一、事务

1. 事务简介

1.1 事务场景
  1. producer发的多条消息组成⼀个事务这些消息需要对consumer同时可⻅或者同时不可⻅
  2. producer可能会给多个topic,多个partition发消息,这些消息也需要能放在⼀个事务⾥⾯,这就形成了⼀个典型的分布式事务
  3. kafka的应⽤场景经常是应⽤先消费⼀个topic,然后做处理再发到另⼀个topic,这个consume-transform-produce过程需要放到⼀个事务⾥⾯,⽐如在消息处理或者发送的过程中如果失败了,消费偏移量也不能提交
  4. producer或者producer所在的应⽤可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务
1.2 关键概念和推导
  1. 因为producer发送消息可能是分布式事务,所以引⼊了常⽤的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引⼊的Group Coordinator在选举和failover上⾯类似
  2. 事务管理中事务⽇志是必不可少的,kafka使⽤⼀个内部topic来保存事务⽇志,这个设计和之前使⽤内部topic保存偏移量的设计保持⼀致。事务⽇志使Transaction Coordinator管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务⽇志只⽤保存最近的事务状态
  3. 因为事务存在commit和abort两种操作,⽽客户端⼜有read committed和read uncommitted两种隔离级别,所以消息队列必须能标识事务状态,这个被称作Control Message
  4. producer挂掉重启或者漂移到其它机器,需要能关联到之前的未完成事务,所以需要有⼀个唯⼀标识符来进⾏关联,这个就是TransactionalId,⼀个producer挂了,另⼀个有相同TransactionalId的producer能够接着处理这个事务未完成的状态。kafka⽬前没有引⼊全局序,所以也没有transaction id,这个TransactionalId是⽤户提前配置的
  5. TransactionalId能关联producer,也需要避免两个使⽤相同TransactionalId的producer同时存在,所以引⼊了producer epoch来保证对应⼀个TransactionalId只有⼀个活跃的producer epoch
1.3 事务语义

多分区原子写入:
事务能够保证Kafka topic下每个分区的原⼦写⼊。事务中所有的消息都将被成功写⼊或者丢弃。
⾸先,我们来考虑⼀下原⼦读取-处理-写⼊周期是什么意思。简⽽⾔之,这意味着如果某个应⽤程序在某个topic tp0的偏移量X处读取到了消息A,并且在对消息A进⾏了⼀些处理(如B = F(A)),之后将消息B写⼊topic tp1,则只有当消息A和B被认为被成功地消费并⼀起发布,或者完全不发布时,整个读取过程写⼊操作是原⼦的。
现在,只有当消息A的偏移量X被标记为已消费,消息A才从topic tp0消费,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写⼊⼀个名为offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。
由于offset commit只是对Kafka topic的另⼀次写⼊,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原⼦写⼊也启⽤原⼦读取-处理-写⼊循环:提交偏移量X到offset topic和消息B到tp1的写⼊将是单个事务的⼀部分,所以整个步骤都是原⼦的。

粉碎“僵尸实例”:
我们通过为每个事务Producer分配⼀个称为transactional.id的唯⼀标识符来解决僵⼫实例的问题。在进程重新启动时能够识别相同的Producer实例。
API要求事务性Producer的第⼀个操作应该是在Kafka集群中显示注册transactional.id。 当注册的时候,Kafka broker⽤给定的transactional.id检查打开的事务并且完成处理。 Kafka也增加了⼀个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。
⼀旦epoch被触发,任何具有相同的transactional.id和旧的epoch的⽣产者被视为僵⼫,Kafka拒绝来⾃这些⽣产者的后续事务性写⼊。
简⽽⾔之:Kafka可以保证Consumer最终只能消费⾮事务性消息或已提交事务性消息。它将保留来⾃未完成事务的消息,并过滤掉已中⽌事务的消息。

1.4 事务的使用场景

在⼀个原⼦操作中,根据包含的操作类型,可以分为三种情况,前两种情况是事务引⼊的场景,最后⼀种没⽤:

  1. 只有Producer⽣产消息;
  2. 消费消息和⽣产消息并存,这个是事务场景中最常⽤的情况,就是我们常说的consume-transform-produce模式
  3. 只有consumer消费消息,这种操作其实没有什么意义,跟使⽤⼿动提交效果⼀样,⽽且也不是事务属性引⼊的⽬的,所以⼀般不会使⽤这种情况
1.5 事务配置

创建消费者代码,需要

  • 将配置中的⾃动提交属性(auto.commit)进⾏关闭
  • ⽽且在代码⾥⾯也不能使⽤⼿动提交commitSync()或者commitAsync()
  • 设置isolation.level

创建生产者,代码如下,需要

  • 配置transactional.id属性
  • 配置enable.idempotence属性

事务相关配置
Broker configs:

Producer configs:

Consumer configs:

1.6 事务工作原理

