默认情况下,spark 尝试将每一行稀疏化为 json 文档,因此如果您的文件包含多行的 json 对象,您将拥有一个包含一列 _corrupt_record
的数据框。要解决此问题,您需要将阅读选项 multiline
设置为 true
。
test.json
[
{
"id":"517379","created_at":"2020-11-16T04:25:03Z","company":"1707","invoice":[
{
"invoice_id":"4102","date":"2020-11-16T04:25:03Z","id":"517379","cantidad":"21992.47","extra_data":{
"column":"ASDFG","credito":"Crédito"
}
}
]
},{
"id":"1234","credito":"Crédito"
}
}
]
}
]
pyspark 代码
from pyspark.sql import SparkSession
df = spark.read.option("multiline","true").json("test.json")
df.printSchema()
root
|-- company: string (nullable = true)
|-- created_at: string (nullable = true)
|-- id: string (nullable = true)
|-- invoice: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- cantidad: string (nullable = true)
| | |-- date: string (nullable = true)
| | |-- extra_data: struct (nullable = true)
| | | |-- column: string (nullable = true)
| | | |-- credito: string (nullable = true)
| | |-- id: string (nullable = true)
| | |-- invoice_id: string (nullable = true)
df.show()
+-------+--------------------+------+--------------------+
|company| created_at| id| invoice|
+-------+--------------------+------+--------------------+
| 1707|2020-11-16T04:25:03Z|517379|[{21992.47,2020-...|
| 1707|2020-11-16T04:25:03Z| 1234|[{21992.47,2020-...|
+-------+--------------------+------+--------------------+
如果您知道架构,最好提供它以减少处理时间并根据需要解析所有类型
from pyspark.sql import types as pyspark_types
from pyspark.sql import functions as pyspark_functions
schema = pyspark_types.StructType(fields=[
pyspark_types.StructField("id",pyspark_types.StringType()),pyspark_types.StructField("created_at",pyspark_types.TimestampType()),pyspark_types.StructField("company",pyspark_types.StructField("invoice",pyspark_types.ArrayType(
pyspark_types.StructType(fields=[
pyspark_types.StructField("invoice_id",pyspark_types.StructField("date",pyspark_types.StructField("id",pyspark_types.StructField("cantidad",pyspark_types.StructField("extra_data",pyspark_types.StructType(fields=[
pyspark_types.StructField("column",pyspark_types.StructField("credito",pyspark_types.StringType())
]))
])
)),])
df = spark.read.option("multiline","true").json("test.json",schema=schema)
df.printSchema()
root
|-- id: string (nullable = true)
|-- created_at: timestamp (nullable = true)
|-- company: string (nullable = true)
|-- invoice: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- invoice_id: string (nullable = true)
| | |-- date: timestamp (nullable = true)
| | |-- id: string (nullable = true)
| | |-- cantidad: string (nullable = true)
| | |-- extra_data: struct (nullable = true)
| | | |-- column: string (nullable = true)
| | | |-- credito: string (nullable = true)
df.show()
+------+-------------------+-------+--------------------+
| id| created_at|company| invoice|
+------+-------------------+-------+--------------------+
|517379|2020-11-16 04:25:03| 1707|[{4102,2020-11-1...|
| 1234|2020-11-16 04:25:03| 1707|[{4102,2020-11-1...|
+------+-------------------+-------+--------------------+
# To get completely denormalized dataframe
df = df.withColumn("invoice",pyspark_functions.explode_outer("invoice")) \
.select("id","company","created_at","invoice.invoice_id","invoice.date","invoice.id","invoice.cantidad","invoice.extra_data.*")
在第二个示例中,spark 不尝试检测模式,虽然它提供了,但它尝试将变量转换为您提供的类型(第二个示例中的 created_at 和 date 是时间戳而不是字符串)
>
本文链接:https://www.f2er.com/6857.html