如何解决通过Node.js流管道传输/抽取的数据顺序
我正在尝试编写一个nodejs代码,该代码读取(音频)文件并将内容流式传输到远程服务(对话框)。我在确保发送到流的块的顺序方面遇到麻烦。 在大多数情况下,一切似乎都以正确的顺序进行,但有时,数据似乎是以乱序方式发送的。
伪代码:
for (var i = 0; i < numFiles; ++i) {
await sendData(fs.createReadStream(filenames[i]),i);
}
...
async function sendData(inputDataStream,chunkIndex) {
await inputDataStream
.pipe(new Transform({
objectMode: true,transform: (obj,_,next) => {
console.log('Sending chunk ' + chunkIndex);
next(null,<some data>);
}
}),{end: false})
.pipe(outputStream,{end: false});
}
我看到有时候“发送块...”的打印顺序不正确。
问:有什么办法可以避免这个问题?
另一个问题是,尽管在大多数情况下,每个块都是连续发送的,但有时,一些块将被拆分并以较小的子块发送(即使每个文件都不大)。 [我在同一组文件上重复了多次实验]
问:有什么方法可以控制块大小? (我在这里做错了什么?)问:这是因为远程服务无法处理传输速率吗?如果是这样,我应该如何应对呢?
[我也尝试过使用pump(),但仍然观察到相同的行为] 预先感谢。
解决方法
对于 Dialogflow,我使用了以下 pump
方法,并且工作正常。
await pump(
fs.createReadStream(filename),new Transform({
objectMode: true,transform: (obj,_,next) => {
next(null,{inputAudio: obj});
},}),detectStream
);
}
参考:link
到目前为止,pump
没有遇到任何问题。
此外,我还遇到了另一个用例,其中使用 WebSocket 连接从流端点接收音频,然后使用该音频进行意图检测。 (我已经在 Dialogflow ES 和 CX 中使用了这个)。
es 示例:
function getDialogflowStream() {
let sessionClient = new dialogflow.SessionsClient();
let sessionPath = sessionClient.projectAgentSessionPath(
projectId,sessionID,);
// First Request
let initialStreamRequest = {
session: sessionPath,queryInput: {
audioConfig: {
audioEncoding: encoding,sampleRateHertz: sampleRateHertz,languageCode: languageCode,},singleUtterance: true,};
const detectStream = sessionClient
.streamingDetectIntent()
.on('error',error => {
console.error(error);
writeFlag = false;
detectStream.end();
})
.on('data',data => {
if (data.recognitionResult) {
console.log(
`Intermediate transcript: ${data.recognitionResult.transcript}`
);
} else {
console.log(
`Query results: ${data.queryResult}`
);
}
});
// Write the initial stream request to config for audio input.
detectStream.write(initialStreamRequest);
return detectStream;
}
const wss = new WebSocket.Server({
port,handleProtocols: (protocols,req) => {
return 'dialogflow.stream';
}
});
wss.on('connection',(ws,req) => {
console.log(`received connection from ${req.connection.remoteAddress}`);
let dialogflowStreamer = getDialogflowStream();
ws.on('message',(message) => {
if (typeof message === 'string') {
console.log(`received message: ${message}`);
console.log(`UUID: ${calluuid}`);
} else if (message instanceof Buffer) {
// Transform message and write to detect
dialogflowStreamer.write({ inputAudio: message });
}
});
ws.on('close',(code,reason) => {
console.log(`socket closed ${code}:${reason}`);
dialogflowStreamer.end();
sessionID = uuid.v4();
});
});
还要确保输入配置中的采样率和编码与音频文件相同,因为我遇到了不同的问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。