如何解决从 Flux<String> 发射直到满足条件之一?
我得到一个 InputStream
,它代表一个资源(很可能是一个 .txt
文件),我的任务是解析该文件的行。
我必须使我的程序尽可能可修改,这意味着今天我正在从文件中读取数据,但明天数据可能会通过 TCP 流从世界的另一端到达。不应假设新线路到达的速度。
考虑到这一点,我决定尝试 asynchronous
/reactive
方法(如果有人能提供一些很好的资源来解释差异,那就太好了)。
我在 pom.xml
中添加:
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.4.4</version>
</dependency>
此外,我必须读取/解析文件,直到处理了 100 行或直到我开始处理后已经过去 30 秒。因此,无论满足哪个条件,都应停止处理。
到目前为止,我已经:
String absolutePath = "C:\\Users\\User\\Desktop\\input.json";
InputStream is = new FileInputStream(absolutePath);
Flux<String> stringFlux = Flux.using(
() -> new BufferedReader(new InputStreamReader(is,StandardCharsets.UTF_8)).lines().limit(100),Flux::fromStream,Stream::close
).take(100);
stringFlux
.subscribe(s -> System.out.println(s + " " + new Date()),(error) -> System.out.println(error),() -> System.out.println("complete Chanel"));
我特意留下了 .limit(100)
和 .take(100)
,因为我不确定哪个更好(也许两者都错了?)。另外,我不知道如何设置时间限制。
解决方法
我必须读取/解析文件直到处理了 100 行
您在这里概述了两个明智的选择 - 您要么限制底层流,要么限制流量本身。通常我主张尽可能对源应用限制(在这种情况下就是流),但是如果您希望能够轻松地换出该源并保持限制不变,那么应用 {{1} } 在外部通量上可能是更好的选择。
这真的取决于你,这里没有硬性规定——选择一个或另一个。
...或者直到我开始处理后 30 秒过去了。
There's another overload of take() that uses Duration。只需使用它,这样您的通量就以 take(100)
结尾。
(与问题不完全相关,但作为一般说明 - .take(100).take(Duration.ofSeconds(30))
是一种更简洁的方式来阅读文件,而不是使用 Files.lines()
如今。)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。