使用方式

KafkaProducer 发送消息主要有以下 3 种方式:

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

    // 发送并忘记(fire-and-forget)
    producer.send(record);

    // 同步发送
    Future<RecordMetadata> future = producer.send(record);
    RecordMetadata metadata = future.get();

    // 异步发送
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {

        }
    });

    producer.close();

具体的发送流程可以参考 KafkaProducer发送流程简析

KafkaProducer 是线程安全的,多个线程可以共享同一个 KafkaProducer 对象。

配置解析

client.id

 该参数可以是任意的字符串,broker 会用它来识别消息的来源,会在日志和监控指标里展示。

bootstrap.servers

 该属性指定 broker 的地址列表。
 清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。
 不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

key.serializer & value.serializer

 这两个属性必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类。
 生产者会使用这个类把键值对象序列化成字节数组。

receive.buffer.bytes & send.buffer.bytes

 设置 socket 读写数据时用到的 TCP 缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。
 当生产者或消费者与 broker 处于不同的机房时,可以适当增大这些值

buffer.memory

 设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
 如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
 此时KafkaProducer.send()会阻塞等待内存释放,等待时间超过 max.block.ms 后会抛出超时异常。

compression.type

 该参数指定了消息被发送给 broker 之前,使用哪一种压缩算法(snappygziplz4)进行压缩。
 使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

batch.size

 该参数指定了一个批次可以使用的内存字节数(而不是消息个数)。
 消息批次ProducerBatch包含了一组将要发送至同个分区的消息,当批次被填满,批次里的所有消息会被立即发送出去。

 不过生产者并不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也可能被发送。
 所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。
 但如果设置得太小,生产者会频繁地发送消息,会增加一些额外的网络开销。

linger.ms

 该参数指定了生产者在发送批次之前等待的时间。
 生产者会在批次填满或等待时间达到 linger.ms 时把批次发送出去。
 设置linger.ms>0会增加延迟,但也会提升吞吐量(一次性发送更多的消息,每个消息的开销就变小了)。

acks

 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
 这个参数决定令消息丢失的可能性:

  • acks=0 生产者发出消息后不等待来自服务器的响应
    如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。
    不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

  • acks=1 只要集群的 leader 节点收到消息,生产者就会收到一个来自服务器的成功响应
    如果消息无法到达 leader 节点(比如:leader节点崩溃,新的 leader 还没有被选举出来),生产者会收到一个错误响应。
    为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新 leader,消息还是会丢失。

    这个时候的吞吐量取决于使用的是同步发送还是异步发送:

    • 发送端阻塞等待服务器的响应(通过调用 Future.get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)
    • 发送端使用回调可以缓解延迟问题,不过吞吐量仍受在途消息数量的限制(比如:生产者在收到服务器响应之前可以发送多少个消息)
  • acks=all 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
    这种模式是最安全的,就算有服务器发生崩溃,数据也不会丢失。
    不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。

retries

 该参数决定了生产者可以重发消息的次数(每次重试之间等待 retry.backoff.ms)。
 服务器返回临时性的错误(比如:分区找不到 leader)时,生产者会自动重试,没必要在代码逻辑里处理可重试的错误。
 作为开发者,只需要处理那些不可重试的错误(比如:消息字节数超过单个发送批次上限)或重试次数超出上限的情况即可。

max.in.flight.requests.per.connection

 该参数指定生产者,最多可以发送未响应在途消息批次数量。
 在途消息批次越多,会占用更多的内存,不过也会提升吞吐量。

 当retries > 0max.in.flight.requests.per.connection > 1时,可能出现消息乱序。
 如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。
 如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。

 一般不建议设置retries=0,而是令max.in.flight.requests.per.connection = 1来保证消息顺序。
 在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker,即使发生重试消息也不会乱序。
 不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

高级特性

幂等

当 broker 失效时生产者可能会自动重试,导致一条消息被重复写入多次。
为了避免这种情况,Kafka 在生产者端提供来幂等保证:同一条消息被生产者发送多次,但在 broker端这条消息只会被写入日志一次

