如何解决PostQueuedCompletionStatus 和 GetQueuedCompletionStatus 之间有时会出现大延迟10 秒
这与之前的帖子有关 Recv Buffer for IO Completion port always empty
我已将 PostQueuedCompletionStatus 添加到我的代码中,以允许我在 IOCP 工作线程中触发消息。这在大多数情况下都有效,但偶尔我会看到 PostQueuedCompletionStatus 和 GetQueuedCompletionStatus 触发之间有 10 秒的延迟。
时间方面我已经检查过 GetQueuedCompletionStatus 正在等待,并且我没有在工作线程的另一部分关闭。我还尝试将工作线程的优先级和类增加到实时线程优先级范围内,这似乎没有任何明显的效果。
关于为什么我有时会在队列中延迟 10 秒有什么想法吗?(真的希望将其缩短到不到一秒)
networkhandlerthread.ccp
#include "NetworkHandlerThread.h"
// Worker thread,processes IOCP messages.
DWORD ServerWorkerThread(LPVOID lpParam)
{
HANDLE CompletionPort = (HANDLE)lpParam;
DWORD BytesTransferred = 0;
OVERLAPPED* lpOverlapped = NULL;
LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
LPPER_IO_OPERATION_DATA PerIoData = NULL;
DWORD Flags = 0;
WSABUF* DataBuf;
DWORD RecvBytes = 0;
Type1MessageParser Type1MsgParser;
Type2MessageParser Type2MsgParser;
int DestinationAddress = 0;
bool IsType1 = false;
while (TRUE)//run forever
{
//Check for new message
if (GetQueuedCompletionStatus(CompletionPort,&BytesTransferred,(PULONG_PTR)&ConnectedSocketData,(LPOVERLAPPED*)&PerIoData,INFINITE) == 0)
{
DWORD Err = GetLastError();
if (Err != WAIT_TIMEOUT)
{
printf("GetQueuedCompletionStatus() Failed with error %d\n",Err);
if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
{
printf("closesocket() Failed with error %d\n",WSAGetLastError());
return 0;
}
GlobalFree(ConnectedSocketData);
}
continue;
}
//We have a message,determine if it's something we receaved or something we should send.
if (PerIoData->OperationType == OPERATION_TYPE_RECV)
{
/// there is code here that processes the recvs Now
/// but i cut it out to save space. didn't seem to have anything to do with the
/// issue
ConnectedSocketData; //this is comming in good and has data
PerIoData->Buffer; // this is empty (pointer is good,but no data)
}
else if (PerIoData->OperationType == OPERATION_TYPE_SEND)
{
///same as above,there is code here to process sends,but cut to save space.
}
}
};
//Thread for handling Listener sockets and Accepting connections
DWORD ListenThread(LPVOID lpParam)
{
LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
WSANETWORKEVENTS NetworkEvents;
DWORD dwRet;
SOCKADDR_IN NewSockAddr;
SOCKET NewSocket;
int nLen;
while (true) //run forever
{
//Wait for event
dwRet = WSAWaitForMultipleEvents(1,&(pSocketData->hAcceptEvent),false,100,false);
//nothing happened,back to top
if (dwRet == WSA_WAIT_TIMEOUT)
continue;
//We got a event,find out which one.
int nRet = WSAEnumNetworkEvents(pSocketData->Socket,pSocketData->hAcceptEvent,&NetworkEvents);
if (nRet == SOCKET_ERROR)
{
wprintf(L"WSAEnumNetworkEvents error %ld\n",WSAGetLastError());
break;
}
//We got a Accept event
if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
{
//Check for errors
if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
{
// Accept new connection
nLen = sizeof(SOCKADDR_IN);
NewSocket = WSAAccept(pSocketData->Socket,(LPSOCKADDR)&NewSockAddr,&nLen,NULL,NULL);
if (NewSocket == SOCKET_ERROR)
{
wprintf(L"accept() error %ld\n",WSAGetLastError());
break;
}
wprintf(L"Accepted Connection %ld",NewSockAddr.sin_addr.S_un.S_addr);
//Set new connection as TCP connection,No Delay
//const char chOpt = 1;
//int nErr = setsockopt(NewSocket,IPPROTO_TCP,TCP_NODELAY,&chOpt,sizeof(char));
//if (nErr == -1)
//{
// wprintf(L"setsockopt() error %ld\n",WSAGetLastError());
// break;
//}
LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;
ZeroMemory(ConnectedSocketData,sizeof(CONNECTED_SOCKET_DATA));
ConnectedSocketData->Socket = NewSocket;
ConnectedSocketData->Port = pSocketData->Port;
ConnectedSocketData->IOCP = pSocketData->IOCP;
ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
ConnectedSocketData->ForwardMessager = pSocketData->ForwardMessager;
//Add the new socket to the completion port,message from the socker will be queued up for proccessing by worker threads.
if (CreateIoCompletionPort((HANDLE)NewSocket,pSocketData->IOCP,(DWORD_PTR)ConnectedSocketData,0) == NULL)
{
wprintf(L"CreateIOCompletionPort error %ld\n",WSAGetLastError());
delete ConnectedSocketData;
ConnectedSocketData = NULL;
closesocket(NewSocket);
break;
}
//Set the PerIOData,will be used at completion time
LPPER_IO_OPERATION_DATA PerIoData;
PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA));
ZeroMemory(&(PerIoData->overlapped),sizeof(OVERLAPPED));
PerIoData->BufferLen = 0;
PerIoData->OperationType = OPERATION_TYPE_RECV;
DWORD RecvBytes = 0;
DWORD Flags = 0;
PerIoData->Buffer.buf = PerIoData->cBuffer;
PerIoData->Buffer.len = DATA_BUFSIZE;
//Kick off the first Recv request for the Socket,will be handled by the completion Queue.
if (WSARecv(NewSocket,&(PerIoData->Buffer),1,&RecvBytes,&Flags,&(PerIoData->overlapped),NULL) == SOCKET_ERROR)
{
wprintf(L"WSARecv error %ld\n",WSAGetLastError());
return 0;
}
}
else
{
wprintf(L"UnkNown network event error %ld\n",WSAGetLastError());
break;
}
}
}
}
NetworkHandlerThread::NetworkHandlerThread()
{
m_CompletionPort = 0;
m_hListenThread = 0;
}
NetworkHandlerThread::~NetworkHandlerThread()
{
}
void NetworkHandlerThread::StartNetworkHandler()
{
int iResult = 0;
SYstem_INFO SystemInfo;
unsigned int i = 0;
//Start WSA
iResult = WSAStartup(MAKEWORD(2,2),&wsaData);
if (iResult != NO_ERROR) {
wprintf(L"WSAStartup() Failed with error: %d\n",iResult);
return;
}
//Start Completion Port
m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,0);
if (m_CompletionPort != NULL)
{
wprintf(L"Completion Port Created\n");
}
//Get # of system processors
GetSystemInfo(&SystemInfo);
//create Worker Threads for each processor.
for (i = 0; i < SystemInfo.dwNumberOfProcessors * THREADS_PER_PROCESSOR; i++)
{
HANDLE ThreadHandle;
// Create a server worker thread,and pass the
// completion port to the thread.
ThreadHandle = CreateThread(NULL,ServerWorkerThread,m_CompletionPort,NULL);
// Close the thread handle
if (ThreadHandle != NULL)
{
CloseHandle(ThreadHandle);
}
}
}
void NetworkHandlerThread::AddListenThread(int Port,ConfigHandler* pConfigHandle,void* ForwardHandle)
{
SOCKADDR_IN InternetAddr;
int iResult = 0;
LPLISTEN_SOCKET_DATA pListenSocketData = new LISTEN_SOCKET_DATA;
if (pListenSocketData == NULL)
{
return;
}
//Create the listener Socket
pListenSocketData->Socket = WSASocket(AF_INET,SOCK_STREAM,WSA_FLAG_OVERLAPPED);
if (pListenSocketData->Socket == INVALID_SOCKET)
{
wprintf(L"socket function Failed with error: %ld\n",WSAGetLastError());
WSACleanup();
return;
}
// Create a Event to handle Socket Accepts
pListenSocketData->hAcceptEvent = WSACreateEvent();
if (pListenSocketData->hAcceptEvent == WSA_INVALID_EVENT)
{
wprintf(L"WSACreateEvent() error %ld\n",WSAGetLastError());
closesocket(pListenSocketData->Socket);
return;
}
// Set the Event to Trigger on FD_ACCEPT (this occurs on socket connection attempts)
int nRet = WSAEventSelect(pListenSocketData->Socket,pListenSocketData->hAcceptEvent,FD_ACCEPT);
if (nRet == SOCKET_ERROR)
{
wprintf(L"WSAAsyncSelect() error %ld\n",WSAGetLastError());
closesocket(pListenSocketData->Socket);
return;
}
//Assign the Port Number
InternetAddr.sin_family = AF_INET;
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
InternetAddr.sin_port = htons(Port);
pListenSocketData->Port = Port;
pListenSocketData->IOCP = m_CompletionPort;
pListenSocketData->CfgHandle = pConfigHandle;
pListenSocketData->ForwardMessager = ForwardHandle;
//Bind the Socket to the Port
iResult = ::bind((pListenSocketData->Socket),(sockaddr*)&InternetAddr,sizeof(InternetAddr));
if (iResult == SOCKET_ERROR) {
wprintf(L"bind function Failed with error %d\n",WSAGetLastError());
iResult = closesocket(pListenSocketData->Socket);
if (iResult == SOCKET_ERROR)
wprintf(L"closesocket function Failed with error %d\n",WSAGetLastError());
WSACleanup();
return;
}
//Listen for incoming connection requests.
if (listen(pListenSocketData->Socket,SOMAXCONN) == SOCKET_ERROR)
{
wprintf(L"listen function Failed with error: %d\n",WSAGetLastError());
closesocket(pListenSocketData->Socket);
WSACleanup();
return;
}
wprintf(L"Listening on %ld",Port);
m_hListenThread = (HANDLE)CreateThread(NULL,// Security
0,// Stack size - use default
ListenThread,// Thread fn entry point
(void*)pListenSocketData,//Listen Socket Data
0,// Init flag
NULL); // Thread address
}
NetworkHandlerThread.h
#pragma once
#include <WinSock2.h>
#include <ws2tcpip.h>
#include <stdio.h>
#include "ForwardMessageHandler.h"
#include "ConfigHandler.h"
#include "Type1MessageParser.h"
#include "Type2Message-Parser.h"
#include "ThreadUtilities.h"
#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2
class NetworkHandlerThread
{
public:
WSADATA wsaData;
HANDLE m_CompletionPort;
HANDLE m_hListenThread;
public:
NetworkHandlerThread();
~NetworkHandlerThread();
void StartNetworkHandler();
void AddListenThread(int Port,void* ForwardHandle);
};
线程实用程序.h
#pragma once
#include <mutex>
#include "ConfigHandler.h"
using namespace std;
#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2
typedef struct _THREAD_MESSAGE
{
mutex cmd_mtx;
string command;
} THREAD_MESSAGE,* LPTHREAD_MESSAGE;
typedef struct _LISTEN_SOCKET_DATA
{
SOCKET Socket;
int Port;
HANDLE hAcceptEvent;
HANDLE IOCP;
VOID* ForwardMessager;
ConfigHandler* CfgHandle;
// Other information useful to be associated with the handle
} LISTEN_SOCKET_DATA,* LPLISTEN_SOCKET_DATA;
typedef struct _CONNECTED_SOCKET_DATA
{
SOCKET Socket;
int Port;
HANDLE IOCP;
VOID* ForwardMessager;
ConfigHandler* CfgHandle;
} CONNECTED_SOCKET_DATA,* LPCONNECTED_SOCKET_DATA;
#define OPERATION_TYPE_UNKNowN 0
#define OPERATION_TYPE_SEND 1
#define OPERATION_TYPE_RECV 2
typedef struct PER_IO_OPERATION_DATA
{
OVERLAPPED overlapped;
WSABUF Buffer;
char cBuffer[DATA_BUFSIZE];
int BufferLen;
int OperationType;
string PacketName;
};
#define LPPER_IO_OPERATION_DATA PER_IO_OPERATION_DATA
DataRecver.cpp CompletionPort 和 ConnectedSocketData 在工作线程 RECV 中发送,存储关闭并在调用 SendMessage 时使用。
void SendMessage(string DataRecved,string PacketName,HANDLE CompletionPort,LPCONNECTED_SOCKET_DATA ConnectedSocketData)
{
WSABUF TempBuf;
LPPER_IO_OPERATION_DATA PerIOOperationData = new PER_IO_OPERATION_DATA;
ZeroMemory(&(PerIOOperationData->overlapped),sizeof(OVERLAPPED));
PerIOOperationData->BufferLen = DataRecved.length();
PerIOOperationData->OperationType = OPERATION_TYPE_SEND;
DataRecved.copy(PerIOOperationData->cBuffer,DataRecved.length(),0);
PacketName.copy(PerIOOperationData->PacketName,PacketName.length(),0);
PostQueuedCompletionStatus(CompletionPort,reinterpret_cast<ULONG_PTR>
(ConnectedSocketData),&(PerIOOperationData->overlapped));
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。