如何解决将 Airflow BigQueryInsertJobOperator 和 BigQueryGetDataOperator Priority 更改为 Batch
我在此处 (https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/airflow/providers/google/cloud/operators/bigquery.html#BigQueryGetDataOperator) 阅读了 BigQuery 作业操作员的 Apache Airflow 文档,但找不到如何将作业优先级更改为批处理。怎么做?
解决方法
BigQueryExecuteQueryOperator
有 priority 参数,可以用 INTERACTIVE
/BATCH
设置,默认为 INTERACTIVE
:
execute_insert_query = BigQueryExecuteQueryOperator(
task_id="execute_insert_query",sql=INSERT_ROWS_QUERY,use_legacy_sql=False,location=location,priority='BATCH',)
BigQueryInsertJobOperator
没有。
我认为您可以创建一个继承自 BigQueryInsertJobOperator
的自定义运算符,并通过覆盖 _submit_job
函数来添加它:
class MyBigQueryInsertJobOperator(BigQueryInsertJobOperator):
def __init__(
self,priority: str = 'INTERACTIVE',**kwargs,) -> None:
super().__init__(**kwargs)
self.priority = priority
def _submit_job(
self,hook: BigQueryHook,job_id: str,) -> BigQueryJob:
# Submit a new job
job = hook.insert_job(
configuration=self.configuration,project_id=self.project_id,location=self.location,job_id=job_id,priority=self.priority,)
# Start the job and wait for it to complete and get the result.
job.result()
return job
虽然我没有测试,但它应该可以工作。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。