如何在 C++ 20 中执行链式协程

如何解决如何在 C++ 20 中执行链式协程

我正在尝试链接协程。 Foo2 实际上会异步。一旦 Foo2 恢复,代码应该按照“恢复 Foo2”和“恢复 Foo1”的顺序执行(如 2 继续)。有些细节我不是很清楚。首先,当 co_await b 挂起时,它是否立即将一个 promise 对象返回给调用者?然后 co_await Foo2() 发生。此时我需要挂起但不想触发线程 t(run)。在某个地方,我认为我需要在 Foo1() 中的 co_await 之前将 Foo2() 中的 promise/awaiter 包装起来。

void run(std::coroutine_handle<> h)
{
  std::cout<<std::this_thread::get_id()<<" "<<"in Run\n";
  std::this_thread::sleep_for (std::chrono::seconds(5));
  h.resume();
}

template<typename T>
struct task{

  struct promise_type {
    T val_;
    task get_return_object() { return {.h_ = std::coroutine_handle<promise_type>::from_promise(*this)}; }
    std::suspend_never initial_suspend() { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    void return_value(T val) { val_ = val; }
    void unhandled_exception() {}
  };

  bool await_ready() { return false; }
  void await_suspend(std::coroutine_handle<> h)
  {
    std::thread t(run,h);
    t.detach();
  }

  void await_resume() { }

  std::coroutine_handle<promise_type> h_;
};

template<typename T>
task<T> Foo2()
{ 
  std::cout<<std::this_thread::get_id()<<" "<<"in Foo2\n";
  task<T> b;
  co_await b;
  std::cout<<std::this_thread::get_id()<<" resume Foo2\n";
}

template<typename T>
task<T> Foo1()
{
  std::cout<<std::this_thread::get_id()<<" "<<"in Foo1\n";
  co_await Foo2<T>();
  std::cout<<std::this_thread::get_id()<<" resume Foo1\n";
}

int main()
{
  Foo1<int>();
  std::cout<<std::this_thread::get_id()<<" ""main end\n";  
  std::this_thread::sleep_for (std::chrono::seconds(30));
}

解决方法

是否立即返回一个promise对象

Promise 对象永远不会返回。 Promise 对象被创建并存储在协程框架内。与协程框架一起销毁。

返回给调用者的是协程的“返回对象”。它是从协程第一次挂起(或完成)时从 promise::get_return_object() 函数返回的对象创建的。 (promise::get_return_object() 在协程的主体开始执行之前被调用)

此时我需要暂停...

为了等待另一个协程的完成,当前的协程需要暂停并存储在某个地方,以便在等待的协程完成后恢复。

它可以存储在负责自旋协程(类似 io_service)的某个上下文中,也可以存储在等待的协程中。

这是一个可等待的异步(和单线程)task<T> 协程示例。

它将暂停的协程存储在等待的协程的协程承诺中,并在计算出协程值后恢复它。

#include <coroutine>
#include <optional>

#include <iostream>
#include <thread>

#include <chrono>
#include <queue>
#include <vector>

// basic coroutine single-threaded async task example

template<typename T>
struct task_promise_type;

// simple single-threaded timer for coroutines
void submit_timer_task(std::coroutine_handle<> handle,std::chrono::seconds timeout);

template<typename T>
struct task;

template<typename T>
struct task_promise_type
{
    // value to be computed
    // when task is not completed (coroutine didn't co_return anything yet) value is empty
    std::optional<T> value;

    // corouine that awaiting this coroutine value
    // we need to store it in order to resume it later when value of this coroutine will be computed
    std::coroutine_handle<> awaiting_coroutine;

    // task is async result of our coroutine
    // it is created before execution of the coroutine body
    // it can be either co_awaited inside another coroutine
    // or used via special interface for extracting values (is_ready and get)
    task<T> get_return_object();

    // there are two kinds of coroutines:
    // 1. eager - that start its execution immediately
    // 2. lazy - that start its execution only after 'co_await'ing on them
    // here I used eager coroutine task
    // eager: do not suspend before running coroutine body
    std::suspend_never initial_suspend()
    {
        return {};
    }

    // store value to be returned to awaiting coroutine or accessed through 'get' function
    void return_value(T val)
    {
        value = std::move(val);
    }

    void unhandled_exception()
    {
        // alternatively we can store current exeption in std::exception_ptr to rethrow it later
        std::terminate();
    }

    // when final suspend is executed 'value' is already set
    // we need to suspend this coroutine in order to use value in other coroutine or through 'get' function
    // otherwise promise object would be destroyed (together with stored value) and one couldn't access task result
    // value
    auto final_suspend() noexcept
    {
        // if there is a coroutine that is awaiting on this coroutine resume it
        struct transfer_awaitable
        {
            std::coroutine_handle<> awaiting_coroutine;

            // always stop at final suspend
            bool await_ready() noexcept
            {
                return false;
            }
            std::coroutine_handle<> await_suspend(std::coroutine_handle<task_promise_type> h) noexcept
            {
                // resume awaiting coroutine or if there is no coroutine to resume return special coroutine that do
                // nothing
                return awaiting_coroutine ? awaiting_coroutine : std::noop_coroutine();
            }
            void await_resume() noexcept {}
        };
        return transfer_awaitable{awaiting_coroutine};
    }

