微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

rxjs流解析器用于NATS

如何解决rxjs流解析器用于NATS

亲爱的rxjs专家,在stackoverflow ...

我一直在使用rxjs为nats.io client protocol实现一个非常基本的流解析器。我不确定我在性能或使用方面是否做对了。我正在使用webSocket主题连接到ws网关进行nat。

我正在将传入流映射为使用正则表达式准备操作名称和参数。 然后我订阅此流,将消息推送到另一个名为protocoll $$的主题

protocoll $$主题实现了基本功能过滤和订阅,以使PING消息以PONG响应以保持活动状态。

然后创建另一个名为messages $$的主题,该主题会过滤用于“ MSG”操作的协议$$主题,然后对其进行转换。

在应用程序中,用户可以进一步订阅主题(这反过来又创建了一个新的Obserable,订阅了消息$$主题并过滤了subscrptionId)。

我的问题是。我做对了吗,我可以改进吗?

import { Observable,Subject } from "rxjs"
import { filter,map } from 'rxjs/operators'
import { webSocket,WebSocketSubject } from "rxjs/webSocket"

interface NatsOptions {
  url: string
  options: Connectionoptions
}

interface Connectionoptions {
  verbose: boolean // false
  pedantic: boolean // false
  lang: string
  version: string
}

export interface Protocol {
  operation: string,args?: string
}

export interface Msg<T> {
  sid: number
  subject: string
  replyTo: string
  size: number
  // Todo: Move encoded and modularize
  payload: T
}

const msgPattern = /^(?<subject>[^\s]+)\s+(?<sid>[^\s]+)\s+(?<reply>([^\s\r\n]+)[^\S\r\n]+)?(?<length>\d+)\r\n(?<payload>\S+)\r\n/gi
const protocolPattern = /^([^\s]+)(\s+([\S\s]*))?/gi

@Injectable({
  providedIn: "root",})
export class NatsService {
  private nats$: WebSocketSubject<any>
  private subscriptions: Array<string> = [];
  public protocol$$: Subject<Protocol>;
  public messages$$: Subject<Msg<any>>;

  constructor(@Inject("natsOptions") private options: NatsOptions) {

    this.nats$ = webSocket({ url: options.url,serializer: (m) => m,deserializer: (m) => m })

    this.connect({
      pedantic: false,verbose: false,...options.options,lang: "typescript",version: "0.0.1"
    })
  }

  private async connect(options: Connectionoptions) {
    this.protocol$$ = new Subject<Protocol>()
    this.messages$$ = new Subject<Msg<any>>();
    this.nats$.pipe(
      map((event: MessageEvent) => {
        const matches: Array<Array<any>> = Array.from(event.data.matchAll(protocolPattern))
        if (matches.length > 0) {
          const groups: Array<any> = matches[0]
          const operation = groups[1]
          let args
          if (groups.length) {
            args = groups[3]
          }
          return { operation,args }
        }
        return { operation: event.data }
      })
    ).subscribe(protocol => {
      this.protocol$$.next(protocol);
    })

    this.protocol$$.pipe(
      filter((x: Protocol) => x.operation == 'PING')
    ).subscribe(x => this.send('PONG'))

    this.protocol$$.pipe(
      filter((p) => {
        return p.operation == 'MSG'
      })
    ).subscribe((p) => {
      const matches: Array<Array<any>> = Array.from(p.args?.matchAll(msgPattern))
      if (matches.length > 0) {
        const groups = matches[0]
        let msg: Msg<any> = {
          subject: groups[1],sid: Number(groups[2]),replyTo: groups[3],size: Number(groups[5]),// Todo: Make encoder configurable and modularize
          payload: JSON.parse(groups[6])
        }
        this.messages$$.next(msg);
      }
    })

    this.nats$.next(`CONNECT ${JSON.stringify(options)}\r\n`)
  }
  private send(command: string) {
    this.nats$.next(`${command}\r\n`)
  }


  public from<T>(subject: string,group?: string) {
    // check wether subscription already exist. If so,just reuse
    const sid = this.subscriptions.push(subject);
    let subject$ = new Observable<Msg<T>>((observer) => {
      this.messages$$.pipe(
        filter(msg => msg.sid == sid),).subscribe(msg => observer.next(msg))
      
      return () => {
        this.send(`UNSUB ${sid}`)
      }
    })
    //We don't support groups yet
    this.send(`SUB ${subject} ${group} ${sid}`)
    return subject$
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。