微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

当我使用客户端模拟器测试 IOCP Socket Worker 线程时,它的工作速度太慢

如何解决当我使用客户端模拟器测试 IOCP Socket Worker 线程时,它的工作速度太慢

我正在开发一个带有 iocp 套接字的 mmorpg。当我使用我的客户端模拟器进行测试时,在 70-80 次连接之后,套接字工作线程上的写入操作比用户延迟还慢。

这是我的工作线程;

typedef void(*OperationHandler)(Socket * s,uint32 len);

void HandleReadComplete(Socket * s,uint32 len);
void HandleWriteComplete(Socket * s,uint32 len);
void HandleShutdown(Socket * s,uint32 len);

static OperationHandler ophandlers[] =
{
    &HandleReadComplete,&HandleWriteComplete,&HandleShutdown
};

uint32 THREADCALL SocketMgr::SocketWorkerThread(void * lpParam){
SocketMgr *socketMgr = (SocketMgr *)lpParam;
HANDLE cp = socketMgr->GetCompletionPort();
DWORD len;
Socket * s = nullptr;
OverlappedStruct * ov = nullptr;
LPOVERLAPPED ol_ptr;

while (socketMgr->m_bWorkerThreadsActive)
{
    if (!GetQueuedCompletionStatus(cp,&len,(LPDWORD)&s,&ol_ptr,INFINITE))
    {
        if (s != nullptr)
            s->disconnect();
        continue;
    }

    ov = CONTAINING_RECORD(ol_ptr,OverlappedStruct,m_overlap);
    if (ov->m_event == SOCKET_IO_THREAD_SHUTDOWN)
    {
        delete ov;
        return 0;
    }

        if (ov->m_event < NUM_SOCKET_IO_EVENTS)
            ophandlers[ov->m_event](s,len);

}

return 0;}

这是写入完成事件处理程序;

    void HandleWriteComplete(Socket * s,uint32 len)
{
    if (s->IsDeleted()) {
        return;
    }

    s->m_writeEvent.Unmark();
    s->BurstBegin();                    // Lock
    s->GetWriteBuffer().Remove(len);
    TRACE("SOCK = %d removed = %d",s->GetFd(),len);
    if (s->GetWriteBuffer().GetContiguousBytes() > 0) {
        s->WriteCallback();
    }
    else {
        s->DecSendLock();
    }
    s->BurstEnd();                    // Unlock
}

