1.概述

Kafka是一个分布表示实时数据流平台,可独立部署在单台服务器上,也可部署在多台服务器上构成集群。它提供了发布与订阅的功能,用户可以发送数据到Kafka集群中,也可以从Kafka集群中读取数据。之前在Kafka 2.8.0版本时,Kafka社区提出了KRaft协议的概念,现在社区发布了Kafka 3.0,里面涉及优化和新增了很多功能,其中就包含KRaft协议的改机。今天,笔者就给大家介绍一下Kafka 3.0新增了哪些特性以及优化了哪些功能。

2.内容

在 Kafka 3.0 中包含了许多重要的新功能,其中比较显著的变化如下所示:

  • 弃用对Java 8 和Scala 2.12 的支持;
  • Kafka Raft 支持元数据主题的快照以及自动管理仲裁中的其他改进;
  • 默认情况下为Kafka 生产者提供更加强大的交付保证;
  • 弃用消息格式 v0 和 v1;
  • OffsetFetch 和 FindCoordinator 请求中的优化;
  • 更灵活的 Mirror Maker 2 配置和 Mirror Maker 1 的弃用;
  • 能够在 Kafka Connect 中的单个调用中重新其中连接器的任务;
  • 现在默认启用连接器日志上下文和连接器客户单覆盖;
  • Kafka Streams 中时间戳同步的增强语义;
  • 改进了 Stream 和 TaskId 的公共 API;
  • Kafka 中的默认 serde 变为 null。

2.1 关于升级到 Kafka 3.0

在Kafka 3.0中,社区对于Zookeeper的版本已经升级到3.6.3了,其中我们可以预览 KRaft 模式,但是无法从 2.8 或者更早的版本升级到该模式。许多实现依赖 jar 现在在运行时类路劲中可用,而不是在编译和运行时类路劲中。升级后的编译错误可以通过显示添加缺少的依赖 jar 或更新应用程序以不使用内部类来修复。

消费者配置的默认值 session.timeout.ms 从10 秒增加到了45 秒,而Broker配置 log.message.format.version 和 Topic 配置 message.format.version 已经被启用。两种配置的值始终假定为 3.0 或者更高,通过 inter.broker.protocol.version 来配置。如果设置了 log.message.format.version 或者 message.format.version 建议在升级到 3.0的同时清理掉这两个属性,同时设置 inter.broker.protocol.version 值为 3.0 。

Streams API 删除了在 2.5.0 或者更早版本中弃用的所有弃用 API,Kafka Streams 不再对“connect:json”模块有编译时的依赖,依赖此传递依赖项的项目必须明确声明它。

现在,通过指定的自定义主体构建起实现 principal.builder.class 现在必须实现 KafkaPrincipalSerde 接口以允许Broker 之间的转发。另外,一些过时的类,方法和工具以及从clients、connect、core、和tools模块进行了删除。

该Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)方法已被弃用。请使用 Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)来替换,ConsumerGroupMetadata 可以通过检索KafkaConsumer#groupMetadata()更强大的语义。需要注意的是,完整的消费者组元数据集只有 Brokers 或 2.5 或更高版本才能支持,因此你必须升级你的 Kafka 集群以获得更强的语义。否则,你可以通过new ConsumerGroupMetadata(consumerGroupId)与较老版本的Broker进行交互。

连接器中 internal.key.converter 和 internal.value.converter 属性已被完全删除。自版本 2.0.0 起,不推荐使用这些 Connect 工作器属性。现在被硬编码为使用 schemas.enable 设置为的 JSON 转换器false。如果你的集群一直在使用不同的内部键或值转换器,你可以按照官网文档中概述的迁移步骤,将你的 Connect 集群安全地升级到 3.0。 基于 Connect 的 MirrorMaker (MM2) 包括对支持的更改IdentityReplicationPolicy,无需重命名 Topic 即可启用复制。DefaultReplicationPolicy默认情况下仍然使用现有的,但可以通过 replication.policy 配置属性启用身份复制 。这对于从旧版 MirrorMaker (MM1) 迁移的用户,或者对于不希望 Topic 重命名的具有简单单向复制拓扑的用例特别有用。请注意IdentityReplicationPolicy与 DefaultReplicationPolicy 不同,无法根据 Topic 名称阻止复制循环,因此在构建复制拓扑时要注意避免循环。

