如何解决当我使用客户端模拟器测试 IOCP Socket Worker 线程时,它的工作速度太慢
我正在开发一个带有 iocp 套接字的 mmorpg。当我使用我的客户端模拟器进行测试时,在 70-80 次连接之后,套接字工作线程上的写入操作比用户延迟还慢。
这是我的工作线程;
typedef void(*OperationHandler)(Socket * s,uint32 len);
void HandleReadComplete(Socket * s,uint32 len);
void HandleWriteComplete(Socket * s,uint32 len);
void HandleShutdown(Socket * s,uint32 len);
static OperationHandler ophandlers[] =
{
&HandleReadComplete,&HandleWriteComplete,&HandleShutdown
};
uint32 THREADCALL SocketMgr::SocketWorkerThread(void * lpParam){
SocketMgr *socketMgr = (SocketMgr *)lpParam;
HANDLE cp = socketMgr->GetCompletionPort();
DWORD len;
Socket * s = nullptr;
OverlappedStruct * ov = nullptr;
LPOVERLAPPED ol_ptr;
while (socketMgr->m_bWorkerThreadsActive)
{
if (!GetQueuedCompletionStatus(cp,&len,(LPDWORD)&s,&ol_ptr,INFINITE))
{
if (s != nullptr)
s->disconnect();
continue;
}
ov = CONTAINING_RECORD(ol_ptr,OverlappedStruct,m_overlap);
if (ov->m_event == SOCKET_IO_THREAD_SHUTDOWN)
{
delete ov;
return 0;
}
if (ov->m_event < NUM_SOCKET_IO_EVENTS)
ophandlers[ov->m_event](s,len);
}
return 0;}
这是写入完成事件处理程序;
void HandleWriteComplete(Socket * s,uint32 len)
{
if (s->IsDeleted()) {
return;
}
s->m_writeEvent.Unmark();
s->BurstBegin(); // Lock
s->GetWriteBuffer().Remove(len);
TRACE("SOCK = %d removed = %d",s->GetFd(),len);
if (s->GetWriteBuffer().GetContiguousBytes() > 0) {
s->WriteCallback();
}
else {
s->DecSendLock();
}
s->BurstEnd(); // Unlock
}
WriteCallBack 函数;
void Socket::WriteCallback()
{
if (IsDeleted() || !IsConnected()) {
return;
}
// We don't want any writes going on while this is happening.
Guard lock(m_writeMutex);
if(writeBuffer.GetContiguousBytes())
{
DWORD w_length = 0;
DWORD flags = 0;
// attempt to push all the data out in a non-blocking fashion.
WSABUF buf;
buf.len = (ULONG)writeBuffer.GetContiguousBytes();
buf.buf = (char*)writeBuffer.GetBufferStart();
m_writeEvent.Mark();
m_writeEvent.Reset(SOCKET_IO_EVENT_WRITE_END);
TRACE("\n SOCK = %d aslında giden = %X THREADID = %d",GetFd(),buf.buf[4],GetCurrentThreadId());
int r = WSASend(m_fd,&buf,1,&w_length,flags,&m_writeEvent.m_overlap,0);
if (r == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
m_writeEvent.Unmark();
DecSendLock();
disconnect();
}
}
else
{
// Write operation is completed.
DecSendLock();
}
}
这是关于互斥锁的东西;
public:
/* Atomic wrapper functions for increasing read/write locks */
INLINE void IncSendLock() { Guard lock(m_writeMutex); ++m_writeLock; }
INLINE void DecSendLock() { Guard lock(m_writeMutex);
--m_writeLock; }
INLINE bool HasSendLock() {Guard lock(m_writeMutex); return (m_writeLock != 0); }
INLINE bool AcquireSendLock()
{
Guard lock(m_writeMutex);
if (m_writeLock != 0)
return false;
++m_writeLock;
return true;
}
private:
// Write lock,stops multiple write events from being posted.
uint32 m_writeLock;
std::recursive_mutex m_writeLockMutex;
这是读取事件处理程序;
void HandleReadComplete(Socket * s,uint32 len)
{
if (s->IsDeleted())
return;
s->m_readEvent.Unmark();
if (len)
{
s->GetReadBuffer().IncrementWritten(len);
s->OnRead();
s->SetupReadEvent();
}
else
{
// s->Delete(); // Queue deletion.
s->disconnect();
}
}
OnRead 函数;
void KOSocket::OnRead()
{
Packet pkt;
for (;;)
{
if (m_remaining == 0)
{
if (GetReadBuffer().GetSize() < 5) {
//TRACE("pkt returnzzz GetFd %d",GetFd());
return; //check for opcode as well
}
uint16 header = 0;
GetReadBuffer().Read(&header,2);
//printf("header : %X",header);//derle at k
if (header != 0x55AA)
{
TRACE("%s: Got packet without header 0x55AA,got 0x%X\n",GetRemoteIP().c_str(),header);
goto error_handler;
}
GetReadBuffer().Read(&m_remaining,2);
if (m_remaining == 0)
{
TRACE("%s: Got packet without an opcode,this should never happen.\n",GetRemoteIP().c_str());
goto error_handler;
}
}
if (m_remaining > GetReadBuffer().GetAllocatedSize())
{
TRACE("%s: Packet received which was %u bytes in size,maximum of %u.\n",m_remaining,GetReadBuffer().GetAllocatedSize());
goto error_handler;
}
if (m_remaining > GetReadBuffer().GetSize())
{
if (m_readTries > 4)
{
TRACE("%s: packet fragmentation count is over 4,disconnecting as they're probably up to something bad\n",GetRemoteIP().c_str());
goto error_handler;
}
m_readTries++;
return;
}
uint8 *in_stream = new uint8[m_remaining];
m_readTries = 0;
GetReadBuffer().Read(in_stream,m_remaining);
uint16 footer = 0;
GetReadBuffer().Read(&footer,2);
if (footer != 0xAA55
|| !DecryptPacket(in_stream,pkt))
{
TRACE("%s: Footer invalid (%X) or Failed to decrypt.\n",footer);
delete [] in_stream;
goto error_handler;
}
delete [] in_stream;
// Update the time of the last (valid) response from the client.
m_lastResponse = UNIXTIME2;
//TRACE("pkt:%d GetFd %d",pkt.Getopcode(),GetFd());
if (!HandlePacket(pkt))
{
TRACE("%s: Handler for packet %X returned false\n",pkt.Getopcode());
#ifndef _DEBUG
goto error_handler;
#endif
}
m_remaining = 0;
}
//TRACE("pkt return11 GetFd %d",GetFd());
return;
error_handler:
disconnect();
}
这就是我的服务器向客户端发送数据包的方式;
BurstBegin();
//TRACE("\n SOCK = %d FREE SPACE = %d ",GetWriteBuffer().GetSpace()/*,GetWriteBuffer().m_writeLock*/);
if (GetWriteBuffer().GetSpace() < size_t(len + 6))
{
size_t freespace = GetWriteBuffer().GetSpace();
BurstEnd();
disconnect();
return false;
}
TRACE("\n SOCK = %d gitmesi gereken paket = %X THREADID = %d",out_stream[0],GetCurrentThreadId());
r = BurstSend((const uint8*)"\xaa\x55",2);
if (r) r = BurstSend((const uint8*)&len,2);
if (r) r = BurstSend((const uint8*)out_stream,len);
if (r) r = BurstSend((const uint8*)"\x55\xaa",2);
if (r) BurstPush();
BurstEnd();
Worker 线程数根据处理器数量;
for(int i = 0; i < numberOfWorkerThreads; i++)
{
m_thread[i] = new Thread(SocketWorkerThread,this);
}
我在其上测试的服务器具有 Intel XEON E5-2630 v3 2.40 ghz(2 个处理器)
你们能帮我了解如何提高性能吗?例如:当客户端每 1.5 秒在地图中移动时,发送将数据包移动到服务器,如果成功,服务器会向该区域中的每个客户端发送移动数据包。
当客户端数量增加时,服务器开始放慢速度以将数据包发送回客户端。我的写入缓冲区已满(容量为 16384 字节)导致服务器无法在写入缓冲区内发送数据包。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。