微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

带有高速公路 python 的 websocket 中继

如何解决带有高速公路 python 的 websocket 中继

我正在尝试使用 Autobahn python 构建一个 websocket 服务器,该服务器充当 IBM Watson 文本转语音服务的中间人或中继。我已经设法使用队列从客户端接收流音频并将其转发到 Watson,并且我正在接收作为 JSON 数据的回传假设从 Watson 到我的服务器,但我不确定如何转发该 JSON 数据到客户端。 Watson 转录端回调和 Autobahn 客户端回调似乎是独立存在的,我无法从另一个回调中调用例程,也无法从另一个回调中访问数据。

我需要设置某种共享的短信队列吗?我相信它应该很简单,但我认为问题可能是我对“self”关键字缺乏理解,它似乎隔离了两个例程。也很感激任何关于理解“自我”的资源。

# For Watson
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback,AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For Autobahn
from autobahn.twisted.websocket import WebSocketServerProtocol,\
    WebSocketServerFactory
from twisted.internet import reactor

try:
    from Queue import Queue,Full
except ImportError:
    from queue import Queue,Full

###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q,True,True)

###############################################
#### Prepare Speech to Text Service ########
###############################################

# initialize speech to text service
authenticator = IAMAuthenticator('secretapikeycanttellyou')
speech_to_text = SpeechToTextV1(authenticator=authenticator)

# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
    def __init__(self):
        RecognizeCallback.__init__(self)

    def on_transcription(self,transcript):
        print(transcript)

    def on_connected(self):
        print('Connection was successful')

    def on_error(self,error):
        print('Error received: {}'.format(error))

    def on_inactivity_timeout(self,error):
        print('Inactivity timeout: {}'.format(error))

    def on_listening(self):
        print('Service is listening')

    def on_hypothesis(self,hypothesis):
        print(hypothesis)
        #self.sendMessage(hypothesis,isBinary = false)
        # HOW TO FORWARD THIS TO CLIENT?

    def on_data(self,data):
        print(data)
        #self.sendMessage(data,isBinary = false)
        # HOW TO FORWARD THIS TO CLIENT?

    def on_close(self):
        print("Connection closed")

# define callback for client-side websocket in Autobahn
class MyServerProtocol(WebSocketServerProtocol):

    def onConnect(self,request):
        print("Client connecting: {0}".format(request.peer))

    def onopen(self):
        print("WebSocket connection open.")
        recognize_thread = Thread(target=recognize_using_weboscket,args=())
        recognize_thread.daemon = True
        recognize_thread.start()

    def onMessage(self,payload,isBinary):
        if isBinary:
            # put audio in queue
            q.put(payload)
        else:
            print("Text message received: {0}".format(payload.decode('utf8')))

        # echo back message verbatim
        self.sendMessage(payload,isBinary)
    
    def onClose(self,wasClean,code,reason):
        print("WebSocket connection closed: {0}".format(reason))
        
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
    mycallback = MyRecognizeCallback()
    speech_to_text.recognize_using_websocket(audio=audio_source,content_type='audio/l16; rate=16000',recognize_callback=mycallback,interim_results=True)

if __name__ == '__main__':

    factory = WebSocketServerFactory("ws://127.0.0.1:9001")
    factory.protocol = MyServerProtocol

    reactor.listenTCP(9001,factory)
    reactor.run()

看来我需要弥合 MyRecognizeCallback()MyServerProtocol()间的差距。还请让我知道这是否是我正在努力完成的一个糟糕的实现。我知道有更简单的方法来中继 websocket 数据,但我想熟悉 websocket API/音频流和文本消息,因为最终我想从等式中删除 Watson 并使用我自己的转录算法。

解决方法

根据答案 here,似乎我从 MyServerProtocol().sendMessage(u"this is a message2".encode('utf8')) 调用 main 的努力实际上是在创建 MyServerProtocol 的一个新的且不相关的实例,而不是将消息传递到现有连接中.我能够使用 here 描述的方法将新消息发送到打开的 websocket 连接中。

这是我的最终代码,还需要做一些工作,但相关的定义是broadcast_message。为了使此方法起作用,还需要“订阅”自己到 websocket onConnect 和“取消订阅”onClose

from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback,AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For autobahn
import json
from autobahn.twisted.websocket import WebSocketServerProtocol,\
    WebSocketServerFactory
from twisted.internet import reactor

try:
    from Queue import Queue,Full
except ImportError:
    from queue import Queue,Full

###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
# Note: It will discard if the websocket client can't consumme fast enough
# So,increase the max size as per your choice
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q,True,True)

###############################################
#### Prepare Speech to Text Service ########
###############################################

# initialize speech to text service
authenticator = IAMAuthenticator('secretapikey')
speech_to_text = SpeechToTextV1(authenticator=authenticator)

# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
    def __init__(self):
        RecognizeCallback.__init__(self)

    def on_transcription(self,transcript):
        # Forward to client
        MyServerProtocol.broadcast_message(transcript)

    def on_connected(self):
        print('Connection was successful')

    def on_error(self,error):
        # Forward to client
        MyServerProtocol.broadcast_message('Error received: {}'.format(error))

    def on_inactivity_timeout(self,error):
        # Forward to client
        MyServerProtocol.broadcast_message('Inactivity timeout: {}'.format(error))

    def on_listening(self):
        print('Service is listening')

    def on_hypothesis(self,hypothesis):
        # Forward to client
        MyServerProtocol.broadcast_message(hypothesis)

    def on_data(self,data):
        # Forward to client
        MyServerProtocol.broadcast_message(data)

    def on_close(self):
        print("Connection closed")
        MyServerProtocol.broadcast_message("Connection closed")

class MyServerProtocol(WebSocketServerProtocol):
    connections = list()

    def onConnect(self,request):
        print("Client connecting: {0}".format(request.peer))
        self.connections.append(self)
        # Start recognizer on connection
        recognize_thread = Thread(target=recognize_using_weboscket,args=())
        recognize_thread.daemon = True
        recognize_thread.start()

    def onOpen(self):
        print("WebSocket connection open.")

    def onMessage(self,payload,isBinary):
        if isBinary:
            # Put incoming audio into the queue
            try:
                q.put(payload)
            except Full:
                pass # discard
        else:
            print("Text message received: {0}".format(payload.decode('utf8')))

    @classmethod
    def broadcast_message(cls,data):
        payload = json.dumps(data,ensure_ascii = False).encode('utf8')
        for c in set(cls.connections):
            reactor.callFromThread(cls.sendMessage,c,payload)
      

    def onClose(self,wasClean,code,reason):
        print("WebSocket connection closed: {0}".format(reason))
        self.connections.remove(self)
  
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
    mycallback = MyRecognizeCallback()
    speech_to_text.recognize_using_websocket(audio=audio_source,content_type='audio/l16; rate=16000',recognize_callback=mycallback,interim_results=True)

if __name__ == '__main__':

factory = WebSocketServerFactory("ws://127.0.0.1:9001")
    factory.protocol = MyServerProtocol

    reactor.listenTCP(9001,factory)
    reactor.run()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?