2.1.1 目的

虽然 internal.key.converter 和 internal.value.converter 中 Connect 工作器属性,以及以这些名称为前缀的所有属性都已弃用,但是有时候用户仍会尝试使用这些属性进行调试,在与未弃用的Key 和 Value转化器相关的属性意外混淆后,或者只是对其进行盲目的配置后,进行调试。这些实验的结果可能会产生不好的后果,配置了新内保转换器却无法读取具有较旧内部转换器的内保 Topic 数据,这最多会导致偏移量和连机器配置的丢失。

以下连接属性会将被删除:

  • internal.key.converter
  • internal.value.converter 
  • internal.key.converter.   # 以工作器内部密钥转换器为前缀的属性
  • internal.value.converter.   # 以工作线程的内部值转换器为前缀的属性

Connect 的行为就好像上面没有提供一样。具体来说,对于它的键和值转换器,它将使用开箱即用的 JsonConverter,配置为 schemas.enable 属性值为 false 。

2.1.2 升级步骤

运行未使用JsonConverter 并对 schemas.enable 设置 false 的 Connect 集群用户,可以按照以下步骤将其 Connect 集群升级到 3.0:

  1. 停止集群上的所有工作线程
  2. 对于每个内部主题(配置、偏移量和状态):
    1. 创建一个新主题来代替现有主题
    2. 对于现有主题中的每条消息:
      1. 使用 Connect 集群的旧内部键和值转换器反序列化消息的键和值
      2. 使用 禁用模式的JSON 转换器序列化消息的键和值(通过将schemas.enable属性设置为false)
      3. 用新的键和值向新的内部主题写一条消息
  3. 重新配置每个 Connect worker 以使用步骤 2 中新创建的内部主题
  4. 启动集群上的所有worker

2.2 新功能

在本次 Kafka 3.0  版本中新增了以下功能:

  • 添加了InsertHeader 和 DropHeader 连接转换 
  • 在 KRaft 模式中实现 createPartitions
  • 如果分区从 fetcher 中删除,副本 fetcher 不应在发散时期更新分区状态

2.2.1 添加 InsertHeader 和 DropHeader

之前在核心 Kafka 产品中引入了 Headers,在 Kafka Connect Framework 中公开它们将是有利的。Kafka 的 Header 是带有二进制值的简单名称,而 Connect API 已经有一个非常有用的层来处理不同类型的数据。Connect 的 Header 支持应该使用像 Kafka 这样的字符串名称,但使用与 Connect 记录键和值相同的类型来表示值。这将提供与 Connect 框架的其余部分的一致性,并使连接器和转换能够轻松地访问、修改和创建记录上的 Header。

Kafka 将 Header 定义为具有字符串名称和二进制值,但 Connect 将使用用于记录键和值的相同机制来表示 Header 值。每个 Header 值可能有一个对应的 Schema,允许连接器和转换以一致的方式处理 Header 值、记录键和记录值。Connect 将定义一种 HeaderConverter 机制以类似于Converter框架的方式序列化和反序列化标头值 ,这样现有的 Converter实现也可以实现 HeaderConverter. 由于来自不同供应商的连接器和转换可能被组合到单个管道中,因此不同的连接器和转换可以轻松地将 Header 值从原始形式转换为连接器和/或转换期望的类型,这一点很重要。

注意:
为了简洁和清晰,显示的代码不包括 JavaDoc,但提议的更改确实包括所有公共 API 和方法的 JavaDoc。
09-25 22:52