如何使用(Py)Spark结构化流为带有时间戳(来自Kafka)的JSON记录定义架构? -显示空值

问题是,我在通过PySpark阅读Kafka消息后得到了null值。

我使用Spark 2.3.1 / Scala 2.11.12

我的代码:

allData = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers","localhost:9092") \
  .option("subscribe","mysql.login") \
  .option("startingOffsets","earliest") \
  .load()

df = allData.selectExpr("cast(value as string)","timestamp","topic" )

detailSchema = StructType() \
    .add("username",StringType()) \
    .add("login_time",DateType())

df2 = df.select(from_json(col('value'),detailSchema).alias('data'),'timestamp','topic')

writeStream3 = df2 \
    .writeStream \
    .trigger(processingTime= '4 seconds') \
    .format('console') \
    .outputMode('update') \
    .start()

writeStream3.awaitTermination()    

kafka-console-consumer.sh读取的消息如下:

$ kafka-console-consumer.sh \
    --bootstrap-server 127.0.0.1:9092 \
    --topic mysql.login \
    --from-beginning
{"username":"hello kitty","login_time":1572866627000}
{"username":"chitara","login_time":1572867234000}
{"username":"hello kitty","login_time":1572868094000}

但是,当我尝试阅读消息时,看不到该值。在以下行之后,它显示为null

df2 = df.select(from_json(col('value'),'topic')

我的代码的输出是:

+--------------------+--------------------+-----------+
|               value|           timestamp|      topic|
+--------------------+--------------------+-----------+
|{"username":"hell...|2019-11-12 13:55:...|mysql.login|
|{"username":"chit...|2019-11-12 13:55:...|mysql.login|
|{"username":"hell...|2019-11-12 13:55:...|mysql.login|
|{"username":"leon...|2019-11-12 13:55:...|mysql.login|
|{"username":"chit...|2019-11-12 13:55:...|mysql.login|
...

+----+--------------------+-----------+
|data|           timestamp|      topic|
+----+--------------------+-----------+
|null|2019-11-12 13:55:...|mysql.login|
|null|2019-11-12 13:55:...|mysql.login|
|null|2019-11-12 13:55:...|mysql.login|
|null|2019-11-12 13:55:...|mysql.login|
|null|2019-11-12 13:55:...|mysql.login|
...

+--------+-----+
|username|count|
+--------+-----+
|    null|  242|
+--------+-----+

我认为该问题与解析有关,这就是为什么我在null函数之后看到from_json值的原因。为什么?如何解决?

yxsyg 回答:如何使用(Py)Spark结构化流为带有时间戳(来自Kafka)的JSON记录定义架构? -显示空值

tl; dr TimestampType使用login_time


由于login_time是时间戳记,因此您应该使用适当的类型,例如TimestampTypeLongType

来自official documentation

  

在字符串不可解析的情况下,返回null

这正是您从from_json所获得的,因为该架构与输入行不匹配。

本文链接:https://www.f2er.com/3113785.html

大家都在问