From f35e0dfb4ac261981be0e968d5687479ff57ff36 Mon Sep 17 00:00:00 2001 From: Robin-Manuel Thiel Date: Wed, 4 Dec 2024 22:18:04 +0100 Subject: [PATCH 1/5] fix: Replace xmlHttp with fetch in the frontend Use fetch in the frontend to send a JSON body and use a modern JS stack --- src/app/static/app.js | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/app/static/app.js b/src/app/static/app.js index d1cfdc1..f6398fd 100644 --- a/src/app/static/app.js +++ b/src/app/static/app.js @@ -139,18 +139,16 @@ function onToggleListening() { function onCallButton() { const phonenumber = document.getElementById('phonenumber').value; - - const callDetails = { - number: phonenumber - }; - - - theUrl = window.location.href + "call"; - var xmlHttp = new XMLHttpRequest(); - xmlHttp.open( "POST", theUrl, false ); - xmlHttp.send( callDetails ); - reportDiv.textContent = xmlHttp.responseText; + theUrl = window.location.href + "call"; + fetch(theUrl, { + method : "POST", + body : JSON.stringify({ + number: phonenumber + }) + }) + .then(response => reportDiv.textContent = response.json()) + .catch(error => console.error('Error:', error)); } toggleButton.addEventListener('click', onToggleListening); From cf3dc863b3ee0e90c40c059c96577d29fdc7c5e4 Mon Sep 17 00:00:00 2001 From: Robin-Manuel Thiel Date: Wed, 4 Dec 2024 22:19:11 +0100 Subject: [PATCH 2/5] feat: Pass previously hardcoded target phone number as request body --- src/app/acs/caller.py | 11 ++++++----- src/app/app.py | 8 ++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/app/acs/caller.py b/src/app/acs/caller.py index f7ee856..bdaa408 100644 --- a/src/app/acs/caller.py +++ b/src/app/acs/caller.py @@ -16,23 +16,22 @@ FileSource, TextSource) import json +from aiohttp import web import requests class OutboundCall: - target_number: str source_number: str acs_connection_string: str acs_callback_path: str - def __init__(self, target_number: str, source_number:str, acs_connection_string: str, acs_callback_path: str): - self.target_number = target_number + def __init__(self, source_number:str, acs_connection_string: str, acs_callback_path: str): self.source_number = source_number self.acs_connection_string = acs_connection_string self.acs_callback_path = acs_callback_path - async def call(self): + async def call(self, target_number: str): self.call_automation_client = CallAutomationClient.from_connection_string(self.acs_connection_string) - self.target_participant = PhoneNumberIdentifier(self.target_number) + self.target_participant = PhoneNumberIdentifier(target_number) self.source_caller = PhoneNumberIdentifier(self.source_number) call_connection_properties = self.call_automation_client.create_call(self.target_participant, self.acs_callback_path, @@ -52,6 +51,8 @@ async def _outbound_call_handler(self, request): print("Call connected") print(call_connection_id) + return web.Response(status=200) + def attach_to_app(self, app, path): app.router.add_post(path, self._outbound_call_handler) diff --git a/src/app/app.py b/src/app/app.py index 43c313a..97eed51 100644 --- a/src/app/app.py +++ b/src/app/app.py @@ -53,12 +53,10 @@ async def create_app(): rtmt = RTMiddleTier(llm_endpoint, llm_deployment, llm_credential) - if (os.environ.get("ACS_TARGET_NUMBER") is not None and - os.environ.get("ACS_SOURCE_NUMBER") is not None and + if (os.environ.get("ACS_SOURCE_NUMBER") is not None and os.environ.get("ACS_CONNECTION_STRING") is not None and os.environ.get("ACS_CALLBACK_PATH") is not None): caller = OutboundCall( - os.environ.get("ACS_TARGET_NUMBER"), os.environ.get("ACS_SOURCE_NUMBER"), os.environ.get("ACS_CONNECTION_STRING"), os.environ.get("ACS_CALLBACK_PATH"), @@ -117,8 +115,10 @@ async def index(request): return web.FileResponse(static_directory / 'index.html') async def call(request): + body = await request.json() + if (caller is not None): - await caller.call() + await caller.call(body['number']) return web.Response(text="Created outbound call") else: return web.Response(text="Outbound calling is not configured") From 1688ac5c8b71de6c3b35ac5b0c4ce2ea3bd3168c Mon Sep 17 00:00:00 2001 From: Robin-Manuel Thiel Date: Fri, 6 Dec 2024 17:47:08 +0100 Subject: [PATCH 3/5] feat: Add implementation for ACS outbound phone calls --- src/app/acs/caller.py | 28 ++++++-- src/app/app.py | 4 +- src/app/backend/rtmt.py | 137 +++++++++++++++++++++++++++++++++++----- src/app/static/app.js | 9 +-- 4 files changed, 151 insertions(+), 27 deletions(-) diff --git a/src/app/acs/caller.py b/src/app/acs/caller.py index bdaa408..43d3923 100644 --- a/src/app/acs/caller.py +++ b/src/app/acs/caller.py @@ -7,10 +7,15 @@ CallAutomationClient, CallConnectionClient, PhoneNumberIdentifier, + MediaStreamingOptions, + MediaStreamingTransportType, + MediaStreamingContentType, RecognizeInputType, MicrosoftTeamsUserIdentifier, + MediaStreamingAudioChannelType, CallInvite, RecognitionChoice, + AudioFormat, DtmfTone, VoiceKind, FileSource, @@ -23,19 +28,32 @@ class OutboundCall: source_number: str acs_connection_string: str acs_callback_path: str + media_streaming_configuration: MediaStreamingOptions - def __init__(self, source_number:str, acs_connection_string: str, acs_callback_path: str): + def __init__(self, source_number:str, acs_connection_string: str, acs_callback_path: str, acs_media_streaming_websocker_path: str): self.source_number = source_number self.acs_connection_string = acs_connection_string self.acs_callback_path = acs_callback_path + self.media_streaming_configuration = MediaStreamingOptions( + transport_url=acs_media_streaming_websocker_path, + transport_type=MediaStreamingTransportType.WEBSOCKET, + content_type=MediaStreamingContentType.AUDIO, + audio_channel_type=MediaStreamingAudioChannelType.MIXED, + start_media_streaming=True, + enable_bidirectional=True, + audio_format=AudioFormat.PCM24_K_MONO + ) async def call(self, target_number: str): self.call_automation_client = CallAutomationClient.from_connection_string(self.acs_connection_string) self.target_participant = PhoneNumberIdentifier(target_number) self.source_caller = PhoneNumberIdentifier(self.source_number) - call_connection_properties = self.call_automation_client.create_call(self.target_participant, - self.acs_callback_path, - source_caller_id_number=self.source_caller) + self.call_automation_client.create_call( + self.target_participant, + self.acs_callback_path, + media_streaming=self.media_streaming_configuration, + source_caller_id_number=self.source_caller + ) async def _outbound_call_handler(self, request): print("Outbound call handler") @@ -44,7 +62,7 @@ async def _outbound_call_handler(self, request): # Parsing callback events event = CloudEvent.from_dict(event_dict) call_connection_id = event.data['callConnectionId'] - print("%s event received for call connection id: %s", event.type, call_connection_id) + print(f"{event.type} event received for call connection id: {call_connection_id}") call_connection_client = self.call_automation_client.get_call_connection(call_connection_id) # target_participant = PhoneNumberIdentifier(self.target_number) if event.type == "Microsoft.Communication.CallConnected": diff --git a/src/app/app.py b/src/app/app.py index 97eed51..79e4189 100644 --- a/src/app/app.py +++ b/src/app/app.py @@ -55,11 +55,13 @@ async def create_app(): if (os.environ.get("ACS_SOURCE_NUMBER") is not None and os.environ.get("ACS_CONNECTION_STRING") is not None and - os.environ.get("ACS_CALLBACK_PATH") is not None): + os.environ.get("ACS_CALLBACK_PATH") is not None and + os.environ.get("ACS_MEDIA_STREAMING_WEBSOCKET_PATH") is not None): caller = OutboundCall( os.environ.get("ACS_SOURCE_NUMBER"), os.environ.get("ACS_CONNECTION_STRING"), os.environ.get("ACS_CALLBACK_PATH"), + os.environ.get("ACS_MEDIA_STREAMING_WEBSOCKET_PATH") ) caller.attach_to_app(app, "/acs") diff --git a/src/app/backend/rtmt.py b/src/app/backend/rtmt.py index a17fea0..891af0a 100644 --- a/src/app/backend/rtmt.py +++ b/src/app/backend/rtmt.py @@ -3,7 +3,7 @@ import json from enum import Enum from typing import Any, Callable, Optional -from aiohttp import web +from aiohttp import WSMessage, web from azure.identity import DefaultAzureCredential, get_bearer_token_provider from azure.core.credentials import AzureKeyCredential @@ -69,7 +69,87 @@ def __init__(self, endpoint: str, deployment: str, credentials: AzureKeyCredenti self._token_provider = get_bearer_token_provider(credentials, "https://cognitiveservices.azure.com/.default") self._token_provider() # Warm up during startup so we have a token cached when the first request arrives - async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketResponse, server_ws: web.WebSocketResponse) -> Optional[str]: + def _acs_message_to_openai(self, msg_data_json: str) -> Optional[str]: + """ + Transforms websocket message data from Azure Communication Services (ACS) to the OpenAI Realtime API format. + Args: + msg_data_json (str): The JSON string containing the ACS message data. + Returns: + Optional[str]: The transformed message in the OpenAI Realtime API format, or None if the message kind is not recognized. + This is needed to plug the Azure Communication Services audio stream into the OpenAI Realtime API. + Both APIs have different message formats, so this function acts as a bridge between them. + This method decides, if the given message is relevant for the OpenAI Realtime API, and if so, it is transformed to the OpenAI Realtime API format. + """ + message = json.loads(msg_data_json) + updated_message = msg_data_json + + # Initial message from Azure Communication Services. + # Set the initial configuration for the OpenAI Realtime API by sending a session.update message. + if message["kind"] == "AudioMetadata": + oai_message = { + "type": "session.update", + "session": { + "tool_choice": "auto" if len(self.tools) > 0 else "none", + "tools": [tool.schema for tool in self.tools.values()], + "turn_detection": { + "type": 'server_vad', + "threshold": 0.7, # Adjust if necessary + "prefix_padding_ms": 300, # Adjust if necessary + "silence_duration_ms": 500 # Adjust if necessary + }, + } + } + + if self.system_message is not None: + oai_message["session"]["instructions"] = self.system_message + if self.temperature is not None: + oai_message["session"]["temperature"] = self.temperature + if self.max_tokens is not None: + oai_message["session"]["max_response_output_tokens"] = self.max_tokens + if self.disable_audio is not None: + oai_message["session"]["disable_audio"] = self.disable_audio + + updated_message = json.dumps(oai_message) + + # Message from Azure Communication Services with audio data. + # Transform the message to the OpenAI Realtime API format. + elif message["kind"] == "AudioData": + oai_message = { + "type": "input_audio_buffer.append", + "audio": message["audioData"]["data"] + } + updated_message = json.dumps(oai_message) + + return updated_message + + def _openai_message_to_acs(self, msg_data_json: str) -> Optional[str]: + """ + Transforms websocket message data from the OpenAI Realtime API format into the Azure Communication Services (ACS) format. + Args: + msg_data_json (str): The JSON string containing the message data from the OpenAI Realtime API. + Returns: + Optional[str]: A JSON string containing the transformed message in ACS format, or None if the message type is not handled. + This is needed to plug the OpenAI Realtime API audio stream into Azure Communication Services. + Both APIs have different message formats, so this function acts as a bridge between them. + This method decides, if the given message is relevant for the ACS, and if so, it is transformed to the ACS format. + """ + message = json.loads(msg_data_json) + updated_message = None + + # Message from the OpenAI Realtime API with audio data. + # Transform the message to the Azure Communication Services format. + if message["type"] == "response.audio.delta": + acs_message = { + "kind": "AudioData", + "audioData": { + "data": message["delta"] + } + } + updated_message = json.dumps(acs_message) + + return updated_message + + async def _process_message_to_client(self, msg: WSMessage, client_ws: web.WebSocketResponse, server_ws: web.WebSocketResponse, is_acs_audio_stream: bool) -> Optional[str]: message = json.loads(msg.data) updated_message = msg.data if message is not None: @@ -119,14 +199,16 @@ async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketRes } }) if result.destination == ToolResultDirection.TO_CLIENT: - # TODO: this will break clients that don't know about this extra message, rewrite - # this to be a regular text message with a special marker of some sort - await client_ws.send_json({ - "type": "extension.middle_tier_tool_response", - "previous_item_id": tool_call.previous_id, - "tool_name": item["name"], - "tool_result": result.to_text() - }) + # Only send extra messages to clients that are not ACS audio streams + if is_acs_audio_stream == False: + # TODO: this will break clients that don't know about this extra message, rewrite + # this to be a regular text message with a special marker of some sort + await client_ws.send_json({ + "type": "extension.middle_tier_tool_response", + "previous_item_id": tool_call.previous_id, + "tool_name": item["name"], + "tool_result": result.to_text() + }) updated_message = None case "response.done": @@ -137,18 +219,32 @@ async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketRes }) if "response" in message: replace = False - for i, output in enumerate(reversed(message["response"]["output"])): + outputs = message["response"]["output"] + for output in reversed(outputs): if output["type"] == "function_call": - message["response"]["output"].pop(i) + outputs.remove(output) replace = True if replace: updated_message = json.dumps(message) + # Transform the message to the Azure Communication Services format, + # if it comes from the OpenAI realtime stream. + if is_acs_audio_stream and updated_message is not None: + updated_message = self._openai_message_to_acs(updated_message) + return updated_message - async def _process_message_to_server(self, msg: str, ws: web.WebSocketResponse) -> Optional[str]: + async def _process_message_to_server(self, msg: WSMessage, ws: web.WebSocketResponse, is_acs_audio_stream) -> Optional[str]: message = json.loads(msg.data) updated_message = msg.data + + # Transform the message to the OpenAI Realtime API format first, + # if it comes from the Azure Communication Services audio stream. + if (is_acs_audio_stream): + data = self._acs_message_to_openai(msg.data) + message = json.loads(data) + updated_message = data + if message is not None: match message["type"]: case "session.update": @@ -168,7 +264,7 @@ async def _process_message_to_server(self, msg: str, ws: web.WebSocketResponse) return updated_message - async def _forward_messages(self, ws: web.WebSocketResponse): + async def _forward_messages(self, ws: web.WebSocketResponse, is_acs_audio_stream: bool): async with aiohttp.ClientSession(base_url=self.endpoint) as session: params = { "api-version": "2024-10-01-preview", "deployment": self.deployment } headers = {} @@ -182,7 +278,7 @@ async def _forward_messages(self, ws: web.WebSocketResponse): async def from_client_to_server(): async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: - new_msg = await self._process_message_to_server(msg, ws) + new_msg = await self._process_message_to_server(msg, ws, is_acs_audio_stream) if new_msg is not None: await target_ws.send_str(new_msg) else: @@ -191,7 +287,7 @@ async def from_client_to_server(): async def from_server_to_client(): async for msg in target_ws: if msg.type == aiohttp.WSMsgType.TEXT: - new_msg = await self._process_message_to_client(msg, ws, target_ws) + new_msg = await self._process_message_to_client(msg, ws, target_ws, is_acs_audio_stream) if new_msg is not None: await ws.send_str(new_msg) else: @@ -206,8 +302,15 @@ async def from_server_to_client(): async def _websocket_handler(self, request: web.Request): ws = web.WebSocketResponse() await ws.prepare(request) - await self._forward_messages(ws) + await self._forward_messages(ws, False) + return ws + + async def _websocket_handler_acs(self, request: web.Request): + ws = web.WebSocketResponse() + await ws.prepare(request) + await self._forward_messages(ws, True) return ws def attach_to_app(self, app, path): app.router.add_get(path, self._websocket_handler) + app.router.add_get(path + "-acs", self._websocket_handler_acs) diff --git a/src/app/static/app.js b/src/app/static/app.js index f6398fd..3472153 100644 --- a/src/app/static/app.js +++ b/src/app/static/app.js @@ -139,13 +139,14 @@ function onToggleListening() { function onCallButton() { const phonenumber = document.getElementById('phonenumber').value; - + const callDetails = { + number: phonenumber + }; + theUrl = window.location.href + "call"; fetch(theUrl, { method : "POST", - body : JSON.stringify({ - number: phonenumber - }) + body : JSON.stringify(callDetails) }) .then(response => reportDiv.textContent = response.json()) .catch(error => console.error('Error:', error)); From a6efe144b927d6b0618fa733866ccecad074e729 Mon Sep 17 00:00:00 2001 From: Robin-Manuel Thiel Date: Fri, 6 Dec 2024 17:58:53 +0100 Subject: [PATCH 4/5] fix: Introduce dedicated realtime path for acs --- src/app/acs/caller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app/acs/caller.py b/src/app/acs/caller.py index 282f8fb..5f20a5b 100644 --- a/src/app/acs/caller.py +++ b/src/app/acs/caller.py @@ -48,7 +48,7 @@ async def call(self, target_number: str): self.target_participant = PhoneNumberIdentifier(target_number) self.source_caller = PhoneNumberIdentifier(self.source_number) - websocket_url = 'wss://' + self.acs_callback_path + '/realtime' + websocket_url = 'wss://' + self.acs_callback_path + '/realtime-acs' media_streaming_options = MediaStreamingOptions( transport_url=websocket_url, From 347eaf5acef2115bc19ea77acc2dfd5ff1604958 Mon Sep 17 00:00:00 2001 From: Robin-Manuel Thiel Date: Fri, 6 Dec 2024 18:05:13 +0100 Subject: [PATCH 5/5] chore: remove import duplicates --- src/app/acs/caller.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/app/acs/caller.py b/src/app/acs/caller.py index 5f20a5b..7fe9c12 100644 --- a/src/app/acs/caller.py +++ b/src/app/acs/caller.py @@ -20,13 +20,8 @@ DtmfTone, VoiceKind, FileSource, - TextSource, - MediaStreamingOptions, - AudioFormat, - MediaStreamingTransportType, - MediaStreamingContentType, - MediaStreamingAudioChannelType, - PhoneNumberIdentifier) + TextSource +) from azure.communication.callautomation.aio import CallAutomationClient from azure.communication.phonenumbers import PhoneNumbersClient,PhoneNumberCapabilityType, PhoneNumberAssignmentType, PhoneNumberType, PhoneNumberCapabilities import json