微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python Redis Pub-Sub-多线程的模式?异步?

如何解决Python Redis Pub-Sub-多线程的模式?异步?

我有一个由 Redis pubsub 消息触发的函数。问题在于,这个加载视频并将其逐帧流式传输到 Redis函数被阻塞,因此后续消息无法通过。

解决这类问题最简单有效的模式是什么?

订阅

conn = redis.Redis(host="localhost",port="6379")
if not conn.ping():
        raise Exception('Redis unavailable')

pubsub = conn.pubsub()
pubsub.subscribe("Feed")
data = None
for message in pubsub.listen():
    logging.info("received pubsub message")
    logging.info(message)
    logging.info(message['type'])
    if message['type'] == "message":
        data = json.loads(message.get("data"))
        if data and data['source']:
            try:
                args.infile = data['source']
                loader = Video(infile=data.get("source"),fps=30.0)
                load(loader,conn,args)
            except error:
                logging.error("Error occurred",exc_info=True)

视频类:

class Video:
    def __init__(self,infile=0,fps=30.0):
        try: 
            self.isFile = not str(infile).isdecimal()
            print('video: self.isFile',self.isFile)
            self.ts = time.time()
            self.infile = infile
            self.cam = cv2.VideoCapture(self.infile)
            if not self.isFile:
                self.cam.set(cv2.CAP_PROP_FPS,fps)
                self.fps = fps
                # Todo: some cameras don't respect the fps directive
                self.cam.set(cv2.CAP_PROP_FRAME_WIDTH,800)
                self.cam.set(cv2.CAP_PROP_FRAME_HEIGHT,600)
            else:
                self.fps = self.cam.get(cv2.CAP_PROP_FPS)
                self.sma = SimpleMovingAverage(value=0.1,count=19)
        except error as error:
            # Output expected AssertionErrors.
            logging.error("Error occurred",exc_info=True)
 
    def __iter__(self):
        self.count = -1
        return self

    def __next__(self):
        try:
            self.count += 1
            if not self.fps:
                self.fps = 30.0
            # Respect FPS for files
            if self.isFile:
                delta = time.time() - self.ts
                self.sma.add(delta)
                time.sleep(max(0,(1 - self.sma.current*self.fps)/self.fps))
                self.ts = time.time()

            # Read image
            ret_val,img0 = self.cam.read()
            if not ret_val and self.isFile:
                self.cam.set(cv2.CAP_PROP_POS_FRAMES,0)
                ret_val,img0 = self.cam.read()
            assert ret_val,'Video Error'

            # Preprocess
            img = img0
            if not self.isFile:
                img = cv2.flip(img,1)

            return self.count,img
        except AssertionError as error:
            # Output expected AssertionErrors.
            redisLog("Error occurred",exc_info=True)
        except Exception as exception:
            # Output unexpected Exceptions.
            logging.exception("Exception occurred")

    def __len__(self):
        return 0

加载函数

def load(loader,args):
    try:
        for (count,img) in loader:
            _,data = cv2.imencode(args.fmt,img)
            msg = {
                'count': count,'image': data.tobytes()
            }
            _id = conn.xadd(args.output,msg,maxlen=args.maxlen)
            if args.verbose:
                print('frame: {} id: {}'.format(count,_id))
            if args.count is not None and count+1 == args.count:
                print('Stopping after {} frames.'.format(count))
                break
    except AssertionError:
        logging.error("Error occurred",exc_info=True)
        raise

解决方法

我最终使用了多处理。这有点痛苦,因为您只能将 Pickelable 参数传递给子进程,这意味着连接已断开。因此,我必须从子进程内部进行连接。

procs = []
logging.basicConfig(level=logging.DEBUG)

for message in pubsub.listen():
    logging.info("received pubsub message")
    logging.info(message)
    logging.info(message['type'])
    try:
        if message['type'] == "message":
            data = json.loads(message.get("data"))
            if data and data['source']:
                    for proc in procs:
                        if proc.is_alive():
                            proc.terminate()
                            proc.join(timeout=0)
                            procs.pop(0)
                    loaderProcess = multiprocessing.Process(target=load,args = (data.get("source"),args,))
                    procs.append(loaderProcess)
                    loaderProcess.start()
                    continue
    except Exception as e:
        logging.error("Error occurred",exc_info=True)

...

def load(source,args):
    try:
        conn = redis.Redis(host="localhost",port="6379")
        if not conn.ping():
            raise Exception('Redis unavailable')

        loader = Video(infile=source,fps=30.0)
        for (count,img) in loader:
            _,data = cv2.imencode(args.fmt,img)
            msg = {
                'count': count,'image': data.tobytes()
            }
            _id = conn.xadd(args.output,msg,maxlen=args.maxlen)
            if args.verbose:
                print('frame: {} id: {}'.format(count,_id))
            if args.count is not None and count+1 == args.count:
                print('Stopping after {} frames.'.format(count))
                break
    except AssertionError:
        logging.error("Error occurred",exc_info=True)
        raise

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。