本文介绍了如何在PySpark中使用foreach或foreachBatch写入数据库?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Python(PySpark)从Kafka源到MariaDB进行Spark结构化流(Spark 2.4.x).

I want to do Spark Structured Streaming (Spark 2.4.x) from a Kafka source to a MariaDB with Python (PySpark).

我想使用流式Spark数据框,而不是静态或Pandas数据框.

I want to use the streamed Spark dataframe and not the static nor Pandas dataframe.

似乎必须使用foreachforeachBatch,因为根据 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

It seems that one has to use foreach or foreachBatch since there are no possible database sinks for streamed dataframes according to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.

这是我的尝试:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer", "password":"1234"}
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])

# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()

# create DataFrame representing the stream
df = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mytopic") \
  .load() \
  .selectExpr("Timestamp", "cast (value as string) as json") \
  .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
  .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)

def process_row(row):
    # Process row
    row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreach(process_row).start()

我得到一个错误:

为什么?

推荐答案

在Jacek的支持下,我可以修复示例:

With the support of Jacek, I could fix my example:

def process_row(df, epoch_id):
    df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreachBatch(process_row).start()

您还必须将 epoch_id 放入函数参数中.否则,您会在spark日志文件中得到jupyter笔记本中未显示的错误.

You also must put the epoch_id into the function parameters. Otherwise you get errors in the spark log file that are not shown in the jupyter notebook.

这篇关于如何在PySpark中使用foreach或foreachBatch写入数据库?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-21 00:22