Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to stream responses from Assistants API? The quickstart example doesn't seem to be working #1792

Open
1 task done
Defozo opened this issue Oct 12, 2024 · 9 comments
Open
1 task done
Labels
bug Something isn't working

Comments

@Defozo
Copy link

Defozo commented Oct 12, 2024

Confirm this is an issue with the Python library and not an underlying OpenAI API

  • This is an issue with the Python library

Describe the bug

I get ERROR:api.routes:Error in event generator: 'StreamingEventHandler' object has no attribute '_AssistantEventHandler__stream' when I use the example code to enable streaming from Assistants API. Additionaly, The method "create_and_stream" in class "AsyncRuns" is deprecated.

To Reproduce

Use example code from https://platform.openai.com/docs/assistants/quickstart

from typing_extensions import override
from openai import AssistantEventHandler
 
# First, we create a EventHandler class to define
# how we want to handle the events in the response stream.
 
class EventHandler(AssistantEventHandler):    
  @override
  def on_text_created(self, text) -> None:
    print(f"\nassistant > ", end="", flush=True)
      
  @override
  def on_text_delta(self, delta, snapshot):
    print(delta.value, end="", flush=True)
      
  def on_tool_call_created(self, tool_call):
    print(f"\nassistant > {tool_call.type}\n", flush=True)
  
  def on_tool_call_delta(self, delta, snapshot):
    if delta.type == 'code_interpreter':
      if delta.code_interpreter.input:
        print(delta.code_interpreter.input, end="", flush=True)
      if delta.code_interpreter.outputs:
        print(f"\n\noutput >", flush=True)
        for output in delta.code_interpreter.outputs:
          if output.type == "logs":
            print(f"\n{output.logs}", flush=True)
 
# Then, we use the `stream` SDK helper 
# with the `EventHandler` class to create the Run 
# and stream the response.
 
with client.beta.threads.runs.stream(
  thread_id=thread.id,
  assistant_id=assistant.id,
  instructions="Please address the user as Jane Doe. The user has a premium account.",
  event_handler=EventHandler(),
) as stream:
  stream.until_done()

Code snippets

No response

OS

Docker container: FROM python:3.9

Python version

Python v3.9

Library version

v1.51.2

@Defozo Defozo added the bug Something isn't working label Oct 12, 2024
@TheVic489
Copy link

Same issue

@shawnxstatis
Copy link

same issue, is OpenAI letting chatGPT write their API documentation and hallucinating? wth lol

@hendersonick
Copy link

did anyone find a solution to this. I am also getting an "AttributeError: 'EventHandler' object has no attribute '_AssistantEventHandler__stream'" error...

@RobertCraigie
Copy link
Collaborator

I cannot reproduce this, does anyone have a complete example snippet? Running this snippet works for me.

@hendersonick
Copy link

hendersonick commented Jan 16, 2025

@RobertCraigie Here is the code I am trying to run. Hoping to use a websocket to deliver the stream to my front end:

Back-end:

class MessageRequest(BaseModel):
    message: str
    num_msg: int


class EventHandler(AssistantEventHandler):
    def __init__(self, websocket: WebSocket):
        self.websocket = websocket

    @override
    async def on_text_created(self, text) -> None:
        await self.websocket.send_text("\nassistant > ")

    @override
    async def on_text_delta(self, delta, snapshot):
        await self.websocket.send_text(delta.value)

    async def on_tool_call_created(self, tool_call):
        await self.websocket.send_text(f"\nassistant > {tool_call.type}\n")

    async def on_tool_call_delta(self, delta, snapshot):
        if delta.type == 'code_interpreter':
            if delta.code_interpreter.input:
                await self.websocket.send_text(delta.code_interpreter.input)
            if delta.code_interpreter.outputs:
                await self.websocket.send_text("\n\noutput >")
                for output in delta.code_interpreter.outputs:
                    if output.type == "logs":
                        await self.websocket.send_text(f"\n{output.logs}")

@app.get("/")
def read_root():
    return {"message": "Hello, FastAPI!"}


