问题是,我在通过PySpark阅读Kafka消息后得到了null
值。
我使用Spark 2.3.1 / Scala 2.11.12
我的代码:
allData = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","mysql.login") \
.option("startingOffsets","earliest") \
.load()
df = allData.selectExpr("cast(value as string)","timestamp","topic" )
detailSchema = StructType() \
.add("username",StringType()) \
.add("login_time",DateType())
df2 = df.select(from_json(col('value'),detailSchema).alias('data'),'timestamp','topic')
writeStream3 = df2 \
.writeStream \
.trigger(processingTime= '4 seconds') \
.format('console') \
.outputMode('update') \
.start()
writeStream3.awaitTermination()
用kafka-console-consumer.sh
读取的消息如下:
$ kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic mysql.login \
--from-beginning
{"username":"hello kitty","login_time":1572866627000}
{"username":"chitara","login_time":1572867234000}
{"username":"hello kitty","login_time":1572868094000}
但是,当我尝试阅读消息时,看不到该值。在以下行之后,它显示为null
:
df2 = df.select(from_json(col('value'),'topic')
我的代码的输出是:
+--------------------+--------------------+-----------+
| value| timestamp| topic|
+--------------------+--------------------+-----------+
|{"username":"hell...|2019-11-12 13:55:...|mysql.login|
|{"username":"chit...|2019-11-12 13:55:...|mysql.login|
|{"username":"hell...|2019-11-12 13:55:...|mysql.login|
|{"username":"leon...|2019-11-12 13:55:...|mysql.login|
|{"username":"chit...|2019-11-12 13:55:...|mysql.login|
...
+----+--------------------+-----------+
|data| timestamp| topic|
+----+--------------------+-----------+
|null|2019-11-12 13:55:...|mysql.login|
|null|2019-11-12 13:55:...|mysql.login|
|null|2019-11-12 13:55:...|mysql.login|
|null|2019-11-12 13:55:...|mysql.login|
|null|2019-11-12 13:55:...|mysql.login|
...
+--------+-----+
|username|count|
+--------+-----+
| null| 242|
+--------+-----+
我认为该问题与解析有关,这就是为什么我在null
函数之后看到from_json
值的原因。为什么?如何解决?