Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Phone Call Support via ACS #8

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions src/app/acs/caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,45 @@
from azure.core.messaging import CloudEvent
from typing import List, Optional, Union, TYPE_CHECKING
from azure.communication.callautomation import (
PhoneNumberIdentifier)
from azure.communication.callautomation.aio import CallAutomationClient
from azure.communication.callautomation import (
CallAutomationClient,
CallConnectionClient,
PhoneNumberIdentifier,
MediaStreamingOptions,
AudioFormat,
MediaStreamingTransportType,
MediaStreamingContentType,
RecognizeInputType,
MicrosoftTeamsUserIdentifier,
MediaStreamingAudioChannelType,
)
CallInvite,
RecognitionChoice,
AudioFormat,
DtmfTone,
VoiceKind,
FileSource,
TextSource
)
from azure.communication.callautomation.aio import CallAutomationClient
from azure.communication.phonenumbers import PhoneNumbersClient,PhoneNumberCapabilityType, PhoneNumberAssignmentType, PhoneNumberType, PhoneNumberCapabilities
import json
from aiohttp import web
import requests

class OutboundCall:
target_number: str
source_number: str
acs_connection_string: str
acs_callback_path: str

def __init__(self, acs_connection_string: str, acs_source_number: str, acs_callback_path: str):
def __init__(self, source_number:str, acs_connection_string: str, acs_callback_path: str):
self.source_number = source_number
self.acs_connection_string = acs_connection_string
self.source_number = acs_source_number
self.acs_callback_path = acs_callback_path

self.acs_callback_path = acs_callback_path

async def call(self, target_number: str):
self.call_automation_client = CallAutomationClient.from_connection_string(self.acs_connection_string)
self.target_participant = PhoneNumberIdentifier(target_number)
self.source_caller = PhoneNumberIdentifier(self.source_number)

websocket_url = 'wss://' + self.acs_callback_path + '/realtime'
websocket_url = 'wss://' + self.acs_callback_path + '/realtime-acs'

media_streaming_options = MediaStreamingOptions(
transport_url=websocket_url,
Expand Down Expand Up @@ -64,7 +73,7 @@ async def _outbound_call_handler(self, request):
# Parsing callback events
event = CloudEvent.from_dict(event_dict)
call_connection_id = event.data['callConnectionId']
print("%s event received for call connection id: %s", event.type, call_connection_id)
print(f"{event.type} event received for call connection id: {call_connection_id}")
call_connection_client = self.call_automation_client.get_call_connection(call_connection_id)
# target_participant = PhoneNumberIdentifier(self.target_number)
if event.type == "Microsoft.Communication.CallConnected":
Expand All @@ -81,6 +90,5 @@ async def _outbound_call_handler(self, request):
async def _get_source_number(self):
return self.source_number


def attach_to_app(self, app, path):
app.router.add_post(path, self._outbound_call_handler)
2 changes: 2 additions & 0 deletions src/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ async def index(request):
return web.FileResponse(static_directory / 'index.html')

async def call(request):
body = await request.json()

if (caller is not None):
body = await request.json()
print(body)
Expand Down
137 changes: 120 additions & 17 deletions src/app/backend/rtmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
from enum import Enum
from typing import Any, Callable, Optional
from aiohttp import web
from aiohttp import WSMessage, web
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from azure.core.credentials import AzureKeyCredential

Expand Down Expand Up @@ -69,7 +69,87 @@ def __init__(self, endpoint: str, deployment: str, credentials: AzureKeyCredenti
self._token_provider = get_bearer_token_provider(credentials, "https://cognitiveservices.azure.com/.default")
self._token_provider() # Warm up during startup so we have a token cached when the first request arrives

async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketResponse, server_ws: web.WebSocketResponse) -> Optional[str]:
def _acs_message_to_openai(self, msg_data_json: str) -> Optional[str]:
"""
Transforms websocket message data from Azure Communication Services (ACS) to the OpenAI Realtime API format.
Args:
msg_data_json (str): The JSON string containing the ACS message data.
Returns:
Optional[str]: The transformed message in the OpenAI Realtime API format, or None if the message kind is not recognized.
This is needed to plug the Azure Communication Services audio stream into the OpenAI Realtime API.
Both APIs have different message formats, so this function acts as a bridge between them.
This method decides, if the given message is relevant for the OpenAI Realtime API, and if so, it is transformed to the OpenAI Realtime API format.
"""
message = json.loads(msg_data_json)
updated_message = msg_data_json

# Initial message from Azure Communication Services.
# Set the initial configuration for the OpenAI Realtime API by sending a session.update message.
if message["kind"] == "AudioMetadata":
oai_message = {
"type": "session.update",
"session": {
"tool_choice": "auto" if len(self.tools) > 0 else "none",
"tools": [tool.schema for tool in self.tools.values()],
"turn_detection": {
"type": 'server_vad',
"threshold": 0.7, # Adjust if necessary
"prefix_padding_ms": 300, # Adjust if necessary
"silence_duration_ms": 500 # Adjust if necessary
},
}
}

if self.system_message is not None:
oai_message["session"]["instructions"] = self.system_message
if self.temperature is not None:
oai_message["session"]["temperature"] = self.temperature
if self.max_tokens is not None:
oai_message["session"]["max_response_output_tokens"] = self.max_tokens
if self.disable_audio is not None:
oai_message["session"]["disable_audio"] = self.disable_audio

updated_message = json.dumps(oai_message)

# Message from Azure Communication Services with audio data.
# Transform the message to the OpenAI Realtime API format.
elif message["kind"] == "AudioData":
oai_message = {
"type": "input_audio_buffer.append",
"audio": message["audioData"]["data"]
}
updated_message = json.dumps(oai_message)

return updated_message

def _openai_message_to_acs(self, msg_data_json: str) -> Optional[str]:
"""
Transforms websocket message data from the OpenAI Realtime API format into the Azure Communication Services (ACS) format.
Args:
msg_data_json (str): The JSON string containing the message data from the OpenAI Realtime API.
Returns:
Optional[str]: A JSON string containing the transformed message in ACS format, or None if the message type is not handled.
This is needed to plug the OpenAI Realtime API audio stream into Azure Communication Services.
Both APIs have different message formats, so this function acts as a bridge between them.
This method decides, if the given message is relevant for the ACS, and if so, it is transformed to the ACS format.
"""
message = json.loads(msg_data_json)
updated_message = None

# Message from the OpenAI Realtime API with audio data.
# Transform the message to the Azure Communication Services format.
if message["type"] == "response.audio.delta":
acs_message = {
"kind": "AudioData",
"audioData": {
"data": message["delta"]
}
}
updated_message = json.dumps(acs_message)

return updated_message

async def _process_message_to_client(self, msg: WSMessage, client_ws: web.WebSocketResponse, server_ws: web.WebSocketResponse, is_acs_audio_stream: bool) -> Optional[str]:
message = json.loads(msg.data)
updated_message = msg.data
if message is not None:
Expand Down Expand Up @@ -119,14 +199,16 @@ async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketRes
}
})
if result.destination == ToolResultDirection.TO_CLIENT:
# TODO: this will break clients that don't know about this extra message, rewrite
# this to be a regular text message with a special marker of some sort
await client_ws.send_json({
"type": "extension.middle_tier_tool_response",
"previous_item_id": tool_call.previous_id,
"tool_name": item["name"],
"tool_result": result.to_text()
})
# Only send extra messages to clients that are not ACS audio streams
if is_acs_audio_stream == False:
# TODO: this will break clients that don't know about this extra message, rewrite
# this to be a regular text message with a special marker of some sort
await client_ws.send_json({
"type": "extension.middle_tier_tool_response",
"previous_item_id": tool_call.previous_id,
"tool_name": item["name"],
"tool_result": result.to_text()
})
updated_message = None

