如何使用foreach更新pyspark数据框

我有一个pyspark数据框,我想处理每行并根据某种逻辑更新/删除/插入行。我试图使用“ foreach”和“ foreachPartition”,但是我真的无法弄清楚它将如何返回修改后的数据以更新实际数据框

data = [

            {
                "city": "s","latitude": "51","longitude": "5","region": "Europe","date_range": "date_last_year",},{
                "city": "s","latitude": "5","longitude": "5.67","date_range": "date_all_time",{
                "city": "Aalborg","latitude": "57.03","longitude": "9.007","latitude": "57.033","longitude": "9.0007","latitude": "57.0","longitude": "9.97",{
                "city": "Aarau","latitude": "47.32","longitude": "8.05",]

from pyspark import SparkContext
from pyspark.sql import SQLContext,functions as sf

sc = SparkContext()
sqlContext = SQLContext(sc)

df = sc.parallelize(data).toDF()

def myfunction(row):
    if float(row.latitude) > 50:
        print('do_something')
        # need to access "df" to do some operations

df.foreach(myfunction)
df.show()

# output
do_something
do_something
do_something
do_something
+-------+--------------+--------+---------+------+                              
|   city|    date_range|latitude|longitude|region|
+-------+--------------+--------+---------+------+
|      s|date_last_year|      51|        5|Europe|
|      s| date_all_time|       5|     5.67|Europe|
|Aalborg|date_last_year|   57.03|    9.007|Europe|
|Aalborg|date_last_year|  57.033|   9.0007|Europe|
|Aalborg|date_last_year|    57.0|     9.97|Europe|
|  Aarau|date_last_year|   47.32|     8.05|Europe|
+-------+--------------+--------+---------+------+

我想将“ df”传递给foreach函数,或者在foreach函数调用中返回并聚合它们。该怎么做?

wenhong1314 回答:如何使用foreach更新pyspark数据框

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3140484.html

大家都在问