我还没有找到任何文档或示例来说明如何将Schema Registry and Avro序列化程序的设置从Confluent传递到Spark结构化流(数据块)。
此设置:
//架构注册表特定设置 basic.auth.credentials.source = USER_INFO schema.registry.basic.auth.user.info =: schema.registry.url = https://psrc-1kk8p.westeurope.azure.confluent.cloud
//使用架构注册表启用Avro序列化程序 key.serializer = io.confluent.kafka.serializers.KafkaAvroSerializer value.serializer = io.confluent.kafka.serializers.KafkaAvroSerializer
这是我当前的代码:
val inputStream = spark.readStream
.format("kafka")
.option("kafka.ssl.endpoint.identification.algorithm","https")
.option("kafka.sasl.mechanism","PLAIN")
.option("kafka.request.timeout.ms","20000")
.option("kafka.bootstrap.servers",brokers)
.option("kafka.retry.backoff.ms","500")
.option("kafka.sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginmodule required username=<my_user_name> password=<my_password>;")
.option("kafka.security.protocol","SASL_SSL")
.option("subscribe",inputTopic)
.option("startingOffsets","latest")
.load()
我希望我能获得火花以连接设置并进行身份验证。