如何处理JSON文档(来自MongoDB)并在结构化流中写入HBase?

我正在获取mongoDB文档,然后经过处理后,我想使用Bson.Document库将其存储到Hbase中

将流媒体方法从Spark Kafkastreaming更改为结构化流 因此,使用kafkaUtils的早期方法是生成Dstream [Document]

在结构化流媒体中,我正在获取数据集[Document]

scala> val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers","brokerList").option("subscribe",s"topic_name").load().selectExpr("CAST(value AS STRING)")
stream: org.apache.spark.sql.DataFrame = [value: string]

scala> val strming_doc = stream.map(record => record.getas[String]("value")
scala> org.apache.spark.sql.Dataset[String] = [value: string]

对于进一步的处理,我需要从数据集中获取文档

scala> val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers",s"topic_name").load().selectExpr("CAST(value AS STRING)")
stream: org.apache.spark.sql.DataFrame = [value: string]

scala> val strming_doc = stream.map(record => record.getas[String]("value")
scala> org.apache.spark.sql.Dataset[String] = [value: string]

我需要从数据集中获取文档,基本上是从mongoDB获取数据

xiaoshiyy 回答:如何处理JSON文档(来自MongoDB)并在结构化流中写入HBase?

似乎您需要foreachforeachBatch运算符才能将流查询的结果写入HBase。请查阅官方文档中的Using Foreach and ForeachBatch

本文链接:https://www.f2er.com/3135361.html

大家都在问