case "response.done":
Expand All @@ -137,18 +219,32 @@ async def _process_message_to_client(self, msg: str, client_ws: web.WebSocketRes
})
if "response" in message:
replace = False
for i, output in enumerate(reversed(message["response"]["output"])):
outputs = message["response"]["output"]
for output in reversed(outputs):
if output["type"] == "function_call":
message["response"]["output"].pop(i)
outputs.remove(output)
replace = True
if replace:
updated_message = json.dumps(message)

# Transform the message to the Azure Communication Services format,
# if it comes from the OpenAI realtime stream.
if is_acs_audio_stream and updated_message is not None:
updated_message = self._openai_message_to_acs(updated_message)

return updated_message

async def _process_message_to_server(self, msg: str, ws: web.WebSocketResponse) -> Optional[str]:
async def _process_message_to_server(self, msg: WSMessage, ws: web.WebSocketResponse, is_acs_audio_stream) -> Optional[str]:
message = json.loads(msg.data)
updated_message = msg.data

# Transform the message to the OpenAI Realtime API format first,
# if it comes from the Azure Communication Services audio stream.
if (is_acs_audio_stream):
data = self._acs_message_to_openai(msg.data)
message = json.loads(data)
updated_message = data

if message is not None:
match message["type"]:
case "session.update":
Expand All @@ -168,7 +264,7 @@ async def _process_message_to_server(self, msg: str, ws: web.WebSocketResponse)

return updated_message

async def _forward_messages(self, ws: web.WebSocketResponse):
async def _forward_messages(self, ws: web.WebSocketResponse, is_acs_audio_stream: bool):
async with aiohttp.ClientSession(base_url=self.endpoint) as session:
params = { "api-version": "2024-10-01-preview", "deployment": self.deployment }
headers = {}
Expand All @@ -182,7 +278,7 @@ async def _forward_messages(self, ws: web.WebSocketResponse):
async def from_client_to_server():
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
new_msg = await self._process_message_to_server(msg, ws)
new_msg = await self._process_message_to_server(msg, ws, is_acs_audio_stream)
if new_msg is not None:
await target_ws.send_str(new_msg)
else:
Expand All @@ -191,7 +287,7 @@ async def from_client_to_server():
async def from_server_to_client():
async for msg in target_ws:
if msg.type == aiohttp.WSMsgType.TEXT:
new_msg = await self._process_message_to_client(msg, ws, target_ws)
new_msg = await self._process_message_to_client(msg, ws, target_ws, is_acs_audio_stream)
if new_msg is not None:
await ws.send_str(new_msg)
else:
Expand All @@ -206,8 +302,15 @@ async def from_server_to_client():
async def _websocket_handler(self, request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
await self._forward_messages(ws)
await self._forward_messages(ws, False)
return ws

async def _websocket_handler_acs(self, request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
await self._forward_messages(ws, True)
return ws

def attach_to_app(self, app, path):
app.router.add_get(path, self._websocket_handler)
app.router.add_get(path + "-acs", self._websocket_handler_acs)
3 changes: 1 addition & 2 deletions src/app/static/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ function onToggleListening() {

function onCallButton() {
const phonenumber = document.getElementById('phonenumber').value;

const callDetails = {
target_number: phonenumber
};
Expand Down Expand Up @@ -312,4 +311,4 @@ window.onload = function() {
}
)
.catch(error => console.error('Error:', error));
};
};