Kafka 的稳定性-LMLPHP

  1. 事务协调器和事务⽇志
    事务协调器是每个Kafka内部运⾏的⼀个模块。事务⽇志是⼀个内部的主题。每个协调器拥有事务⽇志所在分区的⼦集,即这些 borker 中的分区都是Leader。
    每个transactional.id都通过⼀个简单的哈希函数映射到事务⽇志的特定分区,事务⽇志⽂件__transaction_state-0。这意味着只有⼀个Broker拥有给定的transactional.id
    通过这种⽅式,我们利⽤Kafka可靠的复制协议和Leader选举流程来确保事务协调器始终可⽤,并且所有事务状态都能够持久化。
    值得注意的是,事务⽇志只保存事务的最新状态⽽不是事务中的实际消息。消息只存储在实际的Topic的分区中。事务可以处于诸如“Ongoing”,“prepare commit”和“Completed”之类的各种状态中。正是这种状态和关联的元数据存储在事务⽇志中。

  2. 事务数据流
    数据流在抽象层⾯上有四种不同的类型

    • producer和事务coordinator的交互
      执⾏事务时,Producer向事务协调员发出如下请求:
      • initTransactions APIcoordinator注册⼀个transactional.id。 此时,coordinator使⽤该transactional.id关闭所有待处理的事务,并且会避免遇到僵⼫实例,由具有相同的transactional.id的Producer的另⼀个实例启动的任何事务将被关闭和隔离。每个Producer会话只发⽣⼀次。
      • 当Producer在事务中第⼀次将数据发送到分区时,⾸先向coordinator注册分区
      • 当应⽤程序调⽤commitTransactionabortTransaction时,会向coordinator发送⼀个请求以开始两阶段提交协议。
    • Coordinator和事务⽇志交互
      随着事务的进⾏,Producer发送上⾯的请求来更新Coordinator上事务的状态。事务Coordinator会在内存中保存每个事务的状态,并且把这个状态写到事务⽇志中(这是以三种⽅式复制的,因此是持久保存的)。
      事务Coordinator是读写事务⽇志的唯⼀组件。如果⼀个给定的Borker故障了,⼀个新的Coordinator会被选为新的事务⽇志的Leader,这个事务⽇志分割了这个失效的代理,它从传⼊的分区中读取消息并在内存中重建状态。
    • Producer将数据写⼊⽬标Topic所在分区
      在Coordinator的事务中注册新的分区后,Producer将数据正常地发送到真实数据所在分区。这与producer.send流程完全相同,但有⼀些额外的验证,以确保Producer不被隔离。
    • Topic分区和Coordinator的交互
      • 在Producer发起提交(或中⽌)之后,协调器开始两阶段提交协议。
      • 在第⼀阶段,Coordinator将其内部状态更新为“prepare_commit”并在事务⽇志中更新此状态。⼀旦完成了这个事务,⽆论发⽣什么事,都能保证事务完成。
      • Coordinator然后开始阶段2,在那⾥它将事务提交标记写⼊作为事务⼀部分的Topic分区。
      • 这些事务标记不会暴露给应⽤程序,但是在read_committed模式下被Consumer使⽤来过滤掉被中⽌事务的消息,并且不返回属于开放事务的消息(即那些在⽇志中但没有事务标记与他们相关联)
      • ⼀旦标记被写⼊,事务协调器将事务标记为“完成”,并且Producer可以开始下⼀个事务。

2. 幂等性

Kafka在引⼊幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:
⽣产中,会出现各种不确定的因素,⽐如在Producer在发送给Broker的时候出现⽹络异常。⽐如以下这种异常情况的出现:
Kafka 的稳定性-LMLPHP

上图这种情况,当Producer第⼀次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(⽐如⽹络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。

幂等性
保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的⼀致性。
所谓幂等性,数学概念就是:f(f(x)) = f(x)。f函数表示对消息的处理。
⽐如,银⾏转账,如果失败,需要重试。不管重试多少次,都要保证最终结果⼀定是⼀致的。

幂等性实现
添加唯⼀ID,类似于数据库的主键,⽤于唯⼀标记⼀个消息。
Kafka为了实现幂等性,它在底层设计架构中引⼊了ProducerIDSequenceNumber

  • ProducerID:在每个新的Producer初始化时,会被分配⼀个唯⼀的ProducerID,这个ProducerID对客户端使⽤者是不可⻅的。
  • SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应⼀个从0开始单调递增的SequenceNumber值。
    Kafka 的稳定性-LMLPHP

同样,这是⼀种理想状态下的发送流程。实际情况下,会有很多不确定的因素,⽐如Broker在发送Ack信号给Producer时出现⽹络异常,导致发送失败。异常情况如下图所示:
Kafka 的稳定性-LMLPHP
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发⽣异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引⼊了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,⽽之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有⼀条(x2,y2),不会出现重复发送的情况。

客户端在⽣成Producer时,会实例化如下代码:

// 实例化⼀个Producer对象
Producer<String, String> producer = new KafkaProducer<>(props);

org.apache.kafka.clients.producer.internals.Sender类中,在run()中有⼀个maybeWaitForPid()⽅法,⽤来⽣成⼀个ProducerID,实现代码如下:

private void maybeWaitForPid() {
    if (transactionState == null)
        return;
    while (!transactionState.hasPid()) {
        try {
            Node node = awaitLeastLoadedNodeReady(requestTimeout);
            if (node != null) {
                ClientResponse response = sendAndAwaitInitPidRequest(node);
                if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                    InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                    transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                } else {
                    log.error("Received an unexpected response type for an InitPidRequest from {}. " + "We will back off and try again.", node);
                }
            } else {
                log.debug("Could not find an available broker to send InitPidRequest to. " + "We will back off and try again.");
            }
        } catch (Exception e) {
            log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
        }
        log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
        time.sleep(retryBackoffMs);
        metadata.requestUpdate();
    }
}

3. 事务操作

在Kafka事务中,⼀个原⼦性操作,根据操作类型可以分为3种情况。情况如下:

  • 只有Producer⽣产消息,这种场景需要事务的介⼊;
  • 消费消息和⽣产消息并存,⽐如Consumer&Producer模式,这种场景是⼀般Kafka项⽬中⽐较常⻅的模式,需要事务介⼊;
  • 只有Consumer消费消息,这种操作在实际项⽬中意义不⼤,和⼿动Commit Offsets的结果⼀样,⽽且这种场景不是事务的引⼊⽬的。
// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();

// 开启事务
void beginTransaction() throws ProducerFencedException;

// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 提交事务
void commitTransaction() throws ProducerFencedException;

// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;

案例1:单个Producer,使⽤事务保证消息的仅⼀次发送:

package com.mfc.kafka.demo.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class MyTransactionalProducer {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 提供客户端ID
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");
        // 事务ID
        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id");
        // 要求ISR都确认
        configs.put(ProducerConfig.ACKS_CONFIG, "all");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
        // 初始化事务
        producer.initTransactions();

        // 开启事务
        producer.beginTransaction();

        try {
            // producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_01"));
            producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_02"));
        	// int i = 1 / 0;
            // 提交事务
            producer.commitTransaction();
        } catch (Exception ex) {
            // 中⽌事务
            producer.abortTransaction();
        } finally {
            // 关闭⽣产者
            producer.close();
        }
    }
}

案例2:在消费-转换-⽣产模式,使⽤事务保证仅⼀次发送。

package com.mfc.kafka.demo;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class MyTransactional {
    public static KafkaProducer<String, String> getProducer() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 设置client.id
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");

        // 设置事务id
        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");

        // 需要所有的ISR副本确认
        configs.put(ProducerConfig.ACKS_CONFIG, "all");

        // 启⽤幂等性
        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
        return producer;
    }

    public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 设置消费组ID
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");

        // 不启⽤消费者偏移量的⾃动确认,也不要⼿动确认
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 只读取已提交的消息
        // configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
        return consumer;
    }

    public static void main(String[] args) {
    	String consumerGroupId = "consumer_grp_id_101";
        KafkaProducer<String, String> producer = getProducer();
        KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);

        // 事务的初始化
        producer.initTransactions();
        //订阅主题
        consumer.subscribe(Collections.singleton("tp_tx_01"));

        final ConsumerRecords<String, String> records = consumer.poll(1_000);

        // 开启事务
        producer.beginTransaction();
        try {
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (ConsumerRecord<String, String> record : records) {
            	System.out.println(record);
                producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));
                offsets.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)); // 偏移量表示下⼀条要消费的消息
            }
            // 将该消息的偏移量提交作为事务的⼀部分,随事务提交和回滚(不提交消费偏移量)
            producer.sendOffsetsToTransaction(offsets, consumerGroupId);
            // int i = 1 / 0;

            // 提交事务
            producer.commitTransaction();


        } catch (Exception e) {
        	e.printStackTrace();
            // 回滚事务
            producer.abortTransaction();

        } finally {
            // 关闭资源
            producer.close();
            consumer.close();
        }
    }
}

二、控制器

Kafka集群包含若⼲个broker,broker.id指定broker的编号,编号不要重复。
Kafka集群上创建的主题,包含若⼲个分区。
每个分区包含若⼲个副本,副本因⼦包括了Follower副本和Leader副本。
副本⼜分为ISR(同步副本分区)和OSR(⾮同步副本分区)。
Kafka 的稳定性-LMLPHP
控制器就是⼀个broker。
控制器除了⼀般broker的功能,还负责Leader分区的选举。

Broker 选举
集群⾥第⼀个启动的broker在Zookeeper中创建临时节点<KafkaZkChroot>/controller
其他broker在该控制器节点创建Zookeeper watch对象,使⽤Zookeeper的监听机制接收该节点的变更。
即:Kafka通过Zookeeper的分布式锁特性选举集群控制器
下图中,节点/myKafka/controller是⼀个zookeeper临时节点,其中"brokerid":0,表示当前控制器是broker.id为 0 的broker。
Kafka 的稳定性-LMLPHP

每个新选出的控制器通过 Zookeeper 的条件递增操作获得⼀个全新的、数值更⼤的 controller epoch。其他 broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们,以防⽌“脑裂”。
⽐如当⼀个Leader副本分区所在的broker宕机,需要选举新的Leader副本分区,有可能两个具有不同纪元数字的控制器都选举了新的Leader副本分区,如果选举出来的Leader副本分区不⼀样,听谁的?脑裂了。有了纪元数字,直接使⽤纪元数字最新的控制器结果。
Kafka 的稳定性-LMLPHP
当控制器发现⼀个 broker 已经离开集群,那些失去Leader副本分区的Follower分区需要⼀个新Leader(这些分区的⾸领刚好是在这个 broker 上)。

  1. 控制器需要知道哪个broker宕机了?
  2. 控制器需要知道宕机的broker上负责的时候哪些分区的Leader副本分区?

下图中,<KafkaChroot>/brokers/ids/0保存该broker的信息,此节点为临时节点,如果broker节点宕机,该节点丢失。
集群控制器负责监听ids节点,⼀旦节点⼦节点发送变化,集群控制器得到通知。
Kafka 的稳定性-LMLPHP
控制器遍历这些Follower副本分区,并确定谁应该成为新Leader分区,然后向所有包含新Leader分区和现有Follower的 broker 发送请求。该请求消息包含了谁是新Leader副本分区以及谁是Follower副本分区的信息。随后,新Leader分区开始处理来⾃⽣产者和消费者的请求,⽽跟随者开始从新Leader副本分区消费消息。
当控制器发现⼀个 broker 加⼊集群时,它会使⽤ broker ID 来检查新加⼊的 broker 是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加⼊的 broker 和其他 broker,新 broker上的副本分区开始从Leader分区那⾥消费消息,与Leader分区保持同步。

