本文介绍了有没有一种很好的方法可以通过更改表将流加入Spark中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们的Spark环境:DataBricks 4.2(包括Apache Spark 2.3.1,Scala 2.11)

Our Spark environment:DataBricks 4.2 (includes Apache Spark 2.3.1, Scala 2.11)

我们试图实现的目标:我们想用一些参考数据来丰富流数据,这些参考数据会定期更新.通过将流与参考数据结合在一起来进行富集.

What we try to achieve:We want to enrich streaming data with some reference data, which is updated regularly. The enrichment is done by joining the stream with the reference data.

我们实现了什么:我们实现了两个spark作业(jar):第一个是使用

What we implemented:We implemented two spark jobs (jars):The first one is updating a Spark table TEST_TABLE every hour (let’s call it ‘reference data’) by using

<dataset>.write.mode(SaveMode.Overwrite).saveAsTable("TEST_TABLE")

,然后调用spark.catalog.refreshTable("TEST_TABLE").

第二项工作(我们称其为流数据)是使用Spark结构化流传输来流读取一些数据,然后使用DataFrame.transform()将其与表TEST_TABLE结合在一起,然后将其写入另一个系统.我们在.transform()调用的函数中使用spark.read.table(TEST_TABLE)读取参考数据,因此我们在表中获取最新值.不幸的是,每当第一个应用程序更新表时,第二个应用程序就会崩溃. Log4j输出中显示以下消息:

The second job (let’s call it streaming data) is using Spark Structured Streaming to stream reading some data, joining it using DataFrame.transform() with table TEST_TABLE and writing it to another system.We are reading the reference data using spark.read.table("TEST_TABLE") in the function called by .transform() so we get the latest values in the table. Unfortunately, the second app crashes every time the first app updates the table. The following message is shown in Log4j output:

18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748

我们还尝试在读取表之前使缓存无效,但这会降低性能,但应用程序仍然崩溃.我们怀疑根过程是对参考数据集的惰性评估(仍然指向"旧数据,现在不再存在).

We also tried to invalidate the cache before we read the table but that decreased the performance and the app crashed nevertheless.We suspect the root course is the lazy evaluation of the reference dataset (which still ‘points’ to the old data, which is not present anymore).

您对我们可以采取什么措施来预防此问题或将动态参考数据加入流的最佳方法有什么建议?

Do you have any suggestions what we could do to prevent this problem or what the best approach to join a stream with dynamic reference data is?

推荐答案

加入参考数据;不要缓存它,这可以确保您找到源代码.查找由主键+计数器表示的最新版本数据,该计数器最接近或等于您在Streaming应用程序中维护的计数器.每小时写入一次,再次添加所有仍为最新的ref数据,但计数器增加;即新版本.在这里使用镶木地板.

Join to the reference data; do not cache it, this ensures you go to source. Look for latest version data which is signified by a primary key + a counter, where this counter closest to or equal to a counter you maintain in Streaming application. Every hour write, append all the ref data still current, again but with incremented counter; i.e. a new version. Use parquet here.

这篇关于有没有一种很好的方法可以通过更改表将流加入Spark中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-27 00:43