微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在 Tokio 运行时上下文中从异步方法调用的非异步方法中等待未来?

如何解决如何在 Tokio 运行时上下文中从异步方法调用的非异步方法中等待未来?

我正在使用 Tokio 1.1 来执行异步操作。我有一个带有 asyncmain #[tokio::main],所以我已经在使用运行时了。

main 调用一个异步方法,我希望在未来成为 await (具体来说,我从数据融合数据帧中收集)。这种非异步方法一个特征指定的签名,该特征返回一个结构,而不是一个 Future<Struct>。据我所知,我无法将其标记为异步。

如果我尝试调用 df.collect().await;,我会得到

只允许在 async 函数和块中

来自编译器的错误,指出我在其中调用 await方法不是 async

如果我尝试从这样的新运行时block_on未来:

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());

我遇到运行时恐慌:

无法从运行时内启动运行时。发生这种情况是因为一个函数(如 block_on)试图在当前线程被用于驱动异步任务时阻塞当前线程。

如果我尝试 futures::executor::block_on(df.collect()).unwrap();,我会遇到新的运行时恐慌:

“当前未在 Tokio 0.2.x 运行时上运行。”

这很奇怪,因为我使用的是 Tokio v1.1。

这感觉比它应该的更难。我在异步上下文中,感觉编译器应该知道这一点并允许我从方法调用 .await - 唯一的代码路径从 async 块内调用方法。有没有一种简单的方法可以做到这一点,但我缺少这种方法

解决方法

我在异步上下文中,感觉编译器应该知道这一点并允许我从方法中调用 .await

无论您是否在运行时的上下文中,在同步函数中await 从根本上是不可能的。 await 被转换为屈服点,async 函数被转换为状态机,利用这些屈服点执行异步计算。如果不将您的函数标记为 async,这种转换是不可能的。

如果我正确理解你的问题,你有以下代码:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    foo.bar()
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

问题是您不能在 df.collect 内等待 bar,因为它没有标记为 async。如果您可以修改 Trait 的签名,那么您可以使用 How can I define an async method in a trait? 中提到的变通方法使 Trait::bar 成为异步方法。

如果您无法更改 Trait 的签名,那么您有问题。异步函数应该永远花费很长时间而不到达 .await。如 What is the best approach to encapsulate blocking I/O in future-rs? 中所述,您可以在转换为非异步代码时使用 spawn_blocking

#[tokio::main]
async fn main() {
    let foo = Foo {};
    tokio::task::spawn_blocking(move || foo.bar()).await
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

现在您需要一种无需等待即可运行 df.collect 完成的方法。您提到您尝试创建嵌套运行时来解决此问题:

如果我尝试从新的运行时阻止未来......我会感到恐慌

但是,tokio 不允许您创建嵌套的运行时。您可以创建一个新的、独立运行时,如How can I create a Tokio runtime inside another Tokio runtime中所述。但是,生成嵌套运行时的效率会很低。

您可以获取当前运行时的句柄,而不是生成新的运行时:

let handle = Handle::current();

进入运行时上下文:

handle.enter();

然后使用 futures::executor::block_on 运行 Future 以完成:

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        let handle = Handle::current();
        handle.enter();
        futures::executor::block_on(df.collect())
    }
}

进入 tokio 运行时上下文将解决您之前遇到的错误:

如果我尝试 futures::executor::block_on(df.collect()).unwrap();,我会得到一个新的运行时恐慌 not currently running on a Tokio 0.2.x runtime

如果可以,我会敦促您尽量避免这样做。最佳解决方案是将 Trait::bar 标记为 async,将 await 标记为正常。任何其他解决方案,包括上面提到的解决方案,都涉及阻塞当前线程,直到给定的未来完成。

感谢@AliceRyhl 的解释

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。