Airflow FileSensor taks 不工作,总是处于队列状态

如何解决Airflow FileSensor taks 不工作,总是处于队列状态

我正在尝试使用气流 DAG 感应文件,但我的 FileSensor 始终卡在队列状态。我试过下面的代码示例。有什么我想念的吗?顺便说一句,我的气流版本是 2.0.1。

from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator    import DummyOperator
from datetime import datetime,date,timedelta
import airflow

default_args = {
    "depends_on_past" : False,"start_date"      : datetime.Now()-timedelta(minutes=10)
}

with airflow.DAG( "fs_test_dag",default_args= default_args,schedule_interval= "@once"  ) as dag:
    start_task  = DummyOperator(  task_id= "start" )
    stop_task   = DummyOperator(  task_id= "stop"  )
    sensor_task = FileSensor( task_id= "file_sensor_task",poke_interval= 30,filepath= "testfile.csv" )

start_task >> sensor_task  >> stop_task

解决方法

我无法重现该问题。 FileSensor 正在等待目录中的文件,该目录可以在操作员使用的连接中指定。默认情况下它是 ~/ 目录。您确定该文件是在您期望的位置创建的吗?目前您的传感器正在等待 /testfile.csv,这是预期的吗?

我创建了一个虚拟文件并运行 DAG,它运行正常:

root@64299244fb44:/opt/airflow# touch /testfile.csv
root@64299244fb44:/opt/airflow# airflow dags backfill -s 2021-05-25 fs_test_dag
/opt/airflow/airflow/cli/commands/dag_command.py:62 PendingDeprecationWarning: --ignore-first-depends-on-past is deprecated as the value is always set to True
[2021-05-31 20:48:15,940] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags
[2021-05-31 20:48:16,736] {base_executor.py:82} INFO - Adding to queue: ['airflow','tasks','run','fs_test_dag','start','2021-05-25T00:00:00+00:00','--ignore-depends-on-past','--local','--pool','default_pool','--subdir','/files/dags/tasks.py','--cfg-path','/tmp/tmp8ttb6fz5']
[2021-05-31 20:48:21,476] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow',488] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 0 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:21,531] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.start 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:26,503] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:26,527] {base_executor.py:82} INFO - Adding to queue: ['airflow','file_sensor_task','/tmp/tmpbslxmgxo']
[2021-05-31 20:48:31,503] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow',516] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:31,547] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.file_sensor_task 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:36,501] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:36,'stop','/tmp/tmpvoqqs24l']
[2021-05-31 20:48:41,498] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow',512] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 0 | succeeded: 2 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:41,542] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.stop 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:46,520] {dagrun.py:444} INFO - Marking run <DagRun fs_test_dag @ 2021-05-25T00:00:00+00:00: backfill__2021-05-25T00:00:00+00:00,externally triggered: False> successful
[2021-05-31 20:48:46,525] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 3 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:46,528] {local_executor.py:387} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
[2021-05-31 20:48:46,663] {backfill_job.py:831} INFO - Backfill done. Exiting.
,

您能否分享与您要运行此任务的工作线程关联的队列名称。我没有看到在任务参数中传递任何队列名称,因此气流会期望在 default_queue 中定义具有相同名称 airflow.cfg 的队列。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?