如何解决Airflow FileSensor taks 不工作,总是处于队列状态
我正在尝试使用气流 DAG 感应文件,但我的 FileSensor 始终卡在队列状态。我试过下面的代码示例。有什么我想念的吗?顺便说一句,我的气流版本是 2.0.1。
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,date,timedelta
import airflow
default_args = {
"depends_on_past" : False,"start_date" : datetime.Now()-timedelta(minutes=10)
}
with airflow.DAG( "fs_test_dag",default_args= default_args,schedule_interval= "@once" ) as dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )
sensor_task = FileSensor( task_id= "file_sensor_task",poke_interval= 30,filepath= "testfile.csv" )
start_task >> sensor_task >> stop_task
解决方法
我无法重现该问题。 FileSensor
正在等待目录中的文件,该目录可以在操作员使用的连接中指定。默认情况下它是 ~/
目录。您确定该文件是在您期望的位置创建的吗?目前您的传感器正在等待 /testfile.csv
,这是预期的吗?
我创建了一个虚拟文件并运行 DAG,它运行正常:
root@64299244fb44:/opt/airflow# touch /testfile.csv
root@64299244fb44:/opt/airflow# airflow dags backfill -s 2021-05-25 fs_test_dag
/opt/airflow/airflow/cli/commands/dag_command.py:62 PendingDeprecationWarning: --ignore-first-depends-on-past is deprecated as the value is always set to True
[2021-05-31 20:48:15,940] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags
[2021-05-31 20:48:16,736] {base_executor.py:82} INFO - Adding to queue: ['airflow','tasks','run','fs_test_dag','start','2021-05-25T00:00:00+00:00','--ignore-depends-on-past','--local','--pool','default_pool','--subdir','/files/dags/tasks.py','--cfg-path','/tmp/tmp8ttb6fz5']
[2021-05-31 20:48:21,476] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow',488] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 0 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:21,531] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.start 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:26,503] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
[2021-05-31 20:48:26,527] {base_executor.py:82} INFO - Adding to queue: ['airflow','file_sensor_task','/tmp/tmpbslxmgxo']
[2021-05-31 20:48:31,503] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow',516] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:31,547] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.file_sensor_task 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:36,501] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-05-31 20:48:36,'stop','/tmp/tmpvoqqs24l']
[2021-05-31 20:48:41,498] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow',512] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 0 | succeeded: 2 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:41,542] {dagbag.py:487} INFO - Filling up the DagBag from /files/dags/tasks.py
Running <TaskInstance: fs_test_dag.stop 2021-05-25T00:00:00+00:00 [queued]> on host 64299244fb44
[2021-05-31 20:48:46,520] {dagrun.py:444} INFO - Marking run <DagRun fs_test_dag @ 2021-05-25T00:00:00+00:00: backfill__2021-05-25T00:00:00+00:00,externally triggered: False> successful
[2021-05-31 20:48:46,525] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 3 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-05-31 20:48:46,528] {local_executor.py:387} INFO - Shutting down LocalExecutor; waiting for running tasks to finish. Signal again if you don't want to wait.
[2021-05-31 20:48:46,663] {backfill_job.py:831} INFO - Backfill done. Exiting.
,
您能否分享与您要运行此任务的工作线程关联的队列名称。我没有看到在任务参数中传递任何队列名称,因此气流会期望在 default_queue
中定义具有相同名称 airflow.cfg
的队列。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。