如何解决如何将 ParallelIterator 转换回顺序迭代器?
我正在遍历数据库中数 GB 的输入项。在每个输入项上,我正在做一些 cpu 密集型处理,产生一个或多个新的输出项,总共数十 GB。然后将输出项存储在另一个数据库表中。
通过使用 Rayon 进行并行处理,我获得了不错的加速。但是,数据库 API 不是线程安全的;它是 Send
但不是 Sync
,因此 I/O 必须被序列化。
理想情况下,我只想写:
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.ser_bridge() // End parallelism. This function does not exist.
.for_each(|output_item| {
output_database.write_item(output_item);
});
基本上我想要par_bridge()
的反面;在调用它的线程上运行的东西,从每个线程读取项目,并连续生成它们。但在目前的 Rayon 实现中,这似乎并不存在。我不确定这是因为理论上不可能,还是它不适合库的当前设计。
输出太大,无法先将其全部收集到 Vec
中;它需要直接流式传输到数据库中。
顺便说一下,我没有和 Rayon 结婚;如果有其他更合适的板条箱,我很乐意切换。
解决方法
您可以将输出数据库包装在 Arc<Mutex>
中以防止并行访问:
let output_database = Arc::new (Mutex::new (output_database));
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.for_each_with (output_database,|output_database,output_item| {
output_database.lock().write_item(output_item);
});
,
我认为顺序无关紧要,因此您不需要对输出数据进行排序。
您可以使用 mpsc::channel
将您的数据从 for_each
闭包传输到您的数据库 API,例如
use std::sync::mpsc;
let (tx,rx) = mpsc::channel();
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.for_each(move |output_item| {
tx.send(output_item).unwrap();
});
在第二个线程中,您可以使用 rx
变量接收数据并将其写入数据库。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。