    // there are multiple ways to add co_await into coroutines
    // I used `await_transform`

    // use `co_await std::chrono::seconds{n}` to wait specified amount of time
    auto await_transform(std::chrono::seconds duration)
    {
        struct timer_awaitable
        {
            std::chrono::seconds duration;
            // always suspend
            bool await_ready()
            {
                return false;
            }

            // h is a handler for current coroutine which is suspended
            void await_suspend(std::coroutine_handle<task_promise_type> h)
            {
                // submit suspended coroutine to be resumed after timeout
                submit_timer_task(h,duration);
            }
            void await_resume() {}
        };

        return timer_awaitable{duration};
    }

    // also we can await other task<T>
    template<typename U>
    auto await_transform(task<U>& task)
    {
        if (!task.handle) {
            throw std::runtime_error("coroutine without promise awaited");
        }
        if (task.handle.promise().awaiting_coroutine) {
            throw std::runtime_error("coroutine already awaited");
        }

        struct task_awaitable
        {
            std::coroutine_handle<task_promise_type<U>> handle;

            // check if this task already has value computed
            bool await_ready()
            {
                return handle.promise().value.has_value();
            }

            // h - is a handle to coroutine that calls co_await
            // store coroutine handle to be resumed after computing task value
            void await_suspend(std::coroutine_handle<> h)
            {
                handle.promise().awaiting_coroutine = h;
            }

            // when ready return value to a consumer
            auto await_resume()
            {
                return std::move(*(handle.promise().value));
            }
        };

        return task_awaitable{task.handle};
    }
};

template<typename T>
struct task
{
    // declare promise type
    using promise_type = task_promise_type<T>;

    task(std::coroutine_handle<promise_type> handle) : handle(handle) {}

    task(task&& other) : handle(std::exchange(other.handle,nullptr)) {}

    task& operator=(task&& other)
    {
        if (handle) {
            handle.destroy();
        }
        handle = other.handle;
    }

    ~task()
    {
        if (handle) {
            handle.destroy();
        }
    }

    // interface for extracting value without awaiting on it

    bool is_ready() const
    {
        if (handle) {
            return handle.promise().value.has_value();
        }
        return false;
    }

    T get()
    {
        if (handle) {
            return std::move(*handle.promise().value);
        }
        throw std::runtime_error("get from task without promise");
    }

    std::coroutine_handle<promise_type> handle;
};

template<typename T>
task<T> task_promise_type<T>::get_return_object()
{
    return {std::coroutine_handle<task_promise_type>::from_promise(*this)};
}

// simple timers

// stored timer tasks
struct timer_task
{
    std::chrono::steady_clock::time_point target_time;
    std::coroutine_handle<> handle;
};

// comparator
struct timer_task_before_cmp
{
    bool operator()(const timer_task& left,const timer_task& right) const
    {
        return left.target_time > right.target_time;
    }
};

std::priority_queue<timer_task,std::vector<timer_task>,timer_task_before_cmp> timers;

void submit_timer_task(std::coroutine_handle<> handle,std::chrono::seconds timeout)
{
    timers.push(timer_task{std::chrono::steady_clock::now() + timeout,handle});
}

// timer loop
void loop()
{
    while (!timers.empty()) {
        auto& timer = timers.top();
        // if it is time to run a coroutine
        if (timer.target_time < std::chrono::steady_clock::now()) {
            auto handle = timer.handle;
            timers.pop();
            handle.resume();
        } else {
            std::this_thread::sleep_until(timer.target_time);
        }
    }
}

// example

using namespace std::chrono_literals;

task<int> wait_n(int n)
{
    std::cout << "before wait " << n << '\n';
    co_await std::chrono::seconds(n);
    std::cout << "after wait " << n << '\n';
    co_return n;
}

task<int> test()
{
    for (auto c : "hello world\n") {
        std::cout << c;
        co_await 1s;
    }

    std::cout << "test step 1\n";
    auto w3 = wait_n(3);
    std::cout << "test step 2\n";
    auto w2 = wait_n(2);
    std::cout << "test step 3\n";
    auto w1 = wait_n(1);
    std::cout << "test step 4\n";
    auto r = co_await w2 + co_await w3;
    std::cout << "awaiting already computed coroutine\n";
    co_return co_await w1 + r;
}

// main can't be a coroutine and usually need some sort of looper (io_service or timer loop in this example )
int main()
{
    // do something

    auto result = test();

    // execute deferred coroutines
    loop();

    std::cout << "result: " << result.get();
}

输出:

hello world
test step 1
before wait 3
test step 2
before wait 2
test step 3
before wait 1
test step 4
after wait 1
after wait 2
after wait 3
awaiting already computed coroutine
result: 6

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res