WriteCallBack 函数;

    void Socket::WriteCallback()
{
    if (IsDeleted() || !IsConnected()) {
        return;
    }

    // We don't want any writes going on while this is happening.
    Guard lock(m_writeMutex);
    if(writeBuffer.GetContiguousBytes())
    {
        DWORD w_length = 0;
        DWORD flags = 0;

        // attempt to push all the data out in a non-blocking fashion.
        WSABUF buf;
        buf.len = (ULONG)writeBuffer.GetContiguousBytes();
        buf.buf = (char*)writeBuffer.GetBufferStart();

        
        m_writeEvent.Mark();
        m_writeEvent.Reset(SOCKET_IO_EVENT_WRITE_END);

        TRACE("\n SOCK = %d aslında giden = %X THREADID = %d",GetFd(),buf.buf[4],GetCurrentThreadId());
        int r = WSASend(m_fd,&buf,1,&w_length,flags,&m_writeEvent.m_overlap,0);
        if (r == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
        {
            m_writeEvent.Unmark();
            DecSendLock();
            disconnect();
        }

        
    }
    else
    {
        // Write operation is completed.
        
        DecSendLock();
    }
}

这是关于互斥锁的东西;

public:
    /* Atomic wrapper functions for increasing read/write locks */
    INLINE void IncSendLock() { Guard lock(m_writeMutex); ++m_writeLock; }
    INLINE void DecSendLock() { Guard lock(m_writeMutex); 
    --m_writeLock; }
    INLINE bool HasSendLock() {Guard lock(m_writeMutex); return (m_writeLock != 0); }
    INLINE bool AcquireSendLock()
    {
        Guard lock(m_writeMutex);
        if (m_writeLock != 0)
            return false;
        ++m_writeLock;
        return true;
    }
private:
    // Write lock,stops multiple write events from being posted.
    uint32 m_writeLock;
    std::recursive_mutex m_writeLockMutex;

这是读取事件处理程序;

    void HandleReadComplete(Socket * s,uint32 len)
{
    if (s->IsDeleted())
        return;

    s->m_readEvent.Unmark();
    if (len)
    {
        s->GetReadBuffer().IncrementWritten(len);
        s->OnRead();
        s->SetupReadEvent();
    }
    else
    {
        // s->Delete();   // Queue deletion.
        s->disconnect();
    }
}

OnRead 函数;

void KOSocket::OnRead()
{
    Packet pkt;

    for (;;)
    {
        

        if (m_remaining == 0)
        {
            if (GetReadBuffer().GetSize() < 5) {
                //TRACE("pkt returnzzz GetFd %d",GetFd());
                return; //check for opcode as well
            }


            uint16 header = 0;
            GetReadBuffer().Read(&header,2);
            //printf("header : %X",header);//derle at k
            if (header != 0x55AA)
            {
                TRACE("%s: Got packet without header 0x55AA,got 0x%X\n",GetRemoteIP().c_str(),header);
                goto error_handler;
            }

            GetReadBuffer().Read(&m_remaining,2);
            if (m_remaining == 0)
            {
                TRACE("%s: Got packet without an opcode,this should never happen.\n",GetRemoteIP().c_str());
                goto error_handler;
            }
        }

        if (m_remaining > GetReadBuffer().GetAllocatedSize())
        {
            TRACE("%s: Packet received which was %u bytes in size,maximum of %u.\n",m_remaining,GetReadBuffer().GetAllocatedSize());
            goto error_handler;
        }

        if (m_remaining > GetReadBuffer().GetSize())
        {
            if (m_readTries > 4)
            {
                TRACE("%s: packet fragmentation count is over 4,disconnecting as they're probably up to something bad\n",GetRemoteIP().c_str());
                goto error_handler;
            }
            m_readTries++;
            return;
        }

        uint8 *in_stream = new uint8[m_remaining];

        m_readTries = 0;
        GetReadBuffer().Read(in_stream,m_remaining);

        uint16 footer = 0;
        GetReadBuffer().Read(&footer,2);

        

        if (footer != 0xAA55
            || !DecryptPacket(in_stream,pkt))
        {
            TRACE("%s: Footer invalid (%X) or Failed to decrypt.\n",footer);
            delete [] in_stream;
            goto error_handler;
        }

        delete [] in_stream;

        // Update the time of the last (valid) response from the client.
        m_lastResponse = UNIXTIME2;

        //TRACE("pkt:%d GetFd %d",pkt.Getopcode(),GetFd());
        if (!HandlePacket(pkt))
        {
            TRACE("%s: Handler for packet %X returned false\n",pkt.Getopcode());
#ifndef _DEBUG
            goto error_handler;
#endif
        }

        m_remaining = 0;
        
    }
    //TRACE("pkt return11 GetFd %d",GetFd());
    return;

error_handler:
    disconnect();
}

这就是我的服务器向客户端发送数据包的方式;

BurstBegin();
//TRACE("\n SOCK = %d FREE SPACE = %d ",GetWriteBuffer().GetSpace()/*,GetWriteBuffer().m_writeLock*/);
if (GetWriteBuffer().GetSpace() < size_t(len + 6))
{
    size_t freespace = GetWriteBuffer().GetSpace();
    BurstEnd();
    disconnect();
    return false;
}
TRACE("\n SOCK = %d gitmesi gereken paket = %X THREADID = %d",out_stream[0],GetCurrentThreadId());
r = BurstSend((const uint8*)"\xaa\x55",2);
if (r) r = BurstSend((const uint8*)&len,2);
if (r) r = BurstSend((const uint8*)out_stream,len);
if (r) r = BurstSend((const uint8*)"\x55\xaa",2);
if (r) BurstPush();
BurstEnd();

Worker 线程数根据处理器数量

    for(int i = 0; i < numberOfWorkerThreads; i++)
{
m_thread[i] = new Thread(SocketWorkerThread,this);
}

我在其上测试的服务器具有 Intel XEON E5-2630 v3 2.40 ghz(2 个处理器)

你们能帮我了解如何提高性能吗?例如:当客户端每 1.5 秒在地图中移动时,发送将数据包移动到服务器,如果成功,服务器会向该区域中的每个客户端发送移动数据包。

当客户端数量增加时,服务器开始放慢速度以将数据包发送回客户端。我的写入缓冲区已满(容量为 16384 字节)导致服务器无法在写入缓冲区内发送数据包。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。