微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

RxJava 接收来自多个源的数据包并写入到 OutputStream 中,每个数据包之间有延迟

如何解决RxJava 接收来自多个源的数据包并写入到 OutputStream 中,每个数据包之间有延迟

我有 Java TCP 客户端 Socket 读取 InputStream 并通过 RxJava PublishSubject 将数据包分发到应用程序的各个部分。这有效。

有时我也会写到 OutputStream。命令被转换成单个数据包(字节[])并推送到流上。为此,我使用

public void writetoSocket(byte[] packet) {
    Completable.fromAction(() -> {
         outputStream.write(packet);
         outputStream.flush();
    }).subscribeOn(Schedulers.io()).subscribe(); 
}

现在我要执行

    outputStream.write(packet);
    outputStream.flush();

满足以下条件

  1. 虽然源数据包是从多个地方(使用不同的命令)同时创建的,但对每个数据包执行上述操作,延迟为 50 毫秒。理想情况下,将数据包排队并延迟执行。
Example:
Place1: createCommand1(),Place2: createCommand1(),createCommand4()
Place3: createCommand1(),createCommand2(),.... createCommand10()

有什么方法可以使用 RxJava 实现这一点。提前致谢!

解决方法

您可以使用序列化的 PublishSubject 来收集字节,然后使用 concatMapCompletable 执行写入,然后延迟:

var subject = PublishSubject.<byte[]>create().toSerialized();

subject
  .concatMapCompletable(bytes -> 
       Completable.fromAction(() -> {
           outputStream.write(packet);
           outputStream.flush();
       })
       .subscribeOn(Schedulers.io())
       .andThen(Completable.timer(50,TimeUnit.MILLISECONDS))
   )
   .subscribe();

或者,如果您不介意始终将单个线程专用于发射,则可以在 doOnNext 中执行写入和睡眠:

var subject = PublishSubject.<byte[]>create().toSerialized();

subject
  .observeOn(Schedulers.io())
  .doOnNext(packet -> {
     outputStream.write(packet);
     outputStream.flush();
     Thread.sleep(50);
  })
  .subscribe();

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。