微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python3.6 有 erlang 风格的消息队列吗?

如何解决Python3.6 有 erlang 风格的消息队列吗?

我正在寻找可用于在 multiprocess.Processes 之间进行通信的 python 3.6(这个确切版本)的消息队列实现,具体来说,它应该是多生产者、单消费者、具有优先接收权的 fifo应用程序特定类型的消息(例如,如果队列中间有一条系统消息(用 erlang 术语),而队列头部有一条普通消息,则下一次接收应返回系统消息而不是普通消息)

但我怀疑会有这样的库,所以问题变成了,是否有任何 stdlib 或第三方库给了我一大块共享内存或更好的列表,这样我就可以读写支持的缓冲区,但是内存/列表并使用诸如 mp.Lock?

之类的东西来保护顺序

multiprocessing.Manager 使用 tcp,并启动一个新进程

解决方法

我对 Erlang 不太熟悉,但是,根据您描述需求的方式,我认为您可以采用使用 multiprocessing.Queue 并在阅读消息之前对消息进行排序的方法。

这个想法是为每个进程都有一个 multiprocessing.Queue(FIFO 消息队列)。当进程 A 向进程 B 发送消息时,进程 A 将其消息连同消息的优先级放入进程 B 的消息队列中。当进程读取其消息时,它会将消息从 FIFO 队列传输到列表中,然后在处理消息之前对列表进行排序。消息首先按优先级排序,然后是它们到达消息队列的时间。

这是在 Windows 上使用 Python 3.6 测试过的示例。

from multiprocessing import Process,Queue
import queue
import time

def handle_messages(process_id,message_queue):
    # Keep track of the message number to ensure messages with the same priority
    # are read in a FIFO fashion.
    message_no = 0
    messages = []
    while True:
        try:
            priority,contents = message_queue.get_nowait()
            messages.append((priority,message_no,contents))
            message_no+=1
        except queue.Empty:
            break
    # Handle messages in correct order.
    for message in sorted(messages):
        print("{}: {}".format(process_id,message[-1]))
    
def send_message_with_priority(destination_queue,message,priority):
    # Send a message to a destination queue with a specified priority.
    destination_queue.put((-priority,message))

def process_0(my_id,queues):
    while True:
        # Do work
        print("Doing work...")
        time.sleep(5)
        # Receive messages
        handle_messages(my_id,queues[my_id])
            
def process_1(my_id,queues):
    message_no = 0
    while True:
        # Do work
        time.sleep(1)
        # Receive messages
        handle_messages(my_id,queues[my_id])
        send_message_with_priority(queues[0],"This is message {} from process {}".format(message_no,my_id),1)
        message_no+=1
        
def process_2(my_id,queues):
    message_no = 0
    while True:
        # Do work
        time.sleep(3)
        # Receive messages
        handle_messages(my_id,"This is urgent message {} from process {}".format(message_no,2)
        message_no+=1

if __name__ == "__main__":
    qs = {i: Queue() for i in range(3)}
    processes = [Process(target=p,args=(i,qs)) for i,p in enumerate([process_0,process_1,process_2])]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。