微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用期货和异步的 C++ 多线程 tcp 服务器

如何解决使用期货和异步的 C++ 多线程 tcp 服务器

我最近开始学习 C++,并且正在学习一些教程。我正在研究套接字,并决定作为一个副项目来创建一个小型多线程服务器,如下所示。

我试图在 CLIENTS_MAX_NUM 到达后关闭服务器侦听套接字,然后在套接字断开连接后重新打开它,但是这给了我一个错误 10022 (WSAEINVAL),我不太确定我是什么做错了。

如果您想重现错误,只需使用 telnet 连接并关闭客户端连接(ctrl+] ,退出)。

任何帮助将不胜感激。

#include <iostream>
#include <vector>
#include <string>
#include <future>
#include <chrono>
#include <WS2tcpip.h>

#pragma comment(lib,"ws2_32.lib")

static constexpr const unsigned int PORT = 5000;
static constexpr const unsigned int CLIENTS_MAX_NUM = 1;
static constexpr const unsigned int CLIENTS_QUEUE_NUM = 10;
SOCKET server_sock;
std::vector<std::future<void>> futures;
std::mutex mtx;

void initialize_winsock() {

    WSAData wsData;
    WORD ver = MAKEWORD(2,2);

    int wsResult = WSAStartup(ver,&wsData);

    if (wsResult != 0) {

        std::cerr << WSAGetLastError() << std::endl;

        WSACleanup();
        exit(EXIT_FAILURE);

    }

}

void bind_server_socket() {
    
    int keep_alive = 1;
    int re_use = 1;

    if (setsockopt(server_sock,SOL_SOCKET,SO_KEEPALIVE,(const char*)&keep_alive,sizeof(keep_alive)) == SOCKET_ERROR) {
        
        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }

    if (setsockopt(server_sock,SO_REUSEADDR,(const char*)&re_use,sizeof(re_use)) == SOCKET_ERROR) {

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }

    sockaddr_in server;

    server.sin_family = AF_INET;
    server.sin_port = htons(PORT);
    server.sin_addr.S_un.S_addr = INADDR_ANY;

    memset(&server.sin_zero,8);

    if (bind(server_sock,(sockaddr*)&server,sizeof(sockaddr)) == SOCKET_ERROR) {

        std::cerr << WSAGetLastError() << std::endl;

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }



}

void open_server_socket(bool &listening) {
    
    server_sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);

    if (server_sock == INVALID_SOCKET) {

        std::cerr << WSAGetLastError() << std::endl;

        listening = false;

        WSACleanup();
        exit(EXIT_FAILURE);

    }

    listening = true;

}

void close_server_socket(bool &listening) {
    
    closesocket(server_sock);

    listening = false;

}

void handle_client(SOCKET client_sock,sockaddr_in client) {
    
    char buf[4096];
    char host[NI_MAXHOST];
    char service[NI_MAXHOST];

    memset(host,NI_MAXHOST);
    memset(service,NI_MAXHOST);

    //std::cout << std::this_thread::get_id() << std::endl;

    if (getnameinfo((sockaddr*)&client,sizeof(client),host,NI_MAXHOST,service,NI_MAXSERV,0) == 0) {

        std::cout << host << " connected on port " << service << std::endl;

    }
    else {

        inet_ntop(AF_INET,&client.sin_addr,NI_MAXHOST);

        std::cout << host << " connected on port " << ntohs(client.sin_port) << std::endl;

    }

    while (true) {

        memset(&buf,4096);

        const int bytes_received = recv(client_sock,buf,4096,0);

        if (bytes_received == SOCKET_ERROR) {

            std::cerr << WSAGetLastError() << std::endl;

            WSACleanup();

        }
        else if (bytes_received == 0) {

            std::cout << "client disconnected" << std::endl;
            break;

        }
        else {

            send(client_sock,bytes_received + 1,0);

        }

    }

}

