本文介绍了有没有办法控制从Spark数据帧创建的hdfs中零件文件的数量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我将sparksql查询产生的DataFrame保存在HDFS中时,它会生成大量零件文件,每个零件文件的大小为1.4 KB.有没有一种方法可以增加文件的大小,因为每个零件文件都包含大约2条记录.

When i save the DataFrame resulting from sparksql query in HDFS, it generates large number of part files with each one at 1.4 KB. is there a way to increase size of file as every part file contains about 2 records.

df_crimes_dates_formated = spark.sql('SELECT CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) AS DATES , Primary_Type , COUNT(1) AS COUNT  FROM crimes_data Group By CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) , Primary_Type ORDER BY CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) , COUNT(1) DESC' )

df_crimes_dates_formated.write.save('hdfs:///user/maria_dev/crimes/monthly_crimes/') 

推荐答案

您可以使用 .repartition() (或).coalesce() (取决于您的用例)来控制HDFS中的文件数.

You can use either .repartition() (or) .coalesce() depending on your usecase, to control number of files in HDFS.

#to get number of partitions of dataframe, spark creates part files depends on number of partitions in dataframe
>>> df_crimes_dates_formated.rdd.getNumPartitions()

#create 10 part files in HDFS
>>> df_crimes_dates_formated.repartition(10).write.save('hdfs:///user/maria_dev/crimes/monthly_crimes/') 

Caluculating number of partitons dynamically:

Caluculating number of partitons dynamically:

您可以得出每个分区将具有的行数,这样
将提供所需的文件大小,然后将其除以数据帧计数以动态确定分区数.

You can come up with number of rows that each partition will have, So that
will give desired file size then divide that with dataframe count to dynamically decide number of partitions.

df.count()
#3

#req rows for each partition
rows=1
par=df.count()/rows
partitions=int('1' if par <= 0 else par)

#repartition with partitions value
df.repartition(partitions).rdd.getNumPartitions()
#3


此外:


In addition:

从Spark-2.2开始,如果我们在数据帧中有1个分区,并且使用 maxRecordsPerFile 选项控制要写入文件的行数.

From Spark-2.2 if we have 1 partition in dataframe and control number of rows getting written to the file use maxRecordsPerFile option.

#assuming df_crimes_dates_formated having 1 partition then spark creates each file with 100 records in it.
df_crimes_dates_formated.write.option("maxRecordsPerFile", 100).save("hdfs:///user/maria_dev/crimes/monthly_crimes/")

这篇关于有没有办法控制从Spark数据帧创建的hdfs中零件文件的数量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 21:24