本文介绍了Hive 为 HDFS 中的每个插入创建多个小文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下已经实现

  1. Kafka Producer 使用 Spark Streaming 从 Twitter 提取数据.
  2. Kafka 消费者将数据提取到 Hive 外部表(在 HDFS 上).

虽然到目前为止工作正常.我只面临一个问题,当我的应用程序将数据插入 Hive 表时,它创建了一个小文件,每个文件的每一行数据.

while this is working fine so far.there is only one issue I am facing, while my app insert data into Hive table, it created small file with each row data per file.

下面是代码

// Define which topics to read from
  val topic = "topic_twitter"
  val groupId = "group-1"
  val consumer = KafkaConsumer(topic, groupId, "localhost:2181")

//Create SparkContext
  val sparkContext = new SparkContext("local[2]", "KafkaConsumer")

//Create HiveContext  
  val hiveContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)

  hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS twitter_data (tweetId BIGINT, tweetText STRING, userName STRING, tweetTimeStamp STRING,   userLang STRING)")
  hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS demo (foo STRING)")

Hive 演示表已经填充了一条记录.Kafka 消费者通过主题 ="topic_twitter" 的数据循环处理每一行并填充到 Hive 表中

Hive demo table already populated with one single record.Kafka consumer loop thru the data for topic ="topic_twitter" in process each row and populate in Hive table

val hiveSql = "INSERT INTO TABLE twitter_data SELECT STACK( 1," + 
    tweetID        +","  + 
    tweetText      +"," + 
    userName       +"," +
    tweetTimeStamp +","  +
    userLang + ") FROM demo limit 1"

hiveContext.sql(hiveSql)

以下是来自我的 Hadoop 环境的图像.twitter_data,演示

below are the images from my Hadoop environment. twitter_data, demo

在 HDFS 中创建的最后 10 个文件

last 10 files created in HDFS

如您所见,文件大小不超过 200KB,有没有办法将这些文件合并到一个文件中?

as you can see the file size is not more than 200KB, is there a way I merge these files in one file?

推荐答案

[take 2] 好的,所以你不能正确地将数据流"到 Hive 中.但是你可以添加一个周期性的压缩后处理作业...

[take 2] OK, so you can't properly "stream" data into Hive. But you can add a periodic compaction post-processing job...

  • 用 3 个分区创建你的表,例如(role='collectA')(role='collectB')(role='archive')
  • 将您的 Spark 插入指向 (role='activeA')
  • 在某个时候,切换到 (role='activeB')
  • 然后转储您在A"分区中收集的每条记录进入归档",希望 Hive 默认配置能够很好地限制碎片

  • create your table with 3 partitions e.g. (role='collectA'), (role='collectB'), (role='archive')
  • point your Spark inserts to (role='activeA')
  • at some point, switch to (role='activeB')
  • then dump every record that you have collected in the "A" partitioninto "archive", hoping that Hive default config will do a good job of limiting fragmentation

插入表 twitter_data 分区(role='archive')选择 ...从 twitter_data WHERE role='activeA';截断表 twitter_data 分区 (role='activeA');

在某个时候,切换回A"等

at some point, switch back to "A" etc.

最后一句话:如果 Hive 仍然在每个压缩作业上创建太多文件,那么尝试 在您的会话中调整一些参数,就在 INSERT 之前,例如

One last word: if Hive still creates too many files on each compaction job, then try tweaking some parameters in your session, just before the INSERT e.g.

set hive.merge.mapfiles =true;
set hive.merge.mapredfiles =true;
set hive.merge.smallfiles.avgsize=1024000000;

这篇关于Hive 为 HDFS 中的每个插入创建多个小文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-11 01:15