-
如何使用Kafka数据源指定流查询的Kafka自定义配置(例如Confluent Cloud身份验证)?
我想使用针对Confluent Cloud的结构化流进行读写。问题是我无法在文档中找到进行身份验证的方法。 < -
如何在流式查询中使用MLlib模型(“字段“功能”不存在失败。”)?
我正在尝试使用保存的Mllib模型来预测实时流数据的情绪。 我尝试了所有发现的建议,但仍然出现 -
如何使用Spark结构化流配置Confluent的Schema Registry and Avro序列化程序?
我还没有找到任何文档或示例来说明如何将Schema Registry and Avro序列化程序的设置从Confluent传递到Spark结构 -
使用Kafka源进行Spark结构化流式处理,在查询运行时更改主题分区的数量
我已经建立了一个从Kafka主题读取的Spark结构化流查询。 如果在运行Spark查询时更改了主题中的分区数, -
如何使用流数据帧进行rdd转换
我想对流数据帧进行自定义转换,例如: <pre><code>lines = spark \ .readStream....blabla df1 = line.rdd.map(xxx) -
如何在PySpark中使用foreach或foreachBatch写入数据库?
我想使用Python(PySpark)从Kafka源到MariaDB进行Spark结构化流(Spark 2.4.x)。 我想使用流式Spark数据框 -
如何处理JSON文档(来自MongoDB)并在结构化流中写入HBase?
我正在获取mongoDB文档,然后经过处理后,我想使用Bson.Document库将其存储到Hbase中 将流媒体方法从S -
如何在结构化流式处理的pyspark中使用use foreach运算符(“ DataStreamWriter”对象失败,没有属性“ foreach”)?
我在pyspark 2.3.4中使用结构化流。 我正尝试使用<code>foreach</code>运算符,如下所示: <pre><code>que -
如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?
我有一个<strong> Azure Eventhub </strong>,它正在流式传输数据(JSON格式)。 我将其读取为Spark数据帧,并使 -
Spark结构化流:意外错误:状态:STATUS_INVALID_HANDLE = 0xC0000008
我在Hortonworks集群(v 2.6.5)上以客户端模式在Yarn上运行了Spark结构化流(v 2.3.0)作业。该过程从Kafka主 -
如何使用(Py)Spark结构化流为带有时间戳(来自Kafka)的JSON记录定义架构? -显示空值
问题是,我在通过PySpark阅读Kafka消息后得到了<code>null</code>值。 我使用Spark 2.3.1 / Scala 2.11.12 <p -
如何使用writeStream将Spark流传递给kafka主题
我正在使用提供流的twitter流功能。我需要使用Spark writeStream函数,例如:<a href="https://people.apache.org/~pwend -
如何将基本身份验证传递给Confluent Schema Registry?
我想从融合的云主题中读取数据,然后再写入另一个主题。 在本地主机上,我没有遇到任何重大 -
为什么即使我通过设置`spark.sql.shuffle.partitions`来更改默认配置,结构化流阶段也大多要执行200个任务?
我有一个在yarn模式下运行的spark结构化流应用程序。 我正在尝试减少任务数量,并且我注意到大 -
如何使用PySpark结构化流计算时间戳之间的差异
PySpark结构化流媒体存在以下问题。 我的流数据中的每一行都有一个用户ID和一个时间戳。现在, -
如何在静态数据集上运行流查询?
当我将结构化流应用于静态数据集时,spark引擎将获取静态数据集的全部数据。在这种情况下,使用流处 -
Spark + kafka集成中的问题
我正在通过DataSet读取CSV文件,然后将该文件发送到Kafka。 spark-submit作业工作正常,但是当程序将文件发 -
如何将流查询的数据写入Hive?
我正在使用Spark结构化流从HDFS读取数据。我想将该dataFrame保存到Hive。 我已经这样做了,但是它给 -
如何在foreachBatch中使用临时表?
我们正在构建一个流平台,在该平台上,批量处理SQL至关重要。 <pre><code>val query = streamingDataSet.writeS -
“ GroupStateTimeout.ProcessingTimeTimeout()”完成后,Spark结构化流中的密钥不会失效
我正在编写结构化流的代码,其中我从Kafka队列中订阅数据,然后将原始数据写回到Hbase。在这笔交易之 -
如何在Spark结构化流媒体中使用scikit pickle模型?
我们有一个用于梯度增强模型的泡菜,我们想在udf中使用它来通过Spark结构化流对流数据进行评分。 -
如何获取Spark独立集群中的应用程序状态?
根据官方spark文档,我们可以使用<br/>检查状态 <code>spark-submit --master spark://IP-ADDRESS:PORT --status SUBMISSION_ID< -
如何使用外部数据库(PostgreSQL)作为流查询中的输入?
我正在尝试在Postgresql中实现流输入更新。 具体来说,我想在火花输入流中使用Postgresql作为数据源。 -
如何计算流数据集中的数组字段中元素的数量(一个除外)?
我使用Spark 2.1.0.cloudera1。 我在流数据帧中有一个数组,该数组中的数据如下所示: <pre><code>[&# -
使用kafka流在pyspark中将以前的数据与当前数据一起使用的方法
我正在从生产者发送dict对象,并使用pyspark创建一个新对象。但是我要形成的obj的类型也需要键,先前数 -
如何使用Azure Cosmos DB连接器从流查询中写入CosmosDB?
我有一个简单的结构化流应用程序,输出接收器应为CosmosDB。当我调用writeStream方法时,弹出以下错误。 -
如何为foreachBatch的batchId设置起点?
我面临的问题是,我的过程依赖于foreachBatch的batchId,作为对流水线第二阶段准备就绪的某种控制。因此 -
结构化Spark流指标检索
我有一个带有结构化Spark流的应用程序,我想获取一些指标,例如调度延迟,延迟等。通常,这些指标可 -
如何在结构化查询中使用scikit学习模型?
我正在尝试将使用泡菜检索的scikit模型应用于结构化流数据帧的每一行。 我尝试使用pandas_udf(版 -
如何在Spark Dataset <Row>中使用自定义类型创建新列
我有一个输入数据集,它是来自kafka-connect的记录,我将其转换为强类型数据集,然后执行一些转换。之