我的问题与Spark Streaming和一个JSON列有关,即func_params列。但是正如您在下面的架构定义中看到的那样,我将其作为字符串获取。这里的问题是json本身是动态的。我需要检查json中是否有某些值,并根据它们做出决策。但我不知道如何将列转换为实际的json。我看过了,但是所有内容都有一个架构,在这种情况下,我之前并不了解该架构。
#Define schema of json coming from kafka
schema = StructType().add("id","string") \
.add("function_params","string") \
.add("srv_source","string") \
.add("user_id","string") \
.add("entrytimestamp","string")
#load data into spark-structured streaming
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","dic_rest") \
.load() \
.selectExpr("CAST(value AS STRING) as message") \
.select(from_json(col("message").cast("string"),schema).alias("json")) \
.select("json.*")
#need to code here the column to json and then work with it such
#as if the json has a key I create an entry for another kafka topic.
任何帮助将不胜感激。我感到这很容易,而我却变得比实际困难。