微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

扩展 gRPC 双向流聊天服务

如何解决扩展 gRPC 双向流聊天服务

我正在使用双向流在 gRPC java 中起草聊天服务。简化流程如下,

  1. 用户加入聊天服务时,用户StreamObserver 将存储在聊天室存储库中,即服务器中包含 HashMap 的简单 userId - StreamObserver
  2. 一段时间后,当用户发送聊天消息时,服务器收到请求并通过迭代存储在聊天室存储库中的 StreamObserver调用 {{1} } 方法

当只有 1 个服务器存在时,这可以正常工作,但是一旦扩展到多个服务器,客户端的 onNext 将存储在特定服务器中,并且不会存在于其他服务器中,因为 gRPC 打开单个 HTTP 连接到最初连接的服务器。

我需要通过将所有 StreamObserver 分散在服务器周围来向同一聊天室中的所有用户发送消息,有人对这种情况有很好的经验吗?我尝试将 StreamObserver 存储在单个存储中,但由于它不可序列化,因此我无法将其存储在像 redis 这样的共享存储中。

解决方法

我使用 gRPC 和 3 个具有负载平衡的服务器实现了聊天。实现负载平衡的第一件事是使用 defaultLoadBalancingPolicyround_robin。就我而言,我使用了 MultiAddressNameResolverFactory 政策。并使用带有三个服务器的主机和端口的 public class ChatClientAlice { private NameResolver.Factory nameResolverFactory; private ManagedChannel channel; public static void main(String[] args) { ChatClientAlice chatClientAlice = new ChatClientAlice(); chatClientAlice.createChannel(); chatClientAlice.runBiDiStreamChat(); chatClientAlice.closeChannel(); } private void createChannel() { nameResolverFactory = new MultiAddressNameResolverFactory( new InetSocketAddress("localhost",50000),new InetSocketAddress("localhost",50001),50002) ); channel = ManagedChannelBuilder.forTarget("service") .nameResolverFactory(nameResolverFactory) .defaultLoadBalancingPolicy("round_robin") .usePlaintext() .build(); } private void closeChannel() { channel.shutdown(); } private void runBiDiStreamChat() { System.out.println("creating Bidirectional stream stub for Alice"); EchoServiceGrpc.EchoServiceStub asyncClient = EchoServiceGrpc.newStub(channel); CountDownLatch latch = new CountDownLatch(1); StreamObserver<EchoRequest> requestObserver = asyncClient.echoBiDi(new StreamObserver<EchoResponse>() { @Override public void onNext(EchoResponse value) { System.out.println("chat: " + value.getMessage()); } @Override public void onError(Throwable t) { latch.countDown(); } @Override public void onCompleted() { latch.countDown(); } }); Stream.iterate(0,i -> i + 1) .limit(10) .forEach(integer -> { String msg = "Hello,I am " + ChatClientAlice.class.getSimpleName() + "! I am sending stream message number " + integer + "."; System.out.println("Alice says: " + msg); EchoRequest request = EchoRequest.newBuilder() .setMessage(msg) .build(); requestObserver.onNext(request); // throttle the stream try { Thread.sleep(5000); } catch (InterruptedException e) { } }); requestObserver.onCompleted(); System.out.println("Alice BiDi stream is done."); try { // wait for the time set on the stream + the throttle latch.await((5000 * 20),TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } } 创建通道。在这里,我为 Alice 创建了一个客户端聊天。然后你复制这个类并为 Bob 创建一个客户端聊天。这应该已经完成​​了您要求的负载平衡。

StreamObserver

在服务器服务上,每次收到来自新客户端的新请求时,您都必须使用单例来存储 responseObserver.onNext(response);。您将迭代所有观察者并将消息发送给所有 singletonObservers.getObservers().forEach(....,而不是将消息返回给单个观察者 public class ChatServiceImpl extends EchoServiceGrpc.EchoServiceImplBase { private final String name; private final SingletlonChatStreamObserver singletonObservers; ChatServiceImpl(String name) { this.name = name; this.singletonObservers = SingletlonChatStreamObserver.getInstance(); } @Override public StreamObserver<EchoRequest> echoBiDi(StreamObserver<EchoResponse> responseObserver) { System.out.println("received bidirectional call"); singletonObservers.addObserver(responseObserver); System.out.println("added responseObserver to the pool of observers: " + singletonObservers.getObservers().size()); StreamObserver<EchoRequest> requestObserver = new StreamObserver<EchoRequest>() { @Override public void onNext(EchoRequest value) { String msg = value.getMessage(); System.out.println("received message: " + msg); EchoResponse response = EchoResponse.newBuilder() .setMessage(msg) .build(); // do not send messages to a single observer but to all observers on the pool // responseObserver.onNext(response); // observers.foreach... singletonObservers.getObservers().forEach(observer -> { observer.onNext(response); }); } @Override public void onError(Throwable t) { // observers.remove(responseObserver); singletonObservers.getObservers().remove(responseObserver); System.out.println("removed responseObserver to the pool of observers"); } @Override public void onCompleted() { // do not complete messages to a single observer but to all observers on the pool // responseObserver.onCompleted(); // observers.foreach singletonObservers.getObservers().forEach(observer -> { observer.onCompleted(); }); // observers.remove(responseObserver); System.out.println("removed responseObserver to the pool of observers"); } }; return requestObserver; } } 。虽然这与负载平衡策略无关,但我认为值得发布,因为如果您没有很好地实施它,您的客户端将不会收到来自其他客户端的消息。

SingletlonChatStreamObserver

这是我的 public class SingletlonChatStreamObserver implements Serializable { private static volatile SingletlonChatStreamObserver singletonSoleInstance; private static volatile ArrayList<StreamObserver<EchoResponse>> observers; private SingletlonChatStreamObserver() { //Prevent form the reflection api. if (singletonSoleInstance != null) { throw new RuntimeException("Use getInstance() method to get the single instance of this class."); } } public static SingletlonChatStreamObserver getInstance() { if (singletonSoleInstance == null) { //if there is no instance available... create new one synchronized (SingletlonChatStreamObserver.class) { if (singletonSoleInstance == null) { observers = new ArrayList<StreamObserver<EchoResponse>>(); singletonSoleInstance = new SingletlonChatStreamObserver(); } } } return singletonSoleInstance; } //Make singleton from serializing and deserialize operation. protected SingletlonChatStreamObserver readResolve() { return getInstance(); } public void addObserver(StreamObserver<EchoResponse> streamObserver) { observers.add(streamObserver); } public ArrayList<StreamObserver<EchoResponse>> getObservers() { return observers; } } ,所有 3 个服务器都只有一个对象:

  def default_url_options
    { locale: I18n.locale }
  end

我将在我的 explore-grpc 项目中提交完整代码。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。