Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]: Errors: "non existent node" and "The following events are consumed but never produced: SynthesizeEvent" #17457

Open
1 task done
balchja opened this issue Jan 8, 2025 · 3 comments
Labels
question Further information is requested

Comments

@balchja
Copy link

balchja commented Jan 8, 2025

Question Validation

  • I have searched both the documentation and discord for an answer.

Question

Hi. I'm try to build a two-part RAG system that first scans a series of documents and summarizes the results. It then uses those that structured summary of results to extract the appropriate codes from a table. There could be multiple findings so I'm trying to combine the https://docs.llamaindex.ai/en/latest/examples/workflow/rag/ and https://docs.llamaindex.ai/en/stable/examples/workflow/multi_step_query_engine/ examples, but am getting this error when I try to run it:

WorkflowValidationError: The following events are consumed but never produced: SynthesizeEvent

and this error when I try to visualize it with draw_all_possible_flows:

AssertionError: non existent node 'RetrieverEvent'

It works when it's a StopEvent but I can't figure out why it won't save the results in a new event.

Here is my code:

import os, json
from llama_index.core import (
    SimpleDirectoryReader,
    VectorStoreIndex,
    StorageContext,
    load_index_from_storage,
)
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.workflow import (
    step,
    Context,
    Workflow,
    Event,
    StartEvent,
    StopEvent,
)
from llama_index.core.agent import ReActAgent
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor.llm_rerank import LLMRerank
from typing import Dict, List, Any, cast
from llama_index.core.schema import NodeWithScore
from dataclasses import dataclass, field
from llama_index.core.response_synthesizers import (
    ResponseMode,
    get_response_synthesizer,
)

from llama_index.core.schema import QueryBundle, TextNode
from llama_index.core.indices.query.query_transform.base import StepDecomposeQueryTransform
import re

class RetrieverEvent(Event):
    """Result of running retrieval"""
    nodes: list[NodeWithScore]

class SynthesizeEvent(Event):
    """Result of running retrieval"""
    result: str
    
class QueryMultiStepEvent(Event):
    """
    Event containing results of a multi-step query process.

    Attributes:
        nodes (List[NodeWithScore]): List of nodes with their associated scores.
        source_nodes (List[NodeWithScore]): List of source nodes with their scores.
        final_response_metadata (Dict[str, Any]): Metadata associated with the final response.
    """

    nodes: List[NodeWithScore]
    source_nodes: List[NodeWithScore]
    final_response_metadata: Dict[str, Any]

class MultiStepRAGWorkflow(Workflow):
    """
    Multi-step RAG workflow:
      1) Extract information from note
      2) Use that information to retrieve codes.
    """
    def combine_queries(
        self,
        query_bundle: QueryBundle,
        prev_reasoning: str,
        index_summary: str,
        llm: Settings.llm,
    ) -> QueryBundle:
        """Combine queries using StepDecomposeQueryTransform."""
        transform_metadata = {
            "prev_reasoning": prev_reasoning,
            "index_summary": index_summary,
        }
        return StepDecomposeQueryTransform(llm=llm)(
            query_bundle, metadata=transform_metadata
        )

    def default_stop_fn(self, stop_dict: Dict) -> bool:
        """Stop function for multi-step query combiner."""
        query_bundle = cast(QueryBundle, stop_dict.get("query_bundle"))
        if query_bundle is None:
            raise ValueError("Response must be provided to stop function.")

        return "none" in query_bundle.query_str.lower()
    
    # First step: first retrieval 
    @step(pass_context=True)
    async def extract(self, ctx: Context, ev: StartEvent) -> RetrieverEvent | None: 
        """Step 1: Retrieve information from notes."""
        init_query = ev.get("initial_query")
        index_patient_note = ev.get("index_note")

        if not init_query:
            return None
        
        print(f"Query the notes with: {init_query}")

        #store the query in the global context
        await ctx.set("initial_query", init_query)

        # get the index from the global context
        if index_patient_note is None:
            print("No index found in global context, load documents before querying")
            return None
        
        retriever = index_patient_note.as_retriever(similarity_top_k=3)
        nodes = await retriever.aretrieve(init_query)
        print(f"Retrieved {len(nodes)} nodes.")
        return RetrieverEvent(nodes=nodes)


    @step
    async def synthesize(self, ctx: Context, ev: RetrieverEvent) -> SynthesizeEvent:
        """Return a streaming response using nodes."""
        llm = Settings.llm
        summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
        query = await ctx.get("initial_query", default="")

        response = await summarizer.asynthesize(query, nodes=ev.nodes)

        return SynthesizeEvent(nodes = ev.nodes, result=response)
    
    @step(pass_context=True)
    async def query_multistep(
        self, ctx: Context, ev: SynthesizeEvent
    ) -> QueryMultiStepEvent:
        """Execute multi-step query process."""
        prev_reasoning = ""
        cur_response = None
        should_stop = False
        cur_steps = 0

        # use response
        final_response_metadata: Dict[str, Any] = {"sub_qa": []}

        text_chunks = []
        source_nodes = []

        ais_query = ev.get("ais_query")
        await ctx.set("ais_query", ais_query)

        # Retrieve the synthesized result from the context
        synthesized_result = await ctx.get("results", default=None)


        llm = Settings.llm
        stop_fn = self.default_stop_fn

 
        num_steps = num_steps
        query_engine = ev.get("ais_query_engine")
        index_summary = ev.get("ais_index_summary")

        while not should_stop:
            if num_steps is not None and cur_steps >= num_steps:
                should_stop = True
                break
            elif should_stop:
                break

            combined_query_str = f"{ais_query}\n\nSynthesized Result: {synthesized_result}"
            updated_query_bundle = self.combine_queries(
                QueryBundle(query_str=combined_query_str),
                prev_reasoning,
                index_summary,
                llm,
            )

            print(
                f"Created query for the step - {cur_steps} is: {updated_query_bundle}"
            )

            stop_dict = {"query_bundle": updated_query_bundle}
            if stop_fn(stop_dict):
                should_stop = True
                break

            cur_response = query_engine.query(updated_query_bundle)

            # append to response builder
            cur_qa_text = (
                f"\nQuestion: {updated_query_bundle.query_str}\n"
                f"Answer: {cur_response!s}"
            )
            text_chunks.append(cur_qa_text)
            for source_node in cur_response.source_nodes:
                source_nodes.append(source_node)
            # update metadata
            final_response_metadata["sub_qa"].append(
                (updated_query_bundle.query_str, cur_response)
            )

            prev_reasoning += (
                f"- {updated_query_bundle.query_str}\n" f"- {cur_response!s}\n"
            )
            cur_steps += 1

        nodes = [
            NodeWithScore(node=TextNode(text=text_chunk))
            for text_chunk in text_chunks
        ]
        return QueryMultiStepEvent(
            nodes=nodes,
            source_nodes=source_nodes,
            final_response_metadata=final_response_metadata,
        )

    @step(pass_context=True)
    async def synthesize(
        self, ctx: Context, ev: QueryMultiStepEvent
    ) -> StopEvent:
        """Synthesize the response."""
        response_synthesizer = get_response_synthesizer()
        query_final = await ctx.get("final_query", default=None)
        final_response = await response_synthesizer.asynthesize(
            query=query_final,
            nodes=ev.nodes,
            additional_source_nodes=ev.source_nodes,
        )
        final_response.metadata = ev.final_response_metadata

        return StopEvent(result=final_response)   

