消费进度监控3 种方法:

  1. 使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本
  2. 使用 Kafka Java Consumer API 编程
  3. 使用 Kafka 自带的 JMX 监控指标

Kafka 自带命令

kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。

$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值。

  • 示例: Kafka 集群的连接信息,即 localhost:9092。消费者组名:testgroup

它会按照消费者组订阅主题的分区进行展示,每个分区一行数据;其次,除了主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(即 LOG-END-OFFSET 列值)、该消费者组当前最新消费消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前两者的差值)、消费者实例 ID消费者连接 Broker 的主机名以及消费者的 CLIENT-ID 信息。

Kafka - 消费进度监控(Consumer Lag)-LMLPHP

Kafka Java Consumer API [ Kafka 2.0.0 ]

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    try (AdminClient client = AdminClient.create(props)) {
        ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
        try {
            Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                        entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // 处理中断异常
            // ...
            return Collections.emptyMap();
        } catch (ExecutionException e) {
            // 处理ExecutionException
            // ...
            return Collections.emptyMap();
        } catch (TimeoutException e) {
            throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
        }
    }
}

Kafka JMX 监控指标

Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标,里面有很多属性。和我们今天所讲内容相关的有两组属性:records-lag-max 和 records-lead-min它们分别表示此消费者在测试窗口时间内曾经达到的最大的 Lag 值和最小的 Lead 值。

Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理。

监控到 Lag 越来越大,消费者程序变得越来越慢了,至少是追不上生产者程序了.

Lead 越来越小,甚至是快接近于 0 了,消费者端要丢消息了

Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。倘若你的消费者程序足够慢,慢到它要消费的数据快被 Kafka 删除了,这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象。

Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。

01-30 12:06