混搭选择!和 Rust 中的异步调用

如何解决混搭选择!和 Rust 中的异步调用

我正在构建一个小应用程序,它应该以不同的时间间隔安排两个任务(基于 rusoto AWS SDK):每 X 秒运行一个任务,每 Y 秒运行另一个任务。

我发现 crate crossbeam 提供了一个滴答计时器和一个 select! 宏,并将它们放在一起,如下所示:

fn main() -> Result<(),Error> {
  let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
  let rt = Runtime::new().unwrap();
  let tick_a = tick(Duration::from_secs(60));
  let tick_b = tick(Duration::from_secs(30));

  loop {
    select! {
      recv(tick_a) -> _ => {
        rt.block_on(cloudwatch_client.put_metric_data( /* ... */ ));
      },/* similar for tick_b */
    }
  }
}

这可以编译,但是程序会因 thread 'main' panicked at 'not currently running on the Tokio runtime.' 而出现混乱。通过分析回溯,这似乎来自 Rusoto 调用

在这里错过了什么?有没有办法使这项工作?在 Rust 中是否有更好的方法来处理任务的间隔调度?

请注意,this 问题似乎没有解决我的问题。该问题从使用 futures::executor::block_on 函数开始,并通过使用由 tokio block_on 实现的 Runtime 方法解决。我已经在 block_on 中使用了 Runtime 方法

解决方法

线程“main”因“当前未在 Tokio 运行时上运行”而恐慌。

如果特定库所需的 tokio 运行时版本未主动运行,则会显示此错误 - 因为每个主要版本使用不同的线程局部变量,并且可以包含 1 个以上的库主要版本相同的构建。

在您的情况下,您可能运行的是 Tokio 0.3 运行时,但 rusoto 需要 Tokio 0.2 运行时。当 rusoto 然后尝试通过 Tokio 0.2(也包含在构建中)执行 IO 时,该程序检测到没有运行时处于活动状态并产生错误。

要解决此问题,请确保在您的项目中仅使用一个 tokio 版本。您可能需要通过 0.2 将 tokio 降级到 Cargo.toml,因为可能没有更新的 rusoto 版本可用。

另外一个不相关的建议:

除了将 crossbeam 用于计时器之外,您还可以在 tokio 运行时内运行“整个事情”:您可以使用 tokio::select!和 tokio 计时器,用于在此处使用横梁执行您的操作。

https://docs.rs/tokio/0.2.24/tokio/time/fn.interval.htmlhttps://docs.rs/tokio/0.2.24/tokio/macro.select.html(其中包含与您的用例类似的示例)

,

CloudWatchClient::put_metric_data 返回一个 RusotoFuture ,它有一个 sync 方法可以做你想要的。所以,而不是:

let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
let rt = Runtime::new().unwrap();
rt.block_on(cloudwatch_client.put_metric_data(/* ... */));

你可以这样做:

let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
cloudwatch_client.put_metric_data(/* ... */).sync();

Which is also what the official docs recommend

,

请注意:

请注意,从 v0.43.0 开始,Rusoto 使用 Rust 的 std::future::Future 和 Tokio 0.2 生态系统。从 v0.46.0 开始,Rusoto 使用 Tokio 1.0 生态系统。

对于简单的 S3 调用,我建议使用以下依赖项:

[dependencies]  
rusoto_core = "0.46.0"                            
rusoto_s3 = "0.46.0"                                                    
tokio = {version = "1.0",features = ["full"]}  

调用可以这样实现:

use rusoto_core::Region;
use rusoto_s3::{S3,S3Client};

#[tokio::main]
async fn main() {
    let region = Region::UsEast1;
    let client = S3Client::new(region);

    match client.list_buckets().await {
        Ok(output) => match output.buckets {
            Some(bucket_list) => {
                println!("Buckets in S3:");

                for bucket in bucket_list {
                    println!("{:?}",bucket.name);
                }
            }
            None => println!("No buckets!"),},Err(error) => {
            println!("Error: {:?}",error);
        }
    }
}

设置凭据以避免凭据错误。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?