结合promise与websocket的发布/订阅模式实践

结合promise与websocket的发布/订阅模式实践

本文初衷

最近恰好在公司做了一个聊天室系统,所以在系统中做了一下对websocket进行的promise化改造,所以想写篇文章总结一下,如果大家有什么更好的方法或者心得感悟,欢迎交流

技术栈

dva + protobuf
考虑到protobuf对websocket并没什么本质影响,所以本文就不涉及了

业务场景

基于websocket的聊天室系统

开发痛点

  1. 可能存在按顺序触发的请求
    eg. 删除req---确认删除rsp---刷新页面req---刷新页面rsp
    但由于并非所有的删除操作后都会刷新页面,所以考虑是不是可以使用发布订阅模式来模拟类似promise的流式操作
  2. 存在同一类型请求短时间内多次触发时,如何寻找每一条回复信息的发射源,考虑可以使用promise池+唯一识别码token来实现
  3. 由于dva的异步操作是基于redux-saga的,所以如果可以用promise完成与websocket的互动,那么对于effects中使用yield控制异步流程,也会是一个很好的体验

实现原理

首先,这一套系统的一切前提是请求的唯一标识符token,前端发送给服务器之后,服务器必须要把这个token跟数据放在一起发回来才行

本系统的实现原理是

对websocket的send方法进行封装
发送阶段
1. send执行时,先生成一个promise,及其唯一token
2. 将promise的resolve, reject,token,及其他需要的信息放入一个对象,然后推入一个promise池中
3. 执行websocket的send方法
4. return 这个promise

接收阶段
1. 收到回复消息时,先从promise池中对token进行匹配
2. 根据回复的状态决定执行resolve或reject
3. 其他需要的操作

实现代码

// 每一个实例都只能open一条socket线路,用锁机制防止重复open
// 本例中不使用心跳检测,为了方便,只要close是非主动触发且前端能捕捉到的(如浏览器主动断开,服务器主动断开),都会进行自动重连
export class MyWebSocket {
    constructor(url) {
        this.url = url;
    
        // close来源判断及后续操作
        this.closeConfig = {
            resolve: null,closing: false
        }
        // promise池
        this.promisePool = [];
    }
    
    tokenCheck(req,rsp) {
    // 此处根据自己的数据结构进行tokenCheck的判断,返回一个boolean
    }
    
    open() {
        return new Promise((resolve,reject) => {
            if (typeof this._websocket === 'undefined') {
                this._websocket = new WebSocket(this.url);
                this._websocket.open = (e) => {
                    resolve({e,ws: this});
                };
                this._websocket.onerror = (e) => {
                    reject(e);
                }
            }
            this._websocket.onclose = (e) => {
                // 非主动close
                if (!this.closeConfig.closing) {
                    console.log('reconnect');     
                    // 对应的重连操作     
                }
                // 若手动close,恢复初始状态
                this.closeConfig.closing = false;
            }
            
            this._websocket.onmessage = (e) => {
                this.promisePool = this.promisePool.filter((item) => {
                if (this.tokenCheck(req,rsp) {
                  req.resolve(rsp);
                  return false;
                }
                return true;
              })
            };
        });
    }
    
    close() {
        this.closeConfig.closing = true;
        this._websocket.close();        
    }
    // token包含在content中
    send(name,content) {
        return new Promise((resolve,reject) => {
          this.promisePool.push({
            content,resolve,reject,name
          });
          this._websocket.send({name,content});
    });
}

大概流程就是这样,具体的样例如下

*test () {
    const ws = new MyWebSocket('www.mywebsocket.com');
    yield ws.open();
    yield ws.send(.....).then(()=>{...});
    yield ws.send(.....).then(()=>{...});
}

本文呢大概就是这么多了,如果有什么错误或者遗漏的地方还请大家多多指教

v0.0.2

采取了评论大佬的建议,将promise池从数组改为对象,直接将token做为key,查询起来也非常方便

export class MyWebSocket {
    constructor(url) {
        this.url = url;
    
        // close来源判断及后续操作
        this.closeConfig = {
            resolve: null,closing: false
        }
        // promise池
        this.promisePool = {};
    }
    
    tokenCheck(req,ws: this});
                };
                this._websocket.onerror = (e) => {
                    reject(e);
                }
            }
            this._websocket.onclose = (e) => {
                // 非主动close
                if (!this.closeConfig.closing) {
                    console.log('reconnect');     
                    // 对应的重连操作     
                }
                // 若手动close,恢复初始状态
                this.closeConfig.closing = false;
            }
            
            this._websocket.onmessage = (e) => {         
                const key = e.content.token;    
                const req = this.promisePool[key]
                req.resolve(e);
                delete this.promisePool[key];
            };
        });
    }
    
    close() {
        this.closeConfig.closing = true;
        this._websocket.close();
    }
    // token包含在content中
    send(name,reject) => {
          this.promisePool[content.token] = {
            content,name
          };
          this._websocket.send({name,content});
    });
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


react 中的高阶组件主要是对于 hooks 之前的类组件来说的,如果组件之中有复用的代码,需要重新创建一个父类,父类中存储公共代码,返回子类,同时把公用属性...
我们上一节了解了组件的更新机制,但是只是停留在表层上,例如我们的 setState 函数式同步执行的,我们的事件处理直接绑定在了 dom 元素上,这些都跟 re...
我们上一节了解了 react 的虚拟 dom 的格式,如何把虚拟 dom 转为真实 dom 进行挂载。其实函数是组件和类组件也是在这个基础上包裹了一层,一个是调...
react 本身提供了克隆组件的方法,但是平时开发中可能很少使用,可能是不了解。我公司的项目就没有使用,但是在很多三方库中都有使用。本小节我们来学习下如果使用该...
mobx 是一个简单可扩展的状态管理库,中文官网链接。小编在接触 react 就一直使用 mobx 库,上手简单不复杂。
我们在平常的开发中不可避免的会有很多列表渲染逻辑,在 pc 端可以使用分页进行渲染数限制,在移动端可以使用下拉加载更多。但是对于大量的列表渲染,特别像有实时数据...
本小节开始前,我们先答复下一个同学的问题。上一小节发布后,有小伙伴后台来信问到:‘小编你只讲了类组件中怎么使用 ref,那在函数式组件中怎么使用呢?’。确实我们...
上一小节我们了解了固定高度的滚动列表实现,因为是固定高度所以容器总高度和每个元素的 size、offset 很容易得到,这种场景也适合我们常见的大部分场景,例如...
上一小节我们处理了 setState 的批量更新机制,但是我们有两个遗漏点,一个是源码中的 setState 可以传入函数,同时 setState 可以传入第二...
我们知道 react 进行页面渲染或者刷新的时候,会从根节点到子节点全部执行一遍,即使子组件中没有状态的改变,也会执行。这就造成了性能不必要的浪费。之前我们了解...
在平时工作中的某些场景下,你可能想在整个组件树中传递数据,但却不想手动地通过 props 属性在每一层传递属性,contextAPI 应用而生。
楼主最近入职新单位了,恰好新单位使用的技术栈是 react,因为之前一直进行的是 vue2/vue3 和小程序开发,对于这些技术栈实现机制也有一些了解,最少面试...
我们上一节了了解了函数式组件和类组件的处理方式,本质就是处理基于 babel 处理后的 type 类型,最后还是要处理虚拟 dom。本小节我们学习下组件的更新机...
前面几节我们学习了解了 react 的渲染机制和生命周期,本节我们正式进入基本面试必考的核心地带 -- diff 算法,了解如何优化和复用 dom 操作的,还有...
我们在之前已经学习过 react 生命周期,但是在 16 版本中 will 类的生命周期进行了废除,虽然依然可以用,但是需要加上 UNSAFE 开头,表示是不安...
上一小节我们学习了 react 中类组件的优化方式,对于 hooks 为主流的函数式编程,react 也提供了优化方式 memo 方法,本小节我们来了解下它的用...
开源不易,感谢你的支持,❤ star me if you like concent ^_^
hel-micro,模块联邦sdk化,免构建、热更新、工具链无关的微模块方案 ,欢迎关注与了解
本文主题围绕concent的setup和react的五把钩子来展开,既然提到了setup就离不开composition api这个关键词,准确的说setup是由...
ReactsetState的执行是异步还是同步官方文档是这么说的setState()doesnotalwaysimmediatelyupdatethecomponent.Itmaybatchordefertheupdateuntillater.Thismakesreadingthis.staterightaftercallingsetState()apotentialpitfall.Instead,usecom