如何解决如何在 Tokio 运行时上下文中从异步方法调用的非异步方法中等待未来?
我正在使用 Tokio 1.1 来执行异步操作。我有一个带有 async
的 main
#[tokio::main]
,所以我已经在使用运行时了。
main
调用一个非异步方法,我希望在未来成为 await
(具体来说,我从数据融合数据帧中收集)。这种非异步方法有一个特征指定的签名,该特征返回一个结构,而不是一个 Future<Struct>
。据我所知,我无法将其标记为异步。
如果我尝试调用 df.collect().await;
,我会得到
只允许在 async
函数和块中
来自编译器的错误,指出我在其中调用 await
的方法不是 async
。
如果我尝试从这样的新运行时block_on
未来:
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(df.collect());
我遇到运行时恐慌:
无法从运行时内启动运行时。发生这种情况是因为一个函数(如 block_on
)试图在当前线程被用于驱动异步任务时阻塞当前线程。
如果我尝试 futures::executor::block_on(df.collect()).unwrap();
,我会遇到新的运行时恐慌:
“当前未在 Tokio 0.2.x 运行时上运行。”
这很奇怪,因为我使用的是 Tokio v1.1。
这感觉比它应该的更难。我在异步上下文中,感觉编译器应该知道这一点并允许我从方法内调用 .await
- 唯一的代码路径从 async
块内调用此方法。有没有一种简单的方法可以做到这一点,但我缺少这种方法?
解决方法
我在异步上下文中,感觉编译器应该知道这一点并允许我从方法中调用 .await
无论您是否在运行时的上下文中,在同步函数中await
从根本上是不可能的。 await
被转换为屈服点,async
函数被转换为状态机,利用这些屈服点执行异步计算。如果不将您的函数标记为 async
,这种转换是不可能的。
如果我正确理解你的问题,你有以下代码:
#[tokio::main]
async fn main() {
let foo = Foo {};
foo.bar()
}
impl Trait for Foo {
fn bar(df: DataFrame) -> Vec<Data> {
df.collect().await
}
}
问题是您不能在 df.collect
内等待 bar
,因为它没有标记为 async
。如果您可以修改 Trait
的签名,那么您可以使用 How can I define an async method in a trait? 中提到的变通方法使 Trait::bar
成为异步方法。
如果您无法更改 Trait
的签名,那么您有问题。异步函数应该永远花费很长时间而不到达 .await
。如 What is the best approach to encapsulate blocking I/O in future-rs? 中所述,您可以在转换为非异步代码时使用 spawn_blocking
:
#[tokio::main]
async fn main() {
let foo = Foo {};
tokio::task::spawn_blocking(move || foo.bar()).await
}
impl Trait for Foo {
fn bar(df: DataFrame) -> Vec<Data> {
df.collect().await
}
}
现在您需要一种无需等待即可运行 df.collect
完成的方法。您提到您尝试创建嵌套运行时来解决此问题:
如果我尝试从新的运行时阻止未来......我会感到恐慌
但是,tokio 不允许您创建嵌套的运行时。您可以创建一个新的、独立运行时,如How can I create a Tokio runtime inside another Tokio runtime中所述。但是,生成嵌套运行时的效率会很低。
您可以获取当前运行时的句柄,而不是生成新的运行时:
let handle = Handle::current();
进入运行时上下文:
handle.enter();
然后使用 futures::executor::block_on
运行 Future 以完成:
impl Trait for Foo {
fn bar(df: DataFrame) -> Vec<Data> {
let handle = Handle::current();
handle.enter();
futures::executor::block_on(df.collect())
}
}
进入 tokio 运行时上下文将解决您之前遇到的错误:
如果我尝试 futures::executor::block_on(df.collect()).unwrap();,我会得到一个新的运行时恐慌 not currently running on a Tokio 0.2.x runtime
如果可以,我会敦促您尽量避免这样做。最佳解决方案是将 Trait::bar
标记为 async
,将 await
标记为正常。任何其他解决方案,包括上面提到的解决方案,都涉及阻塞当前线程,直到给定的未来完成。
感谢@AliceRyhl 的解释
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。