Any advice? Thank you!

@balchja balchja added the question Further information is requested label Jan 8, 2025
Copy link

dosubot bot commented Jan 8, 2025

Hey there, @balchja! I'm here to help you with your coding conundrums. Let's squash those bugs together! 🤖

To resolve the errors you're encountering in your multi-step RAG workflow using LlamaIndex, follow these steps:

  1. SynthesizeEvent Error: Ensure that the SynthesizeEvent is produced by a step before it is consumed. In your code, the synthesize step should return a SynthesizeEvent, but it seems you are trying to return SynthesizeEvent_Injury, which is not defined. You should return SynthesizeEvent instead, like this:

    return SynthesizeEvent(nodes=ev.nodes, result=response)
  2. RetrieverEvent Error: The AssertionError for a non-existent node 'RetrieverEvent' suggests that the RetrieverEvent is not correctly defined or produced. Ensure that the extract_injury step correctly returns a RetrieverEvent and that this event is consumed by the next step, which is synthesize.

  3. Event Definitions: Make sure that all events are correctly defined and used within your workflow. If an event is not needed, remove its consumption from the workflow.

  4. draw_all_possible_flows: This function requires that all nodes and events are correctly defined and connected. Ensure that each step in your workflow has the necessary __step_config attributes, including accepted_events and return_types.

By ensuring that each event consumed by a step is produced by another step and that all events are correctly defined and used, you should be able to resolve these errors [1][2].

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Other

@balchja
Copy link
Author

balchja commented Jan 8, 2025

I had some typos in the original post which should be corrected. @dosu any advice now?

Copy link

dosubot bot commented Jan 8, 2025

To address the errors in your multi-step RAG workflow, ensure the following:

  1. Correct Event Production and Consumption: Verify that each event consumed by a step is produced by a previous step. For example, ensure that SynthesizeEvent is correctly produced in the synthesize step before being consumed elsewhere.

  2. Event Definitions: Double-check that all events, such as RetrieverEvent and SynthesizeEvent, are correctly defined and used in your workflow. If an event is not needed, remove its consumption.

  3. Typo Corrections: Carefully review your code for any typos, especially in event names and function signatures, as these can lead to errors like "non existent node" and "events consumed but never produced."

  4. Workflow Design: Ensure your workflow design includes all necessary steps and events. Each step should produce the necessary events for subsequent steps.

  5. draw_all_possible_flows: This function requires all nodes and events to be correctly defined and connected. Ensure each step has the necessary attributes, including accepted_events and return_types.

By ensuring these aspects are correctly implemented, you should be able to resolve the errors you're encountering [1][2].

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Other

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant