ASR
This module allows you to recognize audio in wav format and return a text response.
There are 3 operating modes:
- offline - file upload and text response after complete recognition
- streaming - file upload and text chunk response in streaming mode
- real-time streaming - audio upload in chunks and chunk response (WebSocket)
Usage Example
- ASR
Automatic Speech Recognition Model
You can specify stream=True, then the recognized text will be streamed.
import requests
stream = True
url = "https://compressa-api.mil-team.ru/v1/audio/transcriptions"
headers = {"Authorization": "TOKEN_1"}
audio_name = "test.wav"
response = requests.post(
url,
headers=headers,
files={"file": (audio_name, open(audio_name, "rb"), "audio/wav")},
stream=stream,
)
for line in response.iter_lines(decode_unicode=True):
print(line)
Real-time mode is also available via WebSocket. Important! The model has been tested for sample rate 8000.
import wave
import os
import tempfile
import asyncio
import websockets
import librosa
import soundfile as sf
import json
audio_name = "test.wav"
REQUIRED_SR = 8000
def prepare_audio(audio_name, required_sr):
with wave.open(audio_name, "rb") as wf:
sample_rate = wf.getframerate()
sample_width = wf.getsampwidth()
channels = wf.getnchannels()
num_frames = wf.getnframes()
duration = num_frames / sample_rate
if sample_rate != required_sr or channels != 1 or sample_width != 2:
print("Resampling audio...")
y, sr = librosa.load(audio_name, sr=None, mono=True)
y_resampled = librosa.resample(y, orig_sr=sr, target_sr=required_sr)
fd, temp_wav = tempfile.mkstemp(suffix=".wav")
os.close(fd)
sf.write(temp_wav, y_resampled, required_sr, subtype="PCM_16")
print(f"Saved resampled file as {temp_wav}")
return temp_wav
else:
return audio_name
chunk_duration_ms = 300
required_sr = REQUIRED_SR
audio_for_send = prepare_audio(audio_name, required_sr)
with wave.open(audio_for_send, "rb") as wf:
sample_rate = wf.getframerate()
sample_width = wf.getsampwidth()
channels = wf.getnchannels()
num_frames = wf.getnframes()
duration = num_frames / sample_rate
chunk_samples = int(sample_rate * (chunk_duration_ms / 1000))
chunk_bytes = chunk_samples * sample_width * channels
print(
f"Recommended chunk size: {chunk_bytes} bytes "
f"({chunk_samples} samples, {chunk_duration_ms} ms)"
)
CHUNK_SIZE = chunk_bytes
WS_URL = "https://compressa-api.mil-team.ru/v1/realtime/transcriptions/?model_id=t-tech/T-one"
headers = {"Authorization": "TOKEN_1"}
async def send_audio(ws):
with open(audio_for_send, "rb") as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
await ws.send(chunk)
await asyncio.sleep(chunk_samples / sample_rate)
await ws.send(b"")
async def recv_results(ws):
try:
async for msg in ws:
if isinstance(msg, str):
try:
data = json.loads(msg)
if data.get("event") == "transcript":
print("TEXT:", data["phrase"]["text"])
except Exception as e:
print("Parse error:", e)
except websockets.ConnectionClosed:
print("WS closed.")
async def main():
async with websockets.connect(WS_URL, additional_headers=headers) as ws:
await asyncio.gather(send_audio(ws), recv_results(ws))
asyncio.run(main())
if audio_for_send != audio_name:
os.remove(audio_for_send)
print(f"Removed temporary file: {audio_for_send}")