Skip to content

Commit

Permalink
mypy fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cjunkin committed Jan 17, 2025
1 parent a09fac4 commit 1af0228
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 125 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,104 +1,137 @@
import json
from typing import Any, Callable, Dict, Optional, Union

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace import Link, Span, SpanContext, Status, StatusCode

from autogen import ConversableAgent
from autogen import ConversableAgent # type: ignore


class AutogenInstrumentor:
def __init__(self):
def __init__(self) -> None:
self.tracer = trace.get_tracer(__name__)
self._original_generate = None
self._original_initiate_chat = None
self._original_execute_function = None
self._original_generate: Optional[Callable[..., Any]] = None
self._original_initiate_chat: Optional[Callable[..., Any]] = None
self._original_execute_function: Optional[Callable[..., Any]] = None

def _safe_json_dumps(self, obj):
def _safe_json_dumps(self, obj: Any) -> str:
try:
return json.dumps(obj)
except (TypeError, ValueError):
return json.dumps(str(obj))

def instrument(self):
def instrument(self) -> "AutogenInstrumentor":
# Save original methods
self._original_generate = ConversableAgent.generate_reply
self._original_initiate_chat = ConversableAgent.initiate_chat
self._original_execute_function = ConversableAgent.execute_function

instrumentor = self

def wrapped_generate(self, messages=None, sender=None, **kwargs):
def wrapped_generate(
agent_self: ConversableAgent,
messages: Optional[Any] = None,
sender: Optional[str] = None,
**kwargs: Any,
) -> Any:
span: Optional[Span] = None
try:
current_context = trace.get_current_span().get_span_context()
current_span = trace.get_current_span()
current_context: SpanContext = current_span.get_span_context()

with instrumentor.tracer.start_as_current_span(
self.__class__.__name__,
context=trace.set_span_in_context(trace.get_current_span()),
links=[trace.Link(current_context)],
agent_self.__class__.__name__,
context=trace.set_span_in_context(current_span),
links=[Link(current_context)],
) as span:
span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, "AGENT")
span.set_attribute(
SpanAttributes.INPUT_VALUE, instrumentor._safe_json_dumps(messages)
SpanAttributes.INPUT_VALUE,
instrumentor._safe_json_dumps(messages),
)
span.set_attribute(SpanAttributes.INPUT_MIME_TYPE, "application/json")
span.set_attribute("agent.type", self.__class__.__name__)
span.set_attribute("agent.type", agent_self.__class__.__name__)

response = instrumentor._original_generate(
self, messages=messages, sender=sender, **kwargs
)
if instrumentor._original_generate is not None:
response = instrumentor._original_generate(
agent_self, messages=messages, sender=sender, **kwargs
)
else:
# Fallback or raise an error if needed
response = None

span.set_attribute(
SpanAttributes.OUTPUT_VALUE, instrumentor._safe_json_dumps(response)
SpanAttributes.OUTPUT_VALUE,
instrumentor._safe_json_dumps(response),
)
span.set_attribute(SpanAttributes.OUTPUT_MIME_TYPE, "application/json")

return response

except Exception as e:
if "span" in locals():
if span is not None:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(e)
raise

def wrapped_initiate_chat(self, recipient, *args, **kwargs):
def wrapped_initiate_chat(
agent_self: ConversableAgent, recipient: Any, *args: Any, **kwargs: Any
) -> Any:
span: Optional[Span] = None
try:
message = kwargs.get("message", args[0] if args else None)
current_context = trace.get_current_span().get_span_context()
current_span = trace.get_current_span()
current_context: SpanContext = current_span.get_span_context()

with instrumentor.tracer.start_as_current_span(
"Autogen",
context=trace.set_span_in_context(trace.get_current_span()),
links=[trace.Link(current_context)],
context=trace.set_span_in_context(current_span),
links=[Link(current_context)],
) as span:
span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, "AGENT")
span.set_attribute(
SpanAttributes.INPUT_VALUE, instrumentor._safe_json_dumps(message)
SpanAttributes.INPUT_VALUE,
instrumentor._safe_json_dumps(message),
)
span.set_attribute(SpanAttributes.INPUT_MIME_TYPE, "application/json")

result = instrumentor._original_initiate_chat(self, recipient, *args, **kwargs)
if instrumentor._original_initiate_chat is not None:
result = instrumentor._original_initiate_chat(
agent_self, recipient, *args, **kwargs
)
else:
result = None

if hasattr(result, "chat_history") and result.chat_history:
last_message = result.chat_history[-1]["content"]
span.set_attribute(
SpanAttributes.OUTPUT_VALUE, instrumentor._safe_json_dumps(last_message)
SpanAttributes.OUTPUT_VALUE,
instrumentor._safe_json_dumps(last_message),
)
else:
span.set_attribute(
SpanAttributes.OUTPUT_VALUE, instrumentor._safe_json_dumps(result)
SpanAttributes.OUTPUT_VALUE,
instrumentor._safe_json_dumps(result),
)

span.set_attribute(SpanAttributes.OUTPUT_MIME_TYPE, "application/json")

return result

except Exception as e:
if "span" in locals():
if span is not None:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(e)
raise

