$ hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadataw-r--r-- 3 hdfs 68K 2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248rw-r--r-- 3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compactw-r--r-- 3 hdfs 68K 2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250...$ hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250v1{路径":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet",大小":9866555,"isDir":false,修改时间":1582750862638,"blockReplication:3," blockSize:134217728," action:" add}{路径":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet",大小":526513,"isDir":false,修改时间":1582750862834,"blockReplication:3," blockSize:134217728," action:" add}... 所以,我们要做的是:构建一个轮询 _spark_metadata 文件夹的Spark Streaming Job.我们使用 fileStream ,因为它允许我们定义要使用的文件过滤器.该流中的每个条目都是这些JSON行之一,将对其进行解析以提取文件路径和大小. 按文件所属的父文件夹(映射到每个Impala分区)将文件分组.对于每个文件夹:读取仅仅加载目标Parquet文件的数据框(以避免与其他作业写入文件的竞争情况)计算要写入的块数(使用JSON中的size字段和目标块大小)将数据帧压缩到所需的分区数量,并将其写回到HDFS 执行DDL REFRESH TABLE myTable PARTITION([从新文件夹派生的分区键] 最后,删除源文件我们取得的成就是: 通过对每个分区和批处理执行一次刷新来限制DDL. 通过配置批处理时间和块大小,我们可以使我们的产品适应具有更大或更小的数据集的不同部署方案. 该解决方案非常灵活,因为我们可以为Spark Streaming作业分配更多或更少的资源(执行程序,内核,内存等),也可以启动/停止(使用其自己的检查点系统). 我们还在研究在执行此过程的同时应用一些数据重新分区的方法,以使分区尽可能接近最佳大小. We have a Hadoop-based solution (CDH 5.15) where we are getting new files in HDFS in some directories. On top os those directories we have 4-5 Impala (2.1) tables. The process writing those files in HDFS is Spark Structured Streaming (2.3.1)Right now, we are running some DDL queries as soon as we get the files written to HDFS:ALTER TABLE table1 RECOVER PARTITONS to detect new partitions (and their HDFS directories and files) added to the table.REFRESH table1 PARTITIONS (partition1=X, partition2=Y), using all the keys for each partition.Right now, this DDL is taking a bit too long and they are getting queued in our system, damaging the data availability of the system.So, my question is: Is there a way to do this data incorporation more efficiently?We have considered:Using the ALTER TABLE .. RECOVER PARTITONS but as per the documentation, it only refreshes new partitions.Tried to use REFRESH .. PARTITON ... with multiple partitions at once, but the statement syntaxis does not allow to do that.Tried batching the queries but the Hive JDBC drives does not support batching queries.Shall we try to do those updates in parallel given that the system is already busy?Any other way you are aware of?Thanks!VictorNote: The way in which we know what partitions need refreshed is by using HDFS events as with Spark Structured Streaming we don´t know exactly when the files are written.Note #2: Also, the files written in HDFS are sometimes small, so it would be great if it could be possible to merge those files at the same time. 解决方案 Since nobody seems to have the answer for my problem, I would like to share the approach we took to make this processing more efficient, comments are very welcome.We discovered (doc. is not very clear on this) that some of the information stored in the Spark "checkpoints" in HDFS is a number of metadata files describing when each Parquet file was written and how big was it:$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadataw-r--r-- 3 hdfs 68K 2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248rw-r--r-- 3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compactw-r--r-- 3 hdfs 68K 2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250...$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250v1{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}...So, what we did was:Build a Spark Streaming Job polling that _spark_metadata folder.We use a fileStream since it allow us to define the file filter to use.Each entry in that stream is one of those JSON lines, which is parsed to extract the file path and size.Group the files by the parent folder (which maps to each Impala partition) they belong to.For each folder:Read a dataframe loading only the targeted Parquet files (to avoid race conditions with the other job writing the files)Calculate how many blocks to write (using the size field in the JSON and a target block size)Coalesce the dataframe to the desired number of partitions and write it back to HDFSExecute the DDL REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]Finally, delete the source filesWhat we achieved is:Limit the DDLs, by doing one refresh per partition and batch.By having batch time and block size configurable, we are able to adapt our product to different deployment scenarios with bigger or smaller datasets.The solution is quite flexible, since we can assign more or less resources to the Spark Streaming job (executors, cores, memory, etc.) and also we can start/stop it (using its own checkpointing system).We are also studying the possibily of applying some data repartitioning, while doing this process, to have partitions as close as possible to the most optimum size. 这篇关于如何有效地更新文件非常频繁修改的Impala表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!
05-16 21:02