微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 boost::thread_group 使用多客户端服务器?

如何解决如何使用 boost::thread_group 使用多客户端服务器?

我想使用 boost::asio 和 boost::thread_group 制作多客户端服务器。

我试过了

//Listener.h 
#pragma once
#include <boost/asio/ip/tcp.hpp>
#include <boost/bind.hpp>
#include <memory>
#include <boost/asio/io_context.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/thread/thread.hpp>
#include "Session.h"
#define PORT_NUMBER 12880

class Listener
{
public:
    Listener(boost::asio::io_context& iosvc,boost::asio::ip::tcp::endpoint& ep);

    void Start_accept();


private:
    boost::asio::io_context &_iosvc;
    boost::asio::ip::tcp::acceptor _acceptor;
    boost::thread_group threadPool;
    void OnAcceptComplete(const boost::system::error_code& error,Session* session);
};

这是 Listener.cpp 文件

//Listener.cpp
#pragma once
#include <iostream>
#include "Listener.h"
#include "SessionManager.h"

Listener::Listener(boost::asio::io_context& iosvc,boost::asio::ip::tcp::endpoint& ep) :
    _acceptor(iosvc,ep),_iosvc(iosvc) {}


void Listener::Start_accept()
{
    //Create Client Session
    Session* new_session = clientsession::SessionManager::GetInstance()->GenerateSession(_iosvc);
    std::cout << "Listener thread ID : " << std::this_thread::get_id << '\n';
    std::cout << "Listening....." << "\n\n";
    _acceptor.async_accept(*(new_session->Socket()),boost::bind(&Listener::OnAcceptComplete,this,boost::asio::placeholders::error,new_session));
}
void Listener::OnAcceptComplete(const boost::system::error_code& error,Session* session)
{
    if (!error) 
    {
        std::cout << "accept completed" << '\n';

        threadPool.create_thread(boost::bind(&boost::asio::io_context::run,&(session->_iocontext)));
        session->_iocontext.post(boost::bind(&Session::StartSession,this)); // error '->*': Cannot convert from'T*' to'Session *'.
        
        threadPool.join_all();
    }

    else
    {
        std::cout << error.message() << '\n';
    }
    Start_accept();
}

但是 session->_iocontext.post(boost::bind(&Session::StartSession,this)); // 错误“->”:无法从“T”转换为“Session *”。

Server 的主要语句如下。

//Server.cpp
#include <iostream>
#include <boost/thread/thread.hpp>
#include "Listener.h"
#include "SessionManager.h"
using namespace boost::asio;
using namespace clientsession;
SessionManager* _sessionManager;

int main()
{
    try
    {
        boost::asio::io_context iocontext;
        ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"),PORT_NUMBER); //ipv4,포트번호
        Listener* _listener = new Listener(iocontext,ep);

        _listener->Start_accept();
        iocontext.run();

    }
    catch (std::exception& e)
    {
        std::cout << e.what() << '\n';
    }
    while (true)
    {
    }
    return 0;

}

我想做多线程服务器 [一个 io_context 侦听器(包括接受),多个 io_context 会话(用于客户端)]

换句话说 我想更改为每个客户端创建会话线程的结构。 我不知道我创建的代码是否正确。

N:1 通信是可能的,但会话处理是同步进行的。所以我想让一个线程成为一个会话并异步处理它。

解决方法

换句话说,我想改变为每个客户端创建会话线程的结构。

这是 ASIO 中的一种反模式。您可以,但请阅读:https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/overview/core/threads.html

在 Asio 中拥有单独线程的一种自然方法是每个线程也有一个 io_context。然而,一个自然的方法是为每个客户端设置一个strand(“逻辑线程”)和一个反映系统真实内核数量的固定线程池.

我不知道我创建的代码是否正确。

实际上你这样做,因为它不能编译。

    session->_iocontext.post(boost::bind(&Session::StartSession,this)); // error '->*': Cannot convert from'T*' to'Session *'.

this 在这里是 Listener*。当然,您不能将 Session 的方法绑定到 Listener 类的实例。您想要的是会话:

post(session->_iocontext,boost::bind(&Session::StartSession,session));

