微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

异步 Rust 中的多线程 - 为什么我的代码无法并行化?

如何解决异步 Rust 中的多线程 - 为什么我的代码无法并行化?

我试图通过运行以下函数来故意用尽 API 限制(900 次调用):

#[get("/exhaust")]
pub async fn exhaust(_pool: web::Data<PgPool>,config: web::Data<Arc<Settings>>) -> impl Responder {
    let mut handles = vec![];

    for i in 1..900 {
        let inner_config = config.clone();
        let handle = thread::spawn(move || async move {
            println!("running thread {}",i);
            get_single_tweet(inner_config.as_ref().deref(),"1401287393228038149")
                .await
                .unwrap();
        });
        handles.push(handle);
    }

    for h in handles {
        h.join().unwrap().await;
    }

    HttpResponse::Ok()

我的机器有 16 个内核,所以我希望上面的运行速度比单线程函数快 16 倍,但事实并非如此。事实上,它的运行速度与单线程版本一样慢。

这是为什么?我错过了什么?

注意:move || async move 部分对我来说看起来有点奇怪,但我是按照编译器的建议到达那里的。由于async closures being unstable,它不会让我将异步放在第一步。这可能是问题吗?

解决方法

此代码确实会同步运行您的 async 块。 async 块创建了一个实现 Future 的类型,但要知道的一件事是 Future 不会自己开始运行,它们必须是 await -ed 或提供给执行程序运行。

使用返回 thread::spawn 的闭包调用 Future 不会执行它们;线程只是创建 async 块并返回。因此,在 async 循环中 await 之前不会实际执行 handles 块,这将按顺序处理期货。

解决此问题的一种方法是使用 join_all 板条箱中的 futures 同时运行它们。

let mut futs = vec![];

for i in 1..900 {
    let inner_config = config.clone();
    futs.push(async move {
        println!("running thread {}",i);
        get_single_tweet(inner_config.as_ref().deref(),"1401287393228038149")
            .await
            .unwrap();
    });
}

futures::future::join_all(futs).await;
,

问题在于您以某种方式混合了多线程和异步,这会导致所有工作都是连续的:您的所有线程所做的都是调用 get_single_tweet,这显然是一个 async 函数。>

现在在像 Javascript 这样的语言中,get_single_tweet 会创建一个 task,它会返回一个 promise 象征着任务的实现并尽快运行可能。

这不是 Rust 的工作方式(或许多其他语言,顺便说一句,Python 的行为更像 Rust,而不是 Javascript)。在 Rust 中,get_single_tweet 只是创建了一个 未来,它实际上并不任何事情,未来必须是 polled 才能发生: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=b26b47e62e46b66b60844aabc2ea7be1

此轮询何时进行?当 await-ing 的动态链到达事件循环的顶部时。

这里的未来是在线程中创建的,然后从线程返回,然后在从 await 获取时进行 join-ed,因此您的获取不是在线程中运行,而是在在此处运行:

    for h in handles {
        h.join().unwrap().await;
    }

这是完全连续的。

现在要解决这个问题,你基本上有两个选择,但重要的是要保持你的领域:

  • 如果你想使用线程,那么它们内部的调用应该阻塞
  • 否则,您可能想使用 task::spawn(或您选择的运行时中的任何等效项)而不是使用 thread::spawn,确保您使用的是多线程运行时可能是一个好主意但在这里我假设 get_single_tweet 正在执行 IO 工作(例如从 Twitter 获取内容),因此单线程运行时可能已经产生了大部分好处

task::spawn 将(顾名思义)创建一个 任务 并返回一个句柄,这更接近于 Javascript 的 async 函数的行为。 >

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?