本文介绍了当文件无法容纳在Spark主内存中时,Spark如何读取大文件(PB)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在这种情况下,大文件会发生什么?

What will happen for large files in these cases?

1)Spark从NameNode获取数据位置. Spark是否会在同一时间停止,因为根据NameNode的信息,数据大小太长了?

1) Spark gets a location from NameNode for data . Will Spark stop in this same time because data size is too long as per information from NameNode?

2)Spark根据数据节点块大小对数据进行分区,但是无法将所有数据存储到主存储器中.在这里,我们不使用StorageLevel.那么这里会发生什么呢?

2) Spark do partition of data as per datanode block size but all data can not be stored into main memory. Here we are not using StorageLevel. So what will happen here?

3)Spark会对数据进行分区,一旦该主存储器中的数据再次处理,一些数据将存储在主存储器中.spark将从磁盘中加载其他数据.

3) Spark do partition the data, some data will store on main memory once this main memory store's data will process again spark will load other data from disc.

推荐答案

首先,Spark仅在调用某个动作(例如countcollectwrite)时才开始读取数据.调用动作后,Spark会在 partitions 中加载数据-并发加载的分区数取决于可用的核心数.因此,在Spark中,您可以想到1个分区= 1个核心= 1个任务.请注意,所有同时加载的分区都必须适合内存,否则您将获得OOM.

First of all, Spark only starts reading in the data when an action (like count, collect or write) is called. Once an action is called, Spark loads in data in partitions - the number of concurrently loaded partitions depend on the number of cores you have available. So in Spark you can think of 1 partition = 1 core = 1 task. Note that all concurrently loaded partitions have to fit into memory, or you will get an OOM.

假设您有多个阶段,Spark然后从第一阶段开始仅在已加载的分区上运行转换.将转换应用于已加载分区中的数据后,它将输出存储为随机数据,然后在更多分区中读取.然后,它将转换应用于这些分区,将输出存储为随机数据,在更多分区中进行读取,依此类推,直到所有数据都被读取为止.

Assuming that you have several stages, Spark then runs the transformations from the first stage on the loaded partitions only. Once it has applied the transformations on the data in the loaded partitions, it stores the output as shuffle-data and then reads in more partitions. It then applies the transformations on these partitions, stores the output as shuffle-data, reads in more partitions and so forth until all data has been read.

如果不应用任何转换,而仅执行例如count,Spark仍将读取分区中的数据,但不会在集群中存储任何数据,如果再次执行count,它将读取在所有数据中再次出现.为了避免多次读取数据,可以调用cachepersist,在这种情况下,Spark 尝试将数据存储在群集中.在cache上(与persist(StorageLevel.MEMORY_ONLY)相同,它将所有分区存储在内存中-如果它不适合存储在内存中,则会得到一个OOM.如果调用persist(StorageLevel.MEMORY_AND_DISK),它将存储尽可能多的分区).内存,其余的都放在磁盘上.如果数据不适合磁盘,那么操作系统通常都会杀死您的工作人员.

If you apply no transformation but only do for instance a count, Spark will still read in the data in partitions, but it will not store any data in your cluster and if you do the count again it will read in all the data once again. To avoid reading in data several times, you might call cache or persist in which case Spark will try to store the data in you cluster. On cache (which is the same as persist(StorageLevel.MEMORY_ONLY) it will store all partitions in memory - if it doesn't fit in memory you will get an OOM. If you call persist(StorageLevel.MEMORY_AND_DISK) it will store as much as it can in memory and the rest will be put on disk. If data doesn't fit on disk either the OS will usually kill your workers.

请注意,Spark有自己的内存管理系统.如果调用cachepersist,则分配给Spark作业的某些内存用于保存正在处理的数据,而某些内存用于存储.

Note that Spark has its own little memory management system. Some of the memory that you assign to your Spark job is used to hold the data being worked on and some of the memory is used for storage if you call cache or persist.

我希望这个解释有帮助:)

I hope this explanation helps :)

这篇关于当文件无法容纳在Spark主内存中时,Spark如何读取大文件(PB)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 17:09