微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Google PubSub Push SubscriptionTimeouts from GCS Bucket upload call in Cloud Run (Python)

如何解决Google PubSub Push SubscriptionTimeouts from GCS Bucket upload call in Cloud Run (Python)

我正在尝试开发一个图像处理管道,将视频上传一个 GCS 存储桶,将所有帧提取为 jpg 图像,然后将这些图像上传到另一个 GCS 存储桶。我正在使用 PubSub 推送订阅来触发云运行服务。不幸的是,该服务无法在推送订阅的 10 分钟最大请求响应超时时间内可靠地处理视频。我已经跟踪了这个问题,似乎将帧上传到 GCS 导致了瓶颈。这些视频平均包含大约 28000 帧(30FPS,长度约 15 分钟)。我认为这应该在提供的时间内是可能的。所有服务都在同一地区/地区。

有没有办法提高这些 GCS blob 上传的吞吐量?当我使用 gsutil 将视频 blob 从存储桶复制到另一个存储桶(在同一区域内)时,需要几秒钟的时间。

我尝试过增加/减少线程数、增加服务 cpu 数和增加服务内存数。我看不出任何变化。 writes over 1000/Sec 的 GCS 速率限制,但我认为我还没有接近这个限制。

我的服务复制 main.py 脚本作为 Google Cloud Run Vision Tutorial 的一部分。唯一的修改是在 video.py 中更改对我的处理例程的调用。我已经包括在帖子的底部video.py 运行实际处理。

Cloud Run 服务配备 1 个 cpu、512 MiB、15 分钟超时 Cloud PubSub 订阅(推送订阅) 10 分钟超时(最大值)

video.py:

import os
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor

import cv2

from google.cloud import storage
from google.oauth2 import service_account


def upload(blob : storage.blob.Blob,buf : "numpy.ndarray"):
    blob.upload_from_string(buf.tobytes(),content_type="image/jpeg")


def process(data : dict):
    src_client = storage.Client()
    src_bucket = src_client.get_bucket(data["bucket"])
    src_blob = src_bucket.get_blob(data["name"])

    pathname = os.path.dirname(data["name"])
    basename,ext = os.path.splitext(os.path.basename(data["name"]))

    signing_creds = \
        service_account.Credentials.from_service_account_file("key.json")

    url = src_blob.generate_signed_url(
            credentials=signing_creds,version="v4",expiration=timedelta(minutes=20),method="GET"
        )

    count = extract_frames(url,basename,pathname)


def extract_frames(
        signed_url : str,basename : str,pathname : str,dst_bucket_name : str = "extracted-frames"
    ) -> int:

    dst_client = storage.Client()
    dst_bucket = dst_client.get_bucket(dst_bucket_name)

    count = 0
    vid = cv2.VideoCapture(signed_url)

    with ThreadPoolExecutor() as executor:
        ret,frame = vid.read()

        while ret:
            enc_ret,buf = cv2.imencode(".jpg",frame)

            if not enc_ret:
                msg = f'Bad Encoding [Frame: {count:06}]'
            else:
                blob_name = f"{pathname}/{basename}-{count:06}.jpg"
                blob = dst_bucket.blob(blob_name)
                executor.map(upload,(blob,buf))

            count += 1
            ret,frame = vid.read()

    vid.release()
    return count

ma​​in.py:

import base64
import json
import os

from flask import Flask,request

# import image
import video


app = Flask(__name__)


@app.route("/",methods=["POST"])
def index():
    envelope = request.get_json()
    if not envelope:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}",400

    if not isinstance(envelope,dict) or "message" not in envelope:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}",400

    # Decode the Pub/Sub message.
    pubsub_message = envelope["message"]

    if isinstance(pubsub_message,dict) and "data" in pubsub_message:
        try:
            data = json.loads(base64.b64decode(pubsub_message["data"]).decode())

        except Exception as e:
            msg = (
                "Invalid Pub/Sub message: "
                "data property is not valid base64 encoded JSON"
            )
            print(f"error: {e}")
            return f"Bad Request: {msg}",400

        # Validate the message is a Cloud Storage event.
        if not data["name"] or not data["bucket"]:
            msg = (
                "Invalid Cloud Storage notification: "
                "expected name and bucket properties"
            )
            print(f"error: {msg}")
            return f"Bad Request: {msg}",400

        try:
            # image.blur_offensive_images(data)
            video.process(data)
            return ("",204)

        except Exception as e:
            print(f"error: {e}")
            return ("",500)

    return ("",500)

解决方法

您的情况下的模式,如果您有更长的时间,则可以扩展。未来的视频,首先是将视频分成较小的序列(比如 3 或 5 分钟的视频),然后将这些序列存储在 Cloud Storage 中。

然后一个新事件运行一个新服务(或相同,根据您的设计),然后您提取所有图像。如果您需要一个唯一的来源,您可以将您的文件命名为具有相同的前缀,从而能够在后续过程中重复使用它。


与并行化的想法相同,您还可以想象利用 Cloud Run 的多 CPU 容量在同一实例中并行处理视频块。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。