


import asyncio
import base64
import json
import wave
import websockets
API_KEY = "<YOUR_API_KEY>"
WS_URL = "wss://api.inworld.ai/stt/v1/transcribe:streamBidirectional"
async def stream_transcribe():
headers = {"Authorization": f"Basic {API_KEY}"}
async with websockets.connect(WS_URL, additional_headers=headers) as ws:
# Read WAV and extract raw PCM
with wave.open("audio.wav", "rb") as wf:
sample_rate = wf.getframerate()
channels = wf.getnchannels()
pcm = wf.readframes(wf.getnframes())
# 1. Send transcription config
await ws.send(json.dumps({
"transcribeConfig": {
"modelId": "inworld/inworld-stt-1",
"audioEncoding": "LINEAR16",
"sampleRateHertz": 16000,
"numberOfChannels": 1,
"language": "en-US"
}
}))
# 2. Stream audio in 100 ms chunks (base64-encoded)
chunk_bytes = int(sample_rate * 2 * channels * 0.1)
for i in range(0, len(pcm), chunk_bytes):
chunk = pcm[i : i + chunk_bytes]
await ws.send(json.dumps({
"audioChunk": {"content": base64.b64encode(chunk).decode()}
}))
await asyncio.sleep(0.1)
# 3. Signal end of turn
await ws.send(json.dumps({"endTurn": {}}))
# 4. Receive results until final
while True:
try:
raw = await asyncio.wait_for(ws.recv(), timeout=10)
msg = json.loads(raw)
t = msg.get("result", {}).get("transcription", {})
if t:
tag = "[FINAL]" if t.get("isFinal") else "[partial]"
print(f"{tag} {t.get('transcript', '')}")
if t.get("isFinal"):
break
except asyncio.TimeoutError:
break
# 5. Close the stream
await ws.send(json.dumps({"closeStream": {}}))
asyncio.run(stream_transcribe())