如何解决从StreamObservable gRPC服务发布Spring-web-flux
我是反应式编程的新手,我只是在尝试新事物。
我正在尝试使用spring-web-flux
创建一个流控制器,该流控制器将仅桥接从gRPC服务存根流式传输的数据。
我正在创建自己的StreamObserver
/ Publisher
类,同时观察gRPC服务并通过HTTP请求进行流传输:
public class StreamObserverPublisher implements StreamObserver<RPCResponse>,Publisher<Integer> {
private Subscriber<? super Integer> subscriber;
@Override
public void onNext(RPCResponse response) {
System.out.println("Publishing: " + response.getValue());
subscriber.onNext(response.getValue());
}
@Override
public void onError(Throwable throwable) {
System.out.println("Observer error");
subscriber.onError(throwable);
}
@Override
public void onCompleted() {
System.out.println("Observer completed");
subscriber.onComplete();
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
this.subscriber = subscriber;
this.subscriber.onSubscribe(new Subscription() {
@Override
public void request(long l) {
System.out.println("Subscription request: " + l);
}
@Override
public void cancel() {
System.out.println("Subscription cancel");
}
});
}
}
然后在我的控制器中,我只是以Publisher
的形式返回此自定义Flux
:
@RestController
public class MyController {
...
@GetMapping(value="/endpoint",produces=MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Integer> stream() {
...
StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher();
Flux<Integer> flux = Flux.from(streamObserverPublisher);
grpcServiceStub.myMethod(request,streamObserverPublisher);
flux.subscribe();
return flux;
}
}
不过,当我调用此API时,我从Flux
尝试发布时收到错误消息:
java.lang.IllegalStateException: Unexpected item.
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:190) ~[spring-web-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
我认为我的订阅有问题,这似乎导致WriteBarrier
进入错误状态,但是我不知道发生了什么。有什么想法吗?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。