我正在创建一个新的DataFrame,其中包含来自Join的少量记录。

val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()

除计数操作外,其他所有操作都很快(在一秒钟之内)。 RDD转换开始,实际上需要几个小时才能完成。有什么办法可以加快速度吗?
INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

最佳答案



这是合理的:count之前的所有操作都称为转换,而这种类型的spark操作是惰性的,即,在调用操作之前(在示例中为count)它不做任何计算。

第二个问题在repartition(1)中:

请记住,您将失去spark提供的所有并行性,并且您的计算将在一个执行程序(如果处于独立模式下则为核心)中运行,因此您必须删除此步骤或将 1 更改为与CPU内核数(独立模式)或执行程序数(集群模式)。



如果我正确理解,您会将DataFrame转换为RDD,这实际上是spark中的不良做法,您应尽可能避免这种转换。
这是因为DataFrameDataset中的数据是使用特殊 Spark 编码器(如果我还记得的话称为tungstant)进行编码的,它比JVM序列化编码器占用的内存少得多,因此这种转换意味着spark将更改您的类型。来自他自己的数据(通过仅处理编码的数据,而不是序列化要使用的数据,然后反序列化),将自己的数据(占用的少得多的内存并让spark 优化进行了很多换向)到JVM数据类型,这就是为什么DataFrameDatasetRDD强大

希望这对您有帮助

关于scala - 依靠Spark Dataframe非常慢,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45142105/

10-10 08:13