如何使用Kafka数据源指定流查询的Kafka自定义配置(例如Confluent Cloud身份验证)?

我想使用针对Confluent Cloud的结构化流进行读写。问题是我无法在文档中找到进行身份验证的方法。

我有下一个数据连接:

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-nq5ga.westeurope.azure.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginmodule required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
security.protocol=SASL_SSL

在没有密码的情况下针对localhost进行测试时,我没有任何问题。

val inputStream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers",brokers)
    .option("subscribe",inputTopic)
    .option("startingOffsets",startingOffsetsValue)
    .load()

 outputStream.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers",outputBrokers)
    .option("topic",outputTopic)
    .option("checkpointLocation",pathCheckpoint)
    .start()
    .awaitTermination()

有人知道如何通过身份验证配置才能到达融合云

pacheco 回答:如何使用Kafka数据源指定流查询的Kafka自定义配置(例如Confluent Cloud身份验证)?

引用官方文档Kafka Specific Configurations

  

可以通过带有kafka.前缀,例如stream.option("kafka.bootstrap.servers","host:port")

的DataStreamReader.option来设置Kafka自己的配置。

这样,我们可以传递连接数据,例如

.option("kafka.ssl.endpoint.identification.algorithm","https")
本文链接:https://www.f2er.com/3167601.html

大家都在问