如何使用Spark结构化流配置Confluent的Schema Registry and Avro序列化程序?

我还没有找到任何文档或示例来说明如何将Schema Registry and Avro序列化程序的设置从Confluent传递到Spark结构化流(数据块)。

此设置:

  

//架构注册表特定设置   basic.auth.credentials.source = USER_INFO   schema.registry.basic.auth.user.info =:   schema.registry.url = https://psrc-1kk8p.westeurope.azure.confluent.cloud

     

//使用架构注册表启用Avro序列化程序   key.serializer = io.confluent.kafka.serializers.KafkaAvroSerializer   value.serializer = io.confluent.kafka.serializers.KafkaAvroSerializer

这是我当前的代码:

val inputStream = spark.readStream
    .format("kafka")
    .option("kafka.ssl.endpoint.identification.algorithm","https")
    .option("kafka.sasl.mechanism","PLAIN")
    .option("kafka.request.timeout.ms","20000")
    .option("kafka.bootstrap.servers",brokers)
    .option("kafka.retry.backoff.ms","500")
    .option("kafka.sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginmodule required username=<my_user_name> password=<my_password>;")
    .option("kafka.security.protocol","SASL_SSL")
    .option("subscribe",inputTopic)
    .option("startingOffsets","latest")
    .load()

我希望我能获得火花以连接设置并进行身份验证。

liuxuefei184 回答:如何使用Spark结构化流配置Confluent的Schema Registry and Avro序列化程序?

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3145718.html

大家都在问