在发送端设置 enable.idempotence = true 可以开启幂等性,此时配置同时满足以下条件:

  • max.in.flight.requests.per.connection ≤ 5
  • retries > 0
  • acks = all

其工作机制如下:

  • producer 在初始化时必须分配一个 PIDproducer id该过程对用户来说是完全透明的)
  • 发送到 broker 端的每批消息都会被赋予一个单调递增的 SNsequence number用于消息去重(每个分区都有独立的序列号)
  • 接收到消息的 broker 会将批次的(PID, SN)信息一同持久化到对应的分区日志中(保证 leader 切换后去重仍然生效)

若重试导致 broker 接收到小于或等于已知最大序列号的消息,broker 会拒绝写入这些消息,从而保证每条消息也只会被保存在日志中一次。
由于每个 producer 实例都会被分配不同的 PID,该机制只能保证单个 producer 实例的幂等性,无法实现协同多个 producer 实现幂等。

事务

Kafka 事务可以实现 producer 对多个主题和分区的原子写入,并且保证 consumer 不会读取到未提交的数据。

Kafka 要求应用程序必须提供一个全局唯一的 TIDtransactional id

初始化时,producer 首先要向 broker 集群注册其 TID,broker 会根据给定的 TID 检查是否存在未完成的事务。

如果某个 producer 实例失效,该机制能够保证下一个拥有相同 TID 的实例首先完成之前未完成的事务。

此外,broker 还会为自动每个 producer 分配一个epoch用于隔离fencing out失效但仍存活的 producer:

当 producer 参与事务时,broker 会检查是否存在相同的 TID 且 epoch 更大的活跃 producer。

如果存在,则认为当前 producer 是一个僵尸实例zombie instance并拒绝为其提供服务,防止其破坏事务的完整性。

下面是两个常见的应用场景:

实现跨主题原子写入

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty("enable.idempotence", "true"); // 开启幂等
    properties.setProperty("transactional.id", "my-transaction-id"); // 设置事务ID

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    ProducerRecord<String, String> record1 = new ProducerRecord<>("topic1", "key1", "value1");
    ProducerRecord<String, String> record2 = new ProducerRecord<>("topic2", "key2", "value2");
    ProducerRecord<String, String> record3 = new ProducerRecord<>("topic3", "key3", "value3");

    producer.initTransactions(); // 初始化事务(只需执行一次)
    try {
        producer.beginTransaction(); // 开始事务

        // 向多个不同的 topic 写入消息
        producer.send(record1);
        producer.send(record2);
        producer.send(record3);

        producer.commitTransaction(); // 提交事务
    } catch (ProducerFencedException e) {
        producer.close(); // 事务ID 已被占用
    } catch (KafkaException e) {
        producer.abortTransaction();
    }

实现 read-process-write 模式

    final String groupID = "my-group-id";

    Properties producerProps = new Properties();
    producerProps.setProperty("bootstrap.servers", "localhost:9092");
    producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.setProperty("enable.idempotence", "true"); // 开启幂等
    producerProps.setProperty("transactional.id", "my-transaction-id"); // 设置事务ID

    Properties consumerProps = new Properties();
    consumerProps.setProperty("bootstrap.servers", "localhost:9092");
    consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.setProperty("isolation.level","read_committed"); // 设置隔离级别
    consumerProps.setProperty("group.id", groupID); // 设置消费者组群ID

    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

    producer.initTransactions();
    consumer.subscribe(Collections.singletonList("ping"));

    while (true) {

        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); // 读取消息

        try {
            producer.beginTransaction(); // 开启事务

            // 处理消息(可以是任意业务场景)
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for(ConsumerRecord<String, String> record : records){
                offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())); // 记录消费偏移量
                producer.send(new ProducerRecord<>("pong", record.value())); // 发送消息
            }

            producer.sendOffsetsToTransaction(offsets, groupID); // 提交消费偏移量
            producer.commitTransaction(); // 事务提交
        } catch (ProducerFencedException e) {
            producer.close(); // 事务ID 已被占用
        } catch (Exception e){
            producer.abortTransaction(); // 回滚事务
        }
    }

参考资料

01-01 12:47