如何将具有范围值的列添加到DataFrame

我有具有当前结构的数据框

user_id | country | event |
1       | CA      | 1     |
2       | USA     | 1     |

我想添加周期范围为(0-n)的新列,并得到类似的内容

user_id | country | event |period|
1       | CA      | 1     |1
1       | CA      | 1     |2
1       | CA      | 1     |...
1       | CA      | 1     |n

2       | USA     | 1     |1
2       | USA     | 1     |2
2       | USA     | 1     |...
2       | USA     | 1     |n

据我了解,它应该是一些窗口函数和withColumn函数

w = (Window.partitionBy(['user_id','country','event'])
df = df.withColumn('period',(???).over(w))

如何添加新列,并同时按一定范围添加新行?

rennana123 回答:如何将具有范围值的列添加到DataFrame

首先使用spark.range()创建另一个包含句点的DataFrame。例如,使用n=3

n = 3
periods = spark.range(1,n+1).withColumnRenamed("id","period")
periods.show()
#+------+
#|period|
#+------+
#|     1|
#|     2|
#|     3|
#+------+

现在用df crossJoin来获得所需的输出:

df = df.crossJoin(periods)
df.show()
#+-------+-------+-----+------+
#|user_id|country|event|period|
#+-------+-------+-----+------+
#|      1|     CA|    1|     1|
#|      1|     CA|    1|     2|
#|      1|     CA|    1|     3|
#|      2|    USA|    1|     1|
#|      2|    USA|    1|     2|
#|      2|    USA|    1|     3|
#+-------+-------+-----+------+

请注意,range实际上并未实现DataFrame,因此笛卡尔乘积将不会很昂贵。

df.explain()
#== Physical Plan ==
#BroadcastNestedLoopJoin BuildRight,Cross
#:- Scan ExistingRDD[user_id#0,country#1,event#2]
#+- BroadcastExchange IdentityBroadcastMode
#   +- *(1) Project [id#31L AS period#33L]
#      +- *(1) Range (1,4,step=1,splits=2)
本文链接:https://www.f2er.com/3145682.html

大家都在问