我想使用Python(PySpark)从Kafka源到MariaDB进行Spark结构化流(Spark 2.4.x)。
我想使用流式Spark数据框,而不是静态或Pandas数据框。
似乎没有必要使用foreach
或foreachBatch
,因为根据https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks的流数据帧没有数据库接收器。
这是我的尝试:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField,StructType,StringType,DoubleType,TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer","password":"1234"}
# schema
schema_simple = StructType([StructField("Signal",StringType()),StructField("Value",DoubleType())])
# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()
# create DataFrame representing the stream
df = spark.readStream \
.format("kafka").option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","mytopic") \
.load() \
.selectExpr("Timestamp","cast (value as string) as json") \
.select("Timestamp",F.from_json("json",schema_simple).alias('json_wrapper')) \
.selectExpr("Timestamp","json_wrapper.Signal","json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ",df2.isStreaming)
def process_row(row):
# Process row
row.write.jdbc(url=db_target_url,table="mytopic",mode="append",properties=db_target_properties)
pass
query = df2.writeStream.foreach(process_row).start()
我得到一个错误:
AttributeError:写入
为什么?