如何解决NetMQ多线程的最新示例
我正在尝试创建一个服务器/多个客户端NetMQ测试程序(我相信这应该使用Router-Dealer模式),该程序将允许服务器使用单独线程上的工作程序来处理唯一的客户端请求(并回复每个客户端分别),但是我找不到一个适用于当前nuget版本(撰写本文时为v4.0.1.5)的工作示例。
每个客户端仅应发送一个请求(由于是Dealer套接字,因此包含其ID),并从服务器获取其专用响应。服务器应具有线程池(IOCP样式),该线程池将消耗消息并发送回响应。
我在NetMQ / Samples github中找到了Multithreaded example,这似乎是我想要的东西,但是它使用了QueueDevice
来将客户与工作人员联系起来,并且该类似乎已经在某个时候删除:
var queue = new QueueDevice(
"tcp://localhost:5555","tcp://localhost:5556",DeviceMode.Threaded);
然后netmq.readthedocs.io上有一个Router-dealer example,它使用NetMQPoller
,但是由于它调用了不存在的RouterSocket.ReceiveMessage()
和{{1}而不会编译}方法,这些方法已被删除:
NetMQSocket.Receive(out bool hasmore)
也许这些很容易修复,但是我是NetMQ新手,我只想为C#工作获得一个合适的“多线程Hello World”。
NetMQ API与0MQ官方文档中的API稍有不同(C#更为惯用且使用新的低分配.NET构造),因此,我很难理解如何正确移植原始C示例。 / p>
有人可以为我提供这种情况的最新示例的资源,还是可以帮助我了解使用新的NetMQ API的正确方法是什么?
还是不建议使用该库,而我应该使用clrzmq4 .NET包装器?从我所看到的来看,NetMQ使用public static void Main(string[] args)
{
// NOTES
// 1. Use ThreadLocal<DealerSocket> where each thread has
// its own client DealerSocket to talk to server
// 2. Each thread can send using it own socket
// 3. Each thread socket is added to poller
const int delay = 3000; // millis
var clientSocketPerThread = new ThreadLocal<DealerSocket>();
using (var server = new RouterSocket("@tcp://127.0.0.1:5556"))
using (var poller = new NetMQPoller())
{
// Start some threads,each with its own DealerSocket
// to talk to the server socket. Creates lots of sockets,// but no nasty race conditions no shared state,each
// thread has its own socket,happy days.
for (int i = 0; i < 3; i++)
{
Task.Factory.StartNew(state =>
{
DealerSocket client = null;
if (!clientSocketPerThread.IsValueCreated)
{
client = new DealerSocket();
client.Options.Identity =
Encoding.Unicode.GetBytes(state.ToString());
client.Connect("tcp://127.0.0.1:5556");
client.ReceiveReady += Client_ReceiveReady;
clientSocketPerThread.Value = client;
poller.Add(client);
}
else
{
client = clientSocketPerThread.Value;
}
while (true)
{
var messagetoServer = new NetMQMessage();
messagetoServer.AppendEmptyFrame();
messagetoServer.Append(state.ToString());
PrintFrames("Client Sending",messagetoServer);
client.SendMultipartMessage(messagetoServer);
Thread.Sleep(delay);
}
},string.Format("client {0}",i),TaskCreationoptions.LongRunning);
}
// start the poller
poller.RunAsync();
// server loop
while (true)
{
var clientMessage = server.ReceiveMessage();
PrintFrames("Server receiving",clientMessage);
if (clientMessage.FrameCount == 3)
{
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server {1}",clientOriginalMessage,DateTime.Now.ToLongTimeString());
var messagetoClient = new NetMQMessage();
messagetoClient.Append(clientAddress);
messagetoClient.AppendEmptyFrame();
messagetoClient.Append(response);
server.SendMultipartMessage(messagetoClient);
}
}
}
}
static void PrintFrames(string operationType,NetMQMessage message)
{
for (int i = 0; i < message.FrameCount; i++)
{
Console.WriteLine("{0} Socket : Frame[{1}] = {2}",operationType,i,message[i].ConvertToString());
}
}
static void Client_ReceiveReady(object sender,NetMQSocketEventArgs e)
{
bool hasmore = false;
e.socket.Receive(out hasmore);
if (hasmore)
{
string result = e.socket.ReceiveFrameString(out hasmore);
Console.WriteLine("REPLY {0}",result);
}
}
并在代码库周围遍历ref结构,因此看来它应该是高性能的。
解决方法
我已经通过netmq.readthedocs.io解决了router-dealer example的几个问题,现在代码可以正常工作了。我将其发布在这里,也许会有用(也许@somdoron或其他人也可以签出并更新文档)。
该示例并未完全实现我想要的(拥有一批公平地获得工作的工人),我相信我必须实现单独的代理线程。
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
internal static class Program
{
public static void Main(string[] args)
{
InitLogger();
const int NumClients = 5;
Log($"App started with {NumClients} clients\r\n");
using (var server = new RouterSocket("@tcp://127.0.0.1:5556"))
using (var poller = new NetMQPoller())
{
// Start some threads,each with its own DealerSocket
// to talk to the server socket. Creates lots of sockets,// but no nasty race conditions no shared state,each
// thread has its own socket,happy days.
for (int i = 0; i < NumClients; i++)
{
var clientNo = i + 1;
Task.Factory.StartNew(async () =>
{
var rnd = new Random(31 + clientNo * 57);
var clientId = $"C{clientNo}";
DealerSocket client = new DealerSocket();
client.Options.Identity = Encoding.ASCII.GetBytes(clientId);
client.Connect("tcp://127.0.0.1:5556");
client.ReceiveReady += (sender,e) =>
{
var msg = e.Socket.ReceiveMultipartMessage(3);
var clientid = Encoding.ASCII.GetString(e.Socket.Options.Identity);
Log($"Client '{clientid}' received <- server",msg);
};
poller.Add(client);
while (true)
{
var messageToServer = new NetMQMessage();
messageToServer.Append(NetMQFrame.Empty);
messageToServer.Append($"Some data ({clientId}|{rnd.Next():X8})",Encoding.ASCII);
Log($"Client {clientId} sending -> server: ",messageToServer);
client.SendMultipartMessage(messageToServer);
await Task.Delay(3000);
}
},TaskCreationOptions.LongRunning);
}
// start the poller
poller.RunAsync();
// server loop
while (true)
{
var clientMessage = server.ReceiveMultipartMessage();
if (clientMessage.FrameCount < 1)
{
Log("Server received invalid frame count");
continue;
}
var clientid = clientMessage[0];
Log($"Server received <- {clientid.ConvertToString()}: ",clientMessage);
var msg = clientMessage.Last().ConvertToString(Encoding.ASCII);
string response = $"Replying to '{msg}'";
{
var messageToClient = new NetMQMessage();
messageToClient.Append(clientid);
messageToClient.Append(NetMQFrame.Empty);
messageToClient.Append(response,Encoding.ASCII);
Log($"Server sending -> {clientid.ConvertToString()}: {response}");
server.SendMultipartMessage(messageToClient);
}
}
}
}
#region Poor man's console logging
static BlockingCollection<string> _logQueue;
// log using a blocking collection
private static void InitLogger()
{
_logQueue = new BlockingCollection<string>();
Task.Factory.StartNew(() =>
{
foreach (var msg in _logQueue.GetConsumingEnumerable())
{
Console.WriteLine(msg);
}
});
}
// thread id,timestamp,message
static void Log(string msg)
{
var thid = Thread.CurrentThread.ManagedThreadId;
var time = GetTimestamp().ToString("HH:mm:ss.fff");
_logQueue.Add($"[T{thid:00}] {time}: {msg}");
}
// log all frames in a message
static void Log(string operationType,NetMQMessage message)
{
var frames = string.Join(",",message.Select((m,i) => $"F[{i}]={{{m.ConvertToString(Encoding.ASCII)}}}"));
Log($"{operationType} {message.FrameCount} frames: " + frames);
}
// if possible,use win10 high precision timestamps
static DateTime GetTimestamp()
{
if (Environment.OSVersion.Platform == PlatformID.Win32NT && Environment.OSVersion.Version.Major >= 10) // win 10
{
long fileTime;
WinApi.GetSystemTimePreciseAsFileTime(out fileTime);
return DateTime.FromFileTimeUtc(fileTime);
}
return DateTime.UtcNow;
}
static class WinApi
{
[DllImport("Kernel32.dll",CallingConvention = CallingConvention.Winapi)]
internal static extern void GetSystemTimePreciseAsFileTime(out long filetime);
}
#endregion
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。