当我们尝试从启用SSL的Kafka主题流式传输数据时,我们将面临以下error。您能在这个问题上帮助我们吗?
19/11/07 13:26:54 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1573151189884] Added fetcher for partitions ArrayBuffer()
19/11/07 13:26:54 WARN ConsumerFetcherManager$LeaderFinderThread: [spark-streaming-consumer_dvtcbddc101.corp.cox.com-1573151189725-d40a510f-leader-finder-thread],Failed to find leader for Set([inst_monitor_status_test,2],[inst_monitor_status_test,0],1])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:408)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Pyspark代码:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf,SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer,KafkaClient
from kafka import KafkaProducer
def handler(message):
records = message.collect()
for record in records:
print(record)
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>",file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc,10)
zkQuorum,topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc,zkQuorum,"spark-streaming-consumer",{topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b: a+b)
counts.pprint()
kvs.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
火花提交命令:
火花提交:
/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit-打包org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,org.apache。 spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 dsstream2.py主机:2181 inst_monitor_status_test