如何解决基于队列的通信:发送返回队列是个好主意吗? 共享内存类型
def client(worker_queue,message):
answer_queue = queue.Queue(maxsize=1)
worker_queue.put((answer_queue,message))
result = answer_queue.get(timeout=10)
def worker():
while True:
answer_queue,message = worker_queue.get()
result = do_someting_with(message)
answer_queue.put(result)
worker_queue.task_done()
(primitiv worker 只是一个例子。在其他情况下,可能需要在多个回调之间传递“answer_queue”)
这是个好主意,还是会遇到问题(例如内存管理)?
有没有更好的方法来做到这一点?
我知道 asyncio
有像 futures
这样的东西来处理这样的问题,但目前我正在寻找(也)适用于多线程的东西。
解决方法
Python 使用内存地址来访问类变量及其方法。使用队列只是确保每个工作人员都有一个唯一的内存地址位置来放置其答案。
确保,如果变量仍然是局部变量,当您不再需要该变量时,重新分配它,以便垃圾回收可以释放队列正在使用的内存,或者使用 del
关键字清除内存以手动执行,例如 del answer_queue
。
您可以使用任何数据类型来完成在 worker 和 client 之间传输数据的作用,因为所有类方法都是通过内存地址访问的;然而,最常见的数据传输方式是:
使用队列
queue.Queue()
已经非常优化并且非常通用,因此它是一种在客户端和工作器之间执行通信的可靠方式。
import queue
import datetime
from threading import Thread
def queue_client(worker_queue,message):
answer_queue = queue.Queue(maxsize=1)
worker_queue.put((answer_queue,message))
result = answer_queue.get(timeout=60)
print('Bytes received from worker:',len(result))
def queue_worker(worker_queue):
answer_queue,message = worker_queue.get()
do_something = lambda x: x[::-1]
result = do_something(message)
answer_queue.put(result)
if __name__ == '__main__':
large_message = 'X' * (2<<30) #2GB
worker_queue = queue.Queue()
client = Thread(target=queue_client,args=(worker_queue,large_message,))
worker = Thread(target=queue_worker,))
start_time = datetime.datetime.now()
client.start()
worker.start()
client.join()
worker.join()
dt = datetime.datetime.now() - start_time
print('Time elapsed using Queue:',dt.total_seconds()) #~1.47 secs on my machine
共享内存类型
由于在多线程时可以直接访问变量地址的内存,所以可以使用共享内存;这通常是访问数据的最快方式,但您需要某种类型的数据容器对象。
一个非常基本的示例(可以将工作人员生成、回答提交等添加到管理器中):
import datetime
from threading import Thread
class DataManager:
def __init__(self,):
self.tasks = {
#worker_id: message
}
self.answers = {
#worker_id: answer
}
def shared_client(datamanager,worker_id,message):
datamanager.tasks[worker_id] = message
while not datamanager.answers.get(worker_id,None): #Wait for answer
pass
result = datamanager.answers[worker_id]
print('Bytes received from worker:',len(result))
def shared_worker(data_manager,worker_id):
while not data_manager.tasks.get(worker_id,None): #Wait for task to get assigned
pass
message = data_manager.tasks[worker_id]
do_something = lambda x: x[::-1]
result = do_something(message)
data_manager.answers[worker_id] = result
if __name__ == '__main__':
large_message = 'X' * (2<<30) #2GB
data_manager = DataManager()
worker_id = 0
client = Thread(
target=shared_client,args=(data_manager,)
)
worker = Thread(target=shared_worker,worker_id))
start_time = datetime.datetime.now()
client.start()
worker.start()
client.join()
worker.join()
dt = datetime.datetime.now() - start_time
print('Time elapsed using Shared Memory:',dt.total_seconds()) #~1.44 secs on my machine
如果您使用诸如 Multiprocessing.Value 和 Multiprocessing.Array (link) 之类的共享内存类型/ctypes,可能会有更优雅(甚至可能更快)的解决方案。
管道
管道作为一种连接,一端在侦听,另一端在发送数据。
重要的是,当发送大于 ~32MB 的数据包时,这比队列更有利,同时多处理。此外,它是可序列化的,不像 queue.Queue()
.
使用管道的示例:
import datetime
from threading import Thread
from multiprocessing import Pipe
def client(conn,message):
conn.send(message) #Send message to worker
result = conn.recv() #Receive result
print('Bytes received from worker:',len(result))
def worker(conn):
message = conn.recv() #Get message from client
do_something = lambda x: x[::-1]
result = do_something(message)
conn.send(result) #Send result to client
if __name__ == '__main__':
large_message = 'X' * (2<<30) #2GB
client_conn,worker_conn = Pipe(duplex=True) #Bidirectional pipe
client_process = Thread(target=client,args=(client_conn,large_message))
worker_process = Thread(target=worker,args=(worker_conn,))
start_time = datetime.datetime.now()
client_process.start()
worker_process.start()
client_process.join()
worker_process.join()
dt = datetime.datetime.now() - start_time
print('Time elapsed using Pipe:',dt.total_seconds()) #~9.07 secs on my machine
代理
代理在客户端上注册函数,并允许通过调用底层公开函数的工作线程来执行它们。当您的工作人员与您的客户端在不同的计算机上时,这会比较棘手,但对于集群计算来说是必要的。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。