Spark 2.2引入了Kafka的结构化流源。据我了解,它依靠HDFS检查点目录来存储偏移量并保证“完全一次”的消息传递。

但是旧的扩展坞(例如https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/)说,Spark Streaming检查点无法跨应用程序或Spark升级恢复,因此不是很可靠。作为解决方案,有一种实践支持将偏移量存储在支持MySQL或RedshiftDB之类的事务的外部存储中。

如果要将偏移量从Kafka源存储到事务性数据库,如何从结构化流批处理中获取偏移量?

以前,可以通过将RDD强制转换为HasOffsetRanges来完成:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

但是使用新的Streaming API时,我的DatasetInternalRow,但找不到轻松的方式来获取偏移量。 Sink API只有addBatch(batchId: Long, data: DataFrame)方法,我该如何为给定的批次ID获取偏移量?

最佳答案



正确。

每个触发器Spark结构化流都将偏移量保存到检查点位置(使用offset选项或checkpointLocation Spark属性定义或随机分配)中的spark.sql.streaming.checkpointLocation目录中,该偏移量应保证偏移量最多可以处理一次。该功能称为提前写入日志

检查点位置中的另一个目录是commits目录,用于完成的流式批处理,每个批处理有一个文件(文件名是批处理ID)。

引用Fault Tolerance Semantics中的官方文档:



每次执行触发器时,StreamExecution都会检查目录并“计算”已经处理了哪些偏移量。这使至少包含一次语义,而总共恰好包含一次



您将它们称为“旧的”是有原因的,不是吗?

他们指的是旧的(我认为)无效的Spark Streaming,不仅保留了偏移量,还保留了导致查询点几乎不可用的情况的整个查询代码,例如当您更改代码时。

现在的时代已经过去,结构化流传输更加谨慎地确定检查点和检查点。



一个解决方案可能是实现或以某种方式使用用于处理偏移量检查点的MetadataLog接口(interface)。那行得通。



目前不可能。

我的理解是您将而不是能够做到这一点,因为流的语义对您隐藏了。您只应该而不是处理这种称为“偏移”的低级“事物”,Spark结构化流用于提供一次保证。

引用Michael Armbrust在Spark Summit上的讲话Easy, Scalable, Fault Tolerant Stream Processing with Structured Streaming in Apache Spark:



further in the talk (on the next slide):



的一种使用StreamingQueryProgress(从任何来源,包括Kafka)获取偏移量的方法,您可以使用StreamingQueryListeneronQueryProgress回调来拦截。



使用StreamingQueryProgress,您可以使用SourceProgress访问sources属性,该属性可以为您提供所需的内容。

关于apache-spark - 如何获取Kafka偏移量以进行结构化查询,以进行手动和可靠的偏移量管理?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46153105/

10-16 03:21