• 文档内出现的 ${KAFKA_BROKERS} 表示 kafka 的连接地址,${ZOOKEEPER_CONNECT} 表示 zk 的连接地址,需要替换成自己的实际 ip 地址

创建一个演示 topic

kafka-topics.sh --create --zookeeper ${ZOOKEEPER_CONNECT} --replication-factor 1 --partitions 3 --topic test-topic-update
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic test-topic-update
Topic:test-topic-update PartitionCount:3        ReplicationFactor:1     Configs:segment.bytes=1073741824
        Topic: test-topic-update        Partition: 0    Leader: 5       Replicas: 5     Isr: 5
        Topic: test-topic-update        Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: test-topic-update        Partition: 2    Leader: 0       Replicas: 0     Isr: 0
  • 关于输出内容的概念
    • 分区(Partition)
      • 主题(Topic)在 Kafka 中的数据被分成一个或多个分区。每个分区是一个有序且持久化的消息日志。
      • 分区允许 Kafka 集群进行水平扩展,使多个消费者能够并行地处理主题的消息。
      • 消费者组中的每个消费者负责处理一个或多个分区的消息。
    • 领导者(Leader)
      • 每个分区都有一个领导者,领导者负责处理该分区的所有读写请求。
      • 生产者向领导者发送消息,消费者从领导者读取消息。
      • 领导者也负责维护分区的复制和同步。
    • 副本(Replicas)
      • 为了提高数据的冗余和可用性,每个分区可以有多个副本,包括一个领导者副本和零个或多个追随者副本。
      • 领导者副本处理写请求,追随者副本用于数据冗余和读请求。
    • 同步副本集(In-Sync Replicas,ISR)
      • 同步副本集是指在分区的所有副本中,与领导者副本保持同步的副本。
      • 领导者和同步副本集中的副本是可用于读取的,其他追随者副本可能会有一些延迟。

生产一些数据

  • 手动生产 300 条数据
kafka-verifiable-producer.sh --broker-list ${KAFKA_BROKERS} --topic test-topic-update --max-messages 300

使用消费者组消费数据

  • 消费者组不存在的情况下,没有返回被消费的数据,过两三秒之后,可以中断这个命令,然后使用下面的 --describe 来验证
kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS} --topic test-topic-update --group test-topic-update-group
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group
GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 2          100             100             0               -               -               -
test-topic-update-group test-topic-update 0          100             100             0               -               -               -
test-topic-update-group test-topic-update 1          100             100             0               -               -               -
GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 2          100             200             100             -               -               -
test-topic-update-group test-topic-update 0          100             200             100             -               -               -
test-topic-update-group test-topic-update 1          100             200             100             -               -               -

增加分区

  • 在增加分区的场景下比较方便,直接使用 --alter 就能实现,这里将原来的 3 分区改成 12 分区
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --alter --topic test-topic-update --partitions 12

无新数据产生,有旧数据未消费

kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic test-topic-update
Topic:test-topic-update PartitionCount:12       ReplicationFactor:1     Configs:segment.bytes=1073741824
        Topic: test-topic-update        Partition: 0    Leader: 5       Replicas: 5     Isr: 5
        Topic: test-topic-update        Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: test-topic-update        Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-update        Partition: 3    Leader: 3       Replicas: 3     Isr: 3
        Topic: test-topic-update        Partition: 4    Leader: 2       Replicas: 2     Isr: 2
        Topic: test-topic-update        Partition: 5    Leader: 4       Replicas: 4     Isr: 4
        Topic: test-topic-update        Partition: 6    Leader: 5       Replicas: 5     Isr: 5
        Topic: test-topic-update        Partition: 7    Leader: 1       Replicas: 1     Isr: 1
        Topic: test-topic-update        Partition: 8    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-update        Partition: 9    Leader: 3       Replicas: 3     Isr: 3
        Topic: test-topic-update        Partition: 10   Leader: 2       Replicas: 2     Isr: 2
        Topic: test-topic-update        Partition: 11   Leader: 4       Replicas: 4     Isr: 4
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group
GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 2          100             200             100             -               -               -
test-topic-update-group test-topic-update 0          100             200             100             -               -               -
test-topic-update-group test-topic-update 1          100             200             100             -               -               -
kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS} --topic test-topic-update --group test-topic-update-group --max-messages 300
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group
GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 0          100             100             0               -               -               -
test-topic-update-group test-topic-update 7          0               0               0               -               -               -
test-topic-update-group test-topic-update 5          0               0               0               -               -               -
test-topic-update-group test-topic-update 1          100             100             0               -               -               -
test-topic-update-group test-topic-update 6          0               0               0               -               -               -
test-topic-update-group test-topic-update 2          100             100             0               -               -               -
test-topic-update-group test-topic-update 3          0               0               0               -               -               -
test-topic-update-group test-topic-update 10         0               0               0               -               -               -
test-topic-update-group test-topic-update 9          0               0               0               -               -               -
test-topic-update-group test-topic-update 8          0               0               0               -               -               -
test-topic-update-group test-topic-update 11         0               0               0               -               -               -
test-topic-update-group test-topic-update 4          0               0               0               -               -               -
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --delete --topic test-topic-update