结论

  1. Kafka 使⽤ Zookeeper 的分布式锁选举控制器,并在节点加⼊集群或退出集群时通知控制器。
  2. 控制器负责在节点加⼊或离开集群时进⾏分区Leader选举。
  3. 控制器使⽤epoch 来避免“脑裂”。“脑裂”是指两个节点同时认为⾃⼰是当前的控制器。

三、可靠性保证

1. 概念

  • 创建Topic的时候可以指定--replication-factor 3,表示分区的副本数,不要超过broker的数量。
  • Leader是负责读写的节点,⽽其他副本则是Follower。Producer只把消息发送到Leader,Follower定期地到Leader上Pull数据。
  • ISR是Leader负责维护的与其保持同步的Replica列表,即当前活跃的副本列表。如果⼀个Follow落后太多,Leader会将它从ISR中移除。落后太多意思是该Follow⻓时间没有向Leader发送fetch请求(参数:replica.lag.time.max.ms默认值:10000)。
  • 为了保证可靠性,可以设置acks=all。Follower收到消息后,会像Leader发送ACK。⼀旦Leader收到了ISR中所有Replica的ACK,Leader就commit,那么Leader就向Producer发送ACK。

2. 副本分配

当某个topic的--replication-factor为N(N>1)时,每个Partition都有N个副本,称作replica。原则上是将replica均匀的分配到整个集群上。不仅如此,partition的分配也同样需要均匀分配。为了更好的负载均衡。
副本分配的三个⽬标:

  1. 均衡地将副本分散于各个broker上
  2. 对于某个broker上分配的分区,它的其他副本在其他broker上
  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第⼀个副本分区通过轮询的⽅式挑选⼀个broker,进⾏分配。该轮询从broker列表的随机位置进⾏轮询。
  2. 其余副本通过增加偏移进⾏分配。

Kafka 的稳定性-LMLPHP

Leader的选举
如果Leader宕机了该怎么办?很容易想到我们在Follower中重新选举⼀个Leader,但是选举哪个作为leader呢?Follower可能已经落后许多了,因此我们要选择的是”最新”的Follow:新的Leader必须拥有与原来Leader commit过的所有信息。
kafka动态维护⼀组同步leader数据的副本(ISR),只有这个组的成员才有资格当选leader,kafka副本写⼊不被认为是已提交,直到所有的同步副本已经接收才认为。这组ISR保存在zookeeper,正因为如此,在ISR中的任何副本都有资格当选leader。

基于Zookeeper的选举⽅式
⼤数据很多组件都有Leader选举的概念,如HBASE等。它们⼤都基于ZK进⾏选举,所有Follow都在ZK上⾯注册⼀个Watch,⼀旦Leader宕机,Leader对应的Znode会⾃动删除,那些Follow由于在Leader节点上注册了Watcher,故可以得到通知,就去参与下⼀轮选举,尝试去创建该节点,ZK会保证只有⼀个Follow创建成功,成为新的Leader。
但是这种⽅式有⼏个缺点:

  1. split-brain。这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保证同⼀时刻所有Replica“看”到的状态是⼀样的,这就可能造成不同Replica的响应不⼀致
  2. herd effect。如果宕机的那个Broker上的Partition⽐较多,会造成多个Watch被触发,造成集群内⼤量的调整
  3. ZooKeeper负载过重。每个Replica都要为此在ZooKeeper上注册⼀个Watch,当集群规模增加到⼏千个Partition时ZooKeeper负载会过重。

基于Controller的选举⽅式
Kafka 0.8后的Leader Election⽅案解决了上述问题,它在所有broker中选出⼀个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的⽅式(⽐ZooKeeper Queue的⽅式更⾼效)通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

  • 优点:极⼤缓解了Herd Effect问题、减轻了ZK的负载,Controller与Leader/Follower之间通过RPC通信,⾼效且实时。
  • 缺点:引⼊Controller增加了复杂度,且需要考虑Controller的Failover

如何处理Replica的恢复:
Kafka 的稳定性-LMLPHP

  1. 只有当ISR列表中所有列表都确认接收数据后,该消息才会被commit。因此只有m1被commit了。即使leader上有m1,m2,m3,consumer此时只能读到m1。
  2. 此时A宕机了。B变成了新的leader了,A从ISR列表中移除。B有m2,B会发给C,C收到m2后,m2被commit。
  3. B继续commit消息4和5
  4. A回来了。注意A并不能⻢上在isr列表中存在,因为它落后了很多,只有当它接受了⼀些数据,⽐如m2 m4 m5,它不落后太多的时候,才会回到ISR列表中。
    思考:m3怎么办呢?
    两种情况:
  5. A重试,重试成功了,m3就恢复了,但是乱序了。
  6. A重试不成功,此时数据就可能丢失了。

如果Replica都死了怎么办?
只要⾄少有⼀个replica,就能保证数据不丢失,可是如果某个partition的所有replica都死了怎么办?有两种⽅案:

  1. 等待在ISR中的副本恢复,并选择该副本作为Leader
  2. 选择第⼀个活过来的副本(不⼀定在 ISR中),作为Leader
    可⽤性和⼀致性的⽭盾:如果⼀定要等待副本恢复,等待的时间可能⽐较⻓,甚⾄可能永远不可⽤。如果是第⼆种,不能保证所有已经commit的消息不丢失,但有可⽤性。
    Kafka默认选⽤第⼆种⽅式,⽀持选择不能保证⼀致的副本。
    可以通过参数unclean.leader.election.enable禁⽤它。

