如何在 SparkKubernetesOperator 运算符中将 execution_date 作为参数传递?

如何解决如何在 SparkKubernetesOperator 运算符中将 execution_date 作为参数传递?

我试图找到一种方法将 execution_Date 传递给 SparkKubernetesOperator。 无论如何都可以通过它,因为我将使用 spark run 和 s3 分区的执行日期。

submit_compaction_to_spark = SparkKubernetesOperator(
        task_id="submit_compaction_to_spark",application_file="/k8s/compaction_s3.yml",namespace=kubernetes_namespace,kubernetes_conn_id="kubernetes",params={
            "warehouse_path": s3_path,"snapshot_expire_time": execution_date,"partition_filter": execution_date,"k8s_namespace": kubernetes_namespace,"docker_image_tag": docker_image_tag,}

解决方法

遗憾的是,params 仅向 jinja 公开自定义值,但不会在其中呈现 jinja 模板。

例如,让我们看看这个 PythonOperator。

op = PythonOperator(
    task_id="my_operator",python_callable=lambda **context: print(context['params']),params={
        "date": "{{ execution_date }}"
    },dag=dag
)

日期键的值是文字字符串 "{{ execution_date }}" 而不是呈现的值。

[2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}

BaseOperator 中的 params 钩子允许你传递一个字典 模板的参数和/或对象。请花点时间 了解参数 my_param 如何通过 模板。

您可以在 Airflow Documentation 中阅读有关带有参数的 Jinja 模板的更多信息。


可以以其他方式使用 execution_dateSparkKubernetesOperator 通过这些设置利用 jinja 模板。

template_fields = ['application_file','namespace']  
template_ext = ('yaml','yml','json')

SparkKubernetesOperator 有两个模板化字段,application_filenamespace,这意味着您可以使用 jinja 模板作为值。如果您引用具有这些扩展名的文件,它将在其中呈现文件和 jinja 模板。

让我们修改您提供的运算符。

submit_compaction_to_spark = SparkKubernetesOperator(
        task_id="submit_compaction_to_spark",application_file="/k8s/compaction_s3.yml",namespace=kubernetes_namespace,kubernetes_conn_id="kubernetes",params={
            "k8s_namespace": kubernetes_namespace,"warehouse_path": s3_path,}
)

我将猜测 /k8s/compaction_s3.yml 的样子并添加一些 jinja 模板。

---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
  namespace: "{{ params.k8s_namespace }}"
  labels:
    warehouse_path: "{{ params.k8s_namespace }}"
    date: "{{ ds }}"
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.4"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
  sparkVersion: "2.4.4"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 2.4.4
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.4
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

您可以检查 DAG 中任务实例的渲染模板视图。

另请参考 Airflow 文档中的 example DAGsample application_file

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?