我正在处理 kafka 主题并尝试使用 pyspark 在我的本地机器上创建一个 readStream。
我已经通过 home-brew 通过以下命令安装了 spark brew install apache-spark
我遵循了很多教程,但无处可去。
我还尝试了将 kafka 与 Spark 连接的 Guid -> https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html。
但这也无济于事。
以下是我将 pyspark 与 Confluent Kafka Topic 连接起来的代码
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkkafka").config("spark.master","local[*]").getOrCreate()
df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers","--xxx--:--xx--") \
.option("subscribe","NAME OF THE TOPIC") \
.option("startingOffsets","latest") \
.option("security.protocol","some protocol") \
.option("mechanisms","PLAIN") \
.option("[protocol]username","XXX-username-XXX") \
.option("[protocol]password","---xxx--password----") \
.option("schema.registry.url","--- scheme registry url ---") \
.option("basic.auth.credentials.source","auth source") \
.option("basic.auth.user.info","info of user") \
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
print(df)
我尝试了两种方式来执行这段代码。
$: python3 fileName
$: pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.0
这两件事都不起作用。
如果有人已经尝试连接 confluent-kafka 和 pyspark。使用近乎实时的流式传输,您能否指导我一些步骤或一些参考资料,以便我解决此问题。
提前致谢