背景

面试一个时,面试官问了一个问题,Kafka如何做到顺序消息。我回答只给Kafka的Topic创建一个分区,发送到该Topic的消息在Kafka中就是有序的。

面试官又问,如果Topic有多个分区呢?我回答消息发送者在发送消息的时候,指定分区进行发送,可以在发送消息时,每次指定相同的Key。但是面试官说这样做不到,我后面去查了资料,是可以做到的,我当时也没有反驳,毕竟我是一个求职者,跟面试官产生冲突也不太好。而且可能面试官也只知道其他的方式,不知道基于这种方式可以将消息发送到指定分区。

写个博客记录下。

有哪些方式可以将消息发送到指定分区?

当一个Topic中有多个分区的时候,如何将消息发送到指定分区呢?

方式一:基于key

下面的第二个参数,partitionA就是message的key。
Kafka会将具有相同的key的消息发送到同一分区,这是通过哈希函数实现的。
此外,Kafka会按照消息产生的顺序被一致性的接受,这就保证了同一分区内消息的顺序性。

kafkaProducer.send("order-topic", "partitionA", "critical data");
kafkaProducer.send("order-topic", "partitionA", "more critical data");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");

方式二:自定义分区器

Kafka允许自定义分区器,允许用户根据Topic、message key、message val、cluster等信息,自定义将消息发送到哪个分区。

自定义分区器:

public class CustomPartitioner implements Partitioner {
	// PREMIUM的意思是额外加价
    private static final int PREMIUM_PARTITION = 0;
    // NORMAL的意思是正常、标准
    private static final int NORMAL_PARTITION = 1;

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String customerType = extractCustomerType(key.toString());
        // 判断提取出的单词里面是否含有premium,如果有,则将其发送到第0号分区,否则发送到第1号分区。
        // 美团外卖有个加钱提前送达的服务,可以采用这种方式来实现。
        return "premium".equalsIgnoreCase(customerType) ? PREMIUM_PARTITION : NORMAL_PARTITION;
    }

    private String extractCustomerType(String key) {
        String[] parts = key.split("_");
        return parts.length > 1 ? parts[1] : "normal";
    }
}

在创建KafkaTemplate时,将自定义分区器设置到KafkaTemplate的属性里面去

// 在实际的SpringBoot项目中,可以将这个KafkaTemplate注入到Spring容器中
private KafkaTemplate<String, String> setProducerToUseCustomPartitioner() {
    Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
    DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);

    return new KafkaTemplate<>(producerFactory);
}

测试代码。
将高级客户订单和普通客户订单区分开来,进行不同的处理。

// 在实际的SpringBoot项目中,可以从Spring容器中获取这个KafkaTemplate
KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();
// 根据自定义分区器,当key为123_premium,则消息会被发送到第0号分区。
kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
// 根据自定义分区器,当key为456_normal,不含有premium,则消息会被发送到第1号分区。
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

方式三:直接指定分区序号

第二个参数0、1就是指定的分区号码,发送消息时,直接指定分区,将消息发送到指定的分区。

kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");

其他方式

在下面的参考文章当中,还看到了一个粘性分区器,但是没看太懂,而且不为大家所熟知,所以就没有太关注。
将数据发送到 Kafka 中的特定分区

参考

将数据发送到 Kafka 中的特定分区

05-03 10:29