本文介绍了Flume:目录到Avro - > Avro转HDFS - 传输后无效的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有用户编写AVRO文件,我想用Flume将所有这些文件移动到使用Flume的HDFS中。因此,我可以稍后使用Hive或Pig来查询/分析数据。



在客户端上安装了flume,并有一个SpoolDir源和AVRO接收器,如下所示:

  a1.sources = src1 
a1.sinks = sink1
a1.channels = c1

1.channels.c1.type =内存

a1.sources.src1.type = spooldir
a1.sources.src1.channels = c1
a1.sources.src1 .spoolDir = {目录}
a1.sources.src1.fileHeader = true
a1.sources.src1.deserializer = avro

a1.sinks.sink1.type = avro
a1.sinks.sink1.channel = c1
a1.sinks.sink1.hostname = {IP}
1.sinks.sink1.port = 41414


在hadoop集群上,我有这个AVRO源代码和HDFS接收器:

  a1.sources = avro1 
a1.sinks = sink1
a1.channels = c1

a1.channels.c1.type =内存

a1.sources.avro1.type = avro
a1.sources.avro1.channels = c1
a1.sources.avro1.bind = 0.0.0.0
a1.sources.avro1 .port = 41 414

a1.sinks.sink1.type = hdfs
a1.sinks.sink1.channel = c1
1.lfs.sink1.hdfs.path = {hdfs dir}
1.ashs.sink1.hdfs.fileSuffix = .avro
a1.sinks.sink1.hdfs.rollSize = 67108864
1.ashs.sink1.hdfs.fileType = DataStream

问题在于HDFS上的文件不是有效的AVRO文件!我使用色相UI来检查文件是否是有效的AVRO文件。如果我将我在电脑上生成的AVRO I文件上载到群集,我可以看到它的内容正常。但是来自flume的文件不是有效的AVRO文件。



我尝试了flume中包含的flume avro客户端,但没有工作,因为它每行发送一个flume事件打破avro文件,这是用 spooldir 源使用 deserializer = avro 修复的。所以我认为在编写文件时HDFS接收器存在问题。



使用 hdfs.fileType = DataStream 它从avro字段中写入值而不是整个avro文件,丢失了所有的模式信息。如果我使用 hdfs.fileType = SequenceFile 这些文件由于某种原因无效。



有什么想法? p>

谢谢

解决方案

您必须将此添加到您的hdfs接收器配置中此属性的值默认为 TEXT ):

  a1.sinks .sink1.serializer = avro_event 

这应该写入有效的avro文件,但是使用默认模式。 p>

但是,由于您使用的是avro文件作为输入,因此您可能希望使用相同模式编写avro文件。为此,您可以使用 from 。假设您构建了代码并将该jar放置在flume的 lib 目录中,现在可以在属性文件中定义Serializer:

  a1.sinks.sink1.serializer = org.apache.flume.serialization.AvroEventSerializer $ Builder 

序列化程序假定avro模式是每个事件的标题,既可以是URL,也可以是LITERAL。要使用后一种方法(效率较低但可能更容易尝试),您必须通过添加以下属性来告诉客户端的源向每个事件添加模式文本:

  a1.sources.src1.deserializer.schemaType = LITERAL 


I have users writing AVRO files and I want to use Flume to move all those files into HDFS using Flume. So I can later use Hive or Pig to query/analyse the data.

On the client I installed flume and have a SpoolDir source and AVRO sink like this:

a1.sources = src1
a1.sinks = sink1
a1.channels = c1

a1.channels.c1.type = memory

a1.sources.src1.type = spooldir
a1.sources.src1.channels = c1
a1.sources.src1.spoolDir = {directory}
a1.sources.src1.fileHeader = true
a1.sources.src1.deserializer = avro

a1.sinks.sink1.type = avro
a1.sinks.sink1.channel = c1
a1.sinks.sink1.hostname = {IP}
a1.sinks.sink1.port = 41414

On the hadoop cluster I have this AVRO source and HDFS sink:

a1.sources = avro1
a1.sinks = sink1
a1.channels = c1

a1.channels.c1.type = memory

a1.sources.avro1.type = avro
a1.sources.avro1.channels = c1
a1.sources.avro1.bind = 0.0.0.0
a1.sources.avro1.port = 41414

a1.sinks.sink1.type = hdfs
a1.sinks.sink1.channel = c1
a1.sinks.sink1.hdfs.path = {hdfs dir}
a1.sinks.sink1.hdfs.fileSuffix = .avro
a1.sinks.sink1.hdfs.rollSize = 67108864
a1.sinks.sink1.hdfs.fileType = DataStream

The problem is that the files on HDFS are not valid AVRO files! I am using the hue UI to check whenever the file is a valid AVRO file or not. If I upload an AVRO I file that I generate on my pc to the cluster I can see its contents fine. But the files from flume are not valid AVRO files.

I tried the flume avro client that is included in flume but didn't work because it sends an flume event per line breaking the avro files, that is fixed with the spooldir source using deserializer = avro. So I think the problem is on the HDFS sink when is writing the files.

Using hdfs.fileType = DataStream it writes the values from the avro fields not the whole avro file, losing all the schema information. If I use hdfs.fileType = SequenceFile the files are not valid for some reason.

Any ideas?

Thanks

解决方案

You have to add this to your hdfs sink configuration (value of this property is by default TEXT):

a1.sinks.sink1.serializer = avro_event

This should write valid avro files, but with the default schema.

However, since your were using avro files as your input, you probably want to write avro files with the same schema. For that you can use the AvroEventSerializer from cloudera's cdk. Assuming you built the code and placed the jar in flume's lib directory, you can now define the Serializer in the properties file:

a1.sinks.sink1.serializer = org.apache.flume.serialization.AvroEventSerializer$Builder

The serializer assumes that the avro schema is present the header of every event, either as a URL or as a LITERAL. To use the latter approach (that is less efficient, but might be easier to try out), you must tell your source on the client side to add the schema literal to every event, by adding this property:

a1.sources.src1.deserializer.schemaType = LITERAL

这篇关于Flume:目录到Avro - > Avro转HDFS - 传输后无效的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-26 13:39