如何解决测试 PubSub 订阅回调
我有一个小应用程序,它在 Google Pub/Sub 中调用 SubscriberClient 的 subscribe 方法。同时,在实际的订阅调用中,我尝试在调用回调之前和之后做一些处理。
客户端.py
@wrapt.patch_function_wrapper('google.cloud.pubsub_v1','SubscriberClient.subscribe')
def subscribe_handler(wrapped,instance,*args,**kwargs):
cbk = args[1]
def callback_handler(message):
print('before')
cbk(message)
print('after')
return wrapped(*[args[0],callback_handler],**kwargs)
def callback(msg):
print(msg.data)
msg.ack()
future = SubscriberClient.subscribe(subscription_path,callback)
test.py
class TestPubSubSubscribe(unittest.TestCase):
def test_subscribe_handler(self):
def handle_message(msg):
print(msg.data)
msg.ack()
creds = mock.Mock(spec=credentials.Credentials)
sub_client = SubscriberClient(credentials=creds)
msg = self.create_message(b"foo",baz="bacon",spam="eggs") # using some helper to create a PubSubMessage message.
# call the subscribe
future = sub_client.subscribe("test-sub-path",handle_message)
在我的测试中,最后,我想调用 SubscriberClient 的 subscribe 方法,但我希望它以某种方式使用我创建的消息 msg
而不是让它从实际 API 中获取它。
有人知道怎么做吗?我应该嘲笑/修补什么?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。