您可以从pyspark.sql.functions.map_from_entries
使用此功能
如果我们认为您的数据框为df,则应执行以下操作:
if (bill.has(keyInt))
bill.delete(keyInt)
else
bill.add(keyInt)
,
这是我的方法。
代码
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
df = spark.createDataFrame([(1,'t1','a'),(1,'t2','b'),(2,'t3','t4','c'),'t5',\
(3,'t6',(3,'t7','t8','a')],\
('id','time','cat'))
(df.groupBy(['id','cat'])
.agg(F.count(F.col('cat')).cast(StringType()).alias('counted'))
.select(['id',F.concat_ws('->',F.col('cat'),F.col('counted')).alias('arrowed')])
.groupBy('id')
.agg(F.collect_list('arrowed'))
.show()
)
输出
+-------+---------------------+
| id|collect_list(arrowed)|
+-------+---------------------+
| 1 | [a -> 1,b -> 1] |
| 3 | [a -> 3] |
| 2 | [b -> 2,c -> 1] |
+-------+---------------------+
编辑
(df.groupBy(['id','cat'])
.count()
.select(['id',F.create_map('cat','count').alias('map')])
.groupBy('id')
.agg(F.collect_list('map').alias('cat'))
.show()
)
#+---+--------------------+
#| id| cat|
#+---+--------------------+
#| 1|[[a -> 1],[b -> 1]]|
#| 3| [[a -> 3]]|
#| 2|[[b -> 2],[c -> 1]]|
#+---+--------------------+
,
类似于yasi的答案
import pyspark.sql.functions as F
df1 = df.groupby("id","cat").count()
df2 = df1.groupby("id")\
.agg(F.map_from_arrays(F.collect_list("cat"),F.collect_list("count"))).alias("cat"))
本文链接:https://www.f2er.com/3161849.html