如何解决为 websocket 提升堆栈协程,如何发布一个函数并从另一个线程恢复执行 用asio异步操作实现select使用异步事件实现选择使用异步锁存器实现选择
int main()
{
tcp::socket socket(iocp);
acceptor.async_accept(socket,yield[ec]);
if (ec)
fail(ec,"accept");
else
boost::asio::spawn(acceptor.get_executor(),std::bind(&do_session,websocket::stream<beast::tcp_stream>(std::move(socket)),std::placeholders::_1));
... iocp run
}
void do_session(websocket::stream<beast::tcp_stream>& ws,net::yield_context yield)
{
while(ws.is_open())
{
ws.async_read(buffer,yield[ec]);
... process the buffer
... execute posted callbacks
}
}
void another_thread()
{
while(isAppNotExit)
{
post_to_specified_coroutine(ws,[]() { ... do in courutine same thread });
}
}
我需要在任何线程中发布一个函数来让指定的协程运行该函数,即上面“执行发布的回调”的代码部分。但是这个任务下发后,协程可能会处于 async_read 或 async_write 状态。是否可以发布一个类似数据的事件并让 async_read 或 async_write 函数立即返回?
解决方法
我想问题的本质是这样的:在 2 个通道上使用 select
:一个容量为 1 的通道和一个(可能)无限容量的通道。
用asio异步操作实现select
编写一个 asio 异步操作来等待多个(两个)事物。
(asio 异步操作模板:c++ - How to wait for a function to return with Boost:::Asio? - Stack Overflow)。
受互斥锁保护的状态:
- a
std::optional<read_result>
- a
std::vector<functor>
- a
bool
(是否有正在进行的 async_read) - a
std::optional<completion handler>
您的async_wait_for_2_things
:
- 从完成令牌 (
yield[ec]
) 中获取完成处理程序(一个可调用的,可以恢复您的协程); - 锁定互斥锁(使用guard);
- 如果有来自another_thread的pending functor,取出来,发布完成处理程序;
- else 如果有待处理的 read_result,将其取出,发布完成处理程序;
- 否则,如果有一个正在进行的 async_read(布尔值为真),则存储完成处理程序(如果已经存储了一个完成处理程序,则抛出“不可能发生”);
- else(没有挂起的函子,没有挂起的 read_result,async_read 还没有启动),存储完成处理程序(如果已经存储了一个完成处理程序,抛出“不可能发生”),将 bool 设置为 true(如果bool 已经为真,抛出“不能发生”),调用 async_read;
- 解锁互斥锁;
async_read 的回调:
- 锁定互斥锁(使用guard);
- 将 bool 设置为 false(如果 bool 已经为 false,则抛出“不可能发生”);
- 如果有完成处理程序,将其取出,发布;
- 否则,存储read_result(如果已经存储了read_result,则抛出“不可能发生”);
- 解锁互斥锁;
another_thread 发布函子的代码:
- 锁定互斥锁(使用guard);
- 如果有完成处理程序,将其取出,发布;
- 否则,存储函子;
- 解锁互斥锁;
使用异步事件实现选择
- async_read(使用回调重载)的 lambda 完成处理程序:存储结果,通知 asynchronous_event;
- another_thread:存储函子,通知异步事件;
- do_session:异步等待 asynchronous_event,加载结果或函子;
- asynchronous_event 的数据位于受互斥锁保护的
std::pair<std::optional<read_result>,std::vector<functor>>
中;
使用定时器实现异步事件:c++ - Why does Boost.Asio not support an event-based interface? - Stack Overflow。
这不适用,因为“异步事件”不是“异步条件变量”,它不能:
- 在异步等待中以原子方式 释放互斥锁和块
(可能的顺序:do_session 释放互斥锁,然后发布函子,然后通知事件(cancel_one
),然后 do_session 等待事件(timer_.async_wait(yield[ec]);
)并永远阻塞)
使用异步锁存器实现选择
- async_read(使用回调重载)的lambda handler:①存储结果并重置asynchronous_latch_producer,②通知asynchronous_latch_consumer,等待asynchronous_latch_producer(,⑥wake up);
- another_thread:①存储函子并重置asynchronous_latch_producer,②通知asynchronous_latch_consumer,等待asynchronous_latch_producer(,⑥wake up);
- do_session:等待asynchronous_latch_consumer(,③wake up),④加载结果或函子并重置asynchronous_latch_consumer,⑤通知asynchronous_latch_producer;
- asynchronous_latch_consumer 和 asynchronous_latch_producer 的数据在一个
std::pair<std::optional<read_result>,std::vector<functor>>
中;
使用定时器实现异步锁存器:c++ - Cancelling boost asio deadline timer safely - Stack Overflow。修改异步事件实现以获取异步闩锁:在构造函数和 reset
、.expires_at(Timer::clock_type::time_point::max())
中;在notify_all_one_shot
,.expires_at(Timer::clock_type::time_point::min())
。
这不适用,因为其中一个生产者可能会永远阻塞。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。