如何解决当我尝试处理轮询器时 NetMQ 卡住了请求-回复模式
这是我第一个使用 NetMQ (ZMQ) 框架的项目,所以,可能我没有完全理解如何使用它。
我创建了一个包含两个应用程序的 Windows 窗体项目,一个向另一个发送“ping”并接收“pong”作为答案。该协议并不复杂,并使用请求-回复模式,所有命令都有一个标识目标的第一部分,如“查询”或“通知”,以及包含命令或消息本身的第二部分。在这种情况下,一个应用发送一个“query-ping”,另一个应用发送“inform-pong”。
我创建了一个类来封装所有的脏作业,所以主窗体可以非常简单地使用协议。一切正常,但是当我尝试关闭应用程序时,它卡在轮询器中并且应用程序永远不会关闭。在 Visual Studio 中,我可以看到暂停和停止按钮,但没有出现任何异常或错误:
当我单击暂停按钮时,我收到此消息(应用程序处于中断模式):
如果我点击“继续执行”,应用会回到相同的状态并且永远不会关闭。
如果我删除轮询器,应用程序会正常关闭,但当然,轮询器不起作用,应用程序也不再应答。
这是来自 Form1 的代码:
using CommomLib;
using System;
using System.Windows.Forms;
namespace Test1
{
public partial class Form1 : Form
{
ZmqCommunication zmqComm = new ZmqCommunication();
int portNumber;
string status;
public Form1()
{
InitializeComponent();
InitializeZmqComm();
}
public void InitializeZmqComm()
{
// Calls the ZMQ initialization.
portNumber = zmqComm.InitializeComm();
if (portNumber == 0)
status = "Ini error";
else
status = "Ini ok";
}
// Executes a ping command.
private void button1_Click(object sender,EventArgs e)
{
richTextBox1.Clear();
richTextBox1.AppendText(zmqComm.RequestPing(55001) + "\n");
}
}
}
这是来自我的 NetMQ 类的代码。它在一个单独的库项目中。在我的 dispose 方法中,我尝试了 Remove、dispose 和 StopAsync 的所有组合,但没有任何效果:
using NetMQ;
using NetMQ.sockets;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
namespace CommomLib
{
public class ZmqCommunication
{
ResponseSocket serverComm = new ResponseSocket();
NetMQPoller serverPoller;
int _portNumber;
public ZmqCommunication()
{
_portNumber = 55000;
}
// Problem here! The serverPoller gets stuck.
public void dispose()
{
//serverPoller.RemoveAnddispose(serverComm);
//serverComm.dispose();
if (serverPoller.Isdisposed)
Debug.WriteLine("A");
//serverPoller.RemoveAnddispose(serverComm);
serverPoller.Remove(serverComm);
//serverPoller.StopAsync();
serverPoller.dispose();
serverComm.dispose();
if (serverPoller.Isdisposed)
Debug.WriteLine("B");
Thread.Sleep(500);
if (serverPoller.Isdisposed)
Debug.WriteLine("C");
Thread.Sleep(500);
if (serverPoller.Isdisposed)
Debug.WriteLine("D");
}
// ZMQ initialization.
public int InitializeComm()
{
bool ok = true;
bool tryAgain = true;
// Looks for a port.
while (tryAgain && ok)
{
try
{
serverComm.Bind("tcp://127.0.0.1:" + _portNumber);
tryAgain = false;
}
catch (NetMQ.AddressAlreadyInUseException)
{
_portNumber++;
tryAgain = true;
}
catch
{
ok = false;
}
}
if (!ok)
return 0; // Error.
// Set up the pooler.
serverPoller = new NetMQPoller { serverComm };
serverComm.ReceiveReady += (s,a) =>
{
RequestInterpreter();
};
// start polling (on this thread)
serverPoller.RunAsync();
return _portNumber;
}
// Message interpreter.
private void RequestInterpreter()
{
List<string> message = new List<string>();
if (serverComm.TryReceiveMultipartStrings(ref message,2))
{
if (message[0].Contains("query"))
{
// Received the command "ping" and answers with a "pong".
if (message[1].Contains("ping"))
{
serverComm.SendMoreFrame("inform").SendFrame("pong");
}
}
}
}
// Send the command "ping".
public string RequestPing(int port)
{
using (var requester = new RequestSocket())
{
Debug.WriteLine("Running request port {0}",port);
requester.Connect("tcp://127.0.0.1:" + port);
List<string> msgResp = new List<string>();
requester.SendMoreFrame("query").SendFrame("ping");
if (requester.TryReceiveMultipartStrings(new TimeSpan(0,10),ref msgResp,2))
{
if (msgResp[0].Contains("inform"))
{
return msgResp[1];
}
}
}
return "error";
}
}
}
解决方法
你可以尝试调用 NetMQConfig.Cleanup();在窗口关闭事件中?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。