Using OpenAI API
Compressa has an integrated API layer compatible with OpenAI's Chat Completion API. This means developers can use the existing OpenAI client library (including the Langchain library) and adapt current code with minimal changes to work with Compressa.
Depending on Compressa configuration (the task field in the configuration file), different endpoints will be available.
OpenAI API Without Streaming
- Python (клиент OpenAI)
- Python (OpenAI Langchain)
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:5000/v1",
api_key="key"
)
response = client.chat.completions.create(
model="Compressa-LLM",
messages=[
{"role": "system", "content": "You can write funny jokes."},
{"role": "user", "content": "Write a short and funny joke about a programmer."}
],
stream=False
)
print(response.choices[0].message.content)
# pip install langchain langchain-openai openai
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
llm = ChatOpenAI(
base_url="http://localhost:5000/v1",
api_key="key",
model="Compressa-LLM"
)
messages = [
SystemMessage(content="You can write funny jokes."),
HumanMessage(content="Write a short and funny joke about a programmer."),
]
ai_msg = llm.invoke(messages)
print(f"Model response: {ai_msg.content}")
You can also enable token streaming option available in the OpenAI client
#pip install openai - если у вас еще нет этого пакета
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:5000/v1",
api_key="key"
)
completion = client.chat.completions.create(
model="Compressa-LLM",
messages=[
{"role": "system", "content": "You can write funny jokes."},
{"role": "user", "content": "Write a short and funny joke about a programmer."}
],
stream=True
)
for chunk in completion:
print(chunk.choices[0].delta)
#ChoiceDelta(content='A', function_call=None, refusal=None, role='assistant', tool_calls=None)
#ChoiceDelta(content=' p', function_call=None, refusal=None, role='assistant', tool_calls=None)
#ChoiceDelta(content='r', function_call=None, refusal=None, role='assistant', tool_calls=None)
#ChoiceDelta(content='o', function_call=None, refusal=None, role='assistant', tool_calls=None)
#...
Embedding Model
import requests
text = "The quick brown fox jumps over the lazy dog"
url = "http://localhost:5000/v1/embeddings"
response = requests.post(
url,
json={"model": "Compressa-Embedding", "input": text},
)
print(response.status_code)
text_embedding = response.json()["data"][0]["embedding"]
print(f"Text embedding (first 10): {text_embedding[:10]}")
Reranking Model
import requests
url = "http://localhost:5000/v1/rerank"
headers = {"accept": "application/json","Content-Type": "application/json"}
payload = {
"query": "Who wrote War and Peace?",
"model": "Compressa-Rerank",
"documents": [
"Westworld is a great series",
"The film War was directed by Balabanov",
"L.N. Tolstoy worked on the novel War and Peace for 6 years"
]
}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
results = response.json()["results"]
print(results)
Text To Speech Model
import requests
payload = {
"text": "One morning, when Gregor Samsa woke from troubled dreams, he found himself transformed in his bed into a horrible vermin.",
"speaker_id": "Zofija Kendrick",
"language_id": "en"
}
url = "http://localhost:5000/api/tts"
response = requests.post(url, data=payload)
print(response.status_code)
if response.ok:
with open("test.wav", "wb") as f:
f.write(response.content)
Automatic Speech Recognition Model
You can specify stream=True then the recognized text will be streamed.
import requests
stream = True
url = f"http://localhost:5000/v1/audio/transcriptions/?stream={stream}"
audio_name = "test.wav"
response = requests.post(
url,
files={
"file": (audio_name, open(audio_name, "rb"), "audio/wav")
},
stream=stream
)
for line in response.iter_lines(decode_unicode=True):
print(line)
Real-time mode is also available via Web Socket
Important! The model has been tested for sample rate 8000.
import wave
import os
import tempfile
import asyncio
import websockets
import librosa
import soundfile as sf
audio_name = "test.wav"
REQUIRED_SR = 8000
def prepare_audio(audio_name, required_sr):
with wave.open(audio_name, "rb") as wf:
sample_rate = wf.getframerate()
sample_width = wf.getsampwidth()
channels = wf.getnchannels()
num_frames = wf.getnframes()
duration = num_frames / sample_rate
if sample_rate != required_sr or channels != 1 or sample_width != 2:
print("Resampling audio...")
y, sr = librosa.load(audio_name, sr=None, mono=True)
y_resampled = librosa.resample(y, orig_sr=sr, target_sr=required_sr)
fd, temp_wav = tempfile.mkstemp(suffix=".wav")
os.close(fd)
sf.write(temp_wav, y_resampled, required_sr, subtype="PCM_16")
print(f"Saved resampled file as {temp_wav}")
return temp_wav
else:
return audio_name
chunk_duration_ms = 300
required_sr = REQUIRED_SR
audio_for_send = prepare_audio(audio_name, required_sr)
with wave.open(audio_for_send, "rb") as wf:
sample_rate = wf.getframerate()
sample_width = wf.getsampwidth()
channels = wf.getnchannels()
num_frames = wf.getnframes()
duration = num_frames / sample_rate
chunk_samples = int(sample_rate * (chunk_duration_ms / 1000))
chunk_bytes = chunk_samples * sample_width * channels
print(f"Recommended chunk size: {chunk_bytes} bytes ({chunk_samples} samples, {chunk_duration_ms} ms)")
CHUNK_SIZE = chunk_bytes
WS_URL = "ws://localhost:5000/v1/realtime/transcriptions/ws/"
async def send_audio(ws):
with open(audio_for_send, "rb") as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
await ws.send(chunk)
await asyncio.sleep(chunk_samples / sample_rate)
await ws.send(b"")
async def recv_results(ws):
try:
async for msg in ws:
if isinstance(msg, str):
try:
msg = json.loads(msg)
if msg.get("event") == "transcript":
print("TEXT:", msg["phrase"]["text"])
except Exception as e:
print("Parse error:", e)
except websockets.ConnectionClosed:
print("WS closed.")
async def main():
async with websockets.connect(WS_URL) as ws:
await asyncio.gather(
send_audio(ws),
recv_results(ws)
)
asyncio.run(main())
if audio_for_send != audio_name:
os.remove(audio_for_send)
print(f"Removed temporary file: {audio_for_send}")