未能找到主题的负责人; org.apache.kafka.common.utils.Utils.formatAddress上的java.lang.NullPointerException NullPointerException

当我们尝试从启用SSL的Kafka主题流式传输数据时,我们将面临以下error。您能在这个问题上帮助我们吗?

19/11/07 13:26:54 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1573151189884] Added fetcher for partitions ArrayBuffer()
19/11/07 13:26:54 WARN ConsumerFetcherManager$LeaderFinderThread: [spark-streaming-consumer_dvtcbddc101.corp.cox.com-1573151189725-d40a510f-leader-finder-thread],Failed to find leader for Set([inst_monitor_status_test,2],[inst_monitor_status_test,0],1])
java.lang.NullPointerException
        at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:408)
        at kafka.cluster.Broker.connectionString(Broker.scala:62)
        at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Pyspark代码:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf,SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer,KafkaClient
from kafka import KafkaProducer


def handler(message):
    records = message.collect()
    for record in records:
        print(record)


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>",file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc,10)

    zkQuorum,topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc,zkQuorum,"spark-streaming-consumer",{topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b: a+b)
    counts.pprint()
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()

火花提交命令:

火花提交:

/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit-打包org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,org.apache。 spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 dsstream2.py主机:2181 inst_monitor_status_test

ZENGMINGMIN2009 回答:未能找到主题的负责人; org.apache.kafka.common.utils.Utils.formatAddress上的java.lang.NullPointerException NullPointerException

感谢您的投入。我已通过以下方法传递了SSL参数,并按预期工作。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
import time

#  Spark Streaming context :

spark = SparkSession.builder.appName('PythonStreamingDirectKafkaWordCount').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc,20)

#  Kafka Topic Details :

KAFKA_TOPIC_NAME_CONS = "topic_name"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'kafka_server:9093'

#  Creating  readstream DataFrame :

df = spark.readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers",KAFKA_BOOTSTRAP_SERVERS_CONS) \
     .option("subscribe",KAFKA_TOPIC_NAME_CONS) \
     .option("startingOffsets","earliest") \
     .option("kafka.security.protocol","SASL_SSL")\
     .option("kafka.client.id","Clinet_id")\
     .option("kafka.sasl.kerberos.service.name","kafka")\
     .option("kafka.ssl.truststore.location","/home/path/kafka_trust.jks") \
     .option("kafka.ssl.truststore.password","password_rd") \
     .option("kafka.sasl.kerberos.keytab","/home/path.keytab") \
     .option("kafka.sasl.kerberos.principal","path") \
     .load()

df1 = df.selectExpr( "CAST(value AS STRING)")

#  Creating  Writestream DataFrame :

df1.writeStream \
   .option("path","target_directory") \
   .format("csv") \
   .option("checkpointLocation","chkpint_directory") \
   .outputMode("append") \
   .start()

ssc.awaitTermination()
本文链接:https://www.f2er.com/3142356.html

大家都在问