微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

从 Flux<String> 发射直到满足条件之一?

如何解决从 Flux<String> 发射直到满足条件之一?

我得到一个 InputStream,它代表一个资源(很可能是一个 .txt 文件),我的任务是解析该文件的行。

我必须使我的程序尽可能可修改,这意味着今天我正在从文件中读取数据,但明天数据可能会通过 TCP 流从世界的另一端到达。不应假设新线路到达的速度。

考虑到这一点,我决定尝试 asynchronous/reactive 方法(如果有人能提供一些很好的资源来解释差异,那就太好了)。

我在 pom.xml添加

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
        <version>2.4.4</version>
    </dependency>

此外,我必须读取/解析文件,直到处理了 100 行或直到我开始处理后已经过去 30 秒。因此,无论满足哪个条件,都应停止处理。

到目前为止,我已经:

    String absolutePath = "C:\\Users\\User\\Desktop\\input.json"; 
    InputStream is = new FileInputStream(absolutePath);
    
    
    Flux<String> stringFlux = Flux.using(
            () -> new BufferedReader(new InputStreamReader(is,StandardCharsets.UTF_8)).lines().limit(100),Flux::fromStream,Stream::close
    ).take(100);
    
    stringFlux
    .subscribe(s -> System.out.println(s + " " + new Date()),(error) -> System.out.println(error),() -> System.out.println("complete Chanel"));

我特意留下了 .limit(100).take(100),因为我不确定哪个更好(也许两者都错了?)。另外,我不知道如何设置时间限制。

解决方法

我必须读取/解析文件直到处理了 100 行

您在这里概述了两个明智的选择 - 您要么限制底层流,要么限制流量本身。通常我主张尽可能对源应用限制(在这种情况下就是流),但是如果您希望能够轻松地换出该源并保持限制不变,那么应用 {{1} } 在外部通量上可能是更好的选择。

这真的取决于你,这里没有硬性规定——选择一个或另一个。

...或者直到我开始处理后 30 秒过去了。

There's another overload of take() that uses Duration。只需使用它,这样您的通量就以 take(100) 结尾。

(与问题不完全相关,但作为一般说明 - .take(100).take(Duration.ofSeconds(30)) 是一种更简洁的方式来阅读文件,而不是使用 Files.lines() 如今。)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。