如何解决Python Redis Pub-Sub-多线程的模式?异步?
我有一个由 Redis pubsub 消息触发的函数。问题在于,这个加载视频并将其逐帧流式传输到 Redis 的函数被阻塞,因此后续消息无法通过。
解决这类问题最简单有效的模式是什么?
订阅:
conn = redis.Redis(host="localhost",port="6379")
if not conn.ping():
raise Exception('Redis unavailable')
pubsub = conn.pubsub()
pubsub.subscribe("Feed")
data = None
for message in pubsub.listen():
logging.info("received pubsub message")
logging.info(message)
logging.info(message['type'])
if message['type'] == "message":
data = json.loads(message.get("data"))
if data and data['source']:
try:
args.infile = data['source']
loader = Video(infile=data.get("source"),fps=30.0)
load(loader,conn,args)
except error:
logging.error("Error occurred",exc_info=True)
视频类:
class Video:
def __init__(self,infile=0,fps=30.0):
try:
self.isFile = not str(infile).isdecimal()
print('video: self.isFile',self.isFile)
self.ts = time.time()
self.infile = infile
self.cam = cv2.VideoCapture(self.infile)
if not self.isFile:
self.cam.set(cv2.CAP_PROP_FPS,fps)
self.fps = fps
# Todo: some cameras don't respect the fps directive
self.cam.set(cv2.CAP_PROP_FRAME_WIDTH,800)
self.cam.set(cv2.CAP_PROP_FRAME_HEIGHT,600)
else:
self.fps = self.cam.get(cv2.CAP_PROP_FPS)
self.sma = SimpleMovingAverage(value=0.1,count=19)
except error as error:
# Output expected AssertionErrors.
logging.error("Error occurred",exc_info=True)
def __iter__(self):
self.count = -1
return self
def __next__(self):
try:
self.count += 1
if not self.fps:
self.fps = 30.0
# Respect FPS for files
if self.isFile:
delta = time.time() - self.ts
self.sma.add(delta)
time.sleep(max(0,(1 - self.sma.current*self.fps)/self.fps))
self.ts = time.time()
# Read image
ret_val,img0 = self.cam.read()
if not ret_val and self.isFile:
self.cam.set(cv2.CAP_PROP_POS_FRAMES,0)
ret_val,img0 = self.cam.read()
assert ret_val,'Video Error'
# Preprocess
img = img0
if not self.isFile:
img = cv2.flip(img,1)
return self.count,img
except AssertionError as error:
# Output expected AssertionErrors.
redisLog("Error occurred",exc_info=True)
except Exception as exception:
# Output unexpected Exceptions.
logging.exception("Exception occurred")
def __len__(self):
return 0
加载函数:
def load(loader,args):
try:
for (count,img) in loader:
_,data = cv2.imencode(args.fmt,img)
msg = {
'count': count,'image': data.tobytes()
}
_id = conn.xadd(args.output,msg,maxlen=args.maxlen)
if args.verbose:
print('frame: {} id: {}'.format(count,_id))
if args.count is not None and count+1 == args.count:
print('Stopping after {} frames.'.format(count))
break
except AssertionError:
logging.error("Error occurred",exc_info=True)
raise
解决方法
我最终使用了多处理。这有点痛苦,因为您只能将 Pickelable 参数传递给子进程,这意味着连接已断开。因此,我必须从子进程内部进行连接。
procs = []
logging.basicConfig(level=logging.DEBUG)
for message in pubsub.listen():
logging.info("received pubsub message")
logging.info(message)
logging.info(message['type'])
try:
if message['type'] == "message":
data = json.loads(message.get("data"))
if data and data['source']:
for proc in procs:
if proc.is_alive():
proc.terminate()
proc.join(timeout=0)
procs.pop(0)
loaderProcess = multiprocessing.Process(target=load,args = (data.get("source"),args,))
procs.append(loaderProcess)
loaderProcess.start()
continue
except Exception as e:
logging.error("Error occurred",exc_info=True)
...
def load(source,args):
try:
conn = redis.Redis(host="localhost",port="6379")
if not conn.ping():
raise Exception('Redis unavailable')
loader = Video(infile=source,fps=30.0)
for (count,img) in loader:
_,data = cv2.imencode(args.fmt,img)
msg = {
'count': count,'image': data.tobytes()
}
_id = conn.xadd(args.output,msg,maxlen=args.maxlen)
if args.verbose:
print('frame: {} id: {}'.format(count,_id))
if args.count is not None and count+1 == args.count:
print('Stopping after {} frames.'.format(count))
break
except AssertionError:
logging.error("Error occurred",exc_info=True)
raise
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。