我有一个pyspark数据框,我想处理每行并根据某种逻辑更新/删除/插入行。我试图使用“ foreach”和“ foreachPartition”,但是我真的无法弄清楚它将如何返回修改后的数据以更新实际数据框
data = [
{
"city": "s","latitude": "51","longitude": "5","region": "Europe","date_range": "date_last_year",},{
"city": "s","latitude": "5","longitude": "5.67","date_range": "date_all_time",{
"city": "Aalborg","latitude": "57.03","longitude": "9.007","latitude": "57.033","longitude": "9.0007","latitude": "57.0","longitude": "9.97",{
"city": "Aarau","latitude": "47.32","longitude": "8.05",]
from pyspark import SparkContext
from pyspark.sql import SQLContext,functions as sf
sc = SparkContext()
sqlContext = SQLContext(sc)
df = sc.parallelize(data).toDF()
def myfunction(row):
if float(row.latitude) > 50:
print('do_something')
# need to access "df" to do some operations
df.foreach(myfunction)
df.show()
# output
do_something
do_something
do_something
do_something
+-------+--------------+--------+---------+------+
| city| date_range|latitude|longitude|region|
+-------+--------------+--------+---------+------+
| s|date_last_year| 51| 5|Europe|
| s| date_all_time| 5| 5.67|Europe|
|Aalborg|date_last_year| 57.03| 9.007|Europe|
|Aalborg|date_last_year| 57.033| 9.0007|Europe|
|Aalborg|date_last_year| 57.0| 9.97|Europe|
| Aarau|date_last_year| 47.32| 8.05|Europe|
+-------+--------------+--------+---------+------+
我想将“ df”传递给foreach函数,或者在foreach函数调用中返回并聚合它们。该怎么做?