获取PySpark数据框上

我有一个ID和购买的pyspark DF,我正尝试对其进行转换以用于FP增长。 目前,对于给定的ID,我有多行,每行仅与一次购买有关。

我想将此数据框转换为一种形式,其中有两列,一列用于id(每个id一行),第二列包含该id的不同购买列表。

我尝试使用用户定义函数(UDF)将不同的购买映射到不同的ID,但是我收到一个“ py4j.Py4JException:方法 getstate ([])不存在”。感谢@Mithril 我看到“您不能在udf和pandas_udf中使用sparkSession对象,spark.DataFrame对象或其他Spark分布式对象,因为它们没有被腌制。”

所以我已经在下面实现了TERRIBLE方法(该方法可以工作,但不能扩展):

#Lets create some fake transactions
customers  = [1,2,3,1,1]
purschases = ['cake','tea','beer','fruit','cake']

# Lets create a spark DF to capture the transactions
transactions = zip(customers,purschases)
spk_df_1 = spark.createDataFrame(list(transactions),["id","item"])

# Lets have a look at the resulting spark dataframe
spk_df_1.show()

# Lets capture the ids and list of their distinct pruschases in a 
# list of tuples
purschases_lst = []
nums1 = []
import pyspark.sql.functions as f

# for each distinct id lets get the list of their distinct pruschases

for id in spark.sql("SELECT distinct(id) FROM TBLdf ").rdd.map(lambda row : row[0]).collect():
   purschase = df.filter(f.col("id") == id).select("item").distinct().rdd.map(lambda row : row[0]).collect()
   nums1.append((id,purschase))


# Lets see what our list of transaction tuples looks like  
print(nums1)
print("\n")

# lets turn the list of transaction tuples into a pandas dataframe
df_pd = pd.DataFrame(nums1)

# Finally lets turn our pandas dataframe into a pyspark Dataframe
df2 = spark.createDataFrame(df_pd)
df2.show()

输出:

+---+-----+
| id| item|
+---+-----+
|  1| cake|
|  2|  tea|
|  3| beer|
|  1|fruit|
|  1| cake|
+---+-----+

[(1,['fruit','cake']),(3,['beer']),(2,['tea'])]


+---+-------------+
|  0|            1|
+---+-------------+
|  1|[fruit,cake]|
|  3|       [beer]|
|  2|        [tea]|
+---+-------------+

如果有人有任何建议,我将不胜感激。

superhsq 回答:获取PySpark数据框上

这是collect_set的任务,它创建一组没有重复的项目:

import pyspark.sql.functions as F

#Lets create some fake transactions
customers  = [1,2,3,1,1]
purschases = ['cake','tea','beer','fruit','cake']

# Lets create a spark DF to capture the transactions
transactions = zip(customers,purschases)
spk_df_1 = spark.createDataFrame(list(transactions),["id","item"])
spk_df_1.show()

spk_df_1.groupby('id').agg(F.collect_set('item')).show()

输出:

+---+-----+
| id| item|
+---+-----+
|  1| cake|
|  2|  tea|
|  3| beer|
|  1|fruit|
|  1| cake|
+---+-----+

+---+-----------------+
| id|collect_set(item)|
+---+-----------------+
|  1|    [fruit,cake]|
|  3|           [beer]|
|  2|            [tea]|
+---+-----------------+
本文链接:https://www.f2er.com/3106529.html

大家都在问