*** title: Graph description: Connect nodes into event processing pipelines. ----------------------------------------------------------- A **Graph** defines how nodes are connected. Events flow through the graph from parent nodes to child nodes, creating a processing pipeline for your agent logic. The Atoms graph is a **DAG** (Directed Acyclic Graph). Events can flow through multiple branches, but never in circles. *** ## How Graphs Work When you call `session.add_edge(parent, child)`, you're creating a connection. Events emitted by the parent via `send_event()` are automatically queued for the child. ``` User Input → [Root] → [Your Nodes] → [Sink] → User Output ``` The session automatically creates two special nodes: * **Root**: Entry point—receives events from the WebSocket * **Sink**: Exit point—sends events back to the WebSocket *** ## Building a Graph ### Step 1: Add Nodes ```python async def setup(session: AgentSession): logger = LoggerNode() agent = SalesAgent() analytics = AnalyticsNode() session.add_node(logger) session.add_node(agent) session.add_node(analytics) ``` **`session.add_node(node)`**: Registers a `Node` instance with the session. The node must inherit from the base `Node` class. ### Step 2: Connect with Edges ```python # Define the flow: Logger → Agent → Analytics session.add_edge(logger, agent) session.add_edge(agent, analytics) await session.start() ``` ### The Resulting Graph ``` [Root] → [Logger] → [Agent] → [Analytics] → [Sink] ``` *** ## Graph Patterns ```python Linear # [Root] → [A] → [B] → [C] → [Sink] # The simplest pattern—events flow sequentially. session.add_edge(node_a, node_b) session.add_edge(node_b, node_c) ``` ```python Fork (Branching) # [Root] → [Router] ─┬→ [Sales] → [Sink] # └→ [Support] → [Sink] # Events are copied to multiple children. session.add_edge(router, sales_agent) session.add_edge(router, support_agent) ``` ```python Join (Merging) # [Sales] ───┐ # ├→ [Aggregator] → [Sink] # [Support] ─┘ # Multiple streams merge into one. session.add_edge(sales_agent, aggregator) session.add_edge(support_agent, aggregator) ``` ```python Diamond # [Root] → [Splitter] ─┬→ [A] ─┐ # └→ [B] ─┴→ [Merger] → [Sink] # Parallel processing with a final merge. session.add_edge(splitter, path_a) session.add_edge(splitter, path_b) session.add_edge(path_a, merger) session.add_edge(path_b, merger) ``` *** ## Automatic Connections Nodes without explicit parents connect to **Root**. Nodes without explicit children connect to **Sink**. ```python # Just add one node with no edges: session.add_node(my_agent) # Automatically becomes: # [Root] -> [my_agent] -> [Sink] ``` This means a minimal agent only needs: ```python async def setup(session: AgentSession): session.add_node(SalesAgent()) await session.start() await session.wait_until_complete() ``` *** ## Cycle Detection Graphs **must not** contain cycles. The session validates this at startup: ```python # This will FAIL session.add_edge(node_a, node_b) session.add_edge(node_b, node_c) session.add_edge(node_c, node_a) # Creates a cycle! await session.start() # Raises: ValueError("Graph contains cycles") ``` Cycles would cause infinite event loops. The framework prevents this at startup, but design your graphs as DAGs from the start. *** ## Event Flow in Detail When a node calls `send_event()`: 1. The event is queued for each child node 2. Each child's `process_event()` is called asynchronously 3. Children can further propagate via their own `send_event()` ```python class ParentNode(Node): async def process_event(self, event): # Modify the event if needed event.metadata["processed_by"] = self.name # Queue for all children await self.send_event(event) ``` Each node has its own event queue. Multiple events can be queued while a node is processing, and they'll be handled in order. *** ## Custom Routing For dynamic routing, don't use `send_event()`—directly queue to specific children: ```python class RouterNode(Node): def __init__(self, sales_node, support_node): super().__init__(name="router") self.sales = sales_node self.support = support_node async def process_event(self, event): intent = self.classify(event) if intent == "sales": await self.sales.queue_event(event) elif intent == "support": await self.support.queue_event(event) else: # Fallback: send to all children normally await self.send_event(event) ``` *** ## Best Practices Deeply nested graphs increase latency. Events have to hop through every node. **Bad:** `A -> B -> C -> D -> E` (5 hops) **Good:** `Router -> [A, B, C, D, E]` (2 hops) You will thank yourself when reading logs. ```python # Good LoggerNode(name="input_logger") # Bad LoggerNode(name="log1") ``` If a node is sending to more than 3 children, it's usually better to have a dedicated Router node that decides where the event goes, rather than broadcasting to everyone. Graphs can get complex. Sketching the flow on paper (or Excalidraw) before coding saves a lot of headaches.