本文介绍了如何阅读和处理卡夫卡消费者的高优先级消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法先处理优先级高的邮件?

我尝试创建了三个主题‘High’、‘Medium’和‘Low’,并使用一个使用者订阅了所有这三个主题,如果‘High’主题中有未处理的消息,它将暂停其他两个主题。有没有更好的实现消息优先级的方法?

我尝试使用下面给出的逻辑。

topics = ['high', 'medium', 'low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'], 0)
medium_topic_partition = TopicPartition(priority['medium'], 0)
low_topic_partition = TopicPartition(priority['low'], 0)

while True:

    messages = consumer.poll() 
    high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
    medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
    low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)

    if high_priotity_unprocessed_msg >0:  
     consumer.pause(medium_topic_partition)
            consumer.pause(low_topic_partition)

        else:
            consumer.resume(medium_topic_partition)

            if medium_priotity_unprocessed_msg >0:
                consumer.pause(low_topic_partition)
            else:
                consumer.resume(low_topic_partition)
        if messages:
            process(messages)

推荐答案

您可以评估的一个选项基本上就是在较高优先级的消息上拥有更多的并行度...

例如:

Topic1 (Priority Low):    1 partitions
Topic2 (Priority medium): 5 partitions
Topic3 (Priority High):  20 partitions

然后基本上有:

  • 1消费者从topic1获取数据
  • 来自topic2
  • 的5个消费者
  • 来自topic3
  • 的20位消费者

👆现在,我建议您完成此操作的最简单方法是只需编写一次代码...但将主题名称的配置外部化...然后只需将其放大(当然是使用容器)。请参考以下内容:

例如,代码可以简单到:

SuperAwesomeAppBinaryCode:

topic = %MY_TOPIC_NAME_INJECTED_BY_ENV_VAR%
consumer.subscribe(topic)

while True:

    messages = consumer.poll() 
    if messages:
        process(messages)

现在,如果我们将代码部署在K8上,您可以有3个不同的部署,运行相同的代码,但为每种情况注入正确的主题,例如:

低优先级邮件

apiVersion: apps/v1
kind: Deployment
metadata:
  name: LowPriorityProcessor
  labels:
    app: LowPriorityProcessor
spec:
  replicas: 1
  selector:
    matchLabels:
      app: LowPriorityProcessor
  template:
    metadata:
      labels:
        app: LowPriorityProcessor
    spec:
      containers:
      - name: LowPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic1
        ports:
        - containerPort: 80

中等优先级邮件

apiVersion: apps/v1
kind: Deployment
metadata:
  name: MediumPriorityProcessor
  labels:
    app: MediumPriorityProcessor
spec:
  replicas: 5
  selector:
    matchLabels:
      app: MediumPriorityProcessor
  template:
    metadata:
      labels:
        app: MediumPriorityProcessor
    spec:
      containers:
      - name: MediumPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic2
        ports:
        - containerPort: 80

高优先级邮件

apiVersion: apps/v1
kind: Deployment
metadata:
  name: HighPriorityProcessor
  labels:
    app: HighPriorityProcessor
spec:
  replicas: 20
  selector:
    matchLabels:
      app: HighPriorityProcessor
  template:
    metadata:
      labels:
        app: HighPriorityProcessor
    spec:
      containers:
      - name: HighPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic3
        ports:
        - containerPort: 80

然后让并行性发挥它的魔力😉如果您仔细检查,从一个K8s部署到另一个部署的唯一变化就是主题和副本数量。

备注:

  • 没有K8也能做到这一点...使用Docker🤷‍♂️,甚至只运行docker-compose或手动运行实例,但您为什么要重新发明轮子,但可以肯定的是,在某些边缘情况下,没有太多选项...
  • 有关此主题的好读物here

这篇关于如何阅读和处理卡夫卡消费者的高优先级消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-02 09:34