如何解决Airflow:同一个操作符实例可以多次重用和执行,在运行之间保持状态吗?
能否请您解释一下,在某些情况下,操作符实例是否可以重用,execute()
方法将被多次执行并在 execute()
次运行之间保持状态?
换句话说,这种情况在 Airflow 中是否可行:
-
Operator 中的 self 变量在 init 中初始化。
-
execute() 方法读取自变量并更改它。
-
execute() 在同一个 operator 实例上再运行一次,例如因为重启或其他原因 并且可以读取先前执行运行更改的自变量?
class MyOperator(BaSEOperator): def __init__(self,param_1 ... param_n): self.var1=param_1 def execute(self,context): #do some logic with self variable self.var1 += 1 #
解决方法
由于以下原因,您描述的场景是不可能的。
当 Airflow Scheduler 将您的任务实例分派到队列时,该任务会在工作线程的每个心跳中初始化。
这是因为 DagBag 填充的每个心跳都会初始化操作符实例。
运行之间存储的任何值在重新初始化时都会重置。
如果您需要在运行之间存储值,您可以使用 Variable
模型来存储这些值。
from airflow.models import Variable
def execute(self,context):
#do some logic with self variable
var1 = Variable.get(
"count",deserialize_json=True,default_var=0
)
var1 += 1
Variable.set("count",var1)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。