如何解决如何在销毁boost :: asio实体时消除崩溃? 以下两种情况对我们很有趣:
注意!!!问题是针对boost::asio library
领域的专家的。不幸的是,我无法使代码更紧凑,它包含描述问题的最少数量。该代码是人为创建的示例。崩溃的地方众所周知,并在注释中有描述,旨在说明崩溃!!! NO need
有关代码调试的任何帮助...
问题是关于如何设计asio服务器,而不是-它崩溃的地方!!!
此示例接近官方boost :: asio文档中的“聊天服务器”设计。但是,与官方示例不同,在官方示例中,仅动态创建/销毁连接类的对象,而在我的示例中,服务器及其连接类实体都是动态创建/销毁的。我确信这种模式的实现应该在asio爱好者中广为人知,下面描述的问题应该已经有人解决了。
请查看代码。 在这里,CAsioServer和CAsioConnection的实体是动态创建和销毁的。
#include <map>
#include <array>
#include <set>
#include <vector>
#include <deque>
#include <thread>
#include <iostream>
#include <asio.hpp>
#include <iomanip>
class CAsioConnection
: public std::enable_shared_from_this<CAsioConnection>
{
public:
using PtrType = std::shared_ptr<CAsioConnection>;
CAsioConnection(asio::ip::tcp::socket socket,std::set<CAsioConnection::PtrType>& connections)
: socket_(std::move(socket)),connections_(connections)
{
std::cout << "-- CAsioConnection is creating,socket: " << socket_.native_handle() << "\n";
}
virtual ~CAsioConnection()
{
std::cout << "-- CAsioConnection is destroying,socket: " << socket_.native_handle() << "\n";
}
void read() { do_read(); }
private:
void do_read(void)
{
uint8_t buff[3];
asio::async_read(socket_,asio::buffer(buff,3),[this](std::error_code ec,std::size_t /*length*/) {
if (!ec)
{
do_read();
}
else
{
std::cout << "-- CAsioConnection::do_read() error : " << ec.message() << "\n";
// Here is the crash N2
connections_.erase(shared_from_this());
// Crash may be fixed by the code below
//if (ec.value() != 1236) // (winerror.h) #define ERROR_CONNECTION_ABORTED 1236L
// connections_.erase(shared_from_this());
}
});
}
asio::ip::tcp::socket socket_;
std::set<CAsioConnection::PtrType>& connections_;
};
class CAsioServer
: public std::enable_shared_from_this<CAsioServer>
{
public:
using PtrType = std::shared_ptr<CAsioServer>;
CAsioServer(int port,asio::io_context& io,const asio::ip::tcp::endpoint& endpoint)
: port_(port),acceptor_(io,endpoint)
{
std::cout << "-- CAsioServer is creating,port: " << port_ << "\n";
}
virtual ~CAsioServer()
{
std::cout << "-- CAsioServer is destroying,port: " << port_ << "\n";
}
int port(void) { return port_; }
void accept(void) { do_accept(); }
private:
void do_accept()
{
acceptor_.async_accept([this](std::error_code ec,asio::ip::tcp::socket socket) {
if (!ec)
{
std::cout << "-- CAsioServer::do_accept() connection to socket: " << socket.native_handle() << "\n";
auto c = std::make_shared<CAsioConnection>(std::move(socket),connections_);
connections_.insert(c);
c->read();
}
else
{
// Here is the crash N1
std::cout << "-- CAsioServer::do_accept() error : " << ec.message() << "\n";
// Crash may be fixed by the code below
//if (ec.value() == 995) // (winerror.h) #define ERROR_OPERATION_ABORTED 995L
// return;
}
// Actually here is the crash N1 )),but the fix is above...
do_accept();
});
}
int port_;
asio::ip::tcp::acceptor acceptor_;
std::set<CAsioConnection::PtrType> connections_;
};
//*****************************************************************************
class CTcpBase
{
public:
CTcpBase()
{
// heart beat timer to keep it alive
do_heart_beat();
t_ = std::thread([this] {
std::cout << "-- io context is RUNNING!!!\n";
io_.run();
std::cout << "-- io context has been STOPED!!!\n";
});
}
virtual ~CTcpBase()
{
io_.stop();
if (t_.joinable())
t_.join();
}
void add_server(int port)
{
io_.post([this,port]
{
for (auto s : servers_)
if (port == s->port())
return;
auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(),port);
auto s = std::make_shared<CAsioServer>(port,io_,endpoint);
s->accept();
servers_.insert(s);
});
}
void remove_server(int port)
{
io_.post([this,port]
{
for (auto s : servers_)
if (port == s->port())
{ servers_.erase(s); return; }
});
}
private:
void do_heart_beat(void)
{
std::cout << "-- beat\n";
auto timer = std::make_shared<asio::steady_timer>(io_,asio::chrono::milliseconds(3000));
timer->async_wait([timer,this](const asio::error_code& ec) {
do_heart_beat();
});
}
asio::io_context io_;
std::thread t_;
std::set<CAsioServer::PtrType> servers_;
};
//*****************************************************************************
int main(void)
{
CTcpBase tcp_base;
std::cout << "CONNECT the server to port 502\n";
tcp_base.add_server(502);
std::this_thread::sleep_for(std::chrono::seconds(20));
std::cout << "REMOVE the server from port 502\n";
tcp_base.remove_server(502);
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
假设CTcpBase::add_server()
和CTcpBase::remove_server()
将由来自不同线程的外部客户端调用。而asio上下文在其自己的线程中处理它。
让我们考虑以下两种情况:
- 启动应用程序,然后等待半分钟。
崩溃发生在
CAsioServer::do_accept()
中,请参见下面的输出。 Debug Console Output - 启动应用程序。由任何外部客户端连接到端口502,并等待不到20秒。
崩溃发生在
CAsioConnection::do_read()
中,请参见下面的输出。 Debug Console Output
似乎当其类的实体已经销毁时,asio框架会调用延迟的asio::async_read()
和acceptor_.async_accept()
处理程序。
我已通过错误检查修复了处理程序,但该解决方案似乎并不可靠。谁知道还可能存在其他错误和情形……有时,当客户端断开连接时,我需要清除在connection_
处设置的asio::async_read()
,如何确定服务器或连接对象仍然有效?…
有什么方法可以请求boost :: asio框架来防止为已破坏的对象调用推迟的处理程序吗?或者如何通过错误代码识别(be 100% sure)
,表明该对象已被破坏?还是在asio范围内还有其他解决方案或设计模式-如何在一个运行的线程中处理无数个创建/销毁的服务器及其连接,而无需互斥和其他东西...
解决方法
首先检查您的io_service
是否严格运行 单线程。从代码中看不到。如果不是,则共享状态(如connections_
)需要同步访问。
实际上,您可以采用accept循环的形式进行逻辑处理,但是要利用这一点,您应该在其中进行对
connections_
的所有访问,例如,
- 我们在其中拥有会话列表的位置直接举行会话,根本不需要共享指针:How to pass a boost asio tcp socket to a thread for sending heartbeat to client or server
- 或我们在此处共享指针的地方,并在会话列表中存储 弱指针 ,可以从accept循环内部“收集垃圾”:{{ 3}}
更新
-
buff
是一个局部变量,由于未在async_read操作的整个时间内都无效,因此导致未定义行为。 -
通常,
shared_from_this
习惯用法和并保留一个已经由 指示的共享指针容器并没有多大意义一生。您的问题似乎是,有时
CAsioServer
被简单地破坏了,这意味着connections_
的所有元素都被释放了,那时它们的CAsioConnection
对象可能被破坏了。它还会破坏CAsioServer
。每当销毁一个Asio对象时,任何待处理的异步操作都将失败,并显示
asio::error:operation_aborted
,这实际上意味着您已做出响应。但是,当调用完成处理程序时,该对象已经变得无效。在ASIO - How to stop simple coroutine based server?中,我刚刚注意到缺少一个关键要素: 您从未在任何完成处理程序中捕获/绑定指向
CAsioConnection
的共享指针 。这是非常不习惯的。
相反,您使用共享指针来控制生存期。如果您还需要一个连接列表,则将其设为弱指针列表,以便仅遵守生存期。
变化点:
-
无需使服务器enable_shared_from_this
-
connections_
应该包含弱指针,甚至没有所有者指针。弱指针显然在这里要安全得多。实际上,您可以选择删除该容器,因为似乎没有任何容器在使用它。在下面的示例中,我选择保留该示例,以便您可以看到它的实际效果。 -
在完成处理程序中捕获
shared_from_this
,以确保对象在触发时仍然有效:asio::async_read(socket_,asio::buffer(buff,3),[this,self=shared_from_this()](error_code ec,std::size_t /*length*/) {
简体
注意,我之所以选择std::list
,是因为它消除了对等式/排序的需求(请参阅std::owner_less<>
),由于对内部容器的引用存储方式越来越麻烦CAsioConnection
类-使其与循环相关(在实例化CAsioConnection
类之前,owner_less<>
类型尚未完成)。我只是选择了(不需要?)复杂性。
#include <boost/asio.hpp>
#include <iostream>
#include <list>
#include <memory>
namespace asio = boost::asio;
using error_code = boost::system::error_code; // compat
class CAsioConnection : public std::enable_shared_from_this<CAsioConnection> {
public:
using PtrType = std::shared_ptr<CAsioConnection>;
CAsioConnection(asio::ip::tcp::socket socket) : socket_(std::move(socket)) {
log(__FUNCTION__);
}
~CAsioConnection() { log(__FUNCTION__); }
void read() { do_read(); }
private:
void log(std::string_view msg) const {
error_code ec;
std::clog << msg << ",socket: " << socket_.remote_endpoint(ec) << "\n";
}
uint8_t buff[256];
void do_read() {
asio::async_read(socket_,asio::buffer(buff),self = shared_from_this()](error_code ec,std::size_t length) {
if (!ec) {
log(__FUNCTION__ + (" length: " + std::to_string(length)));
do_read();
} else {
log(__FUNCTION__ + (" error: " + ec.message()));
}
});
}
asio::ip::tcp::socket socket_;
};
class CAsioServer {
public:
CAsioServer(asio::io_context& io,const asio::ip::tcp::endpoint& endpoint)
: acceptor_(io,endpoint) { log(__FUNCTION__); }
~CAsioServer() { log(__FUNCTION__); }
int port() const { return acceptor_.local_endpoint().port(); }
void accept() { do_accept(); }
private:
void do_accept() {
acceptor_.async_accept([this](error_code ec,asio::ip::tcp::socket socket) {
if (!ec) {
auto c = std::make_shared<CAsioConnection>(std::move(socket));
connections_.push_back(c);
c->read();
} else {
log(__FUNCTION__ + (" error: " + ec.message()));
}
connections_.remove_if(std::mem_fn(&WeakPtr::expired));
if (acceptor_.is_open())
do_accept();
});
}
void log(std::string_view msg) const {
std::clog << msg << ",port: " << port() << "\n";
}
asio::ip::tcp::acceptor acceptor_;
using WeakPtr = std::weak_ptr<CAsioConnection>;
std::list<WeakPtr> connections_;
};
int main() {
boost::asio::io_context io;
CAsioServer server(io,{ {},7878 });
server.accept();
io.run_for(std::chrono::seconds(10));
}
输出:
./a.out& sleep 1; nc -w 1 127.0.0.1 7878 < main.cpp
CAsioServer,port: 7878
CAsioConnection,socket: 127.0.0.1:50628
operator() length: 256,socket: 127.0.0.1:50628
operator() error: End of file,socket: 127.0.0.1:50628
~CAsioConnection,socket: 127.0.0.1:50628
~CAsioServer,port: 7878
,
小学,我亲爱的沃森
问题的关键–我非常信任人
我应该提到我使用非增强版Asio ver。 1.18.0,以及VS2017和Win10。因此,以下所有说明都与Asio的Windows部分有关。 posix的实现很有可能会有所不同。
最初实现的主要思想是:-仅通过从适当的set<>
集合中添加/删除服务器/连接对象即可控制服务器/连接对象的数量。
以下文本描述了为什么不付出额外努力就无法正常工作。
根据Asio文档:
~basic_stream_socket();
此函数破坏套接字,取消 与套接字关联的所有未完成的异步操作为 如果通过调用取消。
我的错误是认为取消异步操作将在析构函数范围内执行,同时调用异步处理程序。
这很有趣,我想,如果应该在对象销毁阶段拒绝异步处理程序,为什么要在异步处理程序中使用self
指针。 正确的答案–异步处理程序不会被拒绝))。
实际上,异步处理程序将在之后被调用,届时类实体将被销毁。
发生了什么事
- 销毁服务器或连接类时:
::closesocket()
中的套接字句柄将调用WinSock2~basic_stream_socket()
。 - 在
iocontext.run()
内的下一次迭代中:win_iocp_io_context::do_one()
调用::GetQueuedCompletionStatus()
以获取异步操作结果并启动与被破坏的套接字相关联的异步处理程序。
以下两种情况对我们很有趣:
- 套接字等待数据。
- 套接字正在销毁(例如在连接类析构函数内部)。
- 发生错误的异步处理程序被调用。
在这种情况下,即使类已被破坏,我们也可能检查错误代码并关闭异步处理程序。不好,但是可行的解决方案,我在问题中的代码中演示过。
- 套接字获取一些数据。异步处理程序尚未启动。
- 套接字正在销毁(例如在连接类析构函数内部)。
- 启动了异步处理程序没有错误!灾难。
在这种情况下,错误代码无法挽救我们。崩溃发生了。 因此,检查异步处理程序内部错误代码的方法不起作用。
下面的代码通过为服务器和连接类引入hasta_la_vista()
方法来解决所有问题。不是超级优雅,而是钢筋混凝土解决方案:
#include <map>
#include <array>
#include <set>
#include <vector>
#include <deque>
#include <thread>
#include <iostream>
#include <asio.hpp>
#include <iomanip>
class CAsioConnection
: public std::enable_shared_from_this<CAsioConnection>
{
public:
using PtrType = std::shared_ptr<CAsioConnection>;
CAsioConnection(asio::ip::tcp::socket socket,std::set<CAsioConnection::PtrType>& connections)
: socket_(std::move(socket)),connections_(connections),destroying_in_progress(false)
{
std::cout << "-- CAsioConnection is creating\n";
}
virtual ~CAsioConnection()
{
std::cout << "-- CAsioConnection is destroying\n";
}
void read() { do_read(); }
void hasta_la_vista(void)
{
destroying_in_progress = true;
std::error_code ec;
socket_.cancel(ec);
}
private:
void do_read(void)
{
auto self(shared_from_this());
asio::async_read(socket_,self](std::error_code ec,std::size_t /*length*/) {
if (destroying_in_progress)
return;
if (!ec)
{
do_read();
}
else
{
std::cout << "-- CAsioConnection::do_read() error : (" << ec.value() << ") " << ec.message() << "\n";
hasta_la_vista();
connections_.erase(shared_from_this());
}
});
}
uint8_t buff[3];
asio::ip::tcp::socket socket_;
bool destroying_in_progress;
std::set<CAsioConnection::PtrType>& connections_;
};
//*****************************************************************************
class CAsioServer
: public std::enable_shared_from_this<CAsioServer>
{
public:
using PtrType = std::shared_ptr<CAsioServer>;
CAsioServer(int port,asio::io_context& io,const asio::ip::tcp::endpoint& endpoint)
: port_(port),destroying_in_progress(false),acceptor_(io,endpoint)
{
std::cout << "-- CAsioServer is creating,port: " << port_ << "\n";
}
virtual ~CAsioServer()
{
for (auto c : connections_)
{
c->hasta_la_vista();
}
std::cout << "-- CAsioServer is destroying,port: " << port_ << "\n";
}
int port(void) { return port_; }
void accept(void) { do_accept(); }
void hasta_la_vista(void)
{
destroying_in_progress = true;
std::error_code ec;
acceptor_.cancel(ec);
}
private:
void do_accept()
{
auto self(shared_from_this());
acceptor_.async_accept([this,asio::ip::tcp::socket socket) {
if (destroying_in_progress)
return;
if (!ec)
{
std::cout << "-- CAsioServer::do_accept() connection to socket: " << socket.native_handle() << "\n";
auto c = std::make_shared<CAsioConnection>(std::move(socket),connections_);
connections_.insert(c);
c->read();
}
else
{
std::cout << "-- CAsioServer::do_accept() error : (" << ec.value() << ") "<< ec.message() << "\n";
}
do_accept();
});
}
int port_;
bool destroying_in_progress;
asio::ip::tcp::acceptor acceptor_;
std::set<CAsioConnection::PtrType> connections_;
};
//*****************************************************************************
class CTcpBase
{
public:
CTcpBase()
{
// heart beat timer to keep it alive
do_heart_beat();
t_ = std::thread([this] {
std::cout << "-- io context is RUNNING!!!\n";
io_.run();
std::cout << "-- io context has been STOPED!!!\n";
});
}
virtual ~CTcpBase()
{
io_.stop();
if (t_.joinable())
t_.join();
}
void add_server(int port)
{
io_.post([this,port] {
for (auto& s : servers_)
if (port == s->port())
return;
auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(),port);
auto s = std::make_shared<CAsioServer>(port,io_,endpoint);
s->accept();
servers_.insert(s);
});
}
void remove_server(int port)
{
io_.post([this,port] {
for (auto s : servers_)
if (port == s->port())
{
s->hasta_la_vista();
servers_.erase(s);
return;
}
});
}
private:
void do_heart_beat(void)
{
std::cout << "-- beat\n";
auto timer = std::make_shared<asio::steady_timer>(io_,asio::chrono::milliseconds(3000));
timer->async_wait([timer,this](const std::error_code& ec) {
do_heart_beat();
});
}
asio::io_context io_;
std::thread t_;
std::set<CAsioServer::PtrType> servers_;
};
//*****************************************************************************
int main(void)
{
CTcpBase tcp_base;
std::cout << "CONNECT the server to port 502\n";
tcp_base.add_server(502);
std::this_thread::sleep_for(std::chrono::seconds(20));
std::cout << "REMOVE the server from port 502\n";
tcp_base.remove_server(502);
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。