如何解决RxJava 接收来自多个源的数据包并写入到 OutputStream 中,每个数据包之间有延迟
我有 Java TCP 客户端 Socket 读取 InputStream 并通过 RxJava PublishSubject 将数据包分发到应用程序的各个部分。这有效。
有时我也会写到 OutputStream。命令被转换成单个数据包(字节[])并推送到流上。为此,我使用
public void writetoSocket(byte[] packet) {
Completable.fromAction(() -> {
outputStream.write(packet);
outputStream.flush();
}).subscribeOn(Schedulers.io()).subscribe();
}
现在我要执行
outputStream.write(packet);
outputStream.flush();
满足以下条件
- 虽然源数据包是从多个地方(使用不同的命令)同时创建的,但对每个数据包执行上述操作,延迟为 50 毫秒。理想情况下,将数据包排队并延迟执行。
Example:
Place1: createCommand1(),Place2: createCommand1(),createCommand4()
Place3: createCommand1(),createCommand2(),.... createCommand10()
有什么方法可以使用 RxJava 实现这一点。提前致谢!
解决方法
您可以使用序列化的 PublishSubject
来收集字节,然后使用 concatMapCompletable
执行写入,然后延迟:
var subject = PublishSubject.<byte[]>create().toSerialized();
subject
.concatMapCompletable(bytes ->
Completable.fromAction(() -> {
outputStream.write(packet);
outputStream.flush();
})
.subscribeOn(Schedulers.io())
.andThen(Completable.timer(50,TimeUnit.MILLISECONDS))
)
.subscribe();
或者,如果您不介意始终将单个线程专用于发射,则可以在 doOnNext
中执行写入和睡眠:
var subject = PublishSubject.<byte[]>create().toSerialized();
subject
.observeOn(Schedulers.io())
.doOnNext(packet -> {
outputStream.write(packet);
outputStream.flush();
Thread.sleep(50);
})
.subscribe();
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。