微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在并行迭代器中执行计算,然后将其提供给单线程迭代器?

如何解决在并行迭代器中执行计算,然后将其提供给单线程迭代器?

考虑以下示例:

use rayon::prelude::*;

fn do_something_expensive_returning_lots_of_data(x: u32) -> u32 {
    x
}

fn main() {
    let very_large_array = [1,2,3,4];
    let mut h = std::collections::HashSet::new();
    very_large_array.par_iter().for_each({
        |x| {
            let c = do_something_expensive_returning_lots_of_data(*x);
            h.insert(c);
        }
    });
}

我收到以下错误

error[E0596]: cannot borrow `h` as mutable,as it is a captured variable in a `Fn` closure
  --> src/main.rs:13:13
   |
13 |             h.insert(c);
   |             ^ cannot borrow as mutable

我的目的是只让 do_something_expensive_returning_lots_of_data 以多线程方式执行,然后一旦执行,就有一个带有调用结果的单线程迭代器,这样我就可以安全地改变 h。人造丝可以吗?

解决方法

如果您想改变并行代码中的哈希图,您需要进行 Arc<Mutex<T>> 舞蹈。为什么不像这样在 HashSet 中收集结果:

use rayon::prelude::*;

fn do_something_expensive_returning_lots_of_data(x: u32) -> u32 {
    x
}

fn main() {
    let very_large_array = [1,2,3,4];
    let h = very_large_array
        .par_iter()
        .map(|x| do_something_expensive_returning_lots_of_data(*x))
        .collect::<std::collections::HashSet<_>>();
}

(评论后编辑)如果您想在值到达时对其进行处理,我建议使用这样的 mpsc 通道:

use rayon::prelude::*;
use std::sync::mpsc::sync_channel;

fn do_something_expensive_returning_lots_of_data(x: u32) -> u32 {
    x
}

fn main() {
    let (sender,receiver) = sync_channel(1024); // choose appropriate buffer size
    let very_large_array = [1,4];
    rayon::join(
        move || {
            very_large_array.par_iter().for_each(|x| {
                sender
                    .send(do_something_expensive_returning_lots_of_data(*x))
                    .unwrap()
            })
        },move || {
            while let Ok(x) = receiver.recv() {
                println!("{}",x)
            }
        },);
}

(Playground)

,

这个非常简单:不是在计算过程中一次向 HashSet 添加元素,而是将数据与您的计算进行映射,然后将其收集到 HashSet 中。

use rayon::prelude::*;

fn do_something_expensive_returning_lots_of_data(x: u32) -> u32 {
    x
}

fn main() {
    let very_large_array = [1u32,4];
    let h = very_large_array
        .into_par_iter()
        .map(do_something_expensive_returning_lots_of_data)
        .collect::<std::collections::HashSet<_>>();
    println!("{:?}",h);
}

Playground

认识到 Rayon 具有所有 std::iter 组件的并行模拟,这一点很重要。如果您需要扩展现有的 HashSet,您可以使用 par_extend 方法。

use rayon::prelude::*;

fn do_something_expensive_returning_lots_of_data(x: u32) -> u32 {
    x
}

fn main() {
    let very_large_array = [1u32,4];
    let mut h = std::collections::HashSet::new();
    h.par_extend(
        very_large_array
            .into_par_iter()
            .map(do_something_expensive_returning_lots_of_data),);
    println!("{:?}",h);
}

playground

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。