微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Azure 服务总线将 csv 行发送到队列

如何解决Azure 服务总线将 csv 行发送到队列

我正在测试用于 Azure 服务总线的 Python SDK,以将消息发送到队列。基本测试工作正常,因为我向队列发送了一个字符串,如下所示:

import csv
from azure.servicebus import ServiceBusClient,ServiceBusMessage

CONNECTION_STR = "CONN_STR"
QUEUE_NAME= "queue name"
def send_single_message(sender):
    message = ServiceBusMessage("Single Message")
    sender.send_messages(message)
    print("Sent a single message")


servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR,logging_enable=True)

with servicebus_client:
    sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
    with sender:
        send_single_message(sender)
print("Done sending messages")
print("-----------------------")

with servicebus_client:
    receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME,max_wait_time=5)
    with receiver:
        for msg in receiver:
            print("Received: " + str(msg))
            receiver.complete_message(msg)

现在我想要实现的下一步是遍历一个 csv 文件,并为每一行将其发送到队列中。

所以我尝试遍历这个 csv 文件,并将这些行发送到队列中。如下:

def send_a_list_of_messages(sender):
    to_queue = []
    with open('final_result.csv') as f:
        reader = csv.reader(f)
        for row in reader:
            to_queue.append(row)
    print(to_queue)
    message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
    sender.send_messages(message)
    print("Sent a single message")


servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR,logging_enable=True)

with servicebus_client:
    sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
    with sender:
        send_a_list_of_messages(sender)

print("Done sending messages")
print("-----------------------")

但是当我运行我的代码时,我收到了这个错误

Traceback (most recent call last):
  File "servicebus.py",line 23,in <module>
    send_a_list_of_messages(sender)
  File "servicebus.py",line 13,in send_a_list_of_messages
    message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
  File "servicebus.py",in <listcomp>
    message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
  File "/Users/usr/opt/anaconda3/lib/python3.8/site-packages/azure/servicebus/_common/message.py",line 110,in __init__
    self._build_message(body)
  File "/Users/usr/opt/anaconda3/lib/python3.8/site-packages/azure/servicebus/_common/message.py",line 190,in _build_message
    raise TypeError(
TypeError: ServiceBusMessage body must be a string,bytes,or None.  Got instead: <class 'list'>

我知道服务需要特定的正文类型,但我很确定 to_queue[] 将是字符串类型。

如果有人能帮我解决这个问题,我将不胜感激。 如果您有任何其他问题,请告诉我。

非常感谢

编辑:

我通过将 ServiceBusMessage 转换为字符串来解决正文类型问题,如下所示:

message = [ServiceBusMessage(str(to_queue)) for _ in range(len(to_queue))]

但现在我收到关于大小限制的错误

azure.servicebus.exceptions.MessageSizeExceededError: ServiceBusMessageBatch has reached its size limit: 262144

谁能帮我解决这个问题。

我尝试以不同的方式处理工作流程,该方法应为每一行触发多次。如下:

for msg in to_queue:
    print(msg)
    def send_a_list_of_messages(sender):
        print(to_queue)
        message = [ServiceBusMessage(str(to_queue)) for _ in range(len(to_queue))]
        sender.send_messages(message)
        print("Sent a single message")

但我得到了同样的错误

我决定实现一个 for 循环来触发 csv 文件中每一行的函数,从而暂时克服了大小问题。

for msg in to_queue:
    # print(msg)
    def send_a_list_of_messages(sender):
        # print(to_queue)
        message = [ServiceBusMessage(str(msg))]
        sender.send_messages(message)
        print("msg sent: " + str(msg))

这部分有效,因为该函数仅发送 csv 文件中的最后一行。

解决方法

  1. ServiceBusMessage 仅接受 str/bytes,但最初您以列表形式发送。如果在您转换为 str 后它仍然失败,则可能是其他地方出了问题。你能提供一个你的 csv 文件的样本吗?

  2. 当列表/批量消息的大小超过最大限制时,会发生 MessageSizeExceededError。您可以在此处阅读有关限制和批次的更多信息:https://docs.microsoft.com/en-us/python/api/azure-servicebus/azure.servicebus.servicebusmessagebatch?view=azure-python。 为确保您不超过限制并安全地发送您的一批邮件,您应该使用以下模式:https://github.com/Azure/azure-sdk-for-python/blob/ac60fb93ea23f32e5ed5a83532c1ed6aa49921b5/sdk/servicebus/azure-servicebus/samples/sync_samples/send_queue.py#L32

如果您有任何问题,请告诉我!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。