如何解决如何在 SparkKubernetesOperator 运算符中将 execution_date 作为参数传递?
我试图找到一种方法将 execution_Date 传递给 SparkKubernetesOperator。 无论如何都可以通过它,因为我将使用 spark run 和 s3 分区的执行日期。
submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",application_file="/k8s/compaction_s3.yml",namespace=kubernetes_namespace,kubernetes_conn_id="kubernetes",params={
"warehouse_path": s3_path,"snapshot_expire_time": execution_date,"partition_filter": execution_date,"k8s_namespace": kubernetes_namespace,"docker_image_tag": docker_image_tag,}
解决方法
遗憾的是,params
仅向 jinja 公开自定义值,但不会在其中呈现 jinja 模板。
例如,让我们看看这个 PythonOperator。
op = PythonOperator(
task_id="my_operator",python_callable=lambda **context: print(context['params']),params={
"date": "{{ execution_date }}"
},dag=dag
)
日期键的值是文字字符串 "{{ execution_date }}"
而不是呈现的值。
[2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}
BaseOperator 中的 params 钩子允许你传递一个字典 模板的参数和/或对象。请花点时间 了解参数 my_param 如何通过 模板。
您可以在 Airflow Documentation 中阅读有关带有参数的 Jinja 模板的更多信息。
可以以其他方式使用 execution_date
。SparkKubernetesOperator 通过这些设置利用 jinja 模板。
template_fields = ['application_file','namespace']
template_ext = ('yaml','yml','json')
SparkKubernetesOperator 有两个模板化字段,application_file
和 namespace
,这意味着您可以使用 jinja 模板作为值。如果您引用具有这些扩展名的文件,它将在其中呈现文件和 jinja 模板。
让我们修改您提供的运算符。
submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",application_file="/k8s/compaction_s3.yml",namespace=kubernetes_namespace,kubernetes_conn_id="kubernetes",params={
"k8s_namespace": kubernetes_namespace,"warehouse_path": s3_path,}
)
我将猜测 /k8s/compaction_s3.yml
的样子并添加一些 jinja 模板。
---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
namespace: "{{ params.k8s_namespace }}"
labels:
warehouse_path: "{{ params.k8s_namespace }}"
date: "{{ ds }}"
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v2.4.4"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
sparkVersion: "2.4.4"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 2.4.4
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.4
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
您可以检查 DAG 中任务实例的渲染模板视图。
另请参考 Airflow 文档中的 example DAG 和 sample application_file。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。