如何解决如何在kafka消费者中读取和处理高优先级消息?
有没有办法先处理高优先级的消息?
我尝试创建三个主题“高”、“中”和“低”,并使用一个消费者订阅了所有三个主题,如果“高”主题中有未处理的消息,它将暂停另外两个主题。有没有更好的方式来实现消息优先级?
我尝试使用下面给出的逻辑。
topics = ['high','medium','low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'],0)
medium_topic_partition = TopicPartition(priority['medium'],0)
low_topic_partition = TopicPartition(priority['low'],0)
while True:
messages = consumer.poll()
high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)
if high_priotity_unprocessed_msg >0:
consumer.pause(medium_topic_partition)
consumer.pause(low_topic_partition)
else:
consumer.resume(medium_topic_partition)
if medium_priotity_unprocessed_msg >0:
consumer.pause(low_topic_partition)
else:
consumer.resume(low_topic_partition)
if messages:
process(messages)
解决方法
您可以评估的一个选项基本上只是在更高优先级的消息上具有更多的并行性...
例如:
Topic1 (Priority Low): 1 partitions
Topic2 (Priority medium): 5 partitions
Topic3 (Priority High): 20 partitions
然后基本上有:
- 1 个消费者从 topic1 获取数据
- 来自 topic2 的 5 个消费者
- 来自 topic3 的 20 个消费者
?现在,我建议您最简单的方法是编写一次代码……但是将“主题名称”的配置外部化……然后将其扩展(当然使用容器) ...请参考这篇阅读:
例如,代码可以很简单:
SuperAwesomeAppBinaryCode:
topic = %MY_TOPIC_NAME_INJECTED_BY_ENV_VAR%
consumer.subscribe(topic)
while True:
messages = consumer.poll()
if messages:
process(messages)
现在,如果我们部署了该代码,比如 K8s,您可以有 3 个不同的部署,运行相同的代码,但为每种情况注入正确的主题,例如:
低优先级消息
apiVersion: apps/v1
kind: Deployment
metadata:
name: LowPriorityProcessor
labels:
app: LowPriorityProcessor
spec:
replicas: 1
selector:
matchLabels:
app: LowPriorityProcessor
template:
metadata:
labels:
app: LowPriorityProcessor
spec:
containers:
- name: LowPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic1
ports:
- containerPort: 80
中优先级消息
apiVersion: apps/v1
kind: Deployment
metadata:
name: MediumPriorityProcessor
labels:
app: MediumPriorityProcessor
spec:
replicas: 5
selector:
matchLabels:
app: MediumPriorityProcessor
template:
metadata:
labels:
app: MediumPriorityProcessor
spec:
containers:
- name: MediumPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic2
ports:
- containerPort: 80
高优先级消息
apiVersion: apps/v1
kind: Deployment
metadata:
name: HighPriorityProcessor
labels:
app: HighPriorityProcessor
spec:
replicas: 20
selector:
matchLabels:
app: HighPriorityProcessor
template:
metadata:
labels:
app: HighPriorityProcessor
spec:
containers:
- name: HighPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic3
ports:
- containerPort: 80
然后让并行性发挥它的魔力 ? 如果仔细检查,从一个“k8s 部署”到另一个“k8s 部署”的唯一变化是主题和副本数量。
注意事项:
- 你可以在没有 K8s 的情况下实现这一点......使用 Docker Swarm 甚至只是 docker-compose 或手动运行实例?♂️,但是你为什么要重新发明轮子,但肯定在某些边缘情况下,没有太多选择...
- 可以在here 找到有关此主题的精彩阅读
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。