如何解决mangos的问题——Golang包提供的nanomsg总线协议
我想用nanomsg
/nng
作为完全分布式对等多节点网络的通信基础,帮助构建拓扑发现和维护的动态能力。现在我陷入了它的 Golang 包 mangos
。
在python和pynng(nanomsg的python绑定)中做了同样的工作,但是当我使用go并通过mangos调用相应的方法时,它们的行为完全不同。谜题主要有三方面:
OptionRecvDeadline 是下一次 Recv 超时之前的时间。该值是一个时间。持续时间。可以传递零值以指示不应应用超时。负值表示非阻塞操作。默认情况下没有超时。
我相应地尝试了负值,但 Recv()
仍然阻塞。我还应该做什么?以及如何理解“零超时”和“非阻塞”的区别?
-
dialer
返回的(s *socket) NewDialer(...)
似乎在调用dialer.Close()
后仍然存在,因为调用下一个dialer.Dial()
时会发生错误,报告它仍然是“正在使用的地址” .但是当我再次尝试Close()
dialer
时,发生错误并报告它已经关闭。我还尝试了以下选项的不同组合,但所有尝试都失败了
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true // or false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond // or zero
opts[mangos.OptionKeepAliveTime] = time.Millisecond // or even smaller
opts[mangos.OptionKeepAlive] = false // or true
如果我想彻底杀死拨号器,或者想在一段时间后重新使用“伪封闭”拨号器,我该怎么办?
- bus-type-Socket 的
Send()
很奇怪。通常每个节点都应该定期在我的代码中发送一条消息。我从网络中关闭了一个节点(比如“Node-X”)的物理连接,让它离线一段时间,然后重新连接到网络。我发现 Node-X 在重新连接时会立即重新发送大量消息。但我真正期望的是,即使 Node-X 没有邻居,它也可以将这些消息发送到空中。
不知道有没有什么办法可以解决这些问题。我想它可能缺少一些选项或配置,但我没能弄明白。
package main
import (
"fmt"
"os"
"time"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/bus"
// register transports
_ "go.nanomsg.org/mangos/v3/transport/all"
)
var (
sock mangos.socket
DialerMap map[string]*mangos.Dialer
opts map[string]interface{}
)
func main() {
var err error
opts = make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true
opts[mangos.OptionMaxReconnectTime] = time.Millisecond
// opts[mangos.OptionKeepAliveTime] = time.Millisecond
opts[mangos.OptionKeepAlive] = false
DialerMap = make(map[string]*mangos.Dialer)
if sock,err = bus.NewSocket(); err != nil {
fmt.Println("bus.NewSocket error. ",err)
os.Exit(1)
}
TargetUUID := "node-A"
TargetAddr := "tcp://192.168.0.172:60000" // this should be changed to a available address
MyDial(TargetUUID,TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID,TargetAddr)
time.Sleep(time.Second * 2)
MyDial(TargetUUID,TargetAddr)
time.Sleep(100 * time.Second)
}
func MyDial(TargetUUID string,TargetAddr string) (mangos.Dialer,error) {
_,is_exist := DialerMap[TargetUUID]
var err error
var dialer mangos.Dialer
if !is_exist {
dialer,err = sock.NewDialer(TargetAddr,opts)
if err != nil {
} else {
DialerMap[TargetUUID] = &dialer
}
}
dialer = *DialerMap[TargetUUID]
err = dialer.Dial()
if err != nil {
fmt.Println("Dialer fails to dial()",err)
} else {
fmt.Println("Dialer succeeds to dial()")
}
return dialer,err
}
func MyClose(TargetUUID string,TargetAddr string) {
dialerAddr,is_exist := DialerMap[TargetUUID]
if !is_exist {
fmt.Println("Dialer does not exist")
}
dialer := *dialerAddr
err := dialer.Close()
if err != nil {
fmt.Println("dialer fails to close.",err)
} else {
fmt.Println("dialer succeeds to close")
}
}
和控制台输出是
Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed
解决方法
对于此类问题,我通常不会监控 stackoverflow 或 reddit——我们确实有一个不和谐频道(来自 mangos 和 NNG 主页的链接)以及一个邮件列表。
话虽如此,让我看看能不能帮上忙(我是 NNG 和 mangos 的作者):
- 总线支持OptionRecvDeadline。但是,您是正确的,它不支持具有负值的非阻塞模式,相反,负值被视为零,并充当阻塞。这是一个文档错误。要实现逻辑非阻塞,请使用值“1”,这意味着一纳秒,这在逻辑上等同于非阻塞,尽管粒度可能受到调度程序延迟的限制。 (在这种情况下,它就像执行“go close(channel);
我会看看如何修复文档。
-
在拨号器上调用 Close() 是正确的做法。它会一直存在,直到管道关闭,它会自动关闭。您使用非常的重拨时间可能会混淆这一点——老实说,我没有考虑过微小的重拨时间——通常这样做是不好的形式,因为它意味着如果对等点不可用,您的代码将在试图重新连接的处理器上剧烈旋转。我通常建议至少 10 毫秒的重试间隔上限。 (mangos.OptionMaxReconnectTime)
-
我认为您看到了排队的效果,但我不能 100% 确定 - 我需要查看一个测试用例来重现这一点。总线协议肯定是尽力而为的传递,如果没有连接的对等点,则消息会被丢弃在地板上。 (只是重新检查了一下。)
感谢@Garrett D'Amore
的回复,我现在可以用另一种方式解决我的问题,而且我(作为一个对底层通信层知之甚少的新 Golang 粉丝)很抱歉用这样一个基本而愚蠢的问题来打扰您。
问题(1)作者很好地回答了。
问题(3)可能与问题(2)相结合,因为作者将机制描述如下,从而消除了发送缓冲累积的可能性。
当然,总线协议是尽力而为的传递,如果没有连接的对等点,则消息会被丢弃在地板上。 (只是重新检查了一下。)
问题(2),我第一次尝试将mangos.OptionMaxReconnectTime
设置为100 ms
,但问题依然存在。第二次,我尝试了各种options
组合来配置socket和拨号器,但都失败了。
最后,既然作者指出
在拨号器上调用 Close() 是正确的做法。它会一直存在,直到管道关闭,它会自动关闭。您使用非常短的重拨时间可能会混淆这一点。
我转向另一种关闭旧拨号器的方法,即明确关闭它拥有的所有管道。为了实现这一点,可以定义一个回调处理程序,如
var pipe_c chan
func callbackHandler(event mangos.PipeEvent,pipe mangos.Pipe) {
pAddr := &pipe
pipe_c <- pAddr
}
然后将 callbackHandler 附加到套接字
sock.SetPipeEventHook(callbackHandler)
通过这样做,用户可以获得(私有变量)管道。当一个人想关闭拨号连接时,他或她可以这样做
dialer.Close() // try best to close a dialer automatically
for pAddr,num := range pipeSet {
(*pAddr).Close() // explicitly close all the pipes of the dialer
}
然后将“伪封闭”拨号器放在一边。当您想再次连接到远程地址时,可以创建并使用一个新的拨号程序。
我不知道旧的“伪封闭”拨号器是否会累积在内存中。但这已经是我能找到的唯一解决方案了。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。