Broker宕机怎么办?
Controller在Zookeeper的/brokers/ids节点上注册Watch。⼀旦有Broker宕机,其在Zookeeper对应的Znode会⾃动被删除,Zookeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。
对set_p中的每⼀个Partition:

  1. /brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。

  2. 决定该Partition的新Leader。如果当前ISR中有⾄少⼀个Replica还幸存,则选择其中⼀个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意⼀个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。

  3. 将新的Leader,ISR和新的leader_epoch及controller_epoch写⼊/brokers/topics/[topic]/partitions/[partition]/state

    [zk: localhost:2181(CONNECTED) 13] get /brokers/topics/bdstar/partitions/0/state
    {"controller_epoch":1272,"leader":0,"version":1,"leader_epoch":4,"isr":[0,2]}
    

直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在⼀个RPC操作中发送多个命令从⽽提⾼效率。

Controller宕机怎么办?

每个Broker都会在/controller上注册⼀个Watch。

[zk: localhost:2181(CONNECTED) 19] get /controller
{"version":1,"brokerid":1...............}

当前Controller宕机时,对应的/controller会⾃动消失。所有“活”着的Broker竞选成为新的Controller,会创建新的Controller Path

[zk: localhost:2181(CONNECTED) 19] get /controller
{"version":1,"brokerid":2...............}

