为什么在crossbeam_channel :: select旁边调用tokio :: spawn会有延迟?

如何解决为什么在crossbeam_channel :: select旁边调用tokio :: spawn会有延迟?

我正在创建一个任务,它将产生其他任务。其中一些会花费一些时间,因此无法等待,但可以并行运行:

src / main.rs

<a href="@System.Uri.UnescapeDataString(Url.RouteUrl("Product",new {SeName = Model.SeName}))">@Model.Name</a>

我注意到了奇怪的行为。输出

use crossbeam::crossbeam_channel::{bounded,select};

#[tokio::main]
async fn main() {
    let (s,r) = bounded::<usize>(1);

    tokio::spawn(async move {
        let mut counter = 0;
        loop {
            let loop_id = counter.clone();
            tokio::spawn(async move { // why this one was not fired?
                println!("inner task {}",loop_id);
            }); // .await.unwrap(); - solves issue,but this is long task which cannot be awaited
            println!("loop {}",loop_id);
            select! {
                recv(r) -> rr => {
                    // match rr {
                    //     Ok(ee) => {
                    //         println!("received from channel {}",loop_id);
                    //         tokio::spawn(async move {
                    //             println!("received from channel task {}",loop_id);
                    //         });
                    //     },//     Err(e) => println!("{}",e),// };
                },// more recv(some_channel) -> 
            }
            counter = counter + 1;
        }
    });

    // let s_clone = s.clone();
    // tokio::spawn(async move {
    //     s_clone.send(2).unwrap();
    // });

    loop {
        // rest of the program
    }
}

我希望它也能输出loop 0

如果我向channel发送一个值,输出将是:

inner task 0

缺少loop 0 inner task 0 loop 1

为什么inner task 1产生一个延迟循环?

我第一次注意到“从通道任务接收到”这种行为会延迟一个循环,但是当我减少代码以准备示例时,这种情况就开始发生在“内部任务”中。值得一提的是,如果我将第二个inner task写给另一个,则只有最后一个会出现此问题。调用tokio::spawntokio::spawn时应该注意些什么?是什么原因导致这一循环延迟?

Cargo.toml依赖项

select!

Rust 1.46,Windows 10

解决方法

select!被禁止,并且tokio::spawn say的文档:

生成的任务可能在当前线程上执行,或者可能被发送到其他线程以执行。

在这种情况下,select!“未来”实际上是一个阻塞函数,并且spawn不使用新线程(无论是在第一次调用还是在循环内的线程)。 因为您没有告诉tokio您将要阻塞,所以tokio认为不需要另一个线程(从tokio的角度来看,您只有3个永远不会阻塞的期货,那么为什么您仍然需要另一个线程?)。 / p>

解决方案是将tokio::task::spawn_blocking用于select!闭包(它将不再是将来,因此async move {}现在是move || {})。 现在,tokio将知道此函数实际上已阻塞,并将其移至另一个线程(同时将所有实际的期货保留在其他执行线程中)。

use crossbeam::crossbeam_channel::{bounded,select};

#[tokio::main]
async fn main() {
    let (s,r) = bounded::<usize>(1);

    tokio::task::spawn_blocking(move || {
        // ...
    });

    loop {
        // rest of the program
    }
}

Link to playground

另一种可能的解决方案是使用非阻塞通道,例如tokio::sync::mpsc,您可以在其上使用await并获得预期的行为,例如直接使用{{1}的playground example }或recv().await,例如:

tokio::select!

Link to playground

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?