***
title: Graph
description: Connect nodes into event processing pipelines.
-----------------------------------------------------------
A **Graph** defines how nodes are connected. Events flow through the graph from parent nodes to child nodes, creating a processing pipeline for your agent logic.
The Atoms graph is a **DAG** (Directed Acyclic Graph). Events can flow through multiple branches, but never in circles.
***
## How Graphs Work
When you call `session.add_edge(parent, child)`, you're creating a connection. Events emitted by the parent via `send_event()` are automatically queued for the child.
```
User Input → [Root] → [Your Nodes] → [Sink] → User Output
```
The session automatically creates two special nodes:
* **Root**: Entry point—receives events from the WebSocket
* **Sink**: Exit point—sends events back to the WebSocket
***
## Building a Graph
### Step 1: Add Nodes
```python
async def setup(session: AgentSession):
logger = LoggerNode()
agent = SalesAgent()
analytics = AnalyticsNode()
session.add_node(logger)
session.add_node(agent)
session.add_node(analytics)
```
**`session.add_node(node)`**: Registers a `Node` instance with the session. The node must inherit from the base `Node` class.
### Step 2: Connect with Edges
```python
# Define the flow: Logger → Agent → Analytics
session.add_edge(logger, agent)
session.add_edge(agent, analytics)
await session.start()
```
### The Resulting Graph
```
[Root] → [Logger] → [Agent] → [Analytics] → [Sink]
```
***
## Graph Patterns
```python Linear
# [Root] → [A] → [B] → [C] → [Sink]
# The simplest pattern—events flow sequentially.
session.add_edge(node_a, node_b)
session.add_edge(node_b, node_c)
```
```python Fork (Branching)
# [Root] → [Router] ─┬→ [Sales] → [Sink]
# └→ [Support] → [Sink]
# Events are copied to multiple children.
session.add_edge(router, sales_agent)
session.add_edge(router, support_agent)
```
```python Join (Merging)
# [Sales] ───┐
# ├→ [Aggregator] → [Sink]
# [Support] ─┘
# Multiple streams merge into one.
session.add_edge(sales_agent, aggregator)
session.add_edge(support_agent, aggregator)
```
```python Diamond
# [Root] → [Splitter] ─┬→ [A] ─┐
# └→ [B] ─┴→ [Merger] → [Sink]
# Parallel processing with a final merge.
session.add_edge(splitter, path_a)
session.add_edge(splitter, path_b)
session.add_edge(path_a, merger)
session.add_edge(path_b, merger)
```
***
## Automatic Connections
Nodes without explicit parents connect to **Root**. Nodes without explicit children connect to **Sink**.
```python
# Just add one node with no edges:
session.add_node(my_agent)
# Automatically becomes:
# [Root] -> [my_agent] -> [Sink]
```
This means a minimal agent only needs:
```python
async def setup(session: AgentSession):
session.add_node(SalesAgent())
await session.start()
await session.wait_until_complete()
```
***
## Cycle Detection
Graphs **must not** contain cycles. The session validates this at startup:
```python
# This will FAIL
session.add_edge(node_a, node_b)
session.add_edge(node_b, node_c)
session.add_edge(node_c, node_a) # Creates a cycle!
await session.start()
# Raises: ValueError("Graph contains cycles")
```
Cycles would cause infinite event loops. The framework prevents this at startup, but design your graphs as DAGs from the start.
***
## Event Flow in Detail
When a node calls `send_event()`:
1. The event is queued for each child node
2. Each child's `process_event()` is called asynchronously
3. Children can further propagate via their own `send_event()`
```python
class ParentNode(Node):
async def process_event(self, event):
# Modify the event if needed
event.metadata["processed_by"] = self.name
# Queue for all children
await self.send_event(event)
```
Each node has its own event queue. Multiple events can be queued while a node is processing, and they'll be handled in order.
***
## Custom Routing
For dynamic routing, don't use `send_event()`—directly queue to specific children:
```python
class RouterNode(Node):
def __init__(self, sales_node, support_node):
super().__init__(name="router")
self.sales = sales_node
self.support = support_node
async def process_event(self, event):
intent = self.classify(event)
if intent == "sales":
await self.sales.queue_event(event)
elif intent == "support":
await self.support.queue_event(event)
else:
# Fallback: send to all children normally
await self.send_event(event)
```
***
## Best Practices
Deeply nested graphs increase latency. Events have to hop through every node.
**Bad:** `A -> B -> C -> D -> E` (5 hops)
**Good:** `Router -> [A, B, C, D, E]` (2 hops)
You will thank yourself when reading logs.
```python
# Good
LoggerNode(name="input_logger")
# Bad
LoggerNode(name="log1")
```
If a node is sending to more than 3 children, it's usually better to have a dedicated Router node that decides where the event goes, rather than broadcasting to everyone.
Graphs can get complex. Sketching the flow on paper (or Excalidraw) before coding saves a lot of headaches.