Skip to content

Commit

Permalink
docs: add examples + guidance on Realtime API support
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertCraigie committed Dec 17, 2024
1 parent 307dd90 commit 0178824
Show file tree
Hide file tree
Showing 5 changed files with 493 additions and 1 deletion.
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,67 @@ We recommend that you always instantiate a client (e.g., with `client = OpenAI()
- It's harder to mock for testing purposes
- It's not possible to control cleanup of network connections

## Realtime API beta

The Realtime API enables you to build low-latency, multi-modal conversational experiences. It currently supports text and audio as both input and output, as well as [function calling](https://platform.openai.com/docs/guides/function-calling) through a WebSocket connection.

Under the hood the SDK uses the [`websockets`](https://websockets.readthedocs.io/en/stable/) library to manage connections.

The Realtime API works through a combination of client-sent events and server-sent events. Clients can send events to do things like update session configuration or send text and audio inputs. Server events confirm when audio responses have completed, or when a text response from the model has been received. A full event reference can be found [here](platform.openai.com/docs/api-reference/realtime-client-events) and a guide can be found [here](https://platform.openai.com/docs/guides/realtime).

Basic text based example:

```py
import asyncio
from openai import AsyncOpenAI

async def main():
client = AsyncOpenAI()

async with client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as connection:
await connection.session.update(session={'modalities': ['text']})

await connection.conversation.item.create(
item={
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Say hello!"}],
}
)
await connection.response.create()

async for event in connection:
if event.type == 'response.text.delta':
print(event.delta, flush=True, end="")

elif event.type == 'response.text.done':
print()

elif event.type == "response.done":
break

asyncio.run(main())
```

However the real magic of the Realtime API is handling audio inputs / outputs, see this example [TUI script](https://github.com/stainless-sdks/openai-python/blob/robert/realtime-docs-preview/examples/realtime/push_to_talk_app.py) for a fully fledged example.

### Realtime error handling

Whenever an error occurs, the Realtime API will send an [`error` event](https://platform.openai.com/docs/guides/realtime/realtime-api-beta#handling-errors) and the connection will stay open and remain usable. This means you need to handle it yourself, as *no errors are raised directly* by the SDK when an `error` event comes in.

```py
client = AsyncOpenAI()

async with client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as connection:
...
async for event in connection:
if event.type == 'error':
print(event.error.type)
print(event.error.code)
print(event.error.event_id)
print(event.error.message)
```

## Using types

Nested request parameters are [TypedDicts](https://docs.python.org/3/library/typing.html#typing.TypedDict). Responses are [Pydantic models](https://docs.pydantic.dev) which also provide helper methods for things like:
Expand Down
142 changes: 142 additions & 0 deletions examples/realtime/audio_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from __future__ import annotations

import io
import base64
import asyncio
import threading
from typing import Callable, Awaitable

import numpy as np
import pyaudio
import sounddevice as sd
from pydub import AudioSegment

from openai.resources.beta.realtime.realtime import AsyncRealtimeConnection

CHUNK_LENGTH_S = 0.05 # 100ms
SAMPLE_RATE = 24000
FORMAT = pyaudio.paInt16
CHANNELS = 1

# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false


def audio_to_pcm16_base64(audio_bytes: bytes) -> bytes:
# load the audio file from the byte stream
audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
print(f"Loaded audio: {audio.frame_rate=} {audio.channels=} {audio.sample_width=} {audio.frame_width=}")
# resample to 24kHz mono pcm16
pcm_audio = audio.set_frame_rate(SAMPLE_RATE).set_channels(CHANNELS).set_sample_width(2).raw_data
return pcm_audio


class AudioPlayerAsync:
def __init__(self):
self.queue = []
self.lock = threading.Lock()
self.stream = sd.OutputStream(
callback=self.callback,
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype=np.int16,
blocksize=int(CHUNK_LENGTH_S * SAMPLE_RATE),
)
self.playing = False
self._frame_count = 0

def callback(self, outdata, frames, time, status): # noqa
with self.lock:
data = np.empty(0, dtype=np.int16)

# get next item from queue if there is still space in the buffer
while len(data) < frames and len(self.queue) > 0:
item = self.queue.pop(0)
frames_needed = frames - len(data)
data = np.concatenate((data, item[:frames_needed]))
if len(item) > frames_needed:
self.queue.insert(0, item[frames_needed:])

self._frame_count += len(data)

# fill the rest of the frames with zeros if there is no more data
if len(data) < frames:
data = np.concatenate((data, np.zeros(frames - len(data), dtype=np.int16)))

outdata[:] = data.reshape(-1, 1)

def reset_frame_count(self):
self._frame_count = 0

def get_frame_count(self):
return self._frame_count

def add_data(self, data: bytes):
with self.lock:
# bytes is pcm16 single channel audio data, convert to numpy array
np_data = np.frombuffer(data, dtype=np.int16)
self.queue.append(np_data)
if not self.playing:
self.start()

def start(self):
self.playing = True
self.stream.start()

def stop(self):
self.playing = False
self.stream.stop()
with self.lock:
self.queue = []

def terminate(self):
self.stream.close()


async def send_audio_worker_sounddevice(
connection: AsyncRealtimeConnection,
should_send: Callable[[], bool] | None = None,
start_send: Callable[[], Awaitable[None]] | None = None,
):
sent_audio = False

device_info = sd.query_devices()
print(device_info)

read_size = int(SAMPLE_RATE * 0.02)

stream = sd.InputStream(
channels=CHANNELS,
samplerate=SAMPLE_RATE,
dtype="int16",
)
stream.start()

try:
while True:
if stream.read_available < read_size:
await asyncio.sleep(0)
continue

data, _ = stream.read(read_size)

if should_send() if should_send else True:
if not sent_audio and start_send:
await start_send()
await connection.send(
{"type": "input_audio_buffer.append", "audio": base64.b64encode(data).decode("utf-8")}
)
sent_audio = True

elif sent_audio:
print("Done, triggering inference")
await connection.send({"type": "input_audio_buffer.commit"})
await connection.send({"type": "response.create", "response": {}})
sent_audio = False

await asyncio.sleep(0)

except KeyboardInterrupt:
pass
finally:
stream.stop()
stream.close()
Loading

0 comments on commit 0178824

Please sign in to comment.