如何解决使用 xom_pull() 在两个 SimpleHttpOperator 之间传递信息
我对气流相当陌生,我目前正在尝试在我的 SimpleHttpOperators 之间传递信息。
这是检索数据的位置:
request_city_information = SimpleHttpOperator(
http_conn_id='overpass',task_id='basic_city_information',headers={"Content-Type": "application/x-www-form-urlencoded"},method='POST',data=f'[out:json]; node[name={name_city}][capital]; out center;',response_filter=lambda response: response.json()['elements'][0],dag=dag,)
然后我想在以下运算符中使用此响应:
request_city_attractions = SimpleHttpOperator(
http_conn_id='overpass',task_id='city_attractions',data=f"[out:json];(nwr[tourism='attraction'][wikidata](around:{search_radius},"
f"{request_city_information.xcom_pull(context='ti')['lat']}"
f",10););out body;>;out skel qt;",dag=dag)
如您所见,我尝试通过 request_city_information.xcom_pull(context='ti')
访问响应。但是,我的上下文在这里似乎是错误的。
由于我的数据已经写入 XComs,我认为我不需要 XCOM_push='True'
,如建议的 here。
自气流 2.x 以来,XCom 似乎发生了变化,因为我发现许多建议的解决方案对我不起作用。
我相信我的思维过程存在重大差距,我只是不知道在哪里。
我将不胜感激任何对示例的引用或帮助! 提前致谢
解决方法
我现在用完全不同的方法解决了它,如果你们知道第一个是如何工作的,我很乐意对此进行解释。
这是我的解决方案:
with DAG(
'city_info',default_args=dafault_args,description='xcom test',schedule_interval=None,) as dag:
#TODO: Tasks with conn_id
def get_city_information(**kwargs):
payload = f'[out:json]; node[name={name_city}][capital]; out center;'
#TODO: Request als Connection
r = requests.post('https://overpass-api.de/api/interpreter',data=payload)
ti = kwargs['ti']
ti.xcom_push('basic_city_information',r.json())
get_city_information_task = PythonOperator(
task_id='get_city_information_task',python_callable=get_city_information
)
def get_city_attractions(**kwargs):
ti = kwargs['ti']
city_information = ti.xcom_pull(task_ids='get_city_information_task',key='basic_city_information')
payload = f"[out:json];(nwr[tourism='attraction'][wikidata](around:{search_radius}" \
f",{city_information['elements'][0]['lat']},{city_information['elements'][0]['lon']}" \
f"););out body;>;out skel qt;"
r = requests.post('https://overpass-api.de/api/interpreter',data=payload)
#TODO: Json as Object
ti.xcom_push('city_attractions',r.json())
get_city_attractions_task = PythonOperator(
task_id='get_city_attractions_task',python_callable=get_city_attractions
)
get_city_information_task >> get_city_attractions_task
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。