微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 Go 中使用 SASL EXTERNAL 使用 qpid-proton 客户端库连接到 AMQP 1.0 RabbitMQ

如何解决在 Go 中使用 SASL EXTERNAL 使用 qpid-proton 客户端库连接到 AMQP 1.0 RabbitMQ

我正在尝试使用 https://github.com/apache/qpid-proton 提供的 golang 实现,通过 SASL EXTERNAL 机制通过自签名证书提供的身份验证与 RabbitMQ 建立 TLS 连接。目标是无需在 URI 中指定用户名和密码即可连接到 RabbitMQ。

RabbitMQ 使用以下配置运行:

      auth_mechanisms.1 = EXTERNAL
      auth_mechanisms.2 = PLAIN
      auth_mechanisms.3 = AMQPLAIN

插件

  • rabbitmq_amqp1_0
  • rabbitmq_auth_mechanism_ssl

我已确认我能够使用 Node.js 库 (https://github.com/amqp/rhea) 连接 SASL EXTERNAL,并且我已确认与 PLAIN 和 ANONYMOUS 的连接可与 qpid-proton 库中的 Go 一起使用,但已被无法通过 Go 连接 SASL EXTERNAL。

我的客户端代码没有返回任何错误,但是 RabbitMQ 错误日志告诉我客户端关闭了 TCP 连接

2021-06-24 18:57:22.029 [info] <0.16358.106> accepting AMQP connection <0.16358.106> (127.0.0.1:50610 -> 127.0.0.1:5671)
2021-06-24 18:57:23.030 [warning] <0.16358.106> closing AMQP connection <0.16358.106> (127.0.0.1:50610 -> 127.0.0.1:5671):
client unexpectedly closed TCP connection

我的客户端代码如下:

package main

import (
        "fmt"
        "github.com/apache/qpid-proton/go/pkg/amqp"
        "github.com/apache/qpid-proton/go/pkg/electron"
        "os"
        "crypto/tls"
        "io/IoUtil"
        "crypto/x509"
        "time"
)

func main() {
        keyPair,err := tls.LoadX509KeyPair("client.crt","client.key")

        if err != nil {
                fmt.Println("Failed to load certificate:",err)
                os.Exit(1)
        }

        rootCa,err := IoUtil.ReadFile("rootCA.crt")
        if err != nil {
                fmt.Println("Failed to read root CA:",err)
                os.Exit(1)
        }
        certPool := x509.NewCertPool()
        certPool.AppendCertsFromPEM(rootCa)

        tlsConfig := &tls.Config{
                RootCAs: certPool,InsecureSkipVerify: true,Certificates: []tls.Certificate{keyPair},}

        container := electron.NewContainer("myContainer")

        tlsConn,err := tls.Dial("tcp","rabbitmq.default.svc.cluster.local:5671",tlsConfig)
        if err != nil {
                fmt.Println("Failed to open TLS connection:",err)
                os.Exit(1)
        }
        defer tlsConn.Close()

        conn,err := container.Connection(
                tlsConn,electron.SASLEnable(),electron.SASLAllowedMechs("EXTERNAL"),)
        defer conn.Close(err)

        if err != nil {
                fmt.Println("Failed to open AMQP connection",err)
                os.Exit(1)
        }

        sess,err := conn.Session()

        sender,err := sess.Sender(electron.Target("demo-queue"))

        if err != nil {
                fmt.Println("Creating sender Failed:",err)
                os.Exit(1)
        }

        for i := int64(0); i < 100000 ; i++ {
                msg := amqp.NewMessage()
                body := fmt.Sprintf("Test message %d",i)
                msg.Marshal(body)
                sender.SendSync(msg)
                time.Sleep(1*time.Second)
        }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。