如何解决通过Google Cloud功能将大量任务排队
我正在尝试通过每天调用一次外部API使用云功能来更新数据。
到目前为止,我有:
-
云计划已设置为调用功能1
-
功能1-遍历项目并为每个项目创建一个任务
-
任务-使用功能1提供的数据调用功能2
-
功能2-调用外部API来获取数据并更新数据库
问题在于每天有约2k个项目需要更新,并且云功能会在此之前超时,因此为什么我将它们放在队列中。但是,即使将项目放入队列中,对于云功能来说也花费了太长时间,因此在将其全部添加之前就已经超时了。
是否有一种简单的方法可以一次将多个任务批量添加到队列中?
是否通过失败,更好的解决方案?
全部用python编写
功能1的代码
def refresh(request):
for i in items:
# Create a client.
client = tasks_v2.CloudTasksClient()
# TODO(developer): Uncomment these lines and replace with your values.
project = 'my-project'
queue = 'refresh-queue'
location = 'europe-west2'
name = i['name'].replace(' ','')
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name={name}"
# Construct the fully qualified queue name.
parent = client.queue_path(project,location,queue)
# Construct the request body.
task = {
"http_request": { # Specify the type of request.
"http_method": tasks_v2.HttpMethod.GET,"url": url,# The full url path that the task will be sent to.
}
}
# Use the client to build and send the task.
response = client.create_task(request={"parent": parent,"task": task})
解决方法
回答您的问题“是否有一种简单的方法可以一次将多个任务批量添加到队列?”根据公众的documentation,最好的方法是实施双重注入模式。
为此,您将有一个新队列,您将在其中添加一个包含原始队列的多个任务的单个任务,然后在该队列的接收端,您将有一个获取此数据的服务。任务,并在第二个队列中为每个条目创建一个任务。
此外,我建议您将500/50/5模式用于冷队列。这将有助于任务队列和Cloud Function服务以安全比率提高。
,Chris32的答案是正确的,但是我在您的代码片段中注意到的一件事是您应该在for循环之外创建客户端。
def refresh(request):
# Create a client.
client = tasks_v2.CloudTasksClient()
# TODO(developer): Uncomment these lines and replace with your values.
project = 'my-project'
queue = 'refresh-queue'
location = 'europe-west2'
for i in items:
name = i['name'].replace(' ','')
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name={name}"
# Construct the fully qualified queue name.
parent = client.queue_path(project,location,queue)
# Construct the request body.
task = {
"http_request": { # Specify the type of request.
"http_method": tasks_v2.HttpMethod.GET,"url": url,# The full url path that the task will be sent to.
}
}
# Use the client to build and send the task.
response = client.create_task(request={"parent": parent,"task": task})
在应用引擎中,我将在client = tasks_v2.CloudTasksClient()
之外在文件级别执行def refresh
,但是我不知道这对云功能是否重要。
第二件事,
修改“功能2”以采用多个“名称”,而不只是一个。然后可以在“功能1”中一次向“功能2”发送10个名称
BATCH_SIZE = 10 # send 10 names to Function 2
def refresh(request):
# Create a client.
client = tasks_v2.CloudTasksClient()
# ...
for i in range(0,len(items),BATCH_SIZE)]:
items_batch = items[i:i + BATCH_SIZE]
names = ','.join([i['name'].replace(' ','') for i in items_batch])
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?names={names}"
# Construct the fully qualified queue name.
# ...
如果这2个快速修复方法不起作用,则必须将“功能1”分为“功能1A”和“功能1B”
功能1A:
BATCH_SIZE = 100 # send 100 names to Function 1B
def refresh(request):
client = tasks_v2.CloudTasksClient()
for i in range(0,'') for i in items_batch])
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-1b?names={names}"
# send the task.
response = client.create_task(request={
"parent": client.queue_path('my-project','europe-west2','refresh-queue'),"task": {
"http_request": {"http_method": tasks_v2.HttpMethod.GET,"url": url}
}})
功能1B:
BATCH_SIZE = 10 # send 10 names to Function 2
def refresh(request):
# set `names` equal to the query param `names`
client = tasks_v2.CloudTasksClient()
for i in range(0,len(names),BATCH_SIZE)]:
names_batch = ','.join(names[i:i + BATCH_SIZE])
url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-2?names={names_batch}"
# send the task.
response = client.create_task(request={
"parent": client.queue_path('my-project',"url": url}
}})
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。