本文介绍了使用Python 3代码中的asyncio,我如何才能周期性地(重新)启动/停止非阻塞websocket IO?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的现场电话语音识别项目中,Python的 asyncio websockets 模块基本上用于在异步模式下实现客户端和服务器之间的数据交换.待识别的音频流通过本地 wav 文件从PBX通道内部(Asterisk PBX专用)从客户端进入PBX文件,该文件累积从应答呼叫到挂断事件的所有数据.在进行对话时,异步生产者将呼叫记录的大块(每个呼叫记录不大于16 kB)推送到异步队列,以便使用者协程可以在将数据发送到识别引擎服务器之前将数据写入缓冲区(我的选择是 Kaldi 引擎的href ="https://github.com/alphacep/vosk-asterisk" rel ="nofollow noreferrer"> Vosk 实例,使用websocket界面进行连接).一旦缓冲区超过特定容量(例如,可能为288 kB),应使用 send 函数将数据刷新以识别,并通过 recv .实时识别在这里确实很重要,因此我需要保证像 recv 这样的套接字操作不会在整个websocket会话中停止两个协程(它们应该能够保持基于队列的数据流,直到挂起事件为止)).让我们看一下整个程序,首先有一个 main ,其中实例化了一个事件循环以及一些任务:

In my live phone speech recognition project Python's asyncio and websockets modules are used basically to enable data exchange between client and server in asynchronous mode. The audio stream which to be recognized comes to the client from inside of a PBX channel (Asterisk PBX works for that) via a local wav file that cumulates all data from answering call until hangup event. While conversation is going on, an async producer pushes chunks of call record (each of them no larger than 16 kB) to asyncio queue, so that a consumer coroutine can write data to buffer before sending to the recognition engine server (my pick is Vosk instance with Kaldi engine designed to connect using websocket interface). Once the buffer exceeds a specific capacity (for example it may be 288 kB), the data should be flushed to recognition by send function and returned (as a transcript of the speech) by recv. The real-time recognition does matter here, therefore I need to guarantee that socket operations like recv will not halt both coroutines throughout websocket session (they should be able to keep queue-based data flow until the hangup event). Let's take a look at whole program, first of all there is a main where an event loop gets instantiated as well as a couple of tasks:

import logging
import asyncio
import time
from concurrent.futures._base import CancelledError

from .transcription import Transcriber, get_record_size_info

logging.basicConfig(level=logging.DEBUG)
record_file_name = '/var/spool/asterisk/monitor/callrecord.wav'

def main():
    transcriber = Transcriber()

    logging.getLogger('asyncio').setLevel(logging.ERROR)
    logging.getLogger('asyncio.coroutines').setLevel(logging.ERROR)
    logging.getLogger('websockets.server').setLevel(logging.ERROR)
    logging.getLogger('websockets.protocol').setLevel(logging.ERROR)

    loop = asyncio.get_event_loop()
    time.sleep(2)

    prod_task = loop.create_task(transcriber.run_producer(transcriber._queue))
    consum_task = loop.create_task(transcriber.run_consumer(transcriber._queue))

    tasks = [prod_task, consum_task]

    executed, remaining = loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION))
    logging.debug('Tasks completed: %s', executed)
    logging.debug('Tasks in progress: %s', remaining)

    for task in remaining:
        logging.info('Dropping task %s: %s', task, task.cancel())
    try:
        loop.run_until_complete(asyncio.gather(*remaining))
    except CancelledError:
        for running_task in remaining:
        logging.debug('Task dropped %s: %s', running_task, running_task.cancelled())

    loop.stop()
    loop.close()

if __name__ == '__main__':
    main()

生产者/消费者实现如下:

Producer/consumer implementations given below:

from queue import Queue
from concurrent.futures._base import CancelledError
from pathlib import Path

import logging
import asyncio
import websockets
import json

ASR_WS_ADDRESS = 'ws://127.0.0.1:2700'

