如何解决使用期货和异步的 C++ 多线程 tcp 服务器
我最近开始学习 C++,并且正在学习一些教程。我正在研究套接字,并决定作为一个副项目来创建一个小型多线程服务器,如下所示。
我试图在 CLIENTS_MAX_NUM
到达后关闭服务器侦听套接字,然后在套接字断开连接后重新打开它,但是这给了我一个错误 10022 (WSAEINVAL),我不太确定我是什么做错了。
如果您想重现错误,只需使用 telnet 连接并关闭客户端连接(ctrl+] ,退出)。
任何帮助将不胜感激。
#include <iostream>
#include <vector>
#include <string>
#include <future>
#include <chrono>
#include <WS2tcpip.h>
#pragma comment(lib,"ws2_32.lib")
static constexpr const unsigned int PORT = 5000;
static constexpr const unsigned int CLIENTS_MAX_NUM = 1;
static constexpr const unsigned int CLIENTS_QUEUE_NUM = 10;
SOCKET server_sock;
std::vector<std::future<void>> futures;
std::mutex mtx;
void initialize_winsock() {
WSAData wsData;
WORD ver = MAKEWORD(2,2);
int wsResult = WSAStartup(ver,&wsData);
if (wsResult != 0) {
std::cerr << WSAGetLastError() << std::endl;
WSACleanup();
exit(EXIT_FAILURE);
}
}
void bind_server_socket() {
int keep_alive = 1;
int re_use = 1;
if (setsockopt(server_sock,SOL_SOCKET,SO_KEEPALIVE,(const char*)&keep_alive,sizeof(keep_alive)) == SOCKET_ERROR) {
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
if (setsockopt(server_sock,SO_REUSEADDR,(const char*)&re_use,sizeof(re_use)) == SOCKET_ERROR) {
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
sockaddr_in server;
server.sin_family = AF_INET;
server.sin_port = htons(PORT);
server.sin_addr.S_un.S_addr = INADDR_ANY;
memset(&server.sin_zero,8);
if (bind(server_sock,(sockaddr*)&server,sizeof(sockaddr)) == SOCKET_ERROR) {
std::cerr << WSAGetLastError() << std::endl;
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
}
void open_server_socket(bool &listening) {
server_sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if (server_sock == INVALID_SOCKET) {
std::cerr << WSAGetLastError() << std::endl;
listening = false;
WSACleanup();
exit(EXIT_FAILURE);
}
listening = true;
}
void close_server_socket(bool &listening) {
closesocket(server_sock);
listening = false;
}
void handle_client(SOCKET client_sock,sockaddr_in client) {
char buf[4096];
char host[NI_MAXHOST];
char service[NI_MAXHOST];
memset(host,NI_MAXHOST);
memset(service,NI_MAXHOST);
//std::cout << std::this_thread::get_id() << std::endl;
if (getnameinfo((sockaddr*)&client,sizeof(client),host,NI_MAXHOST,service,NI_MAXSERV,0) == 0) {
std::cout << host << " connected on port " << service << std::endl;
}
else {
inet_ntop(AF_INET,&client.sin_addr,NI_MAXHOST);
std::cout << host << " connected on port " << ntohs(client.sin_port) << std::endl;
}
while (true) {
memset(&buf,4096);
const int bytes_received = recv(client_sock,buf,4096,0);
if (bytes_received == SOCKET_ERROR) {
std::cerr << WSAGetLastError() << std::endl;
WSACleanup();
}
else if (bytes_received == 0) {
std::cout << "client disconnected" << std::endl;
break;
}
else {
send(client_sock,bytes_received + 1,0);
}
}
}
int main(int argc,const char* argv[]) {
bool listening = false;
initialize_winsock();
open_server_socket(listening);
bind_server_socket();
// -----------------------------------------------------------
if (listen(server_sock,CLIENTS_QUEUE_NUM) == SOCKET_ERROR) {
std::cerr << WSAGetLastError() << std::endl;
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
else {
std::cout << "listening for incoming connections on port " << PORT << std::endl;
while (true) {
unsigned int removed = 0;
for (int i = 0; i < futures.size(); i++) {
auto status = futures.at(i).wait_for(std::chrono::milliseconds(0));
if (status == std::future_status::ready) {
futures.erase(futures.begin() + i);
removed++;
}
}
if (removed > 1) {
std::cout << removed << " clients removed" << std::endl;
}
else if (removed) {
std::cout << removed << " client removed" << std::endl;
}
if (futures.size() < CLIENTS_MAX_NUM && !listening) {
std::cout << "re-opening server socket" << std::endl;
open_server_socket(listening);
// BOOM <--- 10022 (WSAEINVAL) - https://docs.microsoft.com/en-us/windows/win32/winsock/windows-sockets-error-codes-2
}
if (listening) {
sockaddr_in client;
memset(&client.sin_zero,8);
int client_size = sizeof(client);
SOCKET client_sock = accept(server_sock,(sockaddr*)&client,&client_size);
if (client_sock == INVALID_SOCKET) {
std::cerr << WSAGetLastError() << std::endl;
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
else {
futures.emplace_back(std::async(std::launch::async,&handle_client,client_sock,client));
if (futures.size() >= CLIENTS_MAX_NUM && listening) {
std::cout << "closing server socket" << std::endl;
close_server_socket(listening);
}
std::cout << futures.size() << " clients connected" << std::endl;
}
}
}
}
// ----------------------------------------------------------
std::cout << "bye!" << std::endl;
WSACleanup();
exit(EXIT_SUCCESS);
}
解决方法
经过一番挖掘,我发现了一些愚蠢的错误,以下是工作代码:
更新
还修复了向量循环
#include <iostream>
#include <vector>
#include <string>
#include <future>
#include <chrono>
#include <WS2tcpip.h>
#pragma comment(lib,"ws2_32.lib")
static constexpr const unsigned int PORT = 5000;
static constexpr const unsigned int CLIENTS_MAX_NUM = 5;
static constexpr const unsigned int CLIENTS_QUEUE_NUM = 0;
void handle_client(SOCKET client_sock,sockaddr_in client) {
char buf[4096];
char host[NI_MAXHOST];
char service[NI_MAXHOST];
memset(host,NI_MAXHOST);
memset(service,NI_MAXHOST);
//std::cout << std::this_thread::get_id() << std::endl;
if (getnameinfo((sockaddr*)&client,sizeof(client),host,NI_MAXHOST,service,NI_MAXSERV,0) == 0) {
std::cout << host << " connected on port " << service << std::endl;
}
else {
inet_ntop(AF_INET,&client.sin_addr,NI_MAXHOST);
std::cout << host << " connected on port " << ntohs(client.sin_port) << std::endl;
}
while (true) {
memset(&buf,4096);
const int bytes_received = recv(client_sock,buf,4096,0);
if (bytes_received == SOCKET_ERROR) {
std::cerr << WSAGetLastError() << std::endl;
WSACleanup();
}
else if (bytes_received == 0) {
std::cout << "client disconnected" << std::endl;
break;
}
else {
send(client_sock,bytes_received + 1,0);
}
}
}
void initialize_winsock() {
WSAData wsData;
WORD ver = MAKEWORD(2,2);
int wsResult = WSAStartup(ver,&wsData);
if (wsResult != 0) {
std::cerr << WSAGetLastError() << std::endl;
WSACleanup();
exit(EXIT_FAILURE);
}
}
void bind_server_socket(SOCKET &server_sock) {
int keep_alive = 1;
int re_use = 1;
if (setsockopt(server_sock,SOL_SOCKET,SO_KEEPALIVE,(const char*)&keep_alive,sizeof(keep_alive)) == SOCKET_ERROR) {
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
if (setsockopt(server_sock,SO_REUSEADDR,(const char*)&re_use,sizeof(re_use)) == SOCKET_ERROR) {
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
sockaddr_in server;
server.sin_family = AF_INET;
server.sin_port = htons(PORT);
server.sin_addr.S_un.S_addr = INADDR_ANY;
memset(&server.sin_zero,8);
if (bind(server_sock,(sockaddr*)&server,sizeof(sockaddr)) == SOCKET_ERROR) {
std::cerr << WSAGetLastError() << std::endl;
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
}
void open_server_socket(SOCKET &server_sock,bool &accepting) {
server_sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if (server_sock == INVALID_SOCKET) {
std::cerr << WSAGetLastError() << std::endl;
accepting = false;
WSACleanup();
exit(EXIT_FAILURE);
}
accepting = true;
}
void close_server_socket(SOCKET &server_sock,bool &accepting) {
closesocket(server_sock);
accepting = false;
}
void clear_futures(std::vector<std::future<void>> &futures) {
std::vector<std::future<void>>::iterator it = futures.begin();
while (it != futures.end()) {
auto status = (*it).wait_for(std::chrono::milliseconds(0));
if (status == std::future_status::ready) {
it = futures.erase(it);
}
else {
++it;
}
}
}
void wait_for_connections(std::vector<std::future<void>> &futures,SOCKET &server_sock,bool& accepting) {
if (listen(server_sock,CLIENTS_QUEUE_NUM) == SOCKET_ERROR) {
std::cerr << WSAGetLastError() << std::endl;
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
else {
std::cout << "accepting for incoming connections on port " << PORT << std::endl;
while (true) {
if (futures.size() < CLIENTS_MAX_NUM && !accepting) {
std::cout << "re-opening server socket" << std::endl;
open_server_socket(server_sock,accepting);
bind_server_socket(server_sock);
wait_for_connections(futures,server_sock,accepting);
break;
}
if (accepting) {
sockaddr_in client;
memset(&client.sin_zero,8);
int client_size = sizeof(client);
SOCKET client_sock = accept(server_sock,(sockaddr*)&client,&client_size);
if (client_sock == INVALID_SOCKET) {
std::cerr << WSAGetLastError() << std::endl;
closesocket(server_sock);
WSACleanup();
exit(EXIT_FAILURE);
}
clear_futures(futures);
futures.emplace_back(std::async(std::launch::async,&handle_client,client_sock,client));
if (futures.size() >= CLIENTS_MAX_NUM && accepting) {
std::cout << "closing server socket" << std::endl;
close_server_socket(server_sock,accepting);
}
std::cout << futures.size() << " clients connected" << std::endl;
}
}
}
}
int main(int argc,const char* argv[]) {
std::vector<std::future<void>> futures;
bool accepting = false;
SOCKET server_sock;
initialize_winsock();
open_server_socket(server_sock,accepting);
bind_server_socket(server_sock);
wait_for_connections(futures,accepting);
std::cout << "bye!" << std::endl;
WSACleanup();
exit(EXIT_SUCCESS);
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。