如何解决协程、std::jthread、MSVC 和崩溃
我的代码在 Archlinux 下使用 GCC 10.2 似乎没有任何问题。 我在几次迭代中使用了 valgrind,但没有检测到任何内存问题。
我制作了一些互斥锁和条件变量帮助器来简化我的代码。 这是我的代码:
#include <atomic>
#include <condition_variable>
#include <coroutine>
#include <deque>
#include <iostream>
#include <optional>
#include <shared_mutex>
#include <string_view>
#include <thread>
#include <utility>
#include <vector>
#define FWD(x) std::forward<decltype(x)>(x)
template <typename... Ts> void display(Ts... ts) {
static std::mutex m;
m.lock();
std::cout << "this_thread: " << std::this_thread::get_id() << ": ";
((std::cout << ts << " "),...);
std::cout << std::endl;
m.unlock();
}
template <typename T,typename Lock> struct LockedValue {
LockedValue(T &value,Lock &&lock)
: m_value{value},m_lock{std::move(lock)} {}
T &operator*() { return m_value; }
T *operator->() { return &m_value; }
void lock() { m_lock.lock(); }
void unlock() { m_lock.unlock(); }
private:
T &m_value;
Lock m_lock;
};
template <typename T> class Mutex {
public:
template <typename... Args> Mutex(Args &&...args) : m_value{FWD(args)...} {}
auto lock() { return LockedValue{m_value,std::unique_lock{m_mutex}}; }
auto lock() const { return LockedValue{m_value,std::shared_lock{m_mutex}}; }
private:
friend class ConditionVariable;
T m_value;
mutable std::shared_mutex m_mutex;
};
class ConditionVariable {
public:
void notifyOne() { m_cond.notify_one(); }
void notifyAll() { m_cond.notify_all(); }
template <typename F,typename Args>
void wait(F f,const Mutex<Args> &mutexes) {
auto lock = mutexes.lock();
m_cond.wait(lock,[&] { return f(*lock); });
}
template <typename F,std::stop_token st,st,[&] { return f(*lock); });
}
private:
std::condition_variable_any m_cond;
};
struct Awaiter {
public:
Awaiter() {}
template <typename... Args>
Awaiter(std::coroutine_handle<Args...> handle) : m_handle{handle} {}
Awaiter(const Awaiter &) = delete;
Awaiter(Awaiter &&a) : m_handle{a.m_handle} { a.m_handle = nullptr; }
void resume() { m_handle(); }
private:
std::coroutine_handle<> m_handle = nullptr;
};
class Thread {
struct Awaitable {
Thread &thread;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> handle) {
thread.addAwaiter({handle});
}
void await_resume() {}
};
public:
Thread(std::string name) : m_name{std::move(name)} {
m_thread = std::jthread([this](std::stop_token st) { run(st); });
}
void addAwaiter(Awaiter &&awaiter) {
m_awaiters.lock()->push_back(std::move(awaiter));
m_conditionVariable.notifyOne();
}
auto id() { return m_thread.get_id(); }
Awaitable operator co_await() { return {*this}; }
private:
void run(std::stop_token st) {
while (!st.stop_requested()) {
m_conditionVariable.wait(&Thread::hasAwaiters,m_awaiters);
std::optional<Awaiter> awaiter;
{
auto awaiters = m_awaiters.lock();
if (!awaiters->empty()) {
awaiter.emplace(std::move(awaiters->front()));
awaiters->pop_front();
}
}
if (awaiter)
awaiter->resume();
}
}
static bool hasAwaiters(const std::deque<Awaiter> &awaiters) {
return !awaiters.empty();
}
private:
std::string m_name;
Mutex<std::deque<Awaiter>> m_awaiters;
ConditionVariable m_conditionVariable;
std::jthread m_thread;
};
struct task {
struct promise_type {
task get_return_object() { return {}; }
std::suspend_never initial_suspend() noexcept { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() noexcept {}
~promise_type() {}
};
~task() {}
};
std::atomic_int x = 0;
std::atomic_int done = 0;
task f(Thread &thread1,Thread &thread2) {
co_await thread1;
++x;
co_await thread2;
++x;
++done;
}
using namespace std::chrono_literals;
static auto N_ITER = 1;
static auto loop = 10'000'000;
int main() {
std::ios_base::sync_with_stdio(false);
display("Single task");
for (int i = 0; i < loop; ++i) {
Thread thread1{"A"};
Thread thread2{"B"};
x = 0;
done = 0;
for (int j = 0; j < N_ITER; ++j)
f(thread1,thread2);
int j = 0;
while (done < N_ITER)
;
if (x != N_ITER * 2) {
std::cout << "error at ith" << i << std::endl;
}
if (i % 1'000'000 == 0)
std::cout << i << std::endl;
}
N_ITER *= 500;
display("Middle of task");
for (int i = 0; i < loop; ++i) {
Thread thread1{"A"};
Thread thread2{"B"};
x = 0;
done = 0;
for (int j = 0; j < N_ITER; ++j)
f(thread1,thread2);
int j = 0;
while (done < N_ITER)
;
if (x != N_ITER * 2) {
std::cout << "error at ith" << i << std::endl;
}
if (i % 1'000'000 == 0)
std::cout << i << std::endl;
}
// Lot of task
std::cout << "Now lot of task" << std::endl;
loop /= 1000;
N_ITER *= 1000;
for (int i = 0; i < loop; ++i) {
Thread thread1{"A"};
Thread thread2{"B"};
x = 0;
done = 0;
for (int j = 0; j < N_ITER; ++j)
f(thread1,thread2);
int j = 0;
while (done < N_ITER)
;
if (x != N_ITER * 2) {
std::cout << "error at ith" << i << std::endl;
}
if (i % 1000 == 0)
std::cout << i << std::endl;
}
return 0;
}
这是代码的想法: 我创建了 2 个线程,我启动了一个协程,我在第一个线程中执行了第一部分,在第二个线程中执行了第二部分。
在主线程中,我等待结束,然后删除线程,然后重新执行。
正如我所说,在 linux gcc 下,我没有任何问题,但是,在 MSVC 上,我遇到了一些奇怪的问题。当我整夜启动测试时,我没有任何崩溃。但是,有时,当我在计算机上执行某些操作时,例如在 microsoft edge 上阅读文档时,“测试代码”会崩溃,有时与“忙时互斥量被破坏”有关。
所以我不知道这是我代码中的错误,还是其他地方的错误(可能是 MSVC)。自然,我认为我的代码有问题,但实际上,我不知道我在哪里做错了...
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。