如何将ArrayType(StructType)的spark dataframe列拆分为pyspark中的多个列?

我正在使用具有以下模式的databricks spark xml读取xml。子元素X_PAT可以发生多次,以进行处理 这是我使用过的arraytype(structtype),xt转换是在这单列之外创建多列。

<root_tag>
   <id>fff9</id>
   <X1000>
      <X_PAT>
         <X_PAT01>IC</X_PAT01>
         <X_PAT02>EDISUPPORT</X_PAT02>
         <X_PAT03>TE</X_PAT03>
      </X_PAT>
      <X_PAT>
         <X_PAT01>IC1</X_PAT01>
         <X_PAT02>EDISUPPORT1</X_PAT02>
         <X_PAT03>TE1</X_PAT03>
      </X_PAT>
   </X1000>
</root_tag>
from pyspark.sql import SparkSession
from pyspark.sql.types import *

jar_path = "/Users/nsrinivas/com.databricks_spark-xml_2.10-0.4.1.jar"

spark = SparkSession.builder.appName("Spark - XML read").master("local[*]") \
    .config("spark.jars",jar_path) \
    .config("spark.executor.extraClasspath",jar_path) \
    .config("spark.executor.extralibrary",jar_path) \
    .config("spark.driver.extraClasspath",jar_path) \
    .getOrCreate()

xml_schema = StructType()
xml_schema.add("id",StringType(),True)
x1000 = StructType([
    StructField("X_PAT",ArrayType(StructType([
                    StructField("X_PAT01",StringType()),StructField("X_PAT02",StructField("X_PAT03",StringType())]))),])
xml_schema.add("X1000",x1000,True)

df = spark.read.format("xml").option("rowTag","root_tag").option("valuetag",False) \
    .load("root_tag.xml",schema=xml_schema)

df.select("id","X1000.X_PAT").show(truncate=False)

我得到的输出如下:

+------------+--------------------------------------------+
|id          |X_PAT                                       |
+------------+--------------------------------------------+
|fff9        |[[IC1,SUPPORT1,TE1],[IC2,SUPPORT2,TE2]]|
+------------+--------------------------------------------+

但是我希望X_PAT变平并创建如下所示的多列,然后我将重命名列。

+-----+-------+-------+-------+-------+-------+-------+
|id   |X_PAT01|X_PAT02|X_PAT03|X_PAT01|X_PAT02|X_PAT03|
+-----+-------+-------+-------+-------+-------+-------+
|fff9 |IC1    |SUPPORT1|TE1   |IC2   |SUPPORT2|TE2    |
+-----+-------+-------+-------+-------+-------+-------+

然后我将按以下方式重命名新列

id|XPAT_1_01|XPAT_1_02|XPAT_1_03|XPAT_2_01|XPAT_2_02|XPAT_2_03|

我尝试使用X1000.X_PAT.*,但它抛出以下错误 pyspark.sql.utils.AnalysisException:'只能加星展开结构数据类型。属性:ArrayBuffer(L_1000A,S_PER);'

有什么想法吗?

jujishou1984 回答:如何将ArrayType(StructType)的spark dataframe列拆分为pyspark中的多个列?

尝试一下:

df = spark.createDataFrame([('1',[['IC1','SUPPORT1','TE1'],['IC2','SUPPORT2','TE2']]),('2','TE2']])],['id','X_PAT01'])

enter image description here

定义一个解析数据的函数

def create_column(df):
    data = df.select('X_PAT01').collect()[0][0]
    for each_list in range(len(data)):
        for each_item in range(len(data[each_list])):
            df = df.withColumn('X_PAT_'+str(each_list)+'_0'+str(each_item),F.lit(data[each_list][each_item]))
    return df

通话

df = create_column(df)

输出

enter image description here

,

这是一种根据您的要求水平展开数组元素的简单方法:

df2=(df1
     .select('id',*(col('X_PAT')
               .getItem(i) #Fetch the nested array elements
               .getItem(j) #Fetch the individual string elements from each nested array element
               .alias(f'X_PAT_{i+1}_{str(j+1).zfill(2)}') #Format the column alias
               for i in range(2) #outer loop
               for j in range(3) #inner loop
              )
            )
    )

输入与输出:

Input(df1):

+----+--------------------------------------------+
|id  |X_PAT                                       |
+----+--------------------------------------------+
|fff9|[[IC1,SUPPORT1,TE1],[IC2,SUPPORT2,TE2]]|
+----+--------------------------------------------+

Output(df2):

+----+----------+----------+----------+----------+----------+----------+
|  id|X_PAT_1_01|X_PAT_1_02|X_PAT_1_03|X_PAT_2_01|X_PAT_2_02|X_PAT_2_03|
+----+----------+----------+----------+----------+----------+----------+
|fff9|       IC1|  SUPPORT1|       TE1|       IC2|  SUPPORT2|       TE2|
+----+----------+----------+----------+----------+----------+----------+

尽管这涉及for循环,但由于操作直接在数据帧上执行(无需收集/转换为RDD),所以您应该不会遇到任何问题。

本文链接:https://www.f2er.com/3139553.html

大家都在问