我有以下两个数据帧,我想加入它们来创建新的架构数据:
df = sqlContext.createDataFrame([("A011021","15","2020-01-01","2020-12-31","4"),("A011021","15","2020-01-01","2020-12-31","4"),("A011021","15","2020-01-01","2020-12-31","4"),("A011021","15","2020-01-01","2020-12-31","3")], ["rep_id","sales_target","start_date","end_date","st_new"])
df2.createOrReplaceTempView('df')
+--------------+------------+----------+----------+------+
rep_id |sales_target|start_date|end_date |st_new|
+--------------+------------+----------+----------+-------
|A011021 |15 |2020-01-01|2020-12-31|4 |
|A011021 |15 |2020-01-01|2020-12-31|4 |
|A011021 |15 |2020-01-01|2020-12-31|4 |
|A011021 |15 |2020-01-01|2020-12-31|3 |
|A011022 |6 |2020-01-01|2020-12-31|3 |
|A011022 |6 |2020-01-01|2020-12-31|3 |
+--------------+------------+----------+----------+-------
df2 = sqlContext.createDataFrame([("A011021","15","2020-01-01","2020-12-31","2020-01-01","2020-03-31"),("A011021","15","2020-01-01","2020-12-31","2020-04-01","2020-06-30"),("A011021","15","2020-01-01","2020-12-31","2020-07-01","2020-09-30"),("A011021","15","2020-01-01","2020-12-31","2020-10-01","2020-12-31")], ["rep_id","sales_target","start_date","end_date","new_sdt","new_edt"])
df2.createOrReplaceTempView('df2')
+--------------+------------+----------+----------+-----------+----------+
rep_id |sales_target|start_date|end_date |new_sdt |new_edt |
+--------------+------------+----------+----------------------+----------+
|A011021 |15 |2020-01-01|2020-12-31|2020-01-01 |2020-03-31|
|A011021 |15 |2020-01-01|2020-12-31|2020-04-01 |2020-06-30|
|A011021 |15 |2020-01-01|2020-12-31|2020-07-01 |2020-09-30|
|A011021 |15 |2020-01-01|2020-12-31|2020-10-01 |2020-12-31|
|A011022 |6 |2020-01-01|2020-06-30|2020-01-01 |2020-03-31|
|A011022 |6 |2020-01-01|2020-06-30|2020-04-01 |2020-06-30|
+--------------+------------+----------+----------------------+----------+
当我运行查询以联接两个表时,会得到如下所示的重复结果:
select ds1.*,ds2.st_new from df2 ds2
inner join df1 ds1
on ds2.rep_id=ds1.rep_id
where ds2.rep_id='A011021'
+--------------+------------+----------+----------+------+-----------+----------+
rep_id |sales_target|start_date|end_date |st_new|new_sdt |new_edate |
+--------------+------------+----------+----------+------------------+----------+
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-01-01 |2019-12-31|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-01-01 |2019-12-31|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-01-01 |2019-12-31|
|A011021 |15 |2020-01-01|2020-12-31|3 |2020-01-01 |2020-03-31|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-04-01 |2020-03-31|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-04-01 |2020-03-31|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-04-01 |2020-03-31|
|A011021 |15 |2020-01-01|2020-12-31|3 |2020-04-01 |2020-06-30|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-07-01 |2020-06-30|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-07-01 |2020-06-30|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-07-01 |2020-06-30|
|A011021 |15 |2020-01-01|2020-12-31|3 |2020-07-01 |2020-09-30|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-10-01 |2020-09-30|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-10-01 |2020-09-30|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-10-01 |2020-09-30|
|A011021 |15 |2020-01-01|2020-12-31|3 |2020-10-01 |2020-12-30|
+--------------+------------+----------+----------+------------------+----------+
有没有方法使用spark_sql或pyspark函数为给定的rep_id只获取不同的new_sdt,new_edt,季度数据,请帮助。
预期结果为:
select ds1.*,ds2.st_new from df2 ds2
inner join df1 ds1
on ds2.rep_id=ds1.rep_id
+--------------+------------+----------+----------+------+-----------+----------+
rep_id |sales_target|start_date|end_date |st_new|new_sdt |new_edt |
+--------------+------------+----------+----------+------------------+----------+
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-01-01 |2020-03-31|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-04-01 |2020-06-30|
|A011021 |15 |2020-01-01|2020-12-31|4 |2020-07-01 |2020-09-30|
|A011021 |15 |2020-01-01|2020-12-31|3 |2020-10-01 |2020-12-31|
|A011022 |6 |2020-01-01|2020-12-31|3 |2020-01-01 |2020-03-31|
|A011022 |6 |2020-01-01|2020-12-31|3 |2020-04-01 |2020-06-30|
+--------------+------------+----------+----------+------------------+----------+
import org.apache.spark.sql.functions._
object InnerJoin {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
val df1 = List(
("A011021", "15", "2020-01-01", "2020-12-31", "4"),
("A011021", "15", "2020-01-01", "2020-12-31", "4"),
("A011021", "15", "2020-01-01", "2020-12-31", "4"),
("A011021", "15", "2020-01-01", "2020-12-31", "3"),
("A011022", "6" , "2020-01-01", "2020-12-31", "3"),
("A011022", "6" , "2020-01-01", "2020-12-31", "3"))
.toDF("rep_id","sales_target","start_date","end_date","st_new")
.withColumn("rowid",monotonically_increasing_id())
val df2 = List(
("A011021","15","2020-01-01","2020-12-31","2020-01-01","2020-03-31"),
("A011021","15","2020-01-01","2020-12-31","2020-04-01","2020-06-30"),
("A011021","15","2020-01-01","2020-12-31","2020-07-01","2020-09-30"),
("A011021","15","2020-01-01","2020-12-31","2020-10-01","2020-12-31"),
("A011022","6" ,"2020-01-01","2020-06-30","2020-01-01","2020-03-31"),
("A011022","6" ,"2020-01-01","2020-06-30","2020-04-01","2020-06-30"))
.toDF("rep_id","sales_target","start_date","end_date","new_sdt","new_edt")
.withColumn("rowid",monotonically_increasing_id())
df1.as("ds1").join(df2.as("ds2"),
col("ds1.rowid") === col("ds2.rowid"),
"inner")
.orderBy(col("ds1.rep_id"),col("ds1.sales_target"),col("st_new").desc)
.drop("rowid")
.show()
}
}
所需输出
+-------+------------+----------+----------+------+-------+------------+----------+----------+----------+----------+
| rep_id|sales_target|start_date| end_date|st_new| rep_id|sales_target|start_date| end_date| new_sdt| new_edt|
+-------+------------+----------+----------+------+-------+------------+----------+----------+----------+----------+
|A011021| 15|2020-01-01|2020-12-31| 4|A011021| 15|2020-01-01|2020-12-31|2020-04-01|2020-06-30|
|A011021| 15|2020-01-01|2020-12-31| 4|A011021| 15|2020-01-01|2020-12-31|2020-01-01|2020-03-31|
|A011021| 15|2020-01-01|2020-12-31| 4|A011021| 15|2020-01-01|2020-12-31|2020-07-01|2020-09-30|
|A011021| 15|2020-01-01|2020-12-31| 3|A011021| 15|2020-01-01|2020-12-31|2020-10-01|2020-12-31|
|A011022| 6|2020-01-01|2020-12-31| 3|A011022| 6|2020-01-01|2020-06-30|2020-04-01|2020-06-30|
|A011022| 6|2020-01-01|2020-12-31| 3|A011022| 6|2020-01-01|2020-06-30|2020-01-01|2020-03-31|
+-------+------------+----------+----------+------+-------+------------+----------+----------+----------+----------+