Перейти к основному содержимому

ASR

Данный модудь позволяет распознавать аудио в формате wav и возвращать ответ в виде текста.

Есть 3 режима работы:

  • оффлайн - загрузка файла и ответ текстом после полного окончания распознавания
  • стриминг - загрузка файла и ответ текстовыми чанками в режиме стриминга
  • реалтайм стриминг - загрузка аудио чанками и возврат ответа чанками (WebSocket)

Пример использования

Модель Automatic Speech Recognition

Можно указать stream=True, тогда распознанный текст будет отдаваться стримингом.

import requests

stream = True
url = "https://compressa-api.mil-team.ru/v1/audio/transcriptions"
headers = {"Authorization": "TOKEN_1"}
audio_name = "test.wav"

response = requests.post(
url,
headers=headers,
files={"file": (audio_name, open(audio_name, "rb"), "audio/wav")},
stream=stream,
)

for line in response.iter_lines(decode_unicode=True):
print(line)

Также доступен реалтайм режим через WebSocket. Важно! Работа модели протестирована для sample rate 8000.


import wave
import os
import tempfile
import asyncio
import websockets
import librosa
import soundfile as sf
import json

audio_name = "test.wav"
REQUIRED_SR = 8000

def prepare_audio(audio_name, required_sr):
with wave.open(audio_name, "rb") as wf:
sample_rate = wf.getframerate()
sample_width = wf.getsampwidth()
channels = wf.getnchannels()
num_frames = wf.getnframes()
duration = num_frames / sample_rate

if sample_rate != required_sr or channels != 1 or sample_width != 2:
print("Resampling audio...")
y, sr = librosa.load(audio_name, sr=None, mono=True)
y_resampled = librosa.resample(y, orig_sr=sr, target_sr=required_sr)
fd, temp_wav = tempfile.mkstemp(suffix=".wav")
os.close(fd)
sf.write(temp_wav, y_resampled, required_sr, subtype="PCM_16")
print(f"Saved resampled file as {temp_wav}")
return temp_wav
else:
return audio_name

chunk_duration_ms = 300
required_sr = REQUIRED_SR

audio_for_send = prepare_audio(audio_name, required_sr)

with wave.open(audio_for_send, "rb") as wf:
sample_rate = wf.getframerate()
sample_width = wf.getsampwidth()
channels = wf.getnchannels()
num_frames = wf.getnframes()
duration = num_frames / sample_rate
chunk_samples = int(sample_rate * (chunk_duration_ms / 1000))
chunk_bytes = chunk_samples * sample_width * channels
print(
f"Recommended chunk size: {chunk_bytes} bytes "
f"({chunk_samples} samples, {chunk_duration_ms} ms)"
)

CHUNK_SIZE = chunk_bytes
WS_URL = "https://compressa-api.mil-team.ru/v1/realtime/transcriptions/?model_id=t-tech/T-one"
headers = {"Authorization": "TOKEN_1"}

async def send_audio(ws):
with open(audio_for_send, "rb") as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
await ws.send(chunk)
await asyncio.sleep(chunk_samples / sample_rate)
await ws.send(b"")

async def recv_results(ws):
try:
async for msg in ws:
if isinstance(msg, str):
try:
data = json.loads(msg)
if data.get("event") == "transcript":
print("TEXT:", data["phrase"]["text"])
except Exception as e:
print("Parse error:", e)
except websockets.ConnectionClosed:
print("WS closed.")

async def main():
async with websockets.connect(WS_URL, additional_headers=headers) as ws:
await asyncio.gather(send_audio(ws), recv_results(ws))

asyncio.run(main())


if audio_for_send != audio_name:
os.remove(audio_for_send)
print(f"Removed temporary file: {audio_for_send}")