有新数据产生,有旧数据未消费

  • 同样,先扩容分区
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --alter --topic test-topic-update --partitions 12
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group
kafka-verifiable-producer.sh --broker-list ${KAFKA_BROKERS} --topic test-topic-update --max-messages 100
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group
GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 2          100             108             8               -               -               -
test-topic-update-group test-topic-update 0          100             108             8               -               -               -
test-topic-update-group test-topic-update 1          100             109             9               -               -               -
kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS} --topic test-topic-update --group test-topic-update-group --max-messages 100
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group
GROUP                   TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-topic-update-group test-topic-update 0          108             108             0               -               -               -
test-topic-update-group test-topic-update 7          9               9               0               -               -               -
test-topic-update-group test-topic-update 5          8               8               0               -               -               -
test-topic-update-group test-topic-update 1          109             109             0               -               -               -
test-topic-update-group test-topic-update 6          9               9               0               -               -               -
test-topic-update-group test-topic-update 2          108             108             0               -               -               -
test-topic-update-group test-topic-update 3          8               8               0               -               -               -
test-topic-update-group test-topic-update 10         8               8               0               -               -               -
test-topic-update-group test-topic-update 9          8               8               0               -               -               -
test-topic-update-group test-topic-update 8          8               8               0               -               -               -
test-topic-update-group test-topic-update 11         8               8               0               -               -               -
test-topic-update-group test-topic-update 4          9               9               0               -               -               -

增加副本

创建 json 文件

  • kafka-reassign-partitions.sh 是 Kafka 提供的命令行工具,用于重新分配主题分区的副本。这个工具允许你重新定义主题分区副本的分布,以实现负载均衡、故障恢复或集群扩展等目的
{"version":1, "partitions":[
{"topic":"test-topic-update","partition":0,"replicas":[5,4]},
{"topic":"test-topic-update","partition":1,"replicas":[1,3]},
{"topic":"test-topic-update","partition":2,"replicas":[0,2]},
{"topic":"test-topic-update","partition":3,"replicas":[3,0]},
{"topic":"test-topic-update","partition":4,"replicas":[2,1]},
{"topic":"test-topic-update","partition":5,"replicas":[4,5]},
{"topic":"test-topic-update","partition":6,"replicas":[5,4]},
{"topic":"test-topic-update","partition":7,"replicas":[1,3]},
{"topic":"test-topic-update","partition":8,"replicas":[0,2]},
{"topic":"test-topic-update","partition":9,"replicas":[3,0]},
{"topic":"test-topic-update","partition":10,"replicas":[2,1]},
{"topic":"test-topic-update","partition":11,"replicas":[4,5]}]
}

使用指定的 json 文件增加 topic 的副本数

kafka-reassign-partitions.sh --zookeeper ${ZOOKEEPER_CONNECT} --execute --reassignment-json-file add_rep_test_topic_update.json

使用指定的 json 文件查看 topic 的副本数增加的进度

kafka-reassign-partitions.sh --zookeeper ${ZOOKEEPER_CONNECT} --verify --reassignment-json-file add_rep_test_topic_update.json
Reassignment of partition test-topic-update-0 completed successfully
Reassignment of partition test-topic-update-7 completed successfully
Reassignment of partition test-topic-update-5 completed successfully
Reassignment of partition test-topic-update-1 completed successfully
Reassignment of partition test-topic-update-6 completed successfully
Reassignment of partition test-topic-update-2 completed successfully
Reassignment of partition test-topic-update-3 completed successfully
Reassignment of partition test-topic-update-10 completed successfully
Reassignment of partition test-topic-update-9 completed successfully
Reassignment of partition test-topic-update-8 completed successfully
Reassignment of partition test-topic-update-11 completed successfully
Reassignment of partition test-topic-update-4 completed successfully

查看 topic 情况

kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic test-topic-update
Topic:test-topic-update PartitionCount:12       ReplicationFactor:2     Configs:segment.bytes=1073741824
        Topic: test-topic-update        Partition: 0    Leader: 5       Replicas: 5,4   Isr: 5,4
        Topic: test-topic-update        Partition: 1    Leader: 1       Replicas: 1,3   Isr: 3,1
        Topic: test-topic-update        Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: test-topic-update        Partition: 3    Leader: 3       Replicas: 3,0   Isr: 3,0
        Topic: test-topic-update        Partition: 4    Leader: 1       Replicas: 2,1   Isr: 1,2
        Topic: test-topic-update        Partition: 5    Leader: 4       Replicas: 4,5   Isr: 5,4
        Topic: test-topic-update        Partition: 6    Leader: 5       Replicas: 5,4   Isr: 4,5
        Topic: test-topic-update        Partition: 7    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: test-topic-update        Partition: 8    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: test-topic-update        Partition: 9    Leader: 3       Replicas: 3,0   Isr: 0,3
        Topic: test-topic-update        Partition: 10   Leader: 1       Replicas: 2,1   Isr: 1,2
        Topic: test-topic-update        Partition: 11   Leader: 4       Replicas: 4,5   Isr: 4,5
09-01 11:22