int main(int argc,const char* argv[]) {
    
    bool listening = false;

    initialize_winsock();
    open_server_socket(listening);
    bind_server_socket();

    // -----------------------------------------------------------

    if (listen(server_sock,CLIENTS_QUEUE_NUM) == SOCKET_ERROR) {

        std::cerr << WSAGetLastError() << std::endl;

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }
    else {

        std::cout << "listening for incoming connections on port " << PORT << std::endl;

        while (true) {

            unsigned int removed = 0;

            for (int i = 0; i < futures.size(); i++) {

                auto status = futures.at(i).wait_for(std::chrono::milliseconds(0));

                if (status == std::future_status::ready) {

                    futures.erase(futures.begin() + i);

                    removed++;

                }

            }

            if (removed > 1) {
                std::cout << removed << " clients removed" << std::endl;
            }
            else if (removed) {
                std::cout << removed << " client removed" << std::endl;
            }

            if (futures.size() < CLIENTS_MAX_NUM && !listening) {

                std::cout << "re-opening server socket" << std::endl;

                open_server_socket(listening);

                // BOOM <--- 10022 (WSAEINVAL) - https://docs.microsoft.com/en-us/windows/win32/winsock/windows-sockets-error-codes-2

            }

            if (listening) {

                sockaddr_in client;

                memset(&client.sin_zero,8);

                int client_size = sizeof(client);

                SOCKET client_sock = accept(server_sock,(sockaddr*)&client,&client_size);

                if (client_sock == INVALID_SOCKET) {

                    std::cerr << WSAGetLastError() << std::endl;

                    closesocket(server_sock);
                    WSACleanup();
                    exit(EXIT_FAILURE);

                }
                else {
                    
                    futures.emplace_back(std::async(std::launch::async,&handle_client,client_sock,client));

                    if (futures.size() >= CLIENTS_MAX_NUM && listening) {

                        std::cout << "closing server socket" << std::endl;

                        close_server_socket(listening);

                    }

                    std::cout << futures.size() << " clients connected" << std::endl;

                }

            }

        }

    }

    // ----------------------------------------------------------

    std::cout << "bye!" << std::endl;

    WSACleanup();
    exit(EXIT_SUCCESS);

}

解决方法

经过一番挖掘,我发现了一些愚蠢的错误,以下是工作代码:

更新

还修复了向量循环

#include <iostream>
#include <vector>
#include <string>
#include <future>
#include <chrono>
#include <WS2tcpip.h>

#pragma comment(lib,"ws2_32.lib")

static constexpr const unsigned int PORT = 5000;
static constexpr const unsigned int CLIENTS_MAX_NUM = 5;
static constexpr const unsigned int CLIENTS_QUEUE_NUM = 0;

void handle_client(SOCKET client_sock,sockaddr_in client) {

    char buf[4096];
    char host[NI_MAXHOST];
    char service[NI_MAXHOST];

    memset(host,NI_MAXHOST);
    memset(service,NI_MAXHOST);

    //std::cout << std::this_thread::get_id() << std::endl;

    if (getnameinfo((sockaddr*)&client,sizeof(client),host,NI_MAXHOST,service,NI_MAXSERV,0) == 0) {

        std::cout << host << " connected on port " << service << std::endl;

    }
    else {

        inet_ntop(AF_INET,&client.sin_addr,NI_MAXHOST);

        std::cout << host << " connected on port " << ntohs(client.sin_port) << std::endl;

    }

    while (true) {

        memset(&buf,4096);

        const int bytes_received = recv(client_sock,buf,4096,0);

        if (bytes_received == SOCKET_ERROR) {

            std::cerr << WSAGetLastError() << std::endl;

            WSACleanup();

        }
        else if (bytes_received == 0) {

            std::cout << "client disconnected" << std::endl;
            break;

        }
        else {

            send(client_sock,bytes_received + 1,0);

        }

    }

}

