微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

芹菜任务没有发送给经纪人

如何解决芹菜任务没有发送给经纪人

当我尝试将任务发送给代理 (RabbitMQ) 时,它挂起。

import * as React from "react";

type ChangeEventHandlerLegacy = (field: string,event: React.ChangeEvent) => void
type Props<T> = {
  field?: T
  onChange: [T] extends [string] ? ChangeEventHandlerLegacy : React.ChangeEventHandler
}

function Input<T>(props: Props<T>) { return null }

const workingLegacy = <Input field="1" onChange={(prop,event) => {}} />
// `onChange` incorrectly resolves to `ChangeEventHandlerLegacy | React.ChangeEventHandler`:
const brokenNative = <Input onChange={(event) => { console.log(event) }} />
// Explicitly providing `field` fixes the type:
const workingNative = <Input field={undefined} onChange={(event) => { console.log(event) }} />

// Using createElement results in `onChange` always resolving to `React.ChangeEventHandler`,likely due to `field` being `unkNown`
const brokenLegacy2 = React.createElement(Input,{ field: '2',onChange: (a,b,c) => console.log(a,c) })
const workingNative2 = React.createElement(Input,{ onChange: (event) => console.log(event) })

如果我同步运行任务,它会按预期工作。

# python shell
promise = foo.s(first_arg="2").apply_async()
# blocking indefinitely. I expected a promise object.

如果我用 ctrl+c 中断 # python shell promise = foo.s(first_arg="2").apply() >>> hello argument 2 ,我会得到一些线索的回溯:

.apply_async()

代理连接字符串在系统中如下所示:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py",line 32,in __call__
    return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py",line 173,in _connect
    host,port,family,socket.soCK_STREAM,SOL_TCP)
  File "/usr/local/lib/python3.7/socket.py",line 752,in getaddrinfo
    for res in _socket.getaddrinfo(host,type,proto,flags):
socket.gaierror: [Errno -9] Address family for hostname not supported

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py",line 325,in retry_over_time
    return fun(*args,**kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py",line 866,in _connection_factory
    self._connection = self._establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py",line 801,in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/pyamqp.py",line 128,in establish_connection
    conn.connect()
  File "/usr/local/lib/python3.7/site-packages/amqp/connection.py",line 323,in connect
    self.transport.connect()
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py",line 113,in connect
    self._connect(self.host,self.port,self.connect_timeout)
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py",line 184,in _connect
    "Failed to resolve broker hostname"))
  File "/usr/local/lib/python3.7/site-packages/amqp/transport.py",line 197,in _connect
    self.sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "<stdin>",line 1,in <module>
  File "/usr/local/lib/python3.7/site-packages/celery/canvas.py",line 225,in apply_async
    return _apply(args,kwargs,**options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/task.py",line 565,in apply_async
    **options
  File "/usr/local/lib/python3.7/site-packages/celery/app/base.py",line 749,in send_task
    amqp.send_task_message(P,name,message,**options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py",line 532,in send_task_message
    **properties
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py",line 178,in publish
    exchange_name,declare,File "/usr/local/lib/python3.7/site-packages/kombu/connection.py",line 525,in _ensured
    return fun(*args,**kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py",in _publish
    channel = self.channel
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py",line 206,in _get_channel
    channel = self._channel = channel()
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py",line 34,in __call__
    value = self.__value__ = self.__contract__()
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py",line 221,in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py",line 884,in default_channel
    self._ensure_connection(**conn_opts)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py",line 439,in _ensure_connection
    callback,timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py",line 339,in retry_over_time
    sleep(1.0)

python 中的代理连接字符串:

~$ env | grep broKER
CELERY_broKER=pyamqp://guest@172.23.0.3//

之前提示RabbitMQ没有运行,或者连接字符串坏;我的芹菜工人(消费者)进程能够使用相同的连接字符串进行连接。

# python shell
from src.celery import app
app.pool.connection
>>> Connection: amqp://guest:**@localhost:5672//

这就是我将应用程序/生产者连接到代理的方式。 celeryconfig.py 文件包含代理 url 后端、并发等的设置。

-------------- celery@f9ab48fc6b63 v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-4.15.0-20-generic-x86_64-with-debian-9.12 2021-03-05 07:56:29
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         celery_statst_api:0x7f15b6de0450
- ** ---------- .> transport:   amqp://guest:**@my-rabbit:5672//
- ** ---------- .> results:     postgresql://docker:**@pg_db:5432/
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . foo_task
  . (long list of tasks)

[2021-03-05 07:56:30,564: INFO/MainProcess] Connected to amqp://guest:**@my-rabbit:5672//
[2021-03-05 07:56:30,581: INFO/MainProcess] mingle: searching for neighbors
[2021-03-05 07:56:31,622: INFO/MainProcess] mingle: all alone
[2021-03-05 07:56:31,647: INFO/MainProcess] celery@f9ab48fc6b63 ready.

解决方法

问题出在我的配置文件中。 Celery 没有找到属性 broker_url 并且没有给出任何警告。相反,芹菜默默地设置了一个默认的 amqp://guest:**@localhost:5672//。在此处查看详细信息https://github.com/celery/celery/issues/6661

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?