def wrapped_execute_function(self, func_call, call_id=None, verbose=False):
def wrapped_execute_function(
agent_self: ConversableAgent,
func_call: Union[str, Dict[str, Any]],
call_id: Optional[str] = None,
verbose: bool = False,
) -> Any:
span: Optional[Span] = None
try:
current_context = trace.get_current_span().get_span_context()
current_span = trace.get_current_span()
current_context: SpanContext = current_span.get_span_context()

# Handle both dictionary and string inputs
if isinstance(func_call, str):
Expand All @@ -109,26 +142,29 @@ def wrapped_execute_function(self, func_call, call_id=None, verbose=False):

with instrumentor.tracer.start_as_current_span(
f"{function_name}",
context=trace.set_span_in_context(trace.get_current_span()),
links=[trace.Link(current_context)],
context=trace.set_span_in_context(current_span),
links=[Link(current_context)],
) as span:
span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, "TOOL")
span.set_attribute(SpanAttributes.TOOL_NAME, function_name)

# Record input
span.set_attribute(
SpanAttributes.INPUT_VALUE, instrumentor._safe_json_dumps(func_call)
SpanAttributes.INPUT_VALUE,
instrumentor._safe_json_dumps(func_call),
)
span.set_attribute(SpanAttributes.INPUT_MIME_TYPE, "application/json")

# Record tool-specific attributes
if hasattr(self, "_function_map") and function_name in self._function_map:
func = self._function_map[function_name]
if hasattr(func, "__annotations__"):
span.set_attribute(
SpanAttributes.TOOL_PARAMETERS,
instrumentor._safe_json_dumps(func.__annotations__),
)
# If the agent stores a function map, you can store annotations
if hasattr(agent_self, "_function_map"):
function_map = getattr(agent_self, "_function_map", {})
if function_name in function_map:
func = function_map[function_name]
if hasattr(func, "__annotations__"):
span.set_attribute(
SpanAttributes.TOOL_PARAMETERS,
instrumentor._safe_json_dumps(func.__annotations__),
)

# Record function call details
if isinstance(func_call, dict):
Expand All @@ -143,34 +179,42 @@ def wrapped_execute_function(self, func_call, call_id=None, verbose=False):
span.set_attribute(SpanAttributes.TOOL_CALL_FUNCTION_NAME, function_name)

# Execute function
result = instrumentor._original_execute_function(
self, func_call, call_id=call_id, verbose=verbose
)
if instrumentor._original_execute_function is not None:
result = instrumentor._original_execute_function(
agent_self, func_call, call_id=call_id, verbose=verbose
)
else:
result = None

# Record output
span.set_attribute(
SpanAttributes.OUTPUT_VALUE, instrumentor._safe_json_dumps(result)
SpanAttributes.OUTPUT_VALUE,
instrumentor._safe_json_dumps(result),
)
span.set_attribute(SpanAttributes.OUTPUT_MIME_TYPE, "application/json")

return result

except Exception as e:
if "span" in locals():
if span is not None:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(e)
raise

# Replace methods
# Replace methods on ConversableAgent with wrapped versions
ConversableAgent.generate_reply = wrapped_generate
ConversableAgent.initiate_chat = wrapped_initiate_chat
ConversableAgent.execute_function = wrapped_execute_function

return self

def uninstrument(self):
"""Restore original behavior"""
if self._original_generate and self._original_initiate_chat:
def uninstrument(self) -> "AutogenInstrumentor":
"""Restore original behavior."""
if (
self._original_generate
and self._original_initiate_chat
and self._original_execute_function
):
ConversableAgent.generate_reply = self._original_generate
ConversableAgent.initiate_chat = self._original_initiate_chat
ConversableAgent.execute_function = self._original_execute_function
Expand All @@ -181,14 +225,14 @@ def uninstrument(self):


class SpanAttributes:
OPENINFERENCE_SPAN_KIND = "openinference.span.kind"
INPUT_VALUE = "input.value"
INPUT_MIME_TYPE = "input.mime_type"
OUTPUT_VALUE = "output.value"
OUTPUT_MIME_TYPE = "output.mime_type"
TOOL_NAME = "tool.name"
TOOL_ARGS = "tool.args"
TOOL_KWARGS = "tool.kwargs"
TOOL_PARAMETERS = "tool.parameters"
TOOL_CALL_FUNCTION_ARGUMENTS = "tool_call.function.arguments"
TOOL_CALL_FUNCTION_NAME = "tool_call.function.name"
OPENINFERENCE_SPAN_KIND: str = "openinference.span.kind"
INPUT_VALUE: str = "input.value"
INPUT_MIME_TYPE: str = "input.mime_type"
OUTPUT_VALUE: str = "output.value"
OUTPUT_MIME_TYPE: str = "output.mime_type"
TOOL_NAME: str = "tool.name"
TOOL_ARGS: str = "tool.args"
TOOL_KWARGS: str = "tool.kwargs"
TOOL_PARAMETERS: str = "tool.parameters"
TOOL_CALL_FUNCTION_ARGUMENTS: str = "tool_call.function.arguments"
TOOL_CALL_FUNCTION_NAME: str = "tool_call.function.name"

0 comments on commit 1af0228

Please sign in to comment.