我有两个PySpark数据框,如下所示
首先是df1
,如下所示:
+-----+-----+----------+-----+
| name| type|timestamp1|score|
+-----+-----+----------+-----+
|name1|type1|2012-01-10| 11|
|name2|type1|2012-01-10| 14|
|name3|type2|2012-01-10| 2|
|name3|type2|2012-01-17| 3|
|name1|type1|2012-01-18| 55|
|name1|type1|2012-01-19| 10|
+-----+-----+----------+-----+
第二个是df2
,如下所示:
+-----+-------------------+-------+-------+
| name| timestamp2|string1|string2|
+-----+-------------------+-------+-------+
|name1|2012-01-10 00:00:00| A| aa|
|name2|2012-01-10 00:00:00| A| bb|
|name3|2012-01-10 00:00:00| C| cc|
|name4|2012-01-17 00:00:00| D| dd|
|name3|2012-01-10 00:00:00| C| cc|
|name2|2012-01-17 00:00:00| A| bb|
|name2|2012-01-17 00:00:00| A| bb|
|name4|2012-01-10 00:00:00| D| dd|
|name3|2012-01-17 00:00:00| C| cc|
+-----+-------------------+-------+-------+
这两个数据帧有一个共同的列,即即 name
。 name
中df2
的每个唯一值都有string1
和string2
的唯一值。
我想加入df1
和df2
并形成一个新的数据框df3
,以使df3
包含df1
的所有行(相同的结构,数字以df1
行的形式分配,但将string1
和string2
列中的值(来自df2
)分配给name
中df1
的适当值。以下是我希望合并的数据帧(df3
)的外观。
+-----+-----+----------+-----+-------+-------+
| name| type|timestamp1|score|string1|string2|
+-----+-----+----------+-----+-------+-------+
|name1|type1|2012-01-10| 11| A| aa|
|name2|type1|2012-01-10| 14| A| bb|
|name3|type2|2012-01-10| 2| C| cc|
|name3|type2|2012-01-17| 3| C| cc|
|name1|type1|2012-01-18| 55| A| aa|
|name1|type1|2012-01-19| 10| A| aa|
+-----+-----+----------+-----+-------+-------+
如何获取上述数据框(df3
)?
我尝试了以下df3 = df1.join( df2.select("name","string1","string2"),on=["name"],how="left")
。但这给了我一个包含14行的数据框,其中包含多个(重复的)行条目。
您可以使用下面提到的代码来生成df1
和df2
。
from pyspark.sql import *
import pyspark.sql.functions as F
df1_Stats = Row("name","type","timestamp1","score")
df1_stat1 = df1_Stats('name1','type1',"2012-01-10",11)
df1_stat2 = df1_Stats('name2',14)
df1_stat3 = df1_Stats('name3','type2',2)
df1_stat4 = df1_Stats('name3',"2012-01-17",3)
df1_stat5 = df1_Stats('name1',"2012-01-18",55)
df1_stat6 = df1_Stats('name1',"2012-01-19",10)
df1_stat_lst = [df1_stat1,df1_stat2,df1_stat3,df1_stat4,df1_stat5,df1_stat6]
df1 = spark.createDataFrame(df1_stat_lst)
df2_Stats = Row("name","timestamp2","string2")
df2_stat1 = df2_Stats("name1","2012-01-10 00:00:00","A","aa")
df2_stat2 = df2_Stats("name2","bb")
df2_stat3 = df2_Stats("name3","C","cc")
df2_stat4 = df2_Stats("name4","2012-01-17 00:00:00","D","dd")
df2_stat5 = df2_Stats("name3","cc")
df2_stat6 = df2_Stats("name2","bb")
df2_stat7 = df2_Stats("name2","bb")
df2_stat8 = df2_Stats("name4","dd")
df2_stat9 = df2_Stats("name3","cc")
df2_stat_lst = [
df2_stat1,df2_stat2,df2_stat3,df2_stat4,df2_stat5,df2_stat6,df2_stat7,df2_stat8,df2_stat9,]
df2 = spark.createDataFrame(df2_stat_lst)