本文介绍了如何使用FileFormat格式的更新输出模式?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在更新输出模式下使用Spark结构化流写入文件.我发现此StructuredSessionization示例,只要配置了控制台格式,它就可以正常工作.但是,如果我将输出模式更改为:

I am trying to use spark structured streaming in update output mode write to a file. I found this StructuredSessionization example and it works fine as long as the console format is configured. But if I change the output mode to:

 val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .format("json")
  .option("path", "/work/output/data")
  .option("checkpointLocation", "/work/output/checkpoint")
  .start()

我收到以下错误:

 Exception in thread "main" org.apache.spark.sql.AnalysisException: Data source json does not support Update output mode;
        at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:279)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:286)
        at palyground.StructuredStreamingMergeSpans$.main(StructuredStreamingMergeSpans.scala:84)
        at palyground.StructuredStreamingMergeSpans.main(StructuredStreamingMergeSpans.scala)

我可以使用更新模式并使用FileFormat将结果表写入文件接收器吗?在源代码中,我发现了一个模式匹配,可确保追加模式.

Can i use update mode and use the FileFormat to write the result table to a file sink? In the source code i found a pattern match that ensures Append Mode.

推荐答案

您无法使用Spark结构化流以更新模式写入文件.您需要为此编写ForeachWriter.我在这里为每个作家写了简单的文章.您可以根据需要对其进行修改.

You cannot write to file in update mode using spark structured streaming. You need to write ForeachWriter for it. I have written simple for each writer here. You can modify it according to your requirement.

val writerForText = new ForeachWriter[Row] {
    var fileWriter: FileWriter = _

    override def process(value: Row): Unit = {
      fileWriter.append(value.toSeq.mkString(","))
    }

    override def close(errorOrNull: Throwable): Unit = {
      fileWriter.close()
    }

    override def open(partitionId: Long, version: Long): Boolean = {
      FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
      fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
      true

    }
  }

val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .foreach(writerForText)
  .start()

这篇关于如何使用FileFormat格式的更新输出模式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 09:04