如何解决Google PubSub Push SubscriptionTimeouts from GCS Bucket upload call in Cloud Run (Python)

我正在尝试开发一个图像处理管道,将视频上传一个 GCS 存储桶,将所有帧提取为 jpg 图像,然后将这些图像上传到另一个 GCS 存储桶。我正在使用 PubSub 推送订阅来触发云运行服务。不幸的是,该服务无法在推送订阅的 10 分钟最大请求响应超时时间内可靠地处理视频。我已经跟踪了这个问题,似乎将帧上传到 GCS 导致了瓶颈。这些视频平均包含大约 28000 帧(30FPS,长度约 15 分钟)。我认为这应该在提供的时间内是可能的。所有服务都在同一地区/地区。

有没有办法提高这些 GCS blob 上传的吞吐量?当我使用 gsutil 将视频 blob 从存储桶复制到另一个存储桶(在同一区域内)时,需要几秒钟的时间。

我尝试过增加/减少线程数、增加服务 cpu 数和增加服务内存数。我看不出任何变化。 writes over 1000/Sec 的 GCS 速率限制,但我认为我还没有接近这个限制。

我的服务复制 main.py 脚本作为 Google Cloud Run Vision Tutorial 的一部分。唯一的修改是在 video.py 中更改对我的处理例程的调用。我已经包括在帖子的底部video.py 运行实际处理。

Cloud Run 服务配备 1 个 cpu、512 MiB、15 分钟超时 Cloud PubSub 订阅(推送订阅) 10 分钟超时(最大值)


import os
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor

import cv2

from google.cloud import storage
from google.oauth2 import service_account

def upload(blob : storage.blob.Blob,buf : "numpy.ndarray"):

def process(data : dict):
    src_client = storage.Client()
    src_bucket = src_client.get_bucket(data["bucket"])
    src_blob = src_bucket.get_blob(data["name"])

    pathname = os.path.dirname(data["name"])
    basename,ext = os.path.splitext(os.path.basename(data["name"]))

    signing_creds = \

    url = src_blob.generate_signed_url(

    count = extract_frames(url,basename,pathname)

def extract_frames(
        signed_url : str,basename : str,pathname : str,dst_bucket_name : str = "extracted-frames"
    ) -> int:

    dst_client = storage.Client()
    dst_bucket = dst_client.get_bucket(dst_bucket_name)

    count = 0
    vid = cv2.VideoCapture(signed_url)

    with ThreadPoolExecutor() as executor:
        ret,frame = vid.read()

        while ret:
            enc_ret,buf = cv2.imencode(".jpg",frame)

            if not enc_ret:
                msg = f'Bad Encoding [Frame: {count:06}]'
                blob_name = f"{pathname}/{basename}-{count:06}.jpg"
                blob = dst_bucket.blob(blob_name)

            count += 1
            ret,frame = vid.read()

    return count


import base64
import json
import os

from flask import Flask,request

# import image
import video

app = Flask(__name__)

def index():
    envelope = request.get_json()
    if not envelope:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}",400

    if not isinstance(envelope,dict) or "message" not in envelope:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}",400

    # Decode the Pub/Sub message.
    pubsub_message = envelope["message"]

    if isinstance(pubsub_message,dict) and "data" in pubsub_message:
            data = json.loads(base64.b64decode(pubsub_message["data"]).decode())

        except Exception as e:
            msg = (
                "Invalid Pub/Sub message: "
                "data property is not valid base64 encoded JSON"
            print(f"error: {e}")
            return f"Bad Request: {msg}",400

        # Validate the message is a Cloud Storage event.
        if not data["name"] or not data["bucket"]:
            msg = (
                "Invalid Cloud Storage notification: "
                "expected name and bucket properties"
            print(f"error: {msg}")
            return f"Bad Request: {msg}",400

            # image.blur_offensive_images(data)
            return ("",204)

        except Exception as e:
            print(f"error: {e}")
            return ("",500)

    return ("",500)


您的情况下的模式,如果您有更长的时间,则可以扩展。未来的视频,首先是将视频分成较小的序列(比如 3 或 5 分钟的视频),然后将这些序列存储在 Cloud Storage 中。


与并行化的想法相同,您还可以想象利用 Cloud Run 的多 CPU 容量在同一实例中并行处理视频块。

