Pyspark 将 Json 转换为 DF

我有这个文件 .json 并且我需要,将它转换成 DF,文件是这样的:

{
  "id": "517379","created_at": "2020-11-16T04:25:03Z","company": "1707","invoice": [
    {
      "invoice_id": "4102","date": "2020-11-16T04:25:03Z","id": "517379","cantidad": "21992.47","extra_data": {
        "column": "ASDFG","credito": "Crédito"
      }
    }

我需要这种方式,比如df。

id.,created_at,company,invoice_id,date,id.,cantidad,column,credito
517379,2020-11-16T04:25:03Z,1707,4102,517379,21992.47,ASDFG,Crédito
haijun110120 回答:Pyspark 将 Json 转换为 DF

默认情况下,spark 尝试将每一行稀疏化为 json 文档,因此如果您的文件包含多行的 json 对象,您将拥有一个包含一列 _corrupt_record 的数据框。要解决此问题,您需要将阅读选项 multiline 设置为 true
test.json

[
   {
      "id":"517379","created_at":"2020-11-16T04:25:03Z","company":"1707","invoice":[
         {
            "invoice_id":"4102","date":"2020-11-16T04:25:03Z","id":"517379","cantidad":"21992.47","extra_data":{
               "column":"ASDFG","credito":"Crédito"
            }
         }
      ]
   },{
      "id":"1234","credito":"Crédito"
            }
         }
      ]
   }
]

pyspark 代码

from pyspark.sql import SparkSession
df = spark.read.option("multiline","true").json("test.json")

df.printSchema()
root
 |-- company: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- invoice: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cantidad: string (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- extra_data: struct (nullable = true)
 |    |    |    |-- column: string (nullable = true)
 |    |    |    |-- credito: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- invoice_id: string (nullable = true)

df.show()
+-------+--------------------+------+--------------------+
|company|          created_at|    id|             invoice|
+-------+--------------------+------+--------------------+
|   1707|2020-11-16T04:25:03Z|517379|[{21992.47,2020-...|
|   1707|2020-11-16T04:25:03Z|  1234|[{21992.47,2020-...|
+-------+--------------------+------+--------------------+

如果您知道架构,最好提供它以减少处理时间并根据需要解析所有类型

from pyspark.sql import types as pyspark_types
from pyspark.sql import functions as pyspark_functions
schema = pyspark_types.StructType(fields=[
    pyspark_types.StructField("id",pyspark_types.StringType()),pyspark_types.StructField("created_at",pyspark_types.TimestampType()),pyspark_types.StructField("company",pyspark_types.StructField("invoice",pyspark_types.ArrayType(
        pyspark_types.StructType(fields=[
            pyspark_types.StructField("invoice_id",pyspark_types.StructField("date",pyspark_types.StructField("id",pyspark_types.StructField("cantidad",pyspark_types.StructField("extra_data",pyspark_types.StructType(fields=[
                pyspark_types.StructField("column",pyspark_types.StructField("credito",pyspark_types.StringType())
            ]))
        ])
    )),])
df = spark.read.option("multiline","true").json("test.json",schema=schema)

df.printSchema()
root
 |-- id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- company: string (nullable = true)
 |-- invoice: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- invoice_id: string (nullable = true)
 |    |    |-- date: timestamp (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- cantidad: string (nullable = true)
 |    |    |-- extra_data: struct (nullable = true)
 |    |    |    |-- column: string (nullable = true)
 |    |    |    |-- credito: string (nullable = true)

df.show()
+------+-------------------+-------+--------------------+
|    id|         created_at|company|             invoice|
+------+-------------------+-------+--------------------+
|517379|2020-11-16 04:25:03|   1707|[{4102,2020-11-1...|
|  1234|2020-11-16 04:25:03|   1707|[{4102,2020-11-1...|
+------+-------------------+-------+--------------------+

# To get completely denormalized dataframe

df = df.withColumn("invoice",pyspark_functions.explode_outer("invoice")) \
        .select("id","company","created_at","invoice.invoice_id","invoice.date","invoice.id","invoice.cantidad","invoice.extra_data.*")

在第二个示例中,spark 不尝试检测模式,虽然它提供了,但它尝试将变量转换为您提供的类型(第二个示例中的 created_at 和 date 是时间戳而不是字符串)

>
本文链接:https://www.f2er.com/6857.html

大家都在问