pyspark UDF 函数返回类型 位置 ROW

在我的 spark 数据框中,我有一个 这是架构

root
 |-- locations: array (nullable = true)
 |    |-- element: struct (containsnull = true)
 |    |    |-- address_line_2: string (nullable = true)
 |    |    |-- continent: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- geo: string (nullable = true)
 |    |    |-- is_primary: boolean (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- locality: string (nullable = true)
 |    |    |-- most_recent: boolean (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- postal_code: string (nullable = true)
 |    |    |-- region: string (nullable = true)
 |    |    |-- street_address: string (nullable = true)
 |    |    |-- subregion: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- zip_plus_4: string (nullable = true)

这里是位置示例

[Row(locations=[Row(address_line_2=None,continent='north america',country='united states',geo='40.41,-74.36',is_primary=True,last_updated=None,locality='old bridge',most_recent=True,name='old bridge,new jersey,united states',postal_code=None,region='new jersey',street_address=None,subregion=None,type=None,zip_plus_4=None)])]

如您所见,有一个名为 isPrimary 的字段,基于我想选择的字段是我编写的函数


def geoLambda(locations):

    """
    Pre process geo locations
    :param x:
    :return: dict
    """
    try:
        for x in locations:
            if x.get("is_primary") == "True" or x.get("is_primary") == True:
                data = x
                data = data.get("geo",None)
                if data is None:
                    lat,lon = -83,135
                else:
                    lat,lon = data.split(",")
                Payload = {"lat":float(lat),"lon":float(lon)}
                return Payload
            else:
                pass
    except Exception as e:
        print("EXCEPTION: {} ".format(e))
        lat,135
        Payload = {"lat":float(lat),"lon":float(lon)}
        return Payload
udfValueToCategoryGeo = udf(geoLambda,StructType())
df = df.withColumn("myloc",udfValueToCategoryGeo("locations"))

输出

 |-- myloc: struct (nullable = true)

----+
|   {}|
|   {}|
|   {}|
|   {}|
|   {}|
|   {}|
|   {}|

如果我选择类型为字符串

udfValueToCategoryGeo = udf(geoLambda,StringType())
df = df.withColumn("myloc",udfValueToCategoryGeo("locations"))
|               myloc|
+--------------------+
|{lon=135.0,lat=-...|
|{lon=135.0,lat=-...|

我一直不知道为什么?

相同的功能在熊猫中运行良好,但我不想使用熊猫任何帮助都会很棒

这是单行的样子

位置 ROW

[{'name': 'princeton,'locality': 'princeton','region': 'new jersey','subregion': None,'country': 'united states','continent': 'north america','type': None,'geo': '40.34,-74.65','postal_code': None,'zip_plus_4': None,'street_address': None,'address_line_2': None,'most_recent': True,'is_primary': True,'last_updated': '2021-03-01'}]

任何帮助

g109007 回答:pyspark UDF 函数返回类型 位置 ROW

我就是这样解决的


def geoLambda(locations):
  for x in locations:
      if x["is_primary"] == True:
          data = x["geo"]
          if data is None:
              lat,lon = -83,135
          else:
              lat,lon = data.split(",")
          Payload = {"lat":float(lat),"lon":float(lon)}
          return Payload
      else:
          pass

udfValueToCategoryGeo = udf(geoLambda,StructType(
    
[
 
  StructField('lat',nullable=True,dataType=FloatType()),StructField('lon',dataType=FloatType())
]

))
df = df.withColumn("myloc",udfValueToCategoryGeo("locations"))
本文链接:https://www.f2er.com/7484.html

大家都在问