注意:只会有⼀个竞选成功(这点由Zookeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为Zookeeper的Watch是⼀次性的,被fire⼀次之后即失效,所以需要重新注册

3. 失效副本

Kafka中,⼀个主题可以有多个分区,增强主题的可扩展性,为了保证靠可⽤,可以为每个分区设置副本数。
只有Leader副本可以对外提供读写服务,Follower副本只负责poll Leader副本的数据,与Leader副本保持数据的同步。
系统维护⼀个ISR副本集合,即所有与Leader副本保持同步的副本列表。
当Leader宕机找不到的时候,就从ISR列表中挑选⼀个分区做Leader。如果ISR列表中的副本都找不到了,就剩下OSR的副本了。
此时,有两个选择:要么选择OSR的副本做Leader,优点是可以⽴即恢复该分区的服务。缺点是可能会丢失数据。
要么选择等待,等待ISR列表中的分区副本可⽤,就选择该可⽤ISR分区副本做Leader。优点是不会丢失数据缺点是会影响当前分区的可⽤性。

四、一致性保证

1. 概念

水位标记
⽔位或⽔印(watermark)⼀词,表示位置信息,即位移(offset)。Kafka源码中使⽤的名字是⾼⽔位,HW(high watermark)。

副本⻆⾊
Kafka分区使⽤多个副本(replica)提供⾼可⽤。

LEO和HW
每个分区副本对象都有两个重要的属性:LEO和HW。

  • LEO:即⽇志末端位移(log end offset),记录了该副本⽇志中下⼀条消息的位移值。如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,Leader LEO和Follower LEO的更新是有区别的。
  • HW:即上⾯提到的⽔位值。对于同⼀个副本对象⽽⾔,其HW值不会⼤于LEO值。⼩于等于HW值的所有消息都被认为是“已备份”的(replicated)。Leader副本和Follower副本的HW更新不同。
    Kafka 的稳定性-LMLPHP

上图中,HW值是7,表示位移是013的消息就是未完全备份(fully replicated)——为什么没有14?LEO指向的是下⼀条消息到来时的位移。

消费者⽆法消费分区下Leader副本中位移⼤于分区HW的消息。

2. Follower副本何时更新LEO

Follower副本不停地向Leader副本所在的broker发送FETCH请求,⼀旦获取消息后写⼊⾃⼰的⽇志中进⾏备份。那么Follower副本的LEO是何时更新的呢?Kafka有两套Follower副本LEO:

  1. ⼀套LEO保存在Follower副本所在Broker的副本管理机中;
  2. 另⼀套LEO保存在Leader副本所在Broker的副本管理机中。Leader副本机器上保存了所有的follower副本的LEO。

Kafka使⽤前者帮助Follower副本更新其HW值;利⽤后者帮助Leader副本更新其HW。

  1. Follower副本的本地LEO何时更新?

    Follower副本的LEO值就是⽇志的LEO值,每当新写⼊⼀条消息,LEO值就会被更新。当Follower发送FETCH请求后,Leader将数据返回给Follower,此时Follower开始Log写数据,从⽽⾃动更新LEO值。

  2. Leader端Follower的LEO何时更新?

    Leader端的Follower的LEO更新发⽣在Leader在处理Follower FETCH请求时。⼀旦Leader接收到Follower发送的FETCH请求,它先从Log中读取相应的数据,给Follower返回数据前,先更新Follower的LEO。

3. Follower副本何时更新HW

Follower更新HW发⽣在其更新LEO之后,⼀旦Follower向Log写完数据,尝试更新⾃⼰的HW值。

⽐较当前LEO值与FETCH响应中Leader的HW值,取两者的⼩者作为新的HW值。

即:如果Follower的LEO⼤于Leader的HW,Follower HW值不会⼤于Leader的HW值。
Kafka 的稳定性-LMLPHP

4. Leader副本何时更新LEO

和Follower更新LEO相同,Leader写Log时⾃动更新⾃⼰的LEO值。

5. Leader副本何时更新HW值

Leader的HW值就是分区HW值,直接影响分区数据对消费者的可⻅性 。

Leader会尝试去更新分区HW的四种情况:

  1. Follower副本成为Leader副本时:Kafka会尝试去更新分区HW。
  2. Broker崩溃导致副本被踢出ISR时:检查下分区HW值是否需要更新是有必要的。
  3. ⽣产者向Leader副本写消息时:因为写⼊消息会更新Leader的LEO,有必要检查HW值是否需要更新
  4. Leader处理Follower FETCH请求时:⾸先从Log读取数据,之后尝试更新分区HW值

结论

当Kafka broker都正常⼯作时,分区HW值的更新时机有两个:

  1. Leader处理PRODUCE请求时
  2. Leader处理FETCH请求时。

Leader如何更新⾃⼰的HW值?Leader broker上保存了⼀套Follower副本的LEO以及⾃⼰的LEO。当尝试确定分区HW时,它会选出所有满⾜条件的副本,⽐较它们的LEO(包括Leader的LEO),并选择最⼩的LEO值作为HW值

需要满⾜的条件,(⼆选⼀):

  1. 处于ISR中
  2. 副本LEO落后于Leader LEO的时⻓不⼤于replica.lag.time.max.ms参数值(默认是10s)

如果Kafka只判断第⼀个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“⽴刻进⼊ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——不允许。因为分区HW定义就是ISR中所有副本LEO的最⼩值。

6. HW和LEO正常更新案例

我们假设有⼀个topic,单分区,副本因⼦是2,即⼀个Leader副本和⼀个Follower副本。我们看下当producer发送⼀条消息时,broker端的副本到底会发⽣什么事情以及分区HW是如何被更新的。

6.1 初始状态

初始时Leader和Follower的HW和LEO都是0(严格来说源代码会初始化LEO为-1,不过这不影响之后的讨论)。Leader中的Remote LEO指的就是Leader端保存的Follower LEO,也被初始化成0。此时,⽣产者没有发送任何消息给Leader,⽽Follower已经开始不断地给Leader发送FETCH请求了,但因为没有数据因此什么都不会发⽣。值得⼀提的是,Follower发送过来的FETCH请求因为⽆数据⽽暂时会被寄存到Leader端的purgatory中,待500ms (replica.fetch.wait.max.ms参数)超时后会强制完成。倘若在寄存期间⽣产者发来数据,则Kafka会⾃动唤醒该FETCH请求,让Leader继续处理。
Kafka 的稳定性-LMLPHP

6.2 Follower发送FETCH请求在Leader处理完PRODUCE请求之后

producer给该topic分区发送了⼀条消息

此时的状态如下图所示:
Kafka 的稳定性-LMLPHP

如上图所示,Leader接收到PRODUCE请求主要做两件事情:

  1. 把消息写⼊Log,同时⾃动更新Leader⾃⼰的LEO
  2. 尝试更新Leader HW值。假设此时Follower尚未发送FETCH请求,Leader端保存的Remote LEO依然是0,因此Leader会⽐较它⾃⼰的LEO值和Remote LEO值,发现最⼩值是0,与当前HW值相同,故不会更新分区HW值(仍为0)

PRODUCE请求处理完成后各值如下,Leader端的HW值依然是0,⽽LEO是1,Remote LEO也是0。

假设此时follower发送了FETCH请求,则状态变更如下:
Kafka 的稳定性-LMLPHP

本例中当follower发送FETCH请求时,Leader端的处理依次是:

  1. 读取Log数据
  2. 更新remote LEO = 0(为什么是0? 因为此时Follower还没有写⼊这条消息。Leader如何确认Follower还未写⼊呢?这是通过Follower发来的FETCH请求中的Fetch offset来确定的)
  3. 尝试更新分区HW:此时Leader LEO = 1,Remote LEO = 0,故分区HW值= min(Leader LEO, Follower Remote LEO) = 0
  4. 把数据和当前分区HW值(依然是0)发送给Follower副本

⽽Follower副本接收到FETCH Response后依次执⾏下列操作:

  1. 写⼊本地Log,同时更新Follower⾃⼰管理的 LEO为1
  2. 更新Follower HW:⽐较本地LEO和 FETCH Response 中的当前Leader HW值,取较⼩者,Follower HW = 0

此时,第⼀轮FETCH RPC结束,我们会发现虽然Leader和Follower都已经在Log中保存了这条消息,但分区HW值尚未被更新,仍为0。

Follower第⼆轮FETCH

分区HW是在第⼆轮FETCH RPC中被更新的,如下图所示:
Kafka 的稳定性-LMLPHP

Follower发来了第⼆轮FETCH请求,Leader端接收到后仍然会依次执⾏下列操作:

  1. 读取Log数据
  2. 更新Remote LEO = 1(这次为什么是1了? 因为这轮FETCH RPC携带的fetch offset是1,那么为什么这轮携带的就是1了呢,因为上⼀轮结束后Follower LEO被更新为1了)
  3. 尝试更新分区HW:此时leader LEO = 1,Remote LEO = 1,故分区HW值= min(Leader LEO, Follower Remote LEO) = 1。
  4. 把数据(实际上没有数据)和当前分区HW值(已更新为1)发送给Follower副本作为Response

同样地,Follower副本接收到FETCH response后依次执⾏下列操作:

  1. 写⼊本地Log,当然没东⻄可写,Follower LEO也不会变化,依然是1。
  2. 更新Follower HW:⽐较本地LEO和当前Leader LEO取⼩者。由于都是1,故更新follower HW = 1 。

此时消息已经成功地被复制到Leader和Follower的Log中且分区HW是1,表明消费者能够消费offset = 0的消息。

6.3 FETCH请求保存在purgatory中,PRODUCE请求到来

当Leader⽆法⽴即满⾜FECTH返回要求的时候(⽐如没有数据),那么该FETCH请求被暂存到Leader端的purgatory中(炼狱),待时机成熟尝试再次处理。Kafka不会⽆限期缓存,默认有个超时时间(500ms),⼀旦超时时间已过,则这个请求会被强制完成。当寄存期间还没超时,⽣产者发送PRODUCE请求从⽽使之满⾜了条件以致被唤醒。此时,Leader端处理流程如下:

  1. Leader写Log(⾃动更新Leader LEO)
  2. 尝试唤醒在purgatory中寄存的FETCH请求
  3. 尝试更新分区HW

7. HW和LEO异常案例

Kafka使⽤HW值来决定副本备份的进度,⽽HW值的更新通常需要额外⼀轮FETCH RPC才能完成。但这种设计是有问题的,可能引起的问题包括:

  1. 备份数据丢失
  2. 备份数据不⼀致
7.1 数据丢失

使⽤HW值来确定备份进度时其值的更新是在下⼀轮RPC中完成的。如果Follower副本在标记上⽅的的第⼀步与第⼆步之间发⽣崩溃,那么就有可能造成数据的丢失。
Kafka 的稳定性-LMLPHP

上图中有两个副本:A和B。开始状态是A是Leader。

假设⽣产者min.insync.replicas为1,那么当⽣产者发送两条消息给A后,A写⼊Log,此时Kafka会通知⽣产者这两条消息写⼊成功。

但是在broker端,Leader和Follower的Log虽都写⼊了2条消息且分区HW已经被更新到2,但Follower HW尚未被更新还是1,也就是上⾯标记的第⼆步尚未执⾏,表中最后⼀条未执⾏。

倘若此时副本B所在的broker宕机,那么重启后B会⾃动把LEO调整到之前的HW值1,故副本B会做⽇志截断(log truncation),将offset = 1的那条消息从log中删除,并调整LEO = 1。此时follower副本底层log中就只有⼀条消息,即offset = 0的消息!

B重启之后需要给A发FETCH请求,但若A所在broker机器在此时宕机,那么Kafka会令B成为新的Leader,⽽当A重启回来后也会执⾏⽇志截断,将HW调整回1。这样,offset=1的消息就从两个副本的log中被删除,也就是说这条已经被⽣产者认为发送成功的数据丢失。

丢失数据的前提是min.insync.replicas=1时,⼀旦消息被写⼊Leader端Log即被认为是committed。延迟⼀轮FETCH RPC更新HW值的设计使follower HW值是异步延迟更新,若在这个过程中Leader发⽣变更,那么成为新Leader的Follower的HW值就有可能是过期的,导致⽣产者本是成功提交的消息被删除。

7.2 Leader和Follower数据离散

除了可能造成的数据丢失以外,该设计还会造成Leader的Log和Follower的Log数据不⼀致。

如Leader端记录序列:m1,m2,m3,m4,m5,…;Follower端序列可能是m1,m3,m4,m5,…。

看图:
Kafka 的稳定性-LMLPHP

假设:A是Leader,A的Log写⼊了2条消息,但B的Log只写了1条消息。分区HW更新到2,但B的HW还是1,同时⽣产者min.insync.replicas仍然为1。

假设A和B所在Broker同时宕机,B先重启回来,因此B成为Leader,分区HW = 1。假设此时⽣产者发送了第3条消息(红⾊表示)给B,于是B的log中offset = 1的消息变成了红框表示的消息,同时分区HW更新到2(A还没有回来,就B⼀个副本,故可以直接更新HW⽽不⽤理会A)之后A重启回来,需要执⾏⽇志截断,但发现此时分区HW=2⽽A之前的HW值也是2,故不做任何调整。此后A和B将以这种状态继续正常⼯作。

显然,这种场景下,A和B的Log中保存在offset = 1的消息是不同的记录,从⽽引发不⼀致的情形出现。

8. Leader Epoch使⽤

8.1 Kafka 解决方案

造成上述两个问题的根本原因在于

  1. HW值被⽤于衡量副本备份的成功与否。
  2. 在出现失败重启时作为⽇志截断的依据。

但HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间发⽣的任何崩溃都可能导致HW值的过期。

Kafka从0.11引⼊了leader epoch来取代HW值。Leader端使⽤内存保存Leader的epoch信息,即使出现上⾯的两个场景也能规避这些问题。

所谓Leader epoch实际上是⼀对值:<epoch, offset>:

  1. epoch表示Leader的版本号,从0开始,Leader变更过1次,epoch+1

  2. offset对应于该epoch版本的Leader写⼊第⼀条消息的offset。因此假设有两对值:

    <0, 0>
    <1, 120>
    

则表示第⼀个Leader从位移0开始写⼊消息;共写了120条[0, 119];⽽第⼆个Leader版本号是1,从位移120处开始写⼊消息。

  1. Leader broker中会保存这样的⼀个缓存,并定期地写⼊到⼀个checkpoint⽂件中。
  2. 当Leader写Log时它会尝试更新整个缓存:如果这个Leader⾸次写消息,则会在缓存中增加⼀个条⽬;否则就不做更新。
  3. 每次副本变为Leader时会查询这部分缓存,获取出对应Leader版本的位移,则不会发⽣数据不⼀致和丢失的情况.
8.2 规避数据丢失

Kafka 的稳定性-LMLPHP

只需要知道每个副本都引⼊了新的状态来保存⾃⼰当leader时开始写⼊的第⼀条消息的offset以及leader版本。这样在恢复的时候完全使⽤这些信息⽽⾮HW来判断是否需要截断⽇志。

8.3 规避数据不一致

Kafka 的稳定性-LMLPHP
依靠Leader epoch的信息可以有效地规避数据不⼀致的问题。

五、消息重复的场景及解决方案

消息重复主要发⽣在以下三个阶段:

  1. 生产者阶段
  2. broker阶段
  3. 消费者阶段

1. 生产者阶段重复场景

1.1 根本原因

⽣产发送的消息没有收到正确的broke响应,导致producer重试。

producer发出⼀条消息,broke落盘以后因为⽹络等种种原因发送端得到⼀个发送失败的响应或者⽹络中断,然后producer收到⼀个可恢复的Exception重试消息导致消息重复。

1.2 重试过程

Kafka 的稳定性-LMLPHP
说明:

  1. new KafkaProducer() 后创建一个线程 KafkaThread 扫描RecordAccumulator中是否有消息
  2. 调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator
  3. 后台线程KafkaThread 扫描到RecordAccumulator中有消息后,将消息发送到Kafka集群
  4. 如果发送成功,那么返回成功
  5. 如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败结果;如果允许重试,把消息再保存到 RecordAccumulator中,等待后台线程KafkaThread 扫描再次发送
1.3 可恢复异常说明

异常是 RetriableException 类型或者 TransactionManager允许重试;RetriableException 类继承关系如下:
Kafka 的稳定性-LMLPHP

1.4 记录顺序问题

如果设置max.in.flight.requests.per.connection大于1 (默认5, 单个连接.上发送的未确认请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出了)。大于1可能会改变记录的顺序,因为如果将两个batch发送到单个分区,第一个batch处理失败并重试, 但是第二个batch处理成功,那么第二个batch处理中的记录可能先出现被消费。

设置max.in.flight.requests.per.connection为1,可能会影响吞吐量,可以解决单个生产者发送顺序问题。如果多个生产者,生产者1先发送一一个请求, 生产者2后发送请求,此时生产者1返回可恢复异常,重试一定次数成功了。虽然生产者1先发送消息,但生产者2发送的消息会被先消费。

2. 生产者发送重复解决方案

启动Kafka的幂等性

要启动Kafka的幂等性,设置enable.idempotence=true,以及ack=allretries>1

ack=0,不重试

可能会丢失消息,适用于吞吐量指标重要性高于数据丢失,如:日志收集

3. 生产者和broker阶段消息丢失场景

ack=0,不重试

生产者发送消息完毕,不管结果,如果发送失败也就丢失了

ack=1,Leader crash

生产者发送消息完毕,只等待Leader写入成功就返回了,Leader 分区丢失了,此时Follower没来得及同步,消息丢失

unclean.leader.election.enable 配置true

允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。 生产者发送异步消息,只等待Lead写入成功就返回,Leader分区丢失,此时ISR中没有Follower, Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失。

4. 解决生产者和broker阶段消息丢失

禁用unclean选举,ack=all

ack=all / -1,tries > 1,unclean.leader.election.enable:false

生产者发完消息,等待Follower同步完再返回, 如果异常则重试。副本的数量可能影响吞吐量,不超过5个,一般三个。

不允许unclean Leader选举。

配置:min.insync.replicas>1

当生产者将acks设置为all (或-1 )时,min.insync.replicas>1。指定确认消息写成功需要的最小副本数量。达不到这个最小值,生产者将引发-个异常(要么是NotEnoughReplicas, 要么是NotEnoughReplicasAfterAppend)。

当一起使用时,min.insync.replicasack允许执行更大的持久性保证。一个典型的场景 是创建一个复制因子为3的主题,设置min.insync复制到2个, 用 all 配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常。

失败的 offset 单独记录

生产者发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进行单独处理。

5. 消费者数据重复场景及解决方案

根本原因

数据消费完没及时提交 offset 到 broker

场景

消息消费端在消费过程中挂掉没有及时提交offset到broke,另一个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。

6. 解决方案

取消自动提交

每次消费完或者程序退出时手动提交。这可能也没法保证一条不重复

下游做幂等

一般是让 下游做幂等或者尽量每消费-条消息都记录offset, 对于少数严格的场景可能需要把offset或唯一ID (例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新。

六、__consumer_offsets

Zookeeper不适合⼤批量的频繁写⼊操作。

Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供⽤户查看consumer信息。

创建topic “tp_test_01”

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_test_01 --partitions 5 --replication-factor 1

使⽤kafka-console-producer.sh脚本⽣产消息

[root@node1 ~]# for i in `seq 100`; do echo "hello lagou $i" >> messages.txt; done
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt

由于默认没有指定key,所以根据round-robin⽅式,消息分布到不同的分区上。 (本例中⽣产了100条消息)

验证消息⽣产成功

[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
[root@node1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --brokerlist node1:9092 --topic tp_test_01 --time -1
tp_test_01:2:20
tp_test_01:4:20
tp_test_01:1:20
tp_test_01:3:20
tp_test_01:0:20
[root@node1 ~]#

结果输出表明100条消息全部⽣产成功!

创建⼀个console consumer group

[root@node1 ~]#kafka-console-consumer.sh --bootstrap-server node1:9092 --topic tp_test_01 --from-beginning

获取该consumer group的group id(后⾯需要根据该id查询它的位移信息)

[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

查询__consumer_offsets topic所有内容

注意:运⾏下⾯命令前先要在consumer.properties中设置exclude.internal.topics=false

[root@node1 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

计算指定consumer group在__consumer_offsets topic中分区信息

这时候就⽤到了第5步获取的group.id(本例中是console-consumer-49366)。Kafka会使⽤下⾯公式计算该group位移保存在__consumer_offsets的哪个分区上:

Math.abs(groupID.hashCode()) % numPartitions

对应的分区=Math.abs("console-consumer-49366".hashCode()) % 50 = 19,即__consumer_offsets的分区19保存了这个consumer group的位移信息。

获取指定consumer group的位移信息

[root@node1 ~]# kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 19 --broker-list node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

下⾯是输出结果:

...
[console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
...

上图可⻅,该consumer group果然保存在分区11上,且位移信息都是对的(这⾥的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这⾥的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每⼀⽇志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

06-14 12:48