Designing an Asynchronous Graph Executor for LLM Workflows

6 November 2024 (1y ago)

In the rapidly evolving world of large language models (LLMs), managing complex multi-step workflows can quickly become challenging. Traditional linear processing falls short when dealing with intricate tasks that require conditional execution, dependency management, and parallel processing. Today, I'll walk you through an innovative solution: a flexible, asynchronous Graph Executor designed to streamline LLM workflows.

The Challenge of Complex LLM Workflows

Modern language model applications often require:

  • Chained prompts with interdependent results
  • Parallel task execution
  • Robust error handling
  • Dynamic workflow management

My Graph Executor addresses these challenges by treating workflows as directed graphs, where each node represents a specific task or transformation.

Core Architecture: Nodes, Graphs, and Execution

Node Structure

At the heart of the system is the Node class, which encapsulates:

  • A unique identifier
  • Input data
  • Prompt template
  • JSON schema for response validation
  • Dependencies
  • Next nodes in the workflow
class Node(BaseModel):
    id: str
    input: Dict[str, Any]
    prompt: str
    json_schema: Dict[str, Any]
    next_nodes: List[str]
    dependencies: List[str] = []

Asynchronous Execution Strategy

The GraphExecutor leverages Python's asyncio to provide:

  • Concurrent node execution
  • Dependency resolution
  • Thread-safe operations
async def _execute_subgraph(self, node: Node):
    # Wait for dependencies
    await self._wait_for_dependencies(node)
    
    # Execute node
    await self.execute_node(node)
    
    # Trigger next nodes in parallel
    tasks = [self._execute_subgraph(self.graph.nodes[next_id]) 
             for next_id in node.next_nodes]
    await asyncio.gather(*tasks)

Key Features

1. Dependency Management

The executor intelligently handles node dependencies, ensuring that prerequisite nodes complete before dependent nodes begin.

2. Validation and Error Handling

Each node includes a JSON schema for strict output validation:

def validate_output(self, output: Dict[str, Any]) -> Any:
    try:
        validate(instance=output, schema=self.json_schema)
        return True
    except ValidationError as e:
        return {"error": str(e)}

3. Retry Mechanism

Built-in retry logic helps manage transient errors, such as rate limits:

async def execute_node(self, node: Node, retry: int = 1):
    try:
        response = await self._call_llm(prompt)
    except Exception as e:
        if retry > 0:
            await asyncio.sleep(1)
            return await self.execute_node(node, retry - 1)
        raise e

Real-World Example: Color Analysis Workflow

Let's examine a practical use case analyzing colors in a text:

nodes = {
    "node_1": Node(
        id="node_1",
        input={"text": "A village with colorful houses..."},
        prompt="Identify colors in the text",
        json_schema={
            "type": "object",
            "properties": {
                "colours": {
                    "type": "array",
                    "items": {"type": "string"},
                    "minItems": 5,
                }
            }
        },
        next_nodes=["node_2", "node_3"]
    ),
    # Additional nodes for further color analysis
}

Conclusion

The Graph Executor represents a powerful abstraction for managing complex LLM workflows. By treating workflows as directed graphs, we gain unprecedented flexibility, scalability, and reliability.


Interested in contributing or exploring the full implementation? Check out the GitHub Repository


Jesse Doka