diff --git a/src/app/acs/caller.py b/src/app/acs/caller.py index 40771b1..7fe9c12 100644 --- a/src/app/acs/caller.py +++ b/src/app/acs/caller.py @@ -5,36 +5,45 @@ from azure.core.messaging import CloudEvent from typing import List, Optional, Union, TYPE_CHECKING from azure.communication.callautomation import ( - PhoneNumberIdentifier) -from azure.communication.callautomation.aio import CallAutomationClient -from azure.communication.callautomation import ( + CallAutomationClient, + CallConnectionClient, + PhoneNumberIdentifier, MediaStreamingOptions, - AudioFormat, MediaStreamingTransportType, MediaStreamingContentType, + RecognizeInputType, + MicrosoftTeamsUserIdentifier, MediaStreamingAudioChannelType, - ) + CallInvite, + RecognitionChoice, + AudioFormat, + DtmfTone, + VoiceKind, + FileSource, + TextSource +) +from azure.communication.callautomation.aio import CallAutomationClient from azure.communication.phonenumbers import PhoneNumbersClient,PhoneNumberCapabilityType, PhoneNumberAssignmentType, PhoneNumberType, PhoneNumberCapabilities import json +from aiohttp import web import requests class OutboundCall: - target_number: str source_number: str acs_connection_string: str acs_callback_path: str - def __init__(self, acs_connection_string: str, acs_source_number: str, acs_callback_path: str): + def __init__(self, source_number:str, acs_connection_string: str, acs_callback_path: str): + self.source_number = source_number self.acs_connection_string = acs_connection_string - self.source_number = acs_source_number - self.acs_callback_path = acs_callback_path - + self.acs_callback_path = acs_callback_path + async def call(self, target_number: str): self.call_automation_client = CallAutomationClient.from_connection_string(self.acs_connection_string) self.target_participant = PhoneNumberIdentifier(target_number) self.source_caller = PhoneNumberIdentifier(self.source_number) - websocket_url = 'wss://' + self.acs_callback_path + '/realtime' + websocket_url = 'wss://' + self.acs_callback_path + '/realtime-acs' media_streaming_options = MediaStreamingOptions( transport_url=websocket_url, @@ -64,7 +73,7 @@ async def _outbound_call_handler(self, request): # Parsing callback events event = CloudEvent.from_dict(event_dict) call_connection_id = event.data['callConnectionId'] - print("%s event received for call connection id: %s", event.type, call_connection_id) + print(f"{event.type} event received for call connection id: {call_connection_id}") call_connection_client = self.call_automation_client.get_call_connection(call_connection_id) # target_participant = PhoneNumberIdentifier(self.target_number) if event.type == "Microsoft.Communication.CallConnected": @@ -81,6 +90,5 @@ async def _outbound_call_handler(self, request): async def _get_source_number(self): return self.source_number - def attach_to_app(self, app, path): app.router.add_post(path, self._outbound_call_handler) diff --git a/src/app/app.py b/src/app/app.py index 4e4282d..f6080de 100644 --- a/src/app/app.py +++ b/src/app/app.py @@ -122,6 +122,8 @@ async def index(request): return web.FileResponse(static_directory / 'index.html') async def call(request): + body = await request.json() + if (caller is not None): body = await request.json() print(body) diff --git a/src/app/backend/rtmt.py b/src/app/backend/rtmt.py index a17fea0..891af0a 100644 --- a/src/app/backend/rtmt.py +++ b/src/app/backend/rtmt.py @@ -3,7 +3,7 @@ import json from enum import Enum from typing import Any, Callable, Optional -from aiohttp import web +from aiohttp import WSMessage, web from azure.identity import DefaultAzureCredential, get_bearer_token_provider from azure.core.credentials import AzureKeyCredential @@ -69,7 +69,87 @@ def __init__(self, endpoint: str, deployment: str, credentials: AzureKeyCredenti self._token_provider = get_bearer_token_provider(credentials, "https://cognitiveservices.azure.com/.default") self._token_provider() # Warm up during startup so we have a token cached when the first request arrives - async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketResponse, server_ws: web.WebSocketResponse) -> Optional[str]: + def _acs_message_to_openai(self, msg_data_json: str) -> Optional[str]: + """ + Transforms websocket message data from Azure Communication Services (ACS) to the OpenAI Realtime API format. + Args: + msg_data_json (str): The JSON string containing the ACS message data. + Returns: + Optional[str]: The transformed message in the OpenAI Realtime API format, or None if the message kind is not recognized. + This is needed to plug the Azure Communication Services audio stream into the OpenAI Realtime API. + Both APIs have different message formats, so this function acts as a bridge between them. + This method decides, if the given message is relevant for the OpenAI Realtime API, and if so, it is transformed to the OpenAI Realtime API format. + """ + message = json.loads(msg_data_json) + updated_message = msg_data_json + + # Initial message from Azure Communication Services. + # Set the initial configuration for the OpenAI Realtime API by sending a session.update message. + if message["kind"] == "AudioMetadata": + oai_message = { + "type": "session.update", + "session": { + "tool_choice": "auto" if len(self.tools) > 0 else "none", + "tools": [tool.schema for tool in self.tools.values()], + "turn_detection": { + "type": 'server_vad', + "threshold": 0.7, # Adjust if necessary + "prefix_padding_ms": 300, # Adjust if necessary + "silence_duration_ms": 500 # Adjust if necessary + }, + } + } + + if self.system_message is not None: + oai_message["session"]["instructions"] = self.system_message + if self.temperature is not None: + oai_message["session"]["temperature"] = self.temperature + if self.max_tokens is not None: + oai_message["session"]["max_response_output_tokens"] = self.max_tokens + if self.disable_audio is not None: + oai_message["session"]["disable_audio"] = self.disable_audio + + updated_message = json.dumps(oai_message) + + # Message from Azure Communication Services with audio data. + # Transform the message to the OpenAI Realtime API format. + elif message["kind"] == "AudioData": + oai_message = { + "type": "input_audio_buffer.append", + "audio": message["audioData"]["data"] + } + updated_message = json.dumps(oai_message) + + return updated_message + + def _openai_message_to_acs(self, msg_data_json: str) -> Optional[str]: + """ + Transforms websocket message data from the OpenAI Realtime API format into the Azure Communication Services (ACS) format. + Args: + msg_data_json (str): The JSON string containing the message data from the OpenAI Realtime API. + Returns: + Optional[str]: A JSON string containing the transformed message in ACS format, or None if the message type is not handled. + This is needed to plug the OpenAI Realtime API audio stream into Azure Communication Services. + Both APIs have different message formats, so this function acts as a bridge between them. + This method decides, if the given message is relevant for the ACS, and if so, it is transformed to the ACS format. + """ + message = json.loads(msg_data_json) + updated_message = None + + # Message from the OpenAI Realtime API with audio data. + # Transform the message to the Azure Communication Services format. + if message["type"] == "response.audio.delta": + acs_message = { + "kind": "AudioData", + "audioData": { + "data": message["delta"] + } + } + updated_message = json.dumps(acs_message) + + return updated_message + + async def _process_message_to_client(self, msg: WSMessage, client_ws: web.WebSocketResponse, server_ws: web.WebSocketResponse, is_acs_audio_stream: bool) -> Optional[str]: message = json.loads(msg.data) updated_message = msg.data if message is not None: @@ -119,14 +199,16 @@ async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketRes } }) if result.destination == ToolResultDirection.TO_CLIENT: - # TODO: this will break clients that don't know about this extra message, rewrite - # this to be a regular text message with a special marker of some sort - await client_ws.send_json({ - "type": "extension.middle_tier_tool_response", - "previous_item_id": tool_call.previous_id, - "tool_name": item["name"], - "tool_result": result.to_text() - }) + # Only send extra messages to clients that are not ACS audio streams + if is_acs_audio_stream == False: + # TODO: this will break clients that don't know about this extra message, rewrite + # this to be a regular text message with a special marker of some sort + await client_ws.send_json({ + "type": "extension.middle_tier_tool_response", + "previous_item_id": tool_call.previous_id, + "tool_name": item["name"], + "tool_result": result.to_text() + }) updated_message = None case "response.done": @@ -137,18 +219,32 @@ async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketRes }) if "response" in message: replace = False - for i, output in enumerate(reversed(message["response"]["output"])): + outputs = message["response"]["output"] + for output in reversed(outputs): if output["type"] == "function_call": - message["response"]["output"].pop(i) + outputs.remove(output) replace = True if replace: updated_message = json.dumps(message) + # Transform the message to the Azure Communication Services format, + # if it comes from the OpenAI realtime stream. + if is_acs_audio_stream and updated_message is not None: + updated_message = self._openai_message_to_acs(updated_message) + return updated_message - async def _process_message_to_server(self, msg: str, ws: web.WebSocketResponse) -> Optional[str]: + async def _process_message_to_server(self, msg: WSMessage, ws: web.WebSocketResponse, is_acs_audio_stream) -> Optional[str]: message = json.loads(msg.data) updated_message = msg.data + + # Transform the message to the OpenAI Realtime API format first, + # if it comes from the Azure Communication Services audio stream. + if (is_acs_audio_stream): + data = self._acs_message_to_openai(msg.data) + message = json.loads(data) + updated_message = data + if message is not None: match message["type"]: case "session.update": @@ -168,7 +264,7 @@ async def _process_message_to_server(self, msg: str, ws: web.WebSocketResponse) return updated_message - async def _forward_messages(self, ws: web.WebSocketResponse): + async def _forward_messages(self, ws: web.WebSocketResponse, is_acs_audio_stream: bool): async with aiohttp.ClientSession(base_url=self.endpoint) as session: params = { "api-version": "2024-10-01-preview", "deployment": self.deployment } headers = {} @@ -182,7 +278,7 @@ async def _forward_messages(self, ws: web.WebSocketResponse): async def from_client_to_server(): async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: - new_msg = await self._process_message_to_server(msg, ws) + new_msg = await self._process_message_to_server(msg, ws, is_acs_audio_stream) if new_msg is not None: await target_ws.send_str(new_msg) else: @@ -191,7 +287,7 @@ async def from_client_to_server(): async def from_server_to_client(): async for msg in target_ws: if msg.type == aiohttp.WSMsgType.TEXT: - new_msg = await self._process_message_to_client(msg, ws, target_ws) + new_msg = await self._process_message_to_client(msg, ws, target_ws, is_acs_audio_stream) if new_msg is not None: await ws.send_str(new_msg) else: @@ -206,8 +302,15 @@ async def from_server_to_client(): async def _websocket_handler(self, request: web.Request): ws = web.WebSocketResponse() await ws.prepare(request) - await self._forward_messages(ws) + await self._forward_messages(ws, False) + return ws + + async def _websocket_handler_acs(self, request: web.Request): + ws = web.WebSocketResponse() + await ws.prepare(request) + await self._forward_messages(ws, True) return ws def attach_to_app(self, app, path): app.router.add_get(path, self._websocket_handler) + app.router.add_get(path + "-acs", self._websocket_handler_acs) diff --git a/src/app/static/app.js b/src/app/static/app.js index 7557293..1550c85 100644 --- a/src/app/static/app.js +++ b/src/app/static/app.js @@ -139,7 +139,6 @@ function onToggleListening() { function onCallButton() { const phonenumber = document.getElementById('phonenumber').value; - const callDetails = { target_number: phonenumber }; @@ -312,4 +311,4 @@ window.onload = function() { } ) .catch(error => console.error('Error:', error)); -}; \ No newline at end of file +};