问题描述
以下已经实现
- Kafka Producer 使用 Spark Streaming 从 Twitter 提取数据.
- Kafka 消费者将数据提取到 Hive 外部表(在 HDFS 上).
虽然到目前为止工作正常.我只面临一个问题,当我的应用程序将数据插入 Hive 表时,它创建了一个小文件,每个文件的每一行数据.
while this is working fine so far.there is only one issue I am facing, while my app insert data into Hive table, it created small file with each row data per file.
下面是代码
// Define which topics to read from
val topic = "topic_twitter"
val groupId = "group-1"
val consumer = KafkaConsumer(topic, groupId, "localhost:2181")
//Create SparkContext
val sparkContext = new SparkContext("local[2]", "KafkaConsumer")
//Create HiveContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS twitter_data (tweetId BIGINT, tweetText STRING, userName STRING, tweetTimeStamp STRING, userLang STRING)")
hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS demo (foo STRING)")
Hive 演示表已经填充了一条记录.Kafka 消费者通过主题 ="topic_twitter" 的数据循环处理每一行并填充到 Hive 表中
Hive demo table already populated with one single record.Kafka consumer loop thru the data for topic ="topic_twitter" in process each row and populate in Hive table
val hiveSql = "INSERT INTO TABLE twitter_data SELECT STACK( 1," +
tweetID +"," +
tweetText +"," +
userName +"," +
tweetTimeStamp +"," +
userLang + ") FROM demo limit 1"
hiveContext.sql(hiveSql)
以下是来自我的 Hadoop 环境的图像.twitter_data,演示
below are the images from my Hadoop environment. twitter_data, demo
在 HDFS 中创建的最后 10 个文件
last 10 files created in HDFS
如您所见,文件大小不超过 200KB,有没有办法将这些文件合并到一个文件中?
as you can see the file size is not more than 200KB, is there a way I merge these files in one file?
推荐答案
[take 2] 好的,所以你不能正确地将数据流"到 Hive 中.但是你可以添加一个周期性的压缩后处理作业...
[take 2] OK, so you can't properly "stream" data into Hive. But you can add a periodic compaction post-processing job...
- 用 3 个分区创建你的表,例如
(role='collectA')
、(role='collectB')
、(role='archive')
- 将您的 Spark 插入指向
(role='activeA')
- 在某个时候,切换到
(role='activeB')
然后转储您在A"分区中收集的每条记录进入归档",希望 Hive 默认配置能够很好地限制碎片
- create your table with 3 partitions e.g.
(role='collectA')
,(role='collectB')
,(role='archive')
- point your Spark inserts to
(role='activeA')
- at some point, switch to
(role='activeB')
then dump every record that you have collected in the "A" partitioninto "archive", hoping that Hive default config will do a good job of limiting fragmentation
插入表 twitter_data 分区(role='archive')选择 ...从 twitter_data WHERE role='activeA';截断表 twitter_data 分区 (role='activeA');
在某个时候,切换回A"等
at some point, switch back to "A" etc.
最后一句话:如果 Hive 仍然在每个压缩作业上创建太多文件,那么尝试 在您的会话中调整一些参数,就在 INSERT 之前,例如
One last word: if Hive still creates too many files on each compaction job, then try tweaking some parameters in your session, just before the INSERT e.g.
set hive.merge.mapfiles =true;
set hive.merge.mapredfiles =true;
set hive.merge.smallfiles.avgsize=1024000000;
这篇关于Hive 为 HDFS 中的每个插入创建多个小文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!