00 背景

在实际开发过程中,会遇到希望将采集的数据进入到kafka主题的同一分区中的需求场景,为消费者消费数据提供便利,例如,同一会员的消费信息、浏览信息等等。
此篇主题是探讨flume采集的数据如何进入到kafka主题的同一分区,并且希望能够在为类似需求场景提供解决方案的时候,提供可复制的思路,起到举一反三的效果。

01 工具

02 思路

flume采集的数据如何进入到kafka主题的同一分区?按照个人理解,这个问题需要清楚数据在flume和kafka之间流转的过程,也就是说flume将数据发送给kafka的时候,中间的具体细节是怎样的?
查看flume官网,可以看到flume数据流模型,如下图所示。
从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
可以看出,首先,flume source组件负责接收外部数据源发送的事件,接着,channel组件负责存储source发送事件,然后sink组件负责消费channel中的事件,最后,由sink组件将事件发送到外部存储或者下一个agent。
外部存储有很多种类,比如hdfs、hive、hbase、kafka等等。flume为我们提供相应的sink,比如hdfs sink、hive sink、hbase sink、kafka sink等等。
由于此次是flume和kafka对接,就重点关注kafka sink。可以在官网看到kafka sink具体的信息,如下图所示。
从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
可以看出,kafka sink将数据发布到kafka topic。
kafka Sink使用FlumeEvent headers中的topic和key属性将事件发送给Kafka。如果headers中存在topic,事件将被发送到指定的topic,并且覆盖掉配置文件中sink的kafka.topic属性值。如果headers中存在key,kafka将使用key在topic分区之间划分数据。具有相同key的Events将被发送到相同分区。如果key为null,events将被随机发送到某个分区。
到这里,官网已经给出了答案,只要key相同,就会被发送到相同分区。那么kafka sink是怎么样把具有相同key的events发送到相同分区的呢?查看kafka sink源码,如下图所示。
从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP

代码注释
Flume Sink可以向Kafka发布消息。它可以用在任何Flume agent和channel的通用实现。消息可以是任何事件,key是从header中读取的字符串。对于分区的使用,使用拦截器生成带有分区key的header。
必要的属性:
brokerList——可以是部分列表,但是至少有2个kafka broker,以供HA使用,但是,任何以kafka.开头的属性都会被传递给kafka生产者。阅读Kafka生产者文档,可以看到哪些配置会被使用。
可选属性:
topic——有一个默认属性,而且,如果需要支持具有不同主题的事件,可以在event header 中设置;
batchSize——在一次批处理中要处理多少消息。更大的批处理提高了吞吐量,同时增加了延迟;
requiredAcks—0(不安全)1(至少一个broker接收,默认值)-1(所有broker接收);
useFlumeEventFormat—在序列化到Kafka时保留事件头;
header 属性值 (每个事件):
topic
key

下面看下kafka sink的代码结构,如下图官网给出的信息。
从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
从上图可以看出,process()方法完成从通道提取事件并将其转发的核心处理,所以接下来根据process()来进行分析。

第一步:查看配置信息

从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
上面是kafka sink的配置信息常量

第二步:获取topic和key信息

从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
上面代码表示,从event中获取data、headers后,会分别获取topic、key信息。获取topic信息时,首先判断allowTopicOverride属性值,从官网上面可知,默认为true。
如果值为true,则使用headers中的topic。
如果值为false,则使用配置文件中的topic。如下面的代码所示。
从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
接着从headers中获取key信息。

第三步:KafkaProducer调用send()方法将消息发送到kafka

从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
如果配置文件中未指定defaultPartitionId和partitionIdHeader值,将会生成一个不带分区信息的ProducerRecord,然后交由KafkaProducer调用send()方法将消息发送到kafka。

第四步:查看send()方法

从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
send()方法根据topic和key值分别生成序列化后的key和value,再获取分区信息。那么是如何获取分区信息的?
从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
此时,record.partition()返回的结果为null,接下来就直接调用DefaultPartitioner(默认分区器)的partition()方法
从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
partition()方法会从kafka集群中获取topic的所有分区信息,由于 keyBytes不为null,会通过对keyBytes进行hash来选择分区。
到这里,便可以清楚地知道key相同,在进行选择kafka分区时,partition也会相同。
看下官方给出的默认分区的策略:
从源码看flume采集的数据是如何进入到kafka主题的同一分区-LMLPHP
如果record中指定分区,会直接使用指定的分区;
如果record中未指定分区,但是key指定了,会基于key的hash来选择分区;
如果record中未指定分区或者key,会以循环的方式选择分区;

结论

在进行需求开发时,确定好哪种信息可以作为key,然后将key设置到headers中,便可以实现进入到kafka同一分区中。

10-07 16:08