Перейти к основному содержимому

Использование OpenAI API

В Compressa есть интегрированный API-слой, совместимый с Chat Completion API от OpenAI. Это означает, что разработчики могут использовать существующую клиентскую библиотеку от OpenAI (включая библиотеку на Langchain) и адаптировать текущий код с минимальными изменениями для работы с Compressa.

В зависимости от конфигурации Compressa (поле task в конфигурационном файле) будут доступны разные эндпойнты.

OpenAI API без стриминга

from openai import OpenAI

client = OpenAI(
base_url="http://localhost:5000/v1",
api_key="Ваш_user_API_ключ"
)

response = client.chat.completions.create(
model="Compressa-LLM",
messages=[
{"role": "system", "content": "Ты умеешь сочинять смешные анекдоты."},
{"role": "user", "content": "Напиши короткий и смешной анекдот про программиста."}
],
stream=False
)

print(response.choices[0].message.content)

Также вы можете включить опцию стриминга токенов, доступную в клиенте OpenAI

#pip install openai - если у вас еще нет этого пакета

from openai import OpenAI

client = OpenAI(
base_url="http://localhost:5000/v1",
api_key="Ваш_user_API_ключ"
)

completion = client.chat.completions.create(
model="Compressa-LLM",
messages=[
{"role": "system", "content": "Ты умеешь сочинять смешные анкедоты."},
{"role": "user", "content": "Напиши короткий и смешной анекдот про программиста."}
],
stream=True
)

for chunk in completion:
print(chunk.choices[0].delta)

#ChoiceDelta(content='Б', function_call=None, refusal=None, role='assistant', tool_calls=None)
#ChoiceDelta(content='ы', function_call=None, refusal=None, role='assistant', tool_calls=None)
#ChoiceDelta(content='л', function_call=None, refusal=None, role='assistant', tool_calls=None)
#ChoiceDelta(content=' у', function_call=None, refusal=None, role='assistant', tool_calls=None)
#...

Модель эмбеддингов

import requests

text = "Купил мужик шляпу, а она ему как раз"
url = "http://localhost:5000/v1/embeddings"

response = requests.post(
url,
json={"model": "Compressa-Embedding", "input": text},
)
print(response.status_code)

text_embedding = response.json()["data"][0]["embedding"]
print(f"Text embedding (first 10): {text_embedding[:10]}")

Модель реранжирования

import requests

url = "http://localhost:5000/v1/rerank"
headers = {"accept": "application/json","Content-Type": "application/json"}

payload = {
"query": "Кто написал Воину и Мир?",
"model": "Compressa-Rerank",
"documents": [
"Мир дикого запада - классный сериал",
"Фильм Война снимал Балабанов",
"Л.Н Толстой работал над романом Война и мир 6 лет"
]
}

response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
results = response.json()["results"]
print(results)

Модель Text To Speech

import requests

payload = {
"text": "Проснувшись однажды утром после беспокойного сна, Грегор Замза обнаружил, что он у себя в постели превратился в страшное насекомое.",
"speaker_id": "Zofija Kendrick",
"language_id": "ru"
}

url = "http://localhost:5000/api/tts"
response = requests.post(url, data=payload)

print(response.status_code)
if response.ok:
with open("test.wav", "wb") as f:
f.write(response.content)

Модель Authomatic Speech Recognition

Можно указать stream=True тогда распознанный текст будет отдаваться стримингом.

import requests
stream = True

url = f"http://localhost:5000/v1/audio/transcriptions/?stream={stream}"
audio_name = "test.wav"

response = requests.post(
url,
files={
"file": (audio_name, open(audio_name, "rb"), "audio/wav")
},
stream=stream
)

for line in response.iter_lines(decode_unicode=True):
print(line)

Также доступен реалтайм режим через Web Socket

Важно! Работа модели протестирована для sample rate 8000.

import wave
import os
import tempfile
import asyncio
import websockets
import librosa
import soundfile as sf

audio_name = "test.wav"
REQUIRED_SR = 8000

def prepare_audio(audio_name, required_sr):
with wave.open(audio_name, "rb") as wf:
sample_rate = wf.getframerate()
sample_width = wf.getsampwidth()
channels = wf.getnchannels()
num_frames = wf.getnframes()
duration = num_frames / sample_rate
if sample_rate != required_sr or channels != 1 or sample_width != 2:
print("Resampling audio...")
y, sr = librosa.load(audio_name, sr=None, mono=True)
y_resampled = librosa.resample(y, orig_sr=sr, target_sr=required_sr)
fd, temp_wav = tempfile.mkstemp(suffix=".wav")
os.close(fd)
sf.write(temp_wav, y_resampled, required_sr, subtype="PCM_16")
print(f"Saved resampled file as {temp_wav}")
return temp_wav
else:
return audio_name

chunk_duration_ms = 300
required_sr = REQUIRED_SR

audio_for_send = prepare_audio(audio_name, required_sr)

with wave.open(audio_for_send, "rb") as wf:
sample_rate = wf.getframerate()
sample_width = wf.getsampwidth()
channels = wf.getnchannels()
num_frames = wf.getnframes()
duration = num_frames / sample_rate
chunk_samples = int(sample_rate * (chunk_duration_ms / 1000))
chunk_bytes = chunk_samples * sample_width * channels
print(f"Recommended chunk size: {chunk_bytes} bytes ({chunk_samples} samples, {chunk_duration_ms} ms)")

CHUNK_SIZE = chunk_bytes
WS_URL = "ws://localhost:5000/v1/realtime/transcriptions/ws/"

async def send_audio(ws):
with open(audio_for_send, "rb") as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
await ws.send(chunk)
await asyncio.sleep(chunk_samples / sample_rate)
await ws.send(b"")

async def recv_results(ws):
try:
async for msg in ws:
if isinstance(msg, str):
try:
msg = json.loads(msg)
if msg.get("event") == "transcript":
print("TEXT:", msg["phrase"]["text"])
except Exception as e:
print("Parse error:", e)
except websockets.ConnectionClosed:
print("WS closed.")

async def main():
async with websockets.connect(WS_URL) as ws:
await asyncio.gather(
send_audio(ws),
recv_results(ws)
)

asyncio.run(main())

if audio_for_send != audio_name:
os.remove(audio_for_send)
print(f"Removed temporary file: {audio_for_send}")