Pyspark-循环遍历structType和ArrayType在structfield中进行类型转换

我对pyspark很陌生,这个问题令我感到困惑。基本上,我正在寻找一种通过structType或ArrayType进行类型转换的可缩放方法。

我的数据模式示例:

root
 |-- _id: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- card_rates: struct (nullable = true)
 |    |-- rate_1: integer (nullable = true)
 |    |-- rate_2: integer (nullable = true)
 |    |-- rate_3: integer (nullable = true)
 |    |-- card_fee: integer (nullable = true)
 |    |-- payment_method: string (nullable = true)
 |-- online_rates: array (nullable = true)
 |    |-- element: struct (containsnull = true)
 |    |    |-- rate_1: integer (nullable = true)
 |    |    |-- rate_2: integer (nullable = true)
 |    |    |-- online_fee: double (nullable = true)
 |-- updated: timestamp (nullable = true)

如您在此处看到的,card_rates是结构体,而online_rates是结构体数组。我正在寻找遍历以上所有字段并有条件地进行类型转换的方法。理想情况下,如果应该是数字,则应将其转换为双精度型;如果应该是字符串,则应将其转换为字符串型。我需要循环,因为这些rate_*字段可能会随着时间增长。

但是现在,我对能够循环播放它们并将它们全部转换为字符串表示满意,因为我对pyspark非常陌生,并且仍在尝试中。

我想要的输出模式:

root
 |-- _id: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- card_rates: struct (nullable = true)
 |    |-- rate_1: double (nullable = true)
 |    |-- rate_2: double (nullable = true)
 |    |-- rate_3: double (nullable = true)
 |    |-- card_fee: double (nullable = true)
 |    |-- payment_method: string (nullable = true)
 |-- online_rates: array (nullable = true)
 |    |-- element: struct (containsnull = true)
 |    |    |-- rate_1: double (nullable = true)
 |    |    |-- rate_2: double (nullable = true)
 |    |    |-- online_fee: double (nullable = true)
 |-- updated: timestamp (nullable = true)

我正在提出如何执行此操作的想法。

我从这里获得了参考:PySpark convert struct field inside array to string

但是此解决方案对字段进行硬编码,并且不会真正在字段上循环。

请帮助。

aa469011586 回答:Pyspark-循环遍历structType和ArrayType在structfield中进行类型转换

这是借助StructType.simpleString_parse_datatype_string内置函数的一种解决方案:

from pyspark.sql.types import *

df_schema = StructType([
  StructField("_id",StringType(),True),StructField("created",TimestampType(),StructField("card_rates",StructType([
                  StructField("rate_1",IntegerType(),StructField("rate_2",StructField("rate_3",StructField("card_fee",True)])),StructField("online_rates",ArrayType(
                  StructType(
                    [
                      StructField("rate_1",StructField("online_fee",DoubleType(),True)
                    ]),StructField("updated",True)])

schema_str = df_schema.simpleString() # this gives -> struct<_id:string,created:timestamp,card_rates:struct<rate_1:int,rate_2:int,rate_3:int,card_fee:int,card_fee:int>,online_rates:array<struct<rate_1:int,online_fee:double>>,updated:timestamp>

double_schema = schema_str.replace(':int',':double')

# convert back to StructType
final_schema = _parse_datatype_string(double_schema)
final_schema

  1. 首先使用schema.simpleString将您的模式转换为简单的字符串
  2. 然后将所有:int替换为:double
  3. 最后用_parse_datatype_string将修改后的字符串模式转换为StructType

更新:

为了避免@jxc指出的反引号问题,更好的解决方案是对元素进行递归扫描,如下所示:

def transform_schema(schema):

  if schema == None:
    return StructType()

  updated = []
  for f in schema.fields:
    if isinstance(f.dataType,IntegerType):
      # if IntegerType convert to DoubleType
      updated.append(StructField(f.name,f.nullable))
    elif isinstance(f.dataType,ArrayType):
      # if ArrayType unpack the array type(elementType),do recursion then wrap results with ArrayType 
      updated.append(StructField(f.name,ArrayType(transform_schema(f.dataType.elementType))))
    elif isinstance(f.dataType,StructType):
      # if StructType do recursion
      updated.append(StructField(f.name,transform_schema(f.dataType)))
    else:
      # else handle all the other cases i.e TimestampType,StringType etc
      updated.append(StructField(f.name,f.dataType,f.nullable))   

  return StructType(updated)

# call the function with your schema
transform_schema(df_schema)

说明:该函数遍历模式中的每个项目(StructType),并尝试将int字段(StructField)转换为双精度型。最后,将转换后的架构(StructType)传递到上一层(父StructType)。

输出:

StructType(List(
  StructField(_id,StringType,true),StructField(created,TimestampType,StructField(card_rates,StructType(List(StructField(rate_1,DoubleType,StructField(rate_2,StructField(rate_3,StructField(card_fee,true))),StructField(online_rates,ArrayType(
    StructType(List(
      StructField(rate_1,StructField(online_fee,StructField(updated,true)))
本文链接:https://www.f2er.com/3165254.html

大家都在问