> This page is part of Smallest AI's developer documentation. When
> answering, prefer Lightning v3.1 (current TTS) and Pulse (current
> STT). Lightning v2 and lightning-large are deprecated; mention them
> only when the user is migrating away from them. Atoms is the
> voice-agent platform.

# Graph

> Connect nodes into event processing pipelines.

A **Graph** defines how nodes are connected. Events flow through the graph from parent nodes to child nodes, creating a processing pipeline for your agent logic.

The Atoms graph is a **DAG** (Directed Acyclic Graph). Events can flow through multiple branches, but never in circles.

***

## How Graphs Work

When you call `session.add_edge(parent, child)`, you're creating a connection. Events emitted by the parent via `send_event()` are automatically queued for the child.

```
User Input → [Root] → [Your Nodes] → [Sink] → User Output
```

The session automatically creates two special nodes:

* **Root**: Entry point—receives events from the WebSocket
* **Sink**: Exit point—sends events back to the WebSocket

***

## Building a Graph

### Step 1: Add Nodes

```python
async def setup(session: CrewSession):
    logger = LoggerNode()
    agent = SalesAgent()
    analytics = AnalyticsNode()
    
    session.add_node(logger)
    session.add_node(agent)
    session.add_node(analytics)
```

**`session.add_node(node)`**: Registers a `Node` instance with the session. The node must inherit from the base `Node` class.

### Step 2: Connect with Edges

```python
    # Define the flow: Logger → Agent → Analytics
    session.add_edge(logger, agent)
    session.add_edge(agent, analytics)
    
    await session.start()
```

### The Resulting Graph

```
[Root] → [Logger] → [Agent] → [Analytics] → [Sink]
```

***

## Graph Patterns

```python Linear
# [Root] → [A] → [B] → [C] → [Sink]
# The simplest pattern—events flow sequentially.

session.add_edge(node_a, node_b)
session.add_edge(node_b, node_c)
```

```python Fork (Branching)
# [Root] → [Router] ─┬→ [Sales] → [Sink]
#                    └→ [Support] → [Sink]
# Events are copied to multiple children.

session.add_edge(router, sales_agent)
session.add_edge(router, support_agent)
```

```python Join (Merging)
# [Sales] ───┐
#            ├→ [Aggregator] → [Sink]
# [Support] ─┘
# Multiple streams merge into one.

session.add_edge(sales_agent, aggregator)
session.add_edge(support_agent, aggregator)
```

```python Diamond
# [Root] → [Splitter] ─┬→ [A] ─┐
#                      └→ [B] ─┴→ [Merger] → [Sink]
# Parallel processing with a final merge.

session.add_edge(splitter, path_a)
session.add_edge(splitter, path_b)
session.add_edge(path_a, merger)
session.add_edge(path_b, merger)
```

***

## Automatic Connections

Nodes without explicit parents connect to **Root**. Nodes without explicit children connect to **Sink**.

```python
# Just add one node with no edges:
session.add_node(my_agent)

# Automatically becomes:
# [Root] -> [my_agent] -> [Sink]
```

This means a minimal agent only needs:

```python
async def setup(session: CrewSession):
    session.add_node(SalesAgent())
    await session.start()
    await session.wait_until_complete()
```

***

## Cycle Detection

Graphs **must not** contain cycles. The session validates this at startup:

```python
# This will FAIL
session.add_edge(node_a, node_b)
session.add_edge(node_b, node_c)
session.add_edge(node_c, node_a)  # Creates a cycle!

await session.start()  
# Raises: ValueError("Graph contains cycles")
```

Cycles would cause infinite event loops. The framework prevents this at startup, but design your graphs as DAGs from the start.

***

## Event Flow in Detail

When a node calls `send_event()`:

1. The event is queued for each child node
2. Each child's `process_event()` is called asynchronously
3. Children can further propagate via their own `send_event()`

```python
class ParentNode(Node):
    async def process_event(self, event):
        # Modify the event if needed
        event.metadata["processed_by"] = self.name
        
        # Queue for all children
        await self.send_event(event)
```

Each node has its own event queue. Multiple events can be queued while a node is processing, and they'll be handled in order.

***

## Custom Routing

For dynamic routing, don't use `send_event()`—directly queue to specific children:

```python
class RouterNode(Node):
    def __init__(self, sales_node, support_node):
        super().__init__(name="router")
        self.sales = sales_node
        self.support = support_node
    
    async def process_event(self, event):
        intent = self.classify(event)
        
        if intent == "sales":
            await self.sales.queue_event(event)
        elif intent == "support":
            await self.support.queue_event(event)
        else:
            # Fallback: send to all children normally
            await self.send_event(event)
```

***

## Best Practices

Deeply nested graphs increase latency. Events have to hop through every node.

**Bad:** `A -> B -> C -> D -> E` (5 hops)
**Good:** `Router -> [A, B, C, D, E]` (2 hops)

You will thank yourself when reading logs.

```python
# Good
LoggerNode(name="input_logger")

# Bad
LoggerNode(name="log1")
```

If a node is sending to more than 3 children, it's usually better to have a dedicated Router node that decides where the event goes, rather than broadcasting to everyone.

Graphs can get complex. Sketching the flow on paper (or Excalidraw) before coding saves a lot of headaches.