col

我的问题与Spark Streaming和一个JSON列有关,即func_params列。但是正如您在下面的架构定义中看到的那样,我将其作为字符串获取。这里的问题是json本身是动态的。我需要检查json中是否有某些值,并根据它们做出决策。但我不知道如何将列转换为实际的json。我看过了,但是所有内容都有一个架构,在这种情况下,我之前并不了解该架构。

#Define schema of json coming from kafka
schema = StructType().add("id","string") \
                 .add("function_params","string") \
                 .add("srv_source","string") \
                 .add("user_id","string") \
                 .add("entrytimestamp","string")

#load data into spark-structured streaming
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers","localhost:9092") \
  .option("subscribe","dic_rest") \
  .load() \
  .selectExpr("CAST(value AS STRING) as message") \
  .select(from_json(col("message").cast("string"),schema).alias("json")) \
  .select("json.*")

#need to code here the column to json and then work with it such 
#as if the json has a key I create an entry for another kafka topic.

任何帮助将不胜感激。我感到这很容易,而我却变得比实际困难。

b_bunny 回答:col

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3165840.html

大家都在问