Azure 与服务总线的集成主题

如何解决Azure 与服务总线的集成主题

我正在使用 ADP 的 API 构建集成。已经有一个我将要使用的端点 - 这与 Azure 服务总线相关联。我知道主题名称,但我无法理解需要在 Azure 中创建的下一个工件。我希望传入的消息作为数据到达 EDW 或数据湖(它可能以 XML 格式出现,我可能需要将其转换为 azure sql 数据库):

  • 我是否应该创建另一个与 EDW 相关联的订阅,以便从服务总线主题获取消息? (我以前没有尝试过)
  • 还是应该创建一个逻辑应用程序来直接读取服务总线或服务总线主题? (我也没有尝试过)

我需要设计一个可扩展的解决方案 - 任何见解将不胜感激

谢谢

解决方法

您能否扩展此答案,或者是否有文档可以查看如何 将服务总线主题链接到 azure 函数,然后转储到数据中 湖?

我不知道你用的是什么语言,这里是用python实现的。

您可以使用服务总线触发器来监听来自服务总线队列或主题的消息。然后就可以使用数据湖SDK保存消息了:

使用azure function service bus trigger监听消息:

import logging

import azure.functions as func


def main(msg: func.ServiceBusMessage):

    #put the logic of process the message here

    logging.info('Python ServiceBus queue trigger processed message: %s',msg.get_body().decode('utf-8'))

function.json

{
  "scriptFile": "__init__.py","bindings": [
    {
      "name": "msg","type": "serviceBusTrigger","direction": "in","queueName": "queuename","connection": "bowman1012_SERVICEBUS"
    }
  ]
}

并使用如下代码将消息附加到数据湖:

from azure.storage.filedatalake import DataLakeServiceClient 
connect_str = "DefaultEndpointsProtocol=https;AccountName=0730bowmanwindow;AccountKey=xxxxxx;EndpointSuffix=core.windows.net"
datalake_service_client = DataLakeServiceClient.from_connection_string(connect_str)
myfilesystem = "test"
myfolder     = "test"
myfile       = "FileName.txt"

file_system_client = datalake_service_client.get_file_system_client(myfilesystem)            
directory_client = file_system_client.create_directory(myfolder)         
directory_client = file_system_client.get_directory_client(myfolder)
print("11111")
try:
    file_client = directory_client.get_file_client(myfile)
    file_client.get_file_properties().size
    data = "Test2"   
    print("length of data is "+str(len(data)))
    print("This is a test123")
    filesize_previous = file_client.get_file_properties().size
    print("length of currentfile is "+str(filesize_previous))
    file_client.append_data(data,offset=filesize_previous,length=len(data))
    file_client.flush_data(filesize_previous+len(data))
except:
    file_client = directory_client.create_file(myfile)
    data = "Test2"   
    print("length of data is "+str(len(data)))
    print("This is a test")
    filesize_previous = 0
    print("length of currentfile is "+str(filesize_previous))
    file_client.append_data(data,length=len(data))
    file_client.flush_data(filesize_previous+len(data))

如果需要在本地开发azure函数,需要“azure函数核心工具”、“语言环境”、“VS Code和azure函数扩展”。

欲了解更多信息,请看这个:

https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local?tabs=windows%2Ccsharp%2Cbash

https://docs.microsoft.com/en-us/azure/azure-functions/functions-reference-python

https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=python

这是数据湖SDK的API参考(在这个网页中你可以找到基于python和azure与各种服务交互的所有方法):

https://docs.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.datalakeserviceclient?view=azure-python

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?