本文介绍了Hive为HDFS中的每个插入创建多个小文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下内容已经实现


  1. Kafka Producer使用Spark Streaming从twitter中提取数据。 > Kafka Consumer将数据导入Hive External table(在HDFS上)。

虽然目前工作状况良好。
只有一个我正面临的问题,而我的应用程序将数据插入Hive表中时,它会为每个文件的每行数据创建一个小文件。

下面是

代码

  //定义从
中读取哪些主题val topic =topic_twitter
val groupId =group-1
val consumer = KafkaConsumer(topic,groupId,localhost:2181)

//创建SparkContext
val sparkContext = new SparkContext(local [ 2],KafkaConsumer)

//创建HiveContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)

hiveContext.sql(CREATE EXTERNAL TABLE IF NOT EXISTS)
hiveContext.sql(CREATE EXTERNAL TABLE IF NOT EXISTS demo(foo STRING) )

Hive 演示表已填充一条记录。
Kafka 消费者循环通过处理每一行中的topic =topic_twitter数据并填充到Hive表中
$ b $ pre $ val hiveSql =INSERT INTO TABLE twitter_data SELECT STACK(1,+
tweetID +,+
tweetText +,+
userName +,+
tweetTimeStamp +,+
userLang +)FROM demo limit 1

hiveContext.sql(hiveSql)


$ b $下面的b

是来自我的Hadoop环境的图像。 twitter_data,演示



在HDFS中创建的最后10个文件



你可以看到文件大小不超过200KB,有没有办法将这些文件合并到一个文件?

解决方案

[take 2] OK,所以你不能正确流数据转化为Hive。但是你可以添加一个定期的压缩后期处理工作......




  • (role ='collectA'),(role ='collectB'), ='archive')

  • 将Spark插入到(role ='activeA') li>
  • 在某些时候,切换到(role ='activeB')

  • 然后将您收集到的A分区
    中的每条记录转储到归档中,希望Hive默认配置在限制碎片方面做得很好



    INSERT INTO TABLE twitter_data PARTITION(role ='archive')
    SELECT ...
    FROM twitter_data WHERE role ='activeA'
    ;
    TRUNCATE TABLE twitter_data PARTITION(role ='activeA')
    ;


  • 返回A等。




最后一句话:如果Hive仍然在每个压缩作业上创建太多文件,然后在INSERT之前尝试,例如

  set hive.merge.mapfiles = true; 
set hive.merge.mapredfiles = true;
set hive.merge.smallfiles.avgsize = 1024000000;


following is already been achieved

  1. Kafka Producer pulling data from twitter using Spark Streaming.
  2. Kafka Consumer ingesting data into Hive External table(on HDFS).

while this is working fine so far.there is only one issue I am facing, while my app insert data into Hive table, it created small file with each row data per file.

below is the code

// Define which topics to read from
  val topic = "topic_twitter"
  val groupId = "group-1"
  val consumer = KafkaConsumer(topic, groupId, "localhost:2181")

//Create SparkContext
  val sparkContext = new SparkContext("local[2]", "KafkaConsumer")

//Create HiveContext  
  val hiveContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)

  hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS twitter_data (tweetId BIGINT, tweetText STRING, userName STRING, tweetTimeStamp STRING,   userLang STRING)")
  hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS demo (foo STRING)")

Hive demo table already populated with one single record.Kafka consumer loop thru the data for topic ="topic_twitter" in process each row and populate in Hive table

val hiveSql = "INSERT INTO TABLE twitter_data SELECT STACK( 1," + 
    tweetID        +","  + 
    tweetText      +"," + 
    userName       +"," +
    tweetTimeStamp +","  +
    userLang + ") FROM demo limit 1"

hiveContext.sql(hiveSql)

below are the images from my Hadoop environment. twitter_data, demo

last 10 files created in HDFS

as you can see the file size is not more than 200KB, is there a way I merge these files in one file?

解决方案

[take 2] OK, so you can't properly "stream" data into Hive. But you can add a periodic compaction post-processing job...

  • create your table with 3 partitions e.g. (role='collectA'), (role='collectB'), (role='archive')
  • point your Spark inserts to (role='activeA')
  • at some point, switch to (role='activeB')
  • then dump every record that you have collected in the "A" partitioninto "archive", hoping that Hive default config will do a good job of limiting fragmentation

    INSERT INTO TABLE twitter_data PARTITION (role='archive')SELECT ...FROM twitter_data WHERE role='activeA';TRUNCATE TABLE twitter_data PARTITION (role='activeA');

  • at some point, switch back to "A" etc.

One last word: if Hive still creates too many files on each compaction job, then try tweaking some parameters in your session, just before the INSERT e.g.

set hive.merge.mapfiles =true;
set hive.merge.mapredfiles =true;
set hive.merge.smallfiles.avgsize=1024000000;

这篇关于Hive为HDFS中的每个插入创建多个小文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-11 01:15