@app.websocket("/chat")
async def chat_with_assistant(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            data = await websocket.receive_json()
            user_input = data['message']
            num_msg = data['num_msg']

            global t_id
            
            if num_msg == 0:
                thread = client.beta.threads.create()
                t_id = thread.id

            message = client.beta.threads.messages.create(
                thread_id=t_id,
                role="user",
                content= user_input
            )

            event_handler = EventHandler(websocket)


            with client.beta.threads.runs.stream(
                thread_id=t_id,
                assistant_id=assistant_id,
                instructions="Please address the user as Jane Doe. The user has a premium account.",
                event_handler=event_handler,
            ) as stream:
                await stream.until_done()

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Front-end:

interface Message {
  text: string;
  isUser: boolean;
}

export function Chat() {
  const [messages, setMessages] = useState<Message[]>([])
  const [input, setInput] = useState('')
  const [numMessages, setNumMessages] = useState(0)
  const [isConnected, setIsConnected] = useState(false)
  const textareaRef = useRef<HTMLTextAreaElement>(null)
  const scrollAreaRef = useRef<HTMLDivElement>(null)
  const socketRef = useRef<WebSocket | null>(null)

  useEffect(() => {
    socketRef.current = new WebSocket('ws://localhost:8000/chat')


    socketRef.current.onopen = () => {
      console.log('WebSocket connection established')
      setIsConnected(true)

    }

    socketRef.current.onmessage = (event) => {
      const newMessage = { text: event.data, isUser: false }
      setMessages((prev) => [...prev, newMessage])
    }

    socketRef.current.onclose = () => {
      console.log('WebSocket connection closed')
      setIsConnected(false)
    }

    socketRef.current.onerror = (error) => {
      console.error('WebSocket error:', error)
      }

    return () => {
      if (socketRef.current) {
        // socketRef.current.close()
      }
    }
  }, [])

  const handleSend = () => {

    if (input.trim() && isConnected && socketRef.current) {

      const userMessage = { text: input, isUser: true }
      setMessages((prev) => [...prev, userMessage])

      
      socketRef.current.send(JSON.stringify({
        message: input,
        num_msg: numMessages
      }))

      setInput('')
      setNumMessages((prev) => prev + 1)

    }
  }

lmk if you have any other questions or see any obvious flaws in my implementation.

@Defozo
Copy link
Author

Defozo commented Jan 16, 2025

I’m currently using this code as a workaround. Please note that it requires modification because it includes undefined functions as it was copied directly from my project.

import openai
from openai import AssistantEventHandler

try:
    client = openai.OpenAI(api_key=api_key)
    logger.info("OpenAI client initialized successfully")
except Exception as e:
    logger.error(f"Failed to initialize OpenAI client: {str(e)}")

# Create a custom event handler for streaming
class StreamHandler(AssistantEventHandler):
    def __init__(self, message_placeholder):
        super().__init__()
        self.message_placeholder = message_placeholder
        self.full_response = ""
        
    @override
    def on_text_created(self, text) -> None:
        logger.info("Starting new response")
        self.full_response = ""
        
    @override
    def on_text_delta(self, delta, snapshot) -> None:
        self.full_response += delta.value
        self.message_placeholder.markdown(self.full_response)
        logger.info(f"Response delta: {delta.value}")
        
    def get_full_response(self) -> str:
        return self.full_response

def process_message_stream(thread_id: str, message: str, message_placeholder, assistant_id: str) -> tuple[str, list]:
    """Process a message and stream the assistant's response. Returns (response, annotations)."""
    try:
        # Add message to thread
        client.beta.threads.messages.create(
            thread_id=thread_id,
            role="user",
            content=message
        )
        logger.info(f"Message added to thread {thread_id}")

        # Show thinking indicator
        message_placeholder.markdown("*Thinking...*")

        # Create run with retries
        max_retries = 3
        retry_delay = 5
        run = None
        
        for attempt in range(max_retries):
            try:
                run = client.beta.threads.runs.create(
                    thread_id=thread_id,
                    assistant_id=assistant_id,
                    stream=True
                )
                logger.info(f"Created streaming run with assistant {assistant_id}")
                break
            except openai.RateLimitError as e:
                if attempt == max_retries - 1:
                    error_msg = "Rate limit exceeded. Please try again in a few minutes."
                    logger.error(f"Rate limit error after {max_retries} attempts: {str(e)}")
                    message_placeholder.error(error_msg)
                    return error_msg, []
                logger.warning(f"Rate limit hit, attempt {attempt + 1}/{max_retries}. Waiting {retry_delay} seconds...")
                message_placeholder.markdown(f"*Rate limit reached. Retrying in {retry_delay} seconds... ({attempt + 1}/{max_retries})*")
                time.sleep(retry_delay)
                retry_delay *= 2  # Exponential backoff
            except Exception as e:
                logger.error(f"Error creating run: {str(e)}")
                message_placeholder.error("Failed to create run. Please try again.")
                return "Error creating run", []
        
        if run is None:
            error_msg = "Failed to create run after retries"
            logger.error(error_msg)
            message_placeholder.error(error_msg)
            return error_msg, []
        
        full_response = ""
        first_token = True
        
        # Process the stream with rate limit handling
        try:
            for chunk in run:
                if chunk.event == "thread.message.delta":
                    if hasattr(chunk.data.delta, 'content') and chunk.data.delta.content:
                        content_delta = chunk.data.delta.content[0].text.value
                        logger.info(f"Received content delta: {content_delta}")
                        
                        if content_delta:
                            full_response += content_delta
                            if first_token:
                                first_token = False
                            message_placeholder.markdown(full_response)
                            logger.info(f"Updated response: {full_response}")
                
                elif chunk.event == "thread.run.completed":
                    logger.info("Stream completed")
                    break
                elif chunk.event == "thread.run.failed":
                    logger.info(f"Run failed. Full data: {chunk}")
                    
                    # Extract error details from the failed run
                    if hasattr(chunk.data, 'last_error'):
                        error_code = getattr(chunk.data.last_error, 'code', 'unknown')
                        error_message = getattr(chunk.data.last_error, 'message', 'Unknown error')
                        
                        if error_code == 'rate_limit_exceeded':
                            error_msg = f"Rate limit exceeded. {error_message}"
                        else:
                            error_msg = f"Stream failed: {error_code} - {error_message}"
                    else:
                        error_msg = "Stream failed with an unknown error"
                    
                    logger.error(f"Run failed with error: {error_msg}")
                    message_placeholder.error(error_msg)
                    return error_msg, []
            
            # Get the complete message with annotations after streaming is done
            messages = client.beta.threads.messages.list(
                thread_id=thread_id,
                order="desc",
                limit=1
            )
            
            annotations = []
            if messages.data:
                message = messages.data[0]
                if hasattr(message.content[0].text, 'annotations'):
                    annotations = message.content[0].text.annotations
                    logger.info(f"Found annotations: {annotations}")
                    
                    # Clean the full response by removing citation markers
                    for annotation in annotations:
                        if hasattr(annotation, 'text'):
                            citation_text = annotation.text
                            full_response = full_response.replace(citation_text, '')
                            logger.info(f"Removed citation text: {citation_text}")
            
            return full_response.strip(), annotations
            
        except openai.RateLimitError as e:
            error_msg = "Rate limit exceeded during streaming. Please try again in a few minutes."
            logger.error(f"Rate limit error during streaming: {str(e)}")
            message_placeholder.error(error_msg)
            return error_msg, []

    except Exception as e:
        logger.error(f"Error processing message: {str(e)}")
        return f"Error processing request: {str(e)}", []

@hendersonick
Copy link

I am getting this error: ERROR:
Exception in ASGI application Traceback (most recent call last): File "/Users/nickhenderson/Projects/react-website/run.py", line 152, in chat_with_assistant with client.beta.threads.runs.stream( File "/Users/nickhenderson/Projects/react-website/venv/lib/python3.11/site-packages/openai/lib/streaming/_assistants.py", line 447, in __enter__ self.__event_handler._init(self.__stream) File "/Users/nickhenderson/Projects/react-website/venv/lib/python3.11/site-packages/openai/lib/streaming/_assistants.py", line 59, in _init if self.__stream: ^^^^^^^^^^^^^ AttributeError: 'EventHandler' object has no attribute '_AssistantEventHandler__stream'

Are we having a similar issue? I am hoping to use a websocket to deliver the stream to my front end:

@Defozo
Copy link
Author

Defozo commented Jan 16, 2025

I have also:

import openai
from openai import AssistantEventHandler

# Create a custom event handler for streaming
class StreamHandler(AssistantEventHandler):
    def __init__(self, message_placeholder):
        super().__init__()
        self.message_placeholder = message_placeholder
        self.full_response = ""
        
    @override
    def on_text_created(self, text) -> None:
        logger.info("Starting new response")
        self.full_response = ""
        
    @override
    def on_text_delta(self, delta, snapshot) -> None:
        self.full_response += delta.value
        self.message_placeholder.markdown(self.full_response)
        logger.info(f"Response delta: {delta.value}")
        
    def get_full_response(self) -> str:
        return self.full_response

I've updated my previous message.

@hendersonick
Copy link

hendersonick commented Jan 16, 2025

if anyone could help, I would greatly appreciate it:

https://community.openai.com/t/error-trying-to-stream-assistant-api-with-websocket/1092353

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants