使用 xom_pull() 在两个 SimpleHttpOperator 之间传递信息

如何解决使用 xom_pull() 在两个 SimpleHttpOperator 之间传递信息

我对气流相当陌生,我目前正在尝试在我的 SimpleHttpOperators 之间传递信息。

这是检索数据的位置:

request_city_information = SimpleHttpOperator(
http_conn_id='overpass',task_id='basic_city_information',headers={"Content-Type": "application/x-www-form-urlencoded"},method='POST',data=f'[out:json]; node[name={name_city}][capital]; out center;',response_filter=lambda response: response.json()['elements'][0],dag=dag,)

然后我想在以下运算符中使用此响应:

request_city_attractions = SimpleHttpOperator(
http_conn_id='overpass',task_id='city_attractions',data=f"[out:json];(nwr[tourism='attraction'][wikidata](around:{search_radius},"
     f"{request_city_information.xcom_pull(context='ti')['lat']}"
     f",10););out body;>;out skel qt;",dag=dag)

如您所见,我尝试通过 request_city_information.xcom_pull(context='ti') 访问响应。但是,我的上下文在这里似乎是错误的。

由于我的数据已经写入 XComs,我认为我不需要 XCOM_push='True',如建议的 here

自气流 2.x 以来,XCom 似乎发生了变化,因为我发现许多建议的解决方案对我不起作用。
我相信我的思维过程存在重大差距,我只是不知道在哪里。

我将不胜感激任何对示例的引用或帮助! 提前致谢

解决方法

我现在用完全不同的方法解决了它,如果你们知道第一个是如何工作的,我很乐意对此进行解释。

这是我的解决方案:

with DAG(
    'city_info',default_args=dafault_args,description='xcom test',schedule_interval=None,) as dag:
#TODO: Tasks with conn_id
def get_city_information(**kwargs):
    payload = f'[out:json]; node[name={name_city}][capital]; out center;'
    #TODO: Request als Connection
    r = requests.post('https://overpass-api.de/api/interpreter',data=payload)
    ti = kwargs['ti']
    ti.xcom_push('basic_city_information',r.json())


get_city_information_task = PythonOperator(
    task_id='get_city_information_task',python_callable=get_city_information
)


def get_city_attractions(**kwargs):
    ti = kwargs['ti']
    city_information = ti.xcom_pull(task_ids='get_city_information_task',key='basic_city_information')
    payload = f"[out:json];(nwr[tourism='attraction'][wikidata](around:{search_radius}" \
              f",{city_information['elements'][0]['lat']},{city_information['elements'][0]['lon']}" \
              f"););out body;>;out skel qt;"
    r = requests.post('https://overpass-api.de/api/interpreter',data=payload)
    #TODO: Json as Object
    ti.xcom_push('city_attractions',r.json())


get_city_attractions_task = PythonOperator(
    task_id='get_city_attractions_task',python_callable=get_city_attractions
)

get_city_information_task >> get_city_attractions_task

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?