如何解决在循环中将事件发送到 azure 事件中心
我们将大型(200mb 到 5GB)CSV 文件存储在 azure blob 中。 我们将它们下载到我们的容器中。 遍历每一行,提取一个字段,它是一个 JSON 负载 将 JSON 转换为 AVRO 字节数组并将其序列化 我们基于并通过在融合模式注册表中查找模式进行序列化。 然后我们将有效负载作为事件发送到 EventHubs 目前,我们一行一行地调用它。见下面的代码。大约需要 1 秒/行。 检查emit_msg(send_msg) 函数
def process_files() :
filenames = find_csv_filenames("./fp")
print("\n")
print('Total files found = ',len(filenames))
print(filenames)
print("\n")
if len(filenames) > 0 :
for f in filenames:
start = time.time()
print("now working on ....",f)
with open(f) as csv_file:
csv_reader = csv.reader(csv_file,delimiter=',')
line_count = 0
#csv_reader can contain upto 50 million rows
for row in csv_reader:
if line_count == 0:
line_count += 1
send_msg = "Line # is " + str(line_count) + " And Line value is : " + row[0]
print(send_msg)
#send to event hub
emit_msg(send_msg)
line_count += 1
print("")
print(f'Processed {line_count:,} lines.')
end = time.time()
print(f'Completed processing in {round(end-start)} seconds.')
print("______________________________________________________")
print("\n")
def emit_msg(emit_string):
client = EventHubProducerClient.from_connection_string(eh_cs,eventhub_name=eh_name)
event_data_batch = client.create_batch()
event_data_batch.add(EventData(emit_string))
with client:
client.send_batch(event_data_batch)
我们尝试了矢量化,见下面的代码。仍然需要相同的时间。
def process_file(filename) :
start = time.time()
print(f"working on .... {filename} \n")
df = dd.read_csv(filename,usecols=[6])
print(df.head(4))
print("")
print(f'Processed {df.shape[0].compute(scheduler="processes"):,} lines.\n')
df[df.columns[0]].apply(emit_msg,meta=('Order ID','int') ).compute(scheduler="processes")
end = time.time()
print(f'Completed processing in {round(end-start)} seconds.\n')
print("______________________________________________________")
print("\n")
我们的直接瓶颈在于发布到事件中心。这当前需要 1 秒/消息发射。 我们的 SLA 是发布 100mb/sec。即每秒 100,000 条消息。 非常感谢任何帮助。
一如既往地感谢!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。