微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何实现来自未来功能的流

如何解决如何实现来自未来功能的流

为了理解流的工作原理,我试图实现一个使用 random.org 的无限数生成器。我做的第一件事是实现一个版本,我将调用一个名为 get_number 的异步函数,它会填充一个缓冲区并返回下一个可能的数字:


struct RandomGenerator {
    buffer: Vec<u8>,position: usize,}

impl RandomGenerator {
    pub fn new() -> RandomGenerator {
        Self {
            buffer: Vec::new(),position: 0,}
    }

    pub async fn get_number(&mut self) -> u8 {
        self.fill_buffer().await;

        let value = self.buffer[self.position];
        self.position += 1;

        value
    }

    async fn fill_buffer(&mut self) {
        if self.buffer.is_empty() || self.is_buffer_depleted() {
            let new_numbers = self.fetch_numbers().await;
            drop(replace(&mut self.buffer,new_numbers));
            self.position = 0;
        }
    }

    fn is_buffer_depleted(&self) -> bool {
        self.buffer.len() >= self.position
    }

    async fn fetch_numbers(&mut self) -> Vec<u8> {
        let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
        let numbers = response.text().await.unwrap();
        numbers
            .lines()
            .map(|line| line.trim().parse::<u8>().unwrap())
            .collect()
    }
}

通过这个实现,我可以在循环中调用函数 get_number 并获得我想要的任意数量的数字,但这个想法是有迭代器,所以我可以调用一堆组合函数,比如 taketake_while 等。

但是当我尝试实现 Stream 时,问题开始出现: 我的第一次尝试是使用一个结构来保存对生成器的引用

struct RandomGeneratorStream<'a> {
    generator: &'a mut RandomGenerator,}

然后我实现了以下流

impl<'a> Stream for RandomGeneratorStream<'a> {
    type Item = u8;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,cx: &mut std::task::Context<'_>,) -> std::task::Poll<Option<Self::Item>> {
        let f = self.get_mut().generator.get_number();
        pin_mut!(f);
        f.poll_unpin(cx).map(Some)
    }
}

但是调用它只会挂起进程

generator.into_stream().take(18).collect::<Vec<u8>>().await

在接下来的尝试中,我尝试使用 pin_mut 在流结构上保持未来的状态!但最终在一生中遇到了许多错误而无法解决它们。 在这种情况下可以做什么? 这是一个没有流的工作代码

use std::mem::replace;
struct RandomGenerator {
    buffer: Vec<u8>,new_numbers));
            self.position = 0;
        }
    }

    fn is_buffer_depleted(&self) -> bool {
        self.buffer.len() >= self.position
    }

    async fn fetch_numbers(&mut self) -> Vec<u8> {
        let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
        let numbers = response.text().await.unwrap();
        numbers
            .lines()
            .map(|line| line.trim().parse::<u8>().unwrap())
            .collect()
    }
}

#[tokio::main]
async fn main() {
    let mut generator = RandomGenerator::new();
    dbg!(generator.get_number().await);
}

在这里您可以找到第一个工作示例的链接(我没有调用 random.org,我使用了 Cursor,因为 dns 解析在操场上不起作用)https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=730eaf1f7db842877d3f3e7ca1c6d2a5

我最后一次尝试流,你可以在这里找到https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=de0b212ee70865f6ac6c19430cd952cd

解决方法

在接下来的尝试中,我尝试使用 pin_mut 在流结构上保持未来的状态!但最终却出现了许多终生无法解决的错误。

您走在正确的轨道上,您需要坚持未来才能使 poll_next 正常工作。

不幸的是,您会遇到具有可变引用的障碍。您保留一个 &mut RandomGenerator 以便重复使用它,但未来本身也必须保留一个 &mut RandomGenerator 才能完成它的工作。这将违反可变引用的排他性。不管你怎么剪都可能会遇到这个问题。

FutureStream 的更好方法是遵循建议 here 并使用 futures::stream::unfold

fn as_stream<'a>(&'a mut self) -> impl Stream<Item = u8> + 'a {
    futures::stream::unfold(self,|rng| async {
        let number = rng.get_number().await;
        Some((number,rng))
    })
}

playground 上查看。

这不一定能帮助您更多地了解流,但提供的功能通常比手动滚动更好。这避免了上述多重可变引用问题的关键原因是,生成未来的函数取得可变引用的所有权,然后在完成后将其归还。这样一次只有一个存在。即使您自己实现了 Stream,您也必须使用类似的机制。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。