接下来:

  • threadPool.join_all(); 出现在 OnAcceptedComplete 中,这意味着您甚至会在考虑接受任何其他连接之前等待所有线程。这意味着您实际上最好使用单线程阻塞服务器。

  • 你需要调用std::this_thread::get_id:添加括号std::this_thread::get_id()(让我想起node addon with cluster get process id returns same id for all forked process's

  • 所有原始指针和没有 newdelete 将导致内存泄漏或过时指针的噩梦。

    • 为什么监听器不在堆栈上?

        Listener _listener(iocontext,ep);
        _listener.Start_accept();
      
    • 为什么 Socket() 不返回引用?现在你只能像

      一样使用它
        tcp::socket _socket;
        auto* Socket() { return &_socket; }
        // ...
        _acceptor.async_accept(*(new_session->Socket()),

      哦。更简单:

        auto& Socket() { return _socket; }
        // ....
        _acceptor.async_accept(new_session->Socket(),
  • 您将 StartSession 发布到 io 服务上:

     post(/*some service*/,session));
    

    确实,这仅在该线程的私有服务使用链时才有意义。对于链,它似乎仍然是多余的,因为这总是发生在新接受的会话中,因此没有其他线程可能知道它。

    例外情况是当您依赖线程本地存储时?

    或者您对 GenerateSession 的实现没有产生唯一的会话?

最小修复

这是我的最小修复。我建议不要使用多线程(反正你是在做异步 IO)。

#define PORT_NUMBER 12880
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iomanip>
#include <iostream>
#include <memory>
using boost::asio::ip::tcp;
using boost::system::error_code;

struct Session : std::enable_shared_from_this<Session> {
    template <typename Executor>
    Session(Executor executor) : _socket(make_strand(executor)) {}

    auto& Socket() { return _socket; }

    void StartSession() {
        std::cout << "Session: " << this << " StartSession" << std::endl;
        async_read(_socket,boost::asio::dynamic_buffer(received),[self = shared_from_this()](auto ec,size_t tx) { self->on_received(ec,tx); });
    };

  private:
    tcp::socket _socket;
    std::string received;
    void on_received(error_code ec,size_t /*bytes_transferred*/) {
        std::cout << "Session: " << this << " on_received " << ec.message() << " " << std::quoted(received) << std::endl;
    }
};

using SessionPtr = std::shared_ptr<Session>;

namespace clientsession {
    struct SessionManager {
        static SessionManager* GetInstance() {
            static SessionManager instance;
            return &instance;
        }
        template <typename... T> SessionPtr GenerateSession(T&&... args) {
            return std::make_shared<Session>(std::forward<T>(args)...);
        }
    };
} // namespace clientsession

class Listener {
    public:
        Listener(boost::asio::io_context& iosvc,tcp::endpoint ep)
            : _iosvc(iosvc),_acceptor(iosvc,ep) {}

        void Start_accept() {
            // Create Client Session
            auto new_session = clientsession::SessionManager::GetInstance()
                ->GenerateSession(_iosvc.get_executor());

            std::cout << "Listening.....\n\n";
            _acceptor.async_accept(new_session->Socket(),boost::bind(&Listener::OnAcceptComplete,this,boost::asio::placeholders::error,new_session));
        }

    private:
        boost::asio::io_context& _iosvc;
        tcp::acceptor _acceptor;
        void OnAcceptComplete(error_code error,SessionPtr session) {
            if (!error) {
                std::cout << "accept completed\n";
                session->StartSession();
                Start_accept();
            } else {
                std::cout << error.message() << '\n';
            }
        }
};

int main() {
    try {
        boost::asio::io_context iocontext;
        Listener listener(iocontext,{{},PORT_NUMBER}); // ipv4,port number
        listener.Start_accept();

        iocontext.run();
    } catch (std::exception& e) {
        std::cout << e.what() << '\n';
    }
}

在使用一堆并发客户端进行测试时:

for a in {1..10};
do
    (sleep 1.$RANDOM; echo -n "hellow world $RANDOM") |
         netcat -w 2 localhost 12880&
done; 
time wait

打印出类似的东西

Listening.....

accept completed
Session: 0x56093453c1b0 StartSession
Listening.....

accept completed
Session: 0x56093453de60 StartSession
Listening.....

accept completed
Session: 0x56093453e3c0 StartSession
Listening.....

accept completed
Session: 0x56093453e920 StartSession
Listening.....

accept completed
Session: 0x56093453ee80 StartSession
Listening.....

accept completed
Session: 0x56093453f3e0 StartSession
Listening.....

accept completed
Session: 0x56093453f940 StartSession
Listening.....

accept completed
Session: 0x56093453fea0 StartSession
Listening.....

accept completed
Session: 0x560934540400 StartSession
Listening.....

accept completed
Session: 0x560934540960 StartSession
Listening.....

Session: 0x56093453f940 on_received End of file "hellow world 10149"
Session: 0x56093453fea0 on_received End of file "hellow world 22492"
Session: 0x560934540400 on_received End of file "hellow world 29539"
Session: 0x56093453c1b0 on_received End of file "hellow world 20494"
Session: 0x56093453ee80 on_received End of file "hellow world 24735"
Session: 0x56093453de60 on_received End of file "hellow world 8071"
Session: 0x560934540960 on_received End of file "hellow world 27606"
Session: 0x56093453e920 on_received End of file "hellow world 534"
Session: 0x56093453e3c0 on_received End of file "hellow world 21676"
Session: 0x56093453f3e0 on_received End of file "hellow world 24362"

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?