如何解决此代码的某些部分是否会导致 NetMQ 抛出 SocketException?
我为分布式应用程序创建了一个基于 NetMQ 的简单通信系统。我有一个 Hub
,多个 Node
可以通过它传递消息,还有一个 Bus
,多个 BusNode
可以用于发布/订阅。
我遇到了一个很少出现的问题,我希望我今天已经解决了。应用程序每周只会崩溃几次,并且在 Windows 事件日志中发布了一个异常。我显然无法弄清楚我的来源的哪一部分导致了这种情况。到目前为止,我还无法在我的开发机器上重现该问题。抱歉,我只有这个截图,没有文字。
我认为解决方案是使用 NetMQQueue,因为事实证明消息是从多个线程发送的,而我认为不是。所以我在 Node
和 BusNode
中实现了 NetMQQueue 的使用,这很简单明了。
但是,还有两个地方我不确定东西是否是线程安全的,或者我需要修复一些东西。即在 Hub
和 Bus
中。这就是两人的样子。
我的问题是:这两个是理智的,还是线程问题在这里也是一个危险?
中心
type Hub(connectionStrings: string list) =
let poller = new NetMQPoller()
let router = new RouterSocket()
do
router.ReceiveReady.Add (fun x ->
let message = router.ReceiveMultipartMessage()
if message.FrameCount = 4 then
// incoming is 0:source 1:destination 2:timestamp 3:msg
let m = NetMQMessage()
m.Append message.[1] // destination (for routing - will be removed by router)
m.Append message.[0] // source
m.Append message.[1] // destination
m.Append message.[2] // timestamp
m.Append message.[3] // msg
router.SendMultipartMessage m
else
log.Warning "Unexpected frame size in Hub."
)
connectionStrings |> List.iter router.Bind
poller.Add router
poller.RunAsync()
member _.Stop() =
poller.Stop()
interface Idisposable with
member _.dispose() =
if not poller.Isdisposed then
poller.Stop()
poller.RemoveAnddispose router
dispose poller
公交车
type Bus(xpubs: string list,xsubs: string list) =
let forward (inSocket: NetMQSocket) (outSocket: NetMQSocket) =
let mutable more = inSocket.HasIn
while more do
let mutable msg = new Msg()
msg.InitEmpty()
inSocket.Receive &msg
more <- msg.HasMore
outSocket.Send (&msg,more)
let poller = new NetMQPoller()
let xpub = new XPublisherSocket()
let xsub = new XSubscriberSocket()
do
xpubs |> List.iter xpub.Bind
xsubs |> List.iter xsub.Bind
xpub.ReceiveReady.Add (fun x -> forward xpub xsub)
xsub.ReceiveReady.Add (fun x -> forward xsub xpub)
poller.Add xpub
poller.Add xsub
poller.RunAsync()
member _.Stop() =
poller.Stop()
interface Idisposable with
member _.dispose() =
if not poller.Isdisposed then
poller.Stop()
poller.RemoveAnddispose xpub
poller.RemoveAnddispose xsub
dispose poller
为了完整起见,我还向您展示了 Node 和 BusNode,它们现在使用 NetMQQueue,我认为至少在这里可以解决问题。
节点
type Node(nodeId: NodeId,connectionString: string) =
let receiveEvent = new Event<NodeReceiveEventArgs>()
let queue = new NetMQQueue<NetMQMessage>(0)
let dealer = new DealerSocket()
let poller = new NetMQPoller()
do
queue.ReceiveReady.Add (fun x ->
let mutable isDone = false
while not isDone do
let mutable m: NetMQMessage = null
if x.Queue.TryDequeue(&m,TimeSpan.Zero) then
dealer.SendMultipartMessage m
else
isDone <- true
)
dealer.Options.Identity <- NodeId.crack nodeId
dealer.ReceiveReady.Add (fun x ->
let m = dealer.ReceiveMultipartMessage()
match messagetoArgs m with
| Ok args ->
// log.Debug $"Rcv ({args.source.asText} → {nodeId.asText}) {Misc.getDuCaseName args.Message}"
receiveEvent.Trigger args
| Error s -> log.Warning $"Rcv : {s}"
)
dealer.Connect connectionString
poller.Add queue
poller.Add dealer
poller.RunAsync()
interface Idisposable with
member _.dispose() =
if not poller.Isdisposed then
poller.Stop()
poller.RemoveAnddispose dealer
poller.RemoveAnddispose queue
dispose poller
member _.OnReceive = receiveEvent.Publish
member _.Send(destination: NodeId,message: IMessage) =
// log.Debug $"Snd ({nodeId.asText} → {destination.asText}) {Misc.getDuCaseName message}"
let m = createMqMessage destination DateTime.Now message
queue.Enqueue m
总线节点
type BusNode(nodeId: NodeId,xpub: string,xsub: string) =
let receiveEvent = new Event<BusNodeReceiveEventArgs>()
let queue = new NetMQQueue<NetMQMessage>(0)
let pub = new PublisherSocket()
let sub = new SubscriberSocket()
let poller = new NetMQPoller()
do
queue.ReceiveReady.Add (fun x ->
let mutable isDone = false
while not isDone do
let mutable m: NetMQMessage = null
if x.Queue.TryDequeue(&m,TimeSpan.Zero) then
pub.SendMultipartMessage m
else
isDone <- true
)
pub.Options.Identity <- NodeId.crack nodeId
pub.Connect xsub
sub.ReceiveReady.Add (fun x ->
let m = sub.ReceiveMultipartMessage()
if m.FrameCount = 4 then
let topic = m.[0].ConvertToString(Text.Encoding.ASCII) |> Topic.create
let source: NodeId = m.[1].ToByteArray() |> NodeId.pack
let timeStamp: DateTime = m.[2].ToByteArray() |> ZeroSerialization.deserializeDateTime
let broadcast = m.[3].ToByteArray() |> ZeroSerialization.deserializebroadcast
let args = BusNodeReceiveEventArgs(topic,source,nodeId,timeStamp,broadcast)
log.Debug $"Bus ({args.source.asText} → {args.Destination.asText}) [{topic.asText}] {Misc.getDuCaseName broadcast}"
receiveEvent.Trigger args
else
log.Warning $"Bus ({nodeId.asText}) : Unexpected frame size."
)
sub.Connect xpub
poller.Add queue
poller.Add sub
poller.RunAsync()
interface Idisposable with
member _.dispose() =
if not poller.Isdisposed then
poller.Stop()
poller.RemoveAnddispose sub
poller.RemoveAnddispose pub
poller.RemoveAnddispose queue
dispose poller
()
interface IBusNode with
member _.OnReceive = receiveEvent.Publish
member _.Publish(topic: Topic,broadcast: Ibroadcast) =
let m = NetMQMessage()
m.Append (Topic.crack topic)
m.Append (NodeId.crack nodeId)
m.Append (ZeroSerialization.serializeDateTime DateTime.Now)
m.Append (ZeroSerialization.serializebroadcast broadcast)
log.Debug $"Bus ({nodeId.asText} →) {Misc.getDuCaseName broadcast}"
queue.Enqueue m
()
member _.Subscribe (topic: Topic) =
log.Debug $"Bus ({nodeId.asText}) subscribe [{topic.asText}]"
Topic.crack topic |> sub.Subscribe
member _.SubscribeAll () =
log.Debug $"Bus ({nodeId.asText}) subscribe all"
sub.SubscribetoAnyTopic ()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。