In the rapidly evolving world of large language models (LLMs), managing complex multi-step workflows can quickly become challenging. Traditional linear processing falls short when dealing with intricate tasks that require conditional execution, dependency management, and parallel processing. Today, I'll walk you through an innovative solution: a flexible, asynchronous Graph Executor designed to streamline LLM workflows.
The Challenge of Complex LLM Workflows
Modern language model applications often require:
- Chained prompts with interdependent results
- Parallel task execution
- Robust error handling
- Dynamic workflow management
My Graph Executor addresses these challenges by treating workflows as directed graphs, where each node represents a specific task or transformation.
Core Architecture: Nodes, Graphs, and Execution
Node Structure
At the heart of the system is the Node
class, which encapsulates:
- A unique identifier
- Input data
- Prompt template
- JSON schema for response validation
- Dependencies
- Next nodes in the workflow
class Node(BaseModel): id: str input: Dict[str, Any] prompt: str json_schema: Dict[str, Any] next_nodes: List[str] dependencies: List[str] = []
Asynchronous Execution Strategy
The GraphExecutor
leverages Python's asyncio
to provide:
- Concurrent node execution
- Dependency resolution
- Thread-safe operations
async def _execute_subgraph(self, node: Node): # Wait for dependencies await self._wait_for_dependencies(node) # Execute node await self.execute_node(node) # Trigger next nodes in parallel tasks = [self._execute_subgraph(self.graph.nodes[next_id]) for next_id in node.next_nodes] await asyncio.gather(*tasks)
Key Features
1. Dependency Management
The executor intelligently handles node dependencies, ensuring that prerequisite nodes complete before dependent nodes begin.
2. Validation and Error Handling
Each node includes a JSON schema for strict output validation:
def validate_output(self, output: Dict[str, Any]) -> Any: try: validate(instance=output, schema=self.json_schema) return True except ValidationError as e: return {"error": str(e)}
3. Retry Mechanism
Built-in retry logic helps manage transient errors, such as rate limits:
async def execute_node(self, node: Node, retry: int = 1): try: response = await self._call_llm(prompt) except Exception as e: if retry > 0: await asyncio.sleep(1) return await self.execute_node(node, retry - 1) raise e
Real-World Example: Color Analysis Workflow
Let's examine a practical use case analyzing colors in a text:
nodes = { "node_1": Node( id="node_1", input={"text": "A village with colorful houses..."}, prompt="Identify colors in the text", json_schema={ "type": "object", "properties": { "colours": { "type": "array", "items": {"type": "string"}, "minItems": 5, } } }, next_nodes=["node_2", "node_3"] ), # Additional nodes for further color analysis }
Conclusion
The Graph Executor represents a powerful abstraction for managing complex LLM workflows. By treating workflows as directed graphs, we gain unprecedented flexibility, scalability, and reliability.
Interested in contributing or exploring the full implementation? Check out the GitHub Repository