如何解决如何使用 boost::thread_group 使用多客户端服务器?
我想使用 boost::asio 和 boost::thread_group 制作多客户端服务器。
我试过了
//Listener.h
#pragma once
#include <boost/asio/ip/tcp.hpp>
#include <boost/bind.hpp>
#include <memory>
#include <boost/asio/io_context.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/thread/thread.hpp>
#include "Session.h"
#define PORT_NUMBER 12880
class Listener
{
public:
Listener(boost::asio::io_context& iosvc,boost::asio::ip::tcp::endpoint& ep);
void Start_accept();
private:
boost::asio::io_context &_iosvc;
boost::asio::ip::tcp::acceptor _acceptor;
boost::thread_group threadPool;
void OnAcceptComplete(const boost::system::error_code& error,Session* session);
};
这是 Listener.cpp 文件
//Listener.cpp
#pragma once
#include <iostream>
#include "Listener.h"
#include "SessionManager.h"
Listener::Listener(boost::asio::io_context& iosvc,boost::asio::ip::tcp::endpoint& ep) :
_acceptor(iosvc,ep),_iosvc(iosvc) {}
void Listener::Start_accept()
{
//Create Client Session
Session* new_session = clientsession::SessionManager::GetInstance()->GenerateSession(_iosvc);
std::cout << "Listener thread ID : " << std::this_thread::get_id << '\n';
std::cout << "Listening....." << "\n\n";
_acceptor.async_accept(*(new_session->Socket()),boost::bind(&Listener::OnAcceptComplete,this,boost::asio::placeholders::error,new_session));
}
void Listener::OnAcceptComplete(const boost::system::error_code& error,Session* session)
{
if (!error)
{
std::cout << "accept completed" << '\n';
threadPool.create_thread(boost::bind(&boost::asio::io_context::run,&(session->_iocontext)));
session->_iocontext.post(boost::bind(&Session::StartSession,this)); // error '->*': Cannot convert from'T*' to'Session *'.
threadPool.join_all();
}
else
{
std::cout << error.message() << '\n';
}
Start_accept();
}
但是 session->_iocontext.post(boost::bind(&Session::StartSession,this)); // 错误“->”:无法从“T”转换为“Session *”。
Server 的主要语句如下。
//Server.cpp
#include <iostream>
#include <boost/thread/thread.hpp>
#include "Listener.h"
#include "SessionManager.h"
using namespace boost::asio;
using namespace clientsession;
SessionManager* _sessionManager;
int main()
{
try
{
boost::asio::io_context iocontext;
ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"),PORT_NUMBER); //ipv4,포트번호
Listener* _listener = new Listener(iocontext,ep);
_listener->Start_accept();
iocontext.run();
}
catch (std::exception& e)
{
std::cout << e.what() << '\n';
}
while (true)
{
}
return 0;
}
我想做多线程服务器 [一个 io_context 侦听器(包括接受),多个 io_context 会话(用于客户端)]
换句话说 我想更改为每个客户端创建会话线程的结构。 我不知道我创建的代码是否正确。
N:1 通信是可能的,但会话处理是同步进行的。所以我想让一个线程成为一个会话并异步处理它。
解决方法
换句话说,我想改变为每个客户端创建会话线程的结构。
这是 ASIO 中的一种反模式。您可以,但请阅读:https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/overview/core/threads.html
在 Asio 中拥有单独线程的一种自然方法是每个线程也有一个 io_context
。然而,一个更自然的方法是为每个客户端设置一个strand(“逻辑线程”)和一个反映系统真实内核数量的固定线程池.
我不知道我创建的代码是否正确。
实际上你这样做,因为它不能编译。
session->_iocontext.post(boost::bind(&Session::StartSession,this)); // error '->*': Cannot convert from'T*' to'Session *'.
this
在这里是 Listener*
。当然,您不能将 Session
的方法绑定到 Listener
类的实例。您想要的是会话:
post(session->_iocontext,boost::bind(&Session::StartSession,session));
接下来:
-
threadPool.join_all();
出现在OnAcceptedComplete
中,这意味着您甚至会在考虑接受任何其他连接之前等待所有线程。这意味着您实际上最好使用单线程阻塞服务器。 -
你需要调用
std::this_thread::get_id
:添加括号std::this_thread::get_id()
(让我想起node addon with cluster get process id returns same id for all forked process's) -
所有原始指针和没有
new
的delete
将导致内存泄漏或过时指针的噩梦。-
为什么监听器不在堆栈上?
Listener _listener(iocontext,ep); _listener.Start_accept();
-
为什么
一样使用它Socket()
不返回引用?现在你只能像tcp::socket _socket; auto* Socket() { return &_socket; } // ... _acceptor.async_accept(*(new_session->Socket()),
哦。更简单:
auto& Socket() { return _socket; } // .... _acceptor.async_accept(new_session->Socket(),
-
等
-
-
您将
StartSession
发布到 io 服务上:post(/*some service*/,session));
确实,这仅在该线程的私有服务或使用链时才有意义。对于链,它似乎仍然是多余的,因为这总是发生在新接受的会话中,因此没有其他线程可能知道它。
例外情况是当您依赖线程本地存储时?
或者您对
GenerateSession
的实现没有产生唯一的会话?
最小修复
这是我的最小修复。我建议不要使用多线程(反正你是在做异步 IO)。
#define PORT_NUMBER 12880
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iomanip>
#include <iostream>
#include <memory>
using boost::asio::ip::tcp;
using boost::system::error_code;
struct Session : std::enable_shared_from_this<Session> {
template <typename Executor>
Session(Executor executor) : _socket(make_strand(executor)) {}
auto& Socket() { return _socket; }
void StartSession() {
std::cout << "Session: " << this << " StartSession" << std::endl;
async_read(_socket,boost::asio::dynamic_buffer(received),[self = shared_from_this()](auto ec,size_t tx) { self->on_received(ec,tx); });
};
private:
tcp::socket _socket;
std::string received;
void on_received(error_code ec,size_t /*bytes_transferred*/) {
std::cout << "Session: " << this << " on_received " << ec.message() << " " << std::quoted(received) << std::endl;
}
};
using SessionPtr = std::shared_ptr<Session>;
namespace clientsession {
struct SessionManager {
static SessionManager* GetInstance() {
static SessionManager instance;
return &instance;
}
template <typename... T> SessionPtr GenerateSession(T&&... args) {
return std::make_shared<Session>(std::forward<T>(args)...);
}
};
} // namespace clientsession
class Listener {
public:
Listener(boost::asio::io_context& iosvc,tcp::endpoint ep)
: _iosvc(iosvc),_acceptor(iosvc,ep) {}
void Start_accept() {
// Create Client Session
auto new_session = clientsession::SessionManager::GetInstance()
->GenerateSession(_iosvc.get_executor());
std::cout << "Listening.....\n\n";
_acceptor.async_accept(new_session->Socket(),boost::bind(&Listener::OnAcceptComplete,this,boost::asio::placeholders::error,new_session));
}
private:
boost::asio::io_context& _iosvc;
tcp::acceptor _acceptor;
void OnAcceptComplete(error_code error,SessionPtr session) {
if (!error) {
std::cout << "accept completed\n";
session->StartSession();
Start_accept();
} else {
std::cout << error.message() << '\n';
}
}
};
int main() {
try {
boost::asio::io_context iocontext;
Listener listener(iocontext,{{},PORT_NUMBER}); // ipv4,port number
listener.Start_accept();
iocontext.run();
} catch (std::exception& e) {
std::cout << e.what() << '\n';
}
}
在使用一堆并发客户端进行测试时:
for a in {1..10};
do
(sleep 1.$RANDOM; echo -n "hellow world $RANDOM") |
netcat -w 2 localhost 12880&
done;
time wait
打印出类似的东西
Listening.....
accept completed
Session: 0x56093453c1b0 StartSession
Listening.....
accept completed
Session: 0x56093453de60 StartSession
Listening.....
accept completed
Session: 0x56093453e3c0 StartSession
Listening.....
accept completed
Session: 0x56093453e920 StartSession
Listening.....
accept completed
Session: 0x56093453ee80 StartSession
Listening.....
accept completed
Session: 0x56093453f3e0 StartSession
Listening.....
accept completed
Session: 0x56093453f940 StartSession
Listening.....
accept completed
Session: 0x56093453fea0 StartSession
Listening.....
accept completed
Session: 0x560934540400 StartSession
Listening.....
accept completed
Session: 0x560934540960 StartSession
Listening.....
Session: 0x56093453f940 on_received End of file "hellow world 10149"
Session: 0x56093453fea0 on_received End of file "hellow world 22492"
Session: 0x560934540400 on_received End of file "hellow world 29539"
Session: 0x56093453c1b0 on_received End of file "hellow world 20494"
Session: 0x56093453ee80 on_received End of file "hellow world 24735"
Session: 0x56093453de60 on_received End of file "hellow world 8071"
Session: 0x560934540960 on_received End of file "hellow world 27606"
Session: 0x56093453e920 on_received End of file "hellow world 534"
Session: 0x56093453e3c0 on_received End of file "hellow world 21676"
Session: 0x56093453f3e0 on_received End of file "hellow world 24362"
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。