微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

有没有办法在 websocket 连接中捕获关闭事件?

如何解决有没有办法在 websocket 连接中捕获关闭事件?

我正在我的平台中实现 WebSockets 连接。我在前端使用 Angular 10,在 SockJS Client 1.5.0 上使用 StompJS 2.3.3 来建立与后端的连接。

我创建了一个 WebSockets 服务来管理连接,它正在连接并且工作正常。

这是我创建的服务:

export class WebSocketsService implements OnDestroy {
    /**
    * The authentication service to get the token from.
    */
    private authenticationService: AuthenticationService;

    private serverUrl = environment['ceNotificationsServer'];
    private socket;
    private state: BehaviorSubject<SocketClientState>;

    public constructor(authenticationService: AuthenticationService) {
    this.authenticationService = authenticationService;

    const client_id = environment['ceNotificationsClientId'];
    const token = this.authenticationService.getToken();

    const url = this.serverUrl + '/websocket' + '?clientId=' + client_id + '&Authorization=' + token;
    const ws = new SockJS(url);
    this.socket = StompJS.over(ws);
    this.socket.reconnect_delay = 5000;
    this.socket.debug = () => {
    };
    this.state = new BehaviorSubject<SocketClientState>(SocketClientState.ATTEMPTING);

    this.socket.connect({},() => {
        this.state.next(SocketClientState.CONNECTED);
        console.log('Connection to the socket is open.');
    });
    }

    /**
    * Establish the connection to the websocket.
    */
    connect(cfg: { reconnect: boolean } = {reconnect: false}): Observable<any> {
    return new Observable(observer => {
        this.state.pipe(
        cfg.reconnect ? this.reconnect : o => o,filter(state => state === SocketClientState.CONNECTED))
        .subscribe(() => {
            observer.next(this.socket);
        });
    });
    }

    message(queue: String): Observable<any> {
    return this.connect({reconnect: true})
        .pipe(switchMap(client => {
            return new Observable<any>(observer => {
            client.subscribe(queue,message => {
                observer.next(JSON.parse(message.body));
            });
            });
        }),retrywhen((errors) => errors.pipe(delay(5))));
    }

    reconnect(observable: Observable<any>): Observable<any> {
    return observable.pipe(
        retrywhen(errors => errors.pipe(
        tap(val => console.log('Trying to reconnect to the socket',val)),delayWhen(_ => timer(5000))
        ))
    );
    }

    /**
    * Close the connection to the websocket.
    */
    close() {
    if (this.socket) {
        this.socket.complete();
        this.state.next(SocketClientState.CLOSED);
        console.log('Connection to the socket is closed');
        this.socket = null;
    }
    }

    ngOnDestroy() {
    this.close();
    }
}

export enum SocketClientState {
    ATTEMPTING,CONNECTED,CLOSED
}

在我的 Angular 组件中,我添加了此代码订阅 WebSockets 队列并获取一些通知以填充我的通知托盘:

const subscription1 = this.websocketsService.message('/messages')
    .subscribe(outdatedProfiles => {
        const notification: Notification = outdatedProfiles;
        notification.message = 'notificationNotSyncedProviders';
        this.notifications.addNotification(notification);
    });
this.subscriptionsManager.add(subscription1);

我的问题是,当我失去连接(如果 wifi 断开连接)时,它不会再次重新连接。它会捕获错误,但不会捕获连接关闭事件。

我尝试了 this 方法

public constructor(authenticationService: AuthenticationService) {
    this.socket.connect({},() => {...});
    this.socket.onclose = function(event) {
        console.log("WebSocket is closed Now.");
        this.connect({reconnect: true});
    };
}

但它不起作用。我已阅读文档,但似乎无法在连接关闭时找到重新连接问题的答案。有什么想法吗?

解决方法

正如@BizzyBob 建议的那样,我认为更好的方法是使用 @stomp/ng2-stompjs,它取决于 rx-stomp 和 stompjs 较新版本,并且可以更轻松地完成任务。

  1. 安装@stomp/ng2-stompjs

    npm i @stomp/ng2-stompjs

  2. 为 rx-stom 服务创建配置文件 (my-rxstomp.config.ts)

    
    export const myRxStompConfig: InjectableRxStompConfig = {
      // Which server?
      brokerURL: 'ws://127.0.0.1:8080/',// Wait in milliseconds before attempting auto reconnect
      // Set to 0 to disable
      // Typical value 500 (500 milli seconds)
      reconnectDelay: 200
    };
    
    
  3. 在app.module.ts的provider部分添加RxStomp服务的配置

     {
       provide: InjectableRxStompConfig,useValue: myRxStompConfig,},{
       provide: RxStompService,useFactory: rxStompServiceFactory,deps: [InjectableRxStompConfig],

];

  1. 在你的组件中注入 RxStomp 服务并订阅连接状态的改变 用于在连接关闭时做某事。 如果您只想在连接关闭时重新连接,则无需订阅 连接状态,配置字段 重新连接延迟:200 将为您重新连接。

    constructor(private rxStompService: RxStompService) { }
    
    ngOnInit() {
      this.rxStompService.connectionState$.subscribe(next => {
        console.log('Connection State',RxStompState[next]);
        if(next === RxStompState.CLOSED) {
          // Do something
        }
      });
    }
    

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。