微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Airflow KubernetesPodOperator 在本地 MicroK8s 上超时

如何解决Airflow KubernetesPodOperator 在本地 MicroK8s 上超时

我正在尝试使用 KubernetesPodoperator 启动一个测试 Pod。作为映像,我使用了来自 Docker 的 hello-world 示例,我将其推送到我的 MicroK8s 安装的本地注册表。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodoperator
from airflow.kubernetes.pod import Port
from airflow.utils.dates import days_ago
from datetime import timedelta

ports = [Port('http',80)]

default_args = {
    'owner': 'user','start_date': days_ago(5),'email': ['user@mail'],'email_on_failure': False,'email_on_retry': False,'retries': 0
}

workflow = DAG(
    'kubernetes_helloworld',default_args=default_args,description='Our first DAG',schedule_interval=None,)

op = DummyOperator(task_id='dummy',dag=workflow)

t1 = KubernetesPodoperator(
    dag=workflow,namespace='default',image='localhost:32000/hello-world:registry',name='pod2',task_id='pod2',is_delete_operator_pod=True,hostnetwork=False,get_logs=True,do_xcom_push=False,in_cluster=False,ports=ports,)

op >> t1

当我触发 DAG 时,它会继续运行并无限次地重新尝试启动 Pod。 这是我在 Airflow 中得到的日志输出

Reading local file: /home/user/airflow/logs/kubernetes_helloworld/pod2/2021-03-17T16:25:11.142695+00:00/4.log
[2021-03-17 16:30:00,315] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [queued]>
[2021-03-17 16:30:00,319] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [queued]>
[2021-03-17 16:30:00,319] {taskinstance.py:1042} INFO - 
--------------------------------------------------------------------------------
[2021-03-17 16:30:00,320] {taskinstance.py:1043} INFO - Starting attempt 4 of 1
[2021-03-17 16:30:00,320] {taskinstance.py:1044} INFO - 
--------------------------------------------------------------------------------
[2021-03-17 16:30:00,330] {taskinstance.py:1063} INFO - Executing <Task(KubernetesPodoperator): pod2> on 2021-03-17T16:25:11.142695+00:00
[2021-03-17 16:30:00,332] {standard_task_runner.py:52} INFO - Started process 9021 to run task
[2021-03-17 16:30:00,335] {standard_task_runner.py:76} INFO - Running: ['airflow','tasks','run','kubernetes_helloworld','pod2','2021-03-17T16:25:11.142695+00:00','--job-id','57','--pool','default_pool','--raw','--subdir','DAGS_FOLDER/kubernetes_helloworld.py','--cfg-path','/tmp/tmp5ss4g6q4','--error-file','/tmp/tmp9t3l8emt']
[2021-03-17 16:30:00,336] {standard_task_runner.py:77} INFO - Job 57: Subtask pod2
[2021-03-17 16:30:00,357] {logging_mixin.py:104} INFO - Running <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [running]> on host 05nclorenzvm01.internal.cloudapp.net
[2021-03-17 16:30:00,369] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=user
AIRFLOW_CTX_DAG_OWNER=user
AIRFLOW_CTX_DAG_ID=kubernetes_helloworld
AIRFLOW_CTX_TASK_ID=pod2
AIRFLOW_CTX_EXECUTION_DATE=2021-03-17T16:25:11.142695+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-03-17T16:25:11.142695+00:00
[2021-03-17 16:32:09,805] {connectionpool.py:751} WARNING - retrying (Retry(total=2,connect=None,read=None,redirect=None,status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f812fc23eb0>: Failed to establish a new connection: [Errno 110] Connection timed out')': /api/v1/namespaces/default/pods?labelSelector=dag_id%3Dkubernetes_helloworld%2Cexecution_date%3D2021-03-17T162511.1426950000-e549b02ea%2Ctask_id%3Dpod2

当我在没有 Airflow 的情况下在 kubernetes 本身中启动 pod 时,它运行良好。 我做错了什么?

我尝试了以下几点:

  • 使用 sleep 命令阻止容器退出
  • 尝试不同的图像,例如 pyspark
  • 重新安装 Airflow 和 MicroK8s

气流 v2.0.1 MicroK8s v1.3.7 蟒蛇 3.8 Ubuntu 18.04 LTS

解决方法

不幸的是,我仍然没有弄清楚 microK8s 的问题。

但是我能够在 Airflow 中使用 minikube 中的 KubernetesPodOperator。 以下代码能够毫无问题地运行:

from airflow import DAG
from datetime import datetime,timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import configuration as conf
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'user','start_date': days_ago(5),'email': ['user@airflow.de'],'email_on_failure': False,'email_on_retry': False,'retries': 0
}

namespace = conf.get('kubernetes','NAMESPACE')

if namespace =='default':
    config_file = '/home/user/.kube/config'
    in_cluster=False
else:
    in_cluster=True
    config_file=None

dag = DAG('example_kubernetes_pod',schedule_interval='@once',default_args=default_args)

with dag:
    k = KubernetesPodOperator(
        namespace=namespace,image="hello-world",labels={"foo": "bar"},name="airflow-test-pod",task_id="task-one",in_cluster=in_cluster,# if set to true,will look in the cluster,if false,looks for file
        cluster_context='minikube',# is ignored when in_cluster is set to True
        config_file=config_file,is_delete_operator_pod=True,get_logs=True)
,

为了回答您的问题,我假设您在没有 VM 的本地 microk8s 集群上运行该任务。

气流可能无法连接到 K8s 控制平面以触发 pod。 添加cluster_context="microk8s"

t1 = KubernetesPodOperator(
         dag=workflow,namespace='default',image='localhost:32000/hello-world:registry',name='pod2',task_id='pod2',get_logs=True,do_xcom_push=False,in_cluster=False,cluster_context='microk8s',config_file='/path/to/config',ports=ports,)

要查看使用的集群上下文,请键入以下命令并将输出重定向到配置文件(在 Airflow 项目中):

microk8s.kubectl config view --flatten > config

输出:

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0.............
    server: https://127.0.0.1:16443
  name: microk8s-cluster
contexts:
- context:
    cluster: microk8s-cluster
    user: admin
  name: microk8s
current-context: microk8s
kind: Config
preferences: {}
users:
- name: admin
  user:
     token: SldHNFQ3ek9yUGh4TVhWN......................................

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。