void initialize_winsock() {

    WSAData wsData;
    WORD ver = MAKEWORD(2,2);

    int wsResult = WSAStartup(ver,&wsData);

    if (wsResult != 0) {

        std::cerr << WSAGetLastError() << std::endl;

        WSACleanup();
        exit(EXIT_FAILURE);

    }

}

void bind_server_socket(SOCKET &server_sock) {
    
    int keep_alive = 1;
    int re_use = 1;

    if (setsockopt(server_sock,SOL_SOCKET,SO_KEEPALIVE,(const char*)&keep_alive,sizeof(keep_alive)) == SOCKET_ERROR) {
        
        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }

    if (setsockopt(server_sock,SO_REUSEADDR,(const char*)&re_use,sizeof(re_use)) == SOCKET_ERROR) {

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }

    sockaddr_in server;

    server.sin_family = AF_INET;
    server.sin_port = htons(PORT);
    server.sin_addr.S_un.S_addr = INADDR_ANY;

    memset(&server.sin_zero,8);

    if (bind(server_sock,(sockaddr*)&server,sizeof(sockaddr)) == SOCKET_ERROR) {

        std::cerr << WSAGetLastError() << std::endl;

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }



}

void open_server_socket(SOCKET &server_sock,bool &accepting) {
    
    server_sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);

    if (server_sock == INVALID_SOCKET) {

        std::cerr << WSAGetLastError() << std::endl;

        accepting = false;

        WSACleanup();
        exit(EXIT_FAILURE);

    }

    accepting = true;

}

void close_server_socket(SOCKET &server_sock,bool &accepting) {
    
    closesocket(server_sock);

    accepting = false;

}

void clear_futures(std::vector<std::future<void>> &futures) {

    std::vector<std::future<void>>::iterator it = futures.begin();

    while (it != futures.end()) {

        auto status = (*it).wait_for(std::chrono::milliseconds(0));

        if (status == std::future_status::ready) {

            it = futures.erase(it);

        }
        else {
            ++it;
        }
    }

}

void wait_for_connections(std::vector<std::future<void>> &futures,SOCKET &server_sock,bool& accepting) {

    if (listen(server_sock,CLIENTS_QUEUE_NUM) == SOCKET_ERROR) {

        std::cerr << WSAGetLastError() << std::endl;

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }
    else {

        std::cout << "accepting for incoming connections on port " << PORT << std::endl;

        while (true) {

            if (futures.size() < CLIENTS_MAX_NUM && !accepting) {

                std::cout << "re-opening server socket" << std::endl;

                open_server_socket(server_sock,accepting);
                bind_server_socket(server_sock);
                wait_for_connections(futures,server_sock,accepting);

                break;

            }

            if (accepting) {

                sockaddr_in client;

                memset(&client.sin_zero,8);

                int client_size = sizeof(client);

                SOCKET client_sock = accept(server_sock,(sockaddr*)&client,&client_size);

                if (client_sock == INVALID_SOCKET) {

                    std::cerr << WSAGetLastError() << std::endl;

                    closesocket(server_sock);
                    WSACleanup();
                    exit(EXIT_FAILURE);

                }

                clear_futures(futures);

                futures.emplace_back(std::async(std::launch::async,&handle_client,client_sock,client));

                if (futures.size() >= CLIENTS_MAX_NUM && accepting) {

                    std::cout << "closing server socket" << std::endl;

                    close_server_socket(server_sock,accepting);

                }

                std::cout << futures.size() << " clients connected" << std::endl;

            }

        }

    }

}

int main(int argc,const char* argv[]) {
    
    std::vector<std::future<void>> futures;
    bool accepting = false;
    SOCKET server_sock;

    initialize_winsock();
    open_server_socket(server_sock,accepting);
    bind_server_socket(server_sock);
    wait_for_connections(futures,accepting);

    std::cout << "bye!" << std::endl;

    WSACleanup();
    exit(EXIT_SUCCESS);

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。