如何在PySpark中将SQL函数与UDAF结合/链接

我试图在PySpark的Spark数据帧上使用一堆预定义的sql函数以及我自己的UDAF

    @F.udf
    def mode(v):
     from collections import Counter
     x = [w[0] for w in Counter(v).most_common(5)]
     return x

   funs = [mean,max,min,stddev,approxCountDistinct,mode]
   columns = df.columns
   expr = [f(col(c)) for f in funs for c in columns]

   s = df.agg(*expr).collect()

当我尝试将udf与其他功能一起使用时,我得到: org.apache.spark.sql.AnalysisException:分组表达式序列为空。在窗口函数中以'{avg(CAST(DBN AS DOUBLE))换行为avg(DBN)或在first()(或first_value)中以'DBN'换行关心您获得的价值。;

但是当我跑步时:

funs = [mode]
   columns = df.columns
   expr = [f(collect_list(col(c))) for f in funs for c in columns]

   s = df.agg(*expr).collect()

它给出正确的结果,但仅适用于我的UDF,而不能给出其他功能。

有没有一种方法可以将collect_list函数合并到udf中,以便我可以将udf与其他函数一起运行。

c332515642 回答:如何在PySpark中将SQL函数与UDAF结合/链接

您收到此错误,因为您正在聚合功能中使用udf,而您应该在其中使用UDAF。 1.您可以按照How to define and use a User-Defined Aggregate Function in Spark SQL?定义自己的UDAF,或者 2.您可以手动进行聚合,然后传递到udf。由于您想在调用udf之前使用collect_list,因此可以执行以下操作:

@F.udf
    def mode(v):
     from collections import Counter
     x = [w[0] for w in Counter(v).most_common(5)]
     return x

funs = [mean,max,min,stddev,approxCountDistinct,mode]
my_funs = [mode]
expr = [f(collect_list(col(c))) if f in my_funs  else f(col(c)) for f in funs for c in columns]
s = df.agg(*expr).collect()

在上面的代码中,collect_list用于在对列调用udf之前进行聚合。

本文链接:https://www.f2er.com/3097971.html

大家都在问