class Transcriber:

    def __init__(self):
        self._queue = asyncio.Queue()
        self._buffer = b''
        self._current_record_size = 0 # terminate reading from wav file if current size of record is equal to total payload
        self._record_file_name = '/var/spool/asterisk/monitor/callrecord.wav'
        self._total_payload = 0 # total of bytes written to buffer since call recording started

    async def run_producer(self, qu):
        with open(self._record_file_name, 'rb') as record:
            print('call record file size: ' + str(get_record_size_info(self._record_file_name)))
            self._current_record_size = get_record_size_info(self._record_file_name)
            while True:
                await asyncio.sleep(0.5)
                chunk = record.read(16000)
                qu.put_nowait(chunk)
                qsize = qu.qsize()

    async def run_consumer(self, qu):
        while True:
            data = await qu.get()
            await asyncio.sleep(1)
            self._buffer += data
            self._current_record_size = get_record_size_info(self._record_file_name)
            print('now buffer contains : ' + str(len(self._buffer)) + ' bytes')
            print('current record size: ' + str(self._current_record_size) + ' bytes')
            print('current total payload: ' + str(self._total_payload) + ' bytes')

            if len(self._buffer) >= 288000:
                await self.do_recognition()
                self._total_payload += len(data)
                self._buffer = b''
            elif len(data) == 0 and self._current_record_size == self._total_payload:
                print('looks like recording is complete...')
                await self.do_recognition()
                self._queue._queue.clear() # remove items from queue before loop gets close
                self._queue._finished.set()
                self._queue._unfinished_tasks = 0
                raise Exception('cancel both tasks and close loop')
            else:
                self._total_payload += len(data)
                continue

    async def do_recognition(self):
        async with websockets.connect(ASR_WS_ADDRESS) as ws:
            logging.debug('Sending %s to Vosk-hosted Kaldi engine', len(self._buffer))
            await ws.send(self._buffer)

            response = json.loads(await ws.recv())
            try:
                result = response['partial']
                if len(result) > 0:
                print('\n')
                print(result + '\n')
            except KeyError:
                result = response['text']
                if len(result) > 0:
                print('\n')
                print(result + '\n')

def get_record_size_info(record_file_name):
    return Path(record_file_name).stat().st_size

这是我要花几天时间才能解决的问题:如何以非阻塞方式运行 do_recognition 方法,以避免一旦 recv停顿2-3秒代码>开始执行?我需要触发的通话时间长于通话会话所需的时间,而不是语音识别请求所需的时间,也就是说,实质上阻塞程序对于实时性能而言是灾难性的.由于我的情况是反复执行停止/恢复执行,因此我在SO上已经看到了每个解决方案(具体来说,,, 3 )不能解决这个问题,所以我正在寻找分别解决这个问题的指针.请分享一些想法,哪些变通方法可以应用于实现我想要的改进,我自己对 asyncio 的经验还不足以有效地调整以上内容.

Here is a problem I'm wrapping my head around for a few days: how to run do_recognition method in non-blocking manner to avoid 2-3 second stalling once recv execution starts? Than longer call conversation, than more requests for speech recognition I need to fire, i.e. essentially blocking program becomes disastrous for real time performance. Because of recurrent stop/resume execution in my case, each of solutions I've seen on SO yet (specifically, 1, 2, 3) doesn't solve this issue, so I'm seeking any pointers to deal with that respectively. Please share some ideas what workaround can be applied to enable improvements I want, my own experience with asyncio is far from be sufficient for tuning above stuff effectively.

推荐答案

如果我正确理解了这个问题,您可能希望将 await self.do_recognition()替换为 asyncio.create_task(self.do_recognition()),以使 do_recognition 在后台执行.如果您需要支持Python 3.6和更早版本,则可以使用 loop.create_task(...) asyncio.ensure_future(...),在此全部案件做同样的事情.

If I understand the issue correctly, you probably want to replace await self.do_recognition() with asyncio.create_task(self.do_recognition()) to make do_recognition execute in the background. If you need to support Python 3.6 and earlier, you can use loop.create_task(...) or asyncio.ensure_future(...), all of which in this case do the same thing.

这样做时,您还需要提取 self._buffer 的值并将其作为参数传递给 do_recognition ,以便它可以独立发送缓冲区内容到达的新数据.

When doing that you'll also need to extract the value of self._buffer and pass it to do_recognition as parameter, so that it can send the buffer contents independently of the new data that arrives.

与该问题无关的两个注释:

Two notes unrelated to the question:

  • 代码正在访问队列的内部实现属性,在生产代码中应避免使用该属性,因为即使在Python的错误修正版本中,它也可以随时停止工作.以 _ 开头的属性(例如 _finished _unfinished_tasks )不受向后兼容性保证,可以删除,重命名或更改含义,恕不另行通知.

  • The code is accessing internal implementation attributes of queue, which should be avoided in production code because it can stop working at any point, even in a bugfix release of Python. Attributes that begin with _ like _finished and _unfinished_tasks are not covered by backward compatibility guarantees and can be removed, renamed, or change meaning without notice.

您可以从顶级 asyncio 包中导入 CancelledError ,该包将其公开公开.您不需要引用内部的 concurrent.futures._base 模块,该模块恰好是实现定义的类.

You can import CancelledError from the top-level asyncio package which exposes it publicly. You don't need to refer to the internal concurrent.futures._base module, which just happens to be where the class is defined by the implementation.

这篇关于使用Python 3代码中的asyncio,我如何才能周期性地(重新)启动/停止非阻塞websocket IO?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 17:03