如何解决Python 线程问题破坏音频队列
我正在编写一个 Python 脚本,该脚本将使用 Autobahn Python 托管 websockets 服务器并充当中继,将流式传输到服务器的音频发送到 IBM Watson 进行转录,然后转发从Watson 到客户端浏览器。
下面的脚本适用于单个会话,直到刷新/重新连接 websocket 或 IBM Watson 服务超时(不活动 30 秒后)。 当用户刷新浏览器并重新建立 websocket 时,Watson 转录将恢复,但现在返回的转录突然完全是胡言乱语。
我认为问题在于 Watson 识别线程 (recognize_thread
) 没有正确关闭,因此我实际上是在创建多个线程来覆盖同一个队列。我尝试了各种方法来终止 onClose
上的识别线程但没有成功(因为我在 recognize_thread.start()
上启动 onConnect()
我似乎无法访问 recognize_thread
到 .join()
或 .kill()
事件中的 onClose()
)。我还读到,在 Python 中突然杀死一个线程无论如何都不是最佳做法。
最终目标是服务器可以在自己的线程上接受和托管多个同时发生的用户会话,接收多个不同的音频流,将音频存储在自己独特的队列实例中,并将该音频流式传输到自己的Watson 识别的独特实例。但我是 Python 线程的新手,可以使用一些关于如何进行此操作的指示。
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback,AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For autobahn
import json
from autobahn.twisted.websocket import WebSocketServerProtocol,\
WebSocketServerFactory
from twisted.internet import reactor
try:
from Queue import Queue,Full
except ImportError:
from queue import Queue,Full
###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
# Note: It will discard if the websocket client can't consumme fast enough
# So,increase the max size as per your choice
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q,True,True)
###############################################
#### Prepare Speech to Text Service ########
###############################################
# initialize speech to text service
authenticator = IAMAuthenticator('secretkey')
speech_to_text = SpeechToTextV1(authenticator=authenticator)
# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
def __init__(self):
RecognizeCallback.__init__(self)
def on_transcription(self,transcript):
# Forward to client
MyServerProtocol.broadcast_message(transcript)
def on_connected(self):
print('Connection was successful')
def on_error(self,error):
# Forward to client
MyServerProtocol.broadcast_message('Error received: {}'.format(error))
def on_inactivity_timeout(self,error):
# Forward to client
MyServerProtocol.broadcast_message('Inactivity timeout: {}'.format(error))
def on_listening(self):
print('Service is listening')
def on_hypothesis(self,hypothesis):
# Forward to client
MyServerProtocol.broadcast_message(hypothesis)
def on_data(self,data):
# Forward to client
MyServerProtocol.broadcast_message(data)
def on_close(self):
print("Connection closed")
MyServerProtocol.broadcast_message("Connection closed")
class MyServerProtocol(WebSocketServerProtocol):
connections = list()
def onConnect(self,request):
print("Client connecting: {0}".format(request.peer))
self.connections.append(self)
# Start recognizer on connection
recognize_thread = Thread(target=recognize_using_weboscket,args=())
recognize_thread.daemon = True
recognize_thread.start()
def onOpen(self):
print("WebSocket connection open.")
def onMessage(self,payload,isBinary):
if isBinary:
# Put incoming audio into the queue
try:
q.put(payload)
except Full:
pass # discard
else:
print("Text message received: {0}".format(payload.decode('utf8')))
@classmethod
def broadcast_message(cls,data):
payload = json.dumps(data,ensure_ascii = False).encode('utf8')
for c in set(cls.connections):
reactor.callFromThread(cls.sendMessage,c,payload)
def onClose(self,wasClean,code,reason):
print("WebSocket connection closed: {0}".format(reason))
self.connections.remove(self)
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
mycallback = MyRecognizeCallback()
speech_to_text.recognize_using_websocket(audio=audio_source,content_type='audio/l16; rate=16000',recognize_callback=mycallback,interim_results=True)
if __name__ == '__main__':
factory = WebSocketServerFactory("ws://127.0.0.1:9001")
factory.protocol = MyServerProtocol
reactor.listenTCP(9001,factory)
reactor.run()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。