本文介绍了Spark createDataFrame(df.rdd, df.schema) 与 checkPoint 以打破血统的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用

val df=longLineageCalculation(....)val newDf=sparkSession.createDataFrame(df.rdd, df.schema)newdf.join……

为了节省计算计划的时间,但是文档说检查点是建议的削减"计划的方法.血统.但我不想为将 RDD 保存到磁盘而付出代价.

我的进程是一个批处理进程,时间不长,可以毫无问题地重新启动,因此检查点对我没有好处(我认为).

使用my"可能会出现哪些问题?方法?(文档建议检查点,这是更昂贵的,而不是打破血统的检查点,我想知道原因)

我唯一能猜到的是,如果在我的血统破坏"之后某个节点出现故障,也许我的过程会失败,而检查点的过程会正常工作?(如果 DF 被缓存而不是检查点呢?)

谢谢!

来自 SMaZ 的回答、我自己的知识和他提供的文章.使用 createDataframe(这是一个 Dev-API,因此使用我的"/您自己承担风险)会将谱系保留在内存中(对我来说不是问题,因为我没有内存问题并且谱系不大).

有了这个,看起来(未 100% 测试)Spark 应该能够在失败时重建所需的任何东西.

由于我没有在以下执行中使用数据,所以我会继续缓存+createDataframe 与检查点(如果我没记错的话,是实际上是缓存+saveToHDFS+createDataFrame").

我的过程不是那么重要(如果它崩溃),因为用户总是期待结果并且他们手动启动它,所以如果出现问题,他们可以重新启动(+Spark 将重新启动它)或打电话给我,所以无论如何我可以冒一些风险,但我 99% 确定没有风险:)

解决方案

让我从创建数据框开始:

val newDf=sparkSession.createDataFrame(df.rdd, df.schema)

如果我们仔细查看 SparkSession 类,则此方法使用 @DeveloperApi 进行注释.要了解此注释的含义,请查看 DeveloperApi

面向开发人员的低级、不稳定的 API.

开发者 API 可能会在次要版本的 Spark 中更改或删除.

因此不建议将此方法用于生产解决方案,在开源世界中称为使用风险自担.

然而,让我们深入挖掘当我们从 RDD 调用 createDataframe 时会发生什么.它正在调用 internalCreateDataFrame 私有方法并创建LogicalRDD.

LogicalRDD 是在以下情况下创建的:

  • 请求数据集检查点
  • 请求 SparkSession 从内部二进制行的 RDD 创建一个 DataFrame

所以它与checkpoint操作无异,无需物理保存数据集.它只是从内部二进制行和模式的 RDD 创建 DataFrame.这可能会截断内存中的谱系,但不会截断物理级别.

所以我认为这只是创建另一个RDD的开销,不能用作checkpoint的替代.

现在,检查点是截断谱系图并将其保存到可靠的分布式/本地文件系统的过程.

为什么要检查点?

  • 如果计算需要很长时间谱系太长依赖太多的RDD

  • 保留大量血统信息会带来内存成本.

  • 即使在 Spark 应用程序终止后,检查点文件也不会自动删除,因此我们可以将其用于其他进程

使用我的"方法会出现哪些问题?(文档建议检查点,这是更昂贵的,而不是这个打破血统,我想知道原因)

这篇文章将详细介绍有关缓存和检查点的信息.IIUC,您的问题更多是关于我们应该在哪里使用检查点.让我们讨论一些检查点有用的实际场景

  1. 假设我们有一个数据集,我们要在该数据集上执行 100 次迭代操作,每次迭代都将最后一次迭代结果作为输入(Spark MLlib 用例).现在,在这个迭代过程中,谱系将在此期间增长.这里定期检查点数据集(假设每 10 次迭代)将确保在发生任何故障时,我们可以从最后一个故障点开始该过程.
  2. 让我们举一些批处理示例.想象一下,我们有一批正在创建一个具有繁重血统或复杂计算的主数据集.现在经过一些固定的时间间隔,我们得到了一些应该使用早期计算的主数据集的数据.在这里,如果我们检查我们的主数据集,那么它可以重用于来自不同 sparkSession 的所有后续进程.

我的过程是一个批处理过程,时间不长,可以重新启动没有问题,所以检查点对我没有好处(我想想).

这是正确的,如果您的流程不是繁重计算/大沿袭,那么就没有检查点的意义.经验法则 是如果您的数据集没有多次使用并且可以比所用时间和用于检查点/缓存的资源更快地重新构建,那么我们应该避免它.它将为您的流程提供更多资源.

I'm currently using

val df=longLineageCalculation(....)
val newDf=sparkSession.createDataFrame(df.rdd, df.schema)
newDf.join......

In order to save time when calculating plans, however docs say that checkpointing is the suggested way to "cut" lineage. BUT I don't want to pay the price of saving the RDD to disk.

My process is a batch process which is not-so-long and can be restarted without issues, so checkpointing is not benefit for me (I think).

What are the problems which can arise using "my" method? (Docs suggests checkpointing, which is more expensive, instead of this one for breaking lineages and I would like to know the reason)

Only think I can guess is that if some node fails after my "lineage breaking" maybe my process will fail while the checkpointed one would have worked correctly? (what If the DF is cached instead of checkpointed?)

Thanks!

EDIT:

解决方案

Let me start with creating dataframe with below line :

If we take close look into SparkSession class then this method is annotated with @DeveloperApi. To understand what this annotation means please take a look into below lines from DeveloperApi class

So it is not advised to use this method for production solutions, called as Use at your own risk implementation in open source world.

However, Let's dig deeper what happens when we call createDataframe from RDD. It is calling the internalCreateDataFrame private method and creating LogicalRDD.

LogicalRDD is created when:

  • Dataset is requested to checkpoint
  • SparkSession is requested to create a DataFrame from an RDD of internal binary rows

So it is nothing but the same as checkpoint operation without saving the dataset physically. It is just creating DataFrame From RDD Of Internal Binary Rows and Schema. This might truncate the lineage in memory but not at the Physical level.

So I believe it's just the overhead of creating another RDDs and can not be used as a replacement of checkpoint.

Now, Checkpoint is the process of truncating lineage graph and saving it to a reliable distributed/local file system.

Why checkpoint?

  • If computation takes a long time or lineage is too long or Depends too many RDDs

  • Keeping heavy lineage information comes with the cost of memory.

  • The checkpoint file will not be deleted automatically even after the Spark application terminated so we can use it for some other process

This article will give detail information on cache and checkpoint. IIUC, your question is more on where we should use the checkpoint. let's discuss some practical scenarios where checkpointing is helpful

  1. Let's take a scenario where we have one dataset on which we want to perform 100 iterative operations and each iteration takes the last iteration result as input(Spark MLlib use cases). Now during this iterative process lineage is going to grow over the period. Here checkpointing dataset at a regular interval(let say every 10 iterations) will assure that in case of any failure we can start the process from last failure point.
  2. Let's take some batch example. Imagine we have a batch which is creating one master dataset with heavy lineage or complex computations. Now after some regular intervals, we are getting some data which should use earlier calculated master dataset. Here if we checkpoint our master dataset then it can be reused for all subsequent processes from different sparkSession.

That's correct, If your process is not heavy-computation/Big-lineage then there is no point of checkpointing. Thumb rule is if your dataset is not used multiple time and can be re-build faster than the time is taken and resources used for checkpoint/cache then we should avoid it. It will give more resources to your process.

这篇关于Spark createDataFrame(df.rdd, df.schema) 与 checkPoint 以打破血统的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-03 00:30