Airflow FileSensor taks 不工作,总是处于队列状态

如何解决Airflow FileSensor taks 不工作,总是处于队列状态

我正在尝试使用气流 DAG 感应文件,但我的 FileSensor 始终卡在队列状态。我试过下面的代码示例。有什么我想念的吗?顺便说一句,我的气流版本是 2.0.1。

from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator    import DummyOperator
from datetime import datetime,date,timedelta
import airflow

default_args = {
    "depends_on_past" : False,"start_date"      : datetime.Now()-timedelta(minutes=10)

with airflow.DAG( "fs_test_dag",default_args= default_args,schedule_interval= "@once"  ) as dag:
    start_task  = DummyOperator(  task_id= "start" )
    stop_task   = DummyOperator(  task_id= "stop"  )
    sensor_task = FileSensor( task_id= "file_sensor_task",poke_interval= 30,filepath= "testfile.csv" )

start_task >> sensor_task  >> stop_task


我无法重现该问题。 FileSensor 正在等待目录中的文件,该目录可以在操作员使用的连接中指定。默认情况下它是 ~/ 目录。您确定该文件是在您期望的位置创建的吗?目前您的传感器正在等待 /testfile.csv,这是预期的吗?

我创建了一个虚拟文件并运行 DAG,它运行正常:

root@64299244fb44:/opt/airflow# touch /testfile.csv
root@64299244fb44:/opt/airflow# airflow dags backfill -s 2021-05-25 fs_test_dag
/opt/airflow/airflow/cli/commands/ PendingDeprecationWarning: --ignore-first-depends-on-past is deprecated as the value is always set to True
[2021-05-31 20:48:15,940] {} INFO - Filling up the DagBag from /files/dags
[2021-05-31 20:48:16,736] {} INFO - Adding to queue: ['airflow','tasks','run','fs_test_dag','start','2021-05-25T00:00:00+00:00','--ignore-depends-on-past','--local','--pool','default_pool','--subdir','/files/dags/','--cfg-path','/tmp/tmp8ttb6fz5']
[2021-05-31 20:48:21,476] {} INFO - QueuedLocalWorker running ['airflow',488] {} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 0 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:21,531] {} INFO - Filling up the DagBag from /files/dags/
Running <TaskInstance: fs_test_dag.start 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:26,503] {} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:26,527] {} INFO - Adding to queue: ['airflow','file_sensor_task','/tmp/tmpbslxmgxo']
[2021-05-31 20:48:31,503] {} INFO - QueuedLocalWorker running ['airflow',516] {} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:31,547] {} INFO - Filling up the DagBag from /files/dags/
Running <TaskInstance: fs_test_dag.file_sensor_task 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:36,501] {} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:36,'stop','/tmp/tmpvoqqs24l']
[2021-05-31 20:48:41,498] {} INFO - QueuedLocalWorker running ['airflow',512] {} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 0 | succeeded: 2 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:41,542] {} INFO - Filling up the DagBag from /files/dags/
Running <TaskInstance: fs_test_dag.stop 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:46,520] {} INFO - Marking run <DagRun fs_test_dag @ 2021-05-25T00:00:00+00:00: backfill__2021-05-25T00:00:00+00:00,externally triggered: False> successful
[2021-05-31 20:48:46,525] {} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 3 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:46,528] {} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
[2021-05-31 20:48:46,663] {} INFO - Backfill done. Exiting.

您能否分享与您要运行此任务的工作线程关联的队列名称。我没有看到在任务参数中传递任何队列名称,因此气流会期望在 default_queue 中定义具有相同名称 airflow.cfg 的队列。

