微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在kafka消费者中读取和处理高优先级消息?

如何解决如何在kafka消费者中读取和处理高优先级消息?

有没有办法先处理高优先级的消息?

我尝试创建三个主题“高”、“中”和“低”,并使用一个消费者订阅了所有三个主题,如果“高”主题中有未处理的消息,它将暂停另外两个主题。有没有更好的方式来实现消息优先级?

我尝试使用下面给出的逻辑。

topics = ['high','medium','low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'],0)
medium_topic_partition = TopicPartition(priority['medium'],0)
low_topic_partition = TopicPartition(priority['low'],0)

while True:

    messages = consumer.poll() 
    high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
    medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
    low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)

    if high_priotity_unprocessed_msg >0:  
     consumer.pause(medium_topic_partition)
            consumer.pause(low_topic_partition)

        else:
            consumer.resume(medium_topic_partition)

            if medium_priotity_unprocessed_msg >0:
                consumer.pause(low_topic_partition)
            else:
                consumer.resume(low_topic_partition)
        if messages:
            process(messages)

解决方法

您可以评估的一个选项基本上只是在更高优先级的消息上具有更多的并行性...

例如:

Topic1 (Priority Low):    1 partitions
Topic2 (Priority medium): 5 partitions
Topic3 (Priority High):  20 partitions

然后基本上有:

  • 1 个消费者从 topic1
  • 获取数据
  • 来自 topic2
  • 的 5 个消费者
  • 来自 topic3
  • 的 20 个消费者

?现在,我建议您最简单的方法是编写一次代码……但是将“主题名称”的配置外部化……然后将其扩展(当然使用容器) ...请参考这篇阅读:

例如,代码可以很简单:

SuperAwesomeAppBinaryCode:

topic = %MY_TOPIC_NAME_INJECTED_BY_ENV_VAR%
consumer.subscribe(topic)

while True:

    messages = consumer.poll() 
    if messages:
        process(messages)

现在,如果我们部署了该代码,比如 K8s,您可以有 3 个不同的部署,运行相同的代码,但为每种情况注入正确的主题,例如:

低优先级消息

apiVersion: apps/v1
kind: Deployment
metadata:
  name: LowPriorityProcessor
  labels:
    app: LowPriorityProcessor
spec:
  replicas: 1
  selector:
    matchLabels:
      app: LowPriorityProcessor
  template:
    metadata:
      labels:
        app: LowPriorityProcessor
    spec:
      containers:
      - name: LowPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic1
        ports:
        - containerPort: 80

中优先级消息

apiVersion: apps/v1
kind: Deployment
metadata:
  name: MediumPriorityProcessor
  labels:
    app: MediumPriorityProcessor
spec:
  replicas: 5
  selector:
    matchLabels:
      app: MediumPriorityProcessor
  template:
    metadata:
      labels:
        app: MediumPriorityProcessor
    spec:
      containers:
      - name: MediumPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic2
        ports:
        - containerPort: 80

高优先级消息

apiVersion: apps/v1
kind: Deployment
metadata:
  name: HighPriorityProcessor
  labels:
    app: HighPriorityProcessor
spec:
  replicas: 20
  selector:
    matchLabels:
      app: HighPriorityProcessor
  template:
    metadata:
      labels:
        app: HighPriorityProcessor
    spec:
      containers:
      - name: HighPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic3
        ports:
        - containerPort: 80

然后让并行性发挥它的魔力 ? 如果仔细检查,从一个“k8s 部署”到另一个“k8s 部署”的唯一变化是主题和副本数量。

注意事项:

  • 你可以在没有 K8s 的情况下实现这一点......使用 Docker Swarm 甚至只是 docker-compose 或手动运行实例?‍♂️,但是你为什么要重新发明轮子,但肯定在某些边缘情况下,没有太多选择...
  • 可以在here
  • 找到有关此主题的精彩阅读

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。