使用 Cloud Composer 上的 KubernetesPodOperator 通过 Cloud Functions 将变量传递到容器

如何解决使用 Cloud Composer 上的 KubernetesPodOperator 通过 Cloud Functions 将变量传递到容器

我正在尝试从在 Google Cloud Functions 上运行的后台函数获取事件和上下文变量数据,并将这些值传递给在 Cloud Composer / Airflow 上运行 KubernetesPodoperator 的容器。

代码的第一部分是我的云函数,它触发了一个名为 gcs_to_pubsub_topic_dag 的 dag,我想传递和访问的是 json 中的数据,特别是 "conf": event 数据。

#!/usr/bin/env python
# coding: utf-8

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_ScopE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'

def trigger_dag(event,context=None):
    client_id = '###############.apps.googleusercontent.com'
    webserver_id = '###############'
    # The name of the DAG you wish to trigger
    dag_name = 'gcs_to_pubsub_topic_dag'
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/api/experimental/dags/'
        + dag_name
        + '/dag_runs'
    )
    print(f' This is my webserver url: {webserver_url}')
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url,client_id,method='POST',json={"conf": event,"replace_microseconds": 'false'})

def make_iap_request(url,method='GET',**kwargs):

    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    google_open_id_connect_token = id_token.fetch_id_token(Request(),client_id)

    resp = requests.request(
        method,url,headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)},**kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code,resp.headers,resp.text))
    else:
        return resp.text

def main(event,context=None):
    """
    Call the main function,sets the order in which to run functions.
    """
    
    trigger_dag(event,context=None)

    return 'Script has run without errors !!'

if (__name__ == "__main__"):
    main()

被触发的 dag 运行这个 KubernetesPodoperator 代码

kubernetes_pod_operator.KubernetesPodoperator(
    # The ID specified for the task.
    task_id=TASK_ID,# Name of task you want to run,used to generate Pod ID.
    name=TASK_ID,# Entrypoint of the container,if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=[f'python3','execution_file.py'],# The namespace to run within Kubernetes,default namespace is `default`.
    namespace=KUBERNETES_NAMESPACE,# location of the docker image on google container repository
    image=f'eu.gcr.io/{GCP_PROJECT_ID}/{CONTAINER_ID}:{IMAGE_VERSION}',#Always pulls the image before running it.
    image_pull_policy='Always',# The env_var template variable allows you to access variables defined in Airflow UI.
    env_vars = {'GCP_PROJECT_ID':GCP_PROJECT_ID,'DAG_CONF':{{ dag_run.conf }}},dag=dag)

最后我想让 DAG_CONF 在被调用的容器图像 execution_file.py 脚本中打印:

#!/usr/bin/env python
# coding: utf-8

from gcs_unzip_function import main as gcs_unzip_function
from gcs_to_pubsub_topic import main as gcs_to_pubsub_topic
from os import listdir,getenv

GCP_PROJECT_ID = getenv('GCP_PROJECT_ID')
DAG_CONF = getenv('DAG_CONF')

print('Test run')
    
print(GCP_PROJECT_ID)

print (f'This is my dag conf {DAG_CONF}')
 
print(type(DAG_CONF))    

此时代码触发了 dag 并返回:

Test run

GCP_PROJECT_ID (this is set in the airflow environment variables)

This is my dag conf None

class 'nonetype

我希望 DAG_CONF 通过的地方

解决方法

我有一种方法可以访问有关在使用 KubernetesPodOperator 运行的容器内触发 dag 的对象的数据。

帖子 request code 保持不变,但我想强调的是,您可以将任何内容传递给字典中的 conf 元素。

make_iap_request(
    webserver_url,client_id,method='POST',json={"conf": event,"replace_microseconds": 'false'})

dag 代码要求您创建一个自定义类来评估 dag_run 和 .conf 元素,然后参数访问我们从发布请求发送的 json。 article在做这部分时阅读。

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

class CustomKubernetesPodOperator(KubernetesPodOperator):

def execute(self,context):
    json = str(context['dag_run'].conf)
    arguments = [f'--json={json}']
    self.arguments.extend(arguments)
    super().execute(context)


CustomKubernetesPodOperator(
    # The ID specified for the task.
    task_id=TASK_ID,# Name of task you want to run,used to generate Pod ID.
    name=TASK_ID,# Entrypoint of the container,if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=[f'python3','execution_file.py'],# The namespace to run within Kubernetes,default namespace is `default`.
    namespace=KUBERNETES_NAMESPACE,# location of the docker image on google container repository
    image=f'eu.gcr.io/{GCP_PROJECT_ID}/{CONTAINER_ID}:{IMAGE_VERSION}',#Always pulls the image before running it.
    image_pull_policy='Always',# The env_var template variable allows you to access variables defined in Airflow UI.
    env_vars = {'GCP_PROJECT_ID':GCP_PROJECT_ID},dag=dag)

在容器中运行的代码使用 argparse 将参数作为字符串获取,然后使用 ast 字面量将其改回要在代码中访问的字典:

import ast
import argparse
from os import listdir,getenv

def main(object_metadata_dict):
    """
    Call the main function,sets the order in which to run functions.
    """

    print(f'This is my metadata as a dictionary {object_metadata_dict}')

    print (f'This is my bucket {object_metadata_dict["bucket"]}')

    print (f'This is my file name {object_metadata_dict["name"]}')

    return 'Script has run without errors !!'

if (__name__ == "__main__"):
    parser = argparse.ArgumentParser(description='Staging to live load process.')
    parser.add_argument("--json",type=str,dest="json",required = False,default = 'all',\
                    help="List of metadata for the triggered object derived 
                          from cloud function backgroud functions.")
    args = parser.parse_args()
    json=args.json
    object_metadata_dict=ast.literal_eval(json)
    main(object_metadata_dict)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?