*** title: State Management description: Track and manage data across conversation turns. ------------------------------------------------------------- Voice agents need to remember things. User names, order IDs, preferences, and what was said earlier in the conversation. State management handles this memory. ## Types of State | Type | Scope | Persists After Call | Example | | ---------------- | --------------- | ------------------- | --------------------------- | | Turn state | Single response | No | Current tool results | | Session state | Entire call | No | Customer ID, collected data | | Persistent state | Across calls | Yes | User preferences, history | ## Session State (Instance Variables) Store data on `self` to remember it across turns within a call. ```python class DataCollectionAgent(OutputAgentNode): def __init__(self): super().__init__(name="data-agent") # Session state self.customer_name = None self.email = None self.phone = None self.order_id = None @function_tool() def save_customer_info( self, name: str = None, email: str = None, phone: str = None ) -> dict: """Save customer information.""" if name: self.customer_name = name if email: self.email = email if phone: self.phone = phone return {"saved": True, "collected": self._get_collected()} def _get_collected(self) -> dict: return { "name": self.customer_name, "email": self.email, "phone": self.phone } ``` ## Initial Variables Access caller-provided data via `self.initial_variables` in `__init__`. ```python async def start(self, init_event, task_manager): await super().start(init_event, task_manager) ctx = init_event.session_context # Access initial variables self.customer_id = ctx.initial_variables.get("customer_id") self.account_tier = ctx.initial_variables.get("tier") self.language = ctx.initial_variables.get("language", "en") # Use them to customize behavior if self.account_tier == "premium": self.context.add_message({ "role": "system", "content": "This is a premium customer. Prioritize their requests." }) ``` Initial variables are passed when making outbound calls or via the WebSocket connection. ## Context History The conversation context tracks all messages: ```python class MyAgent(OutputAgentNode): async def generate_response(self): # Access full message history all_messages = self.context.messages # Get the last user message user_messages = [m for m in all_messages if m["role"] == "user"] last_user = user_messages[-1]["content"] if user_messages else "" # Count turns user_turns = sum(1 for m in all_messages if m["role"] == "user") ``` ## Extracting Information Use tools to extract and store structured data: ```python class IntakeAgent(OutputAgentNode): def __init__(self): super().__init__(name="intake-agent") self.collected = {} self.tool_registry = ToolRegistry() self.tool_registry.discover(self) self.tool_schemas = self.tool_registry.get_schemas() @function_tool() def record_information( self, field: str, value: str ) -> dict: """ Record a piece of information from the conversation. Args: field: The type of information (name, email, phone, etc.) value: The value to record """ self.collected[field] = value return { "recorded": f"{field}: {value}", "remaining": self._get_remaining_fields() } def _get_remaining_fields(self): required = ["name", "email", "reason"] return [f for f in required if f not in self.collected] ``` ## Turn Counting Track conversation progress: ```python class ProgressAgent(OutputAgentNode): def __init__(self): super().__init__(name="progress-agent") self.turn_count = 0 self.started_at = None async def start(self, init_event, task_manager): await super().start(init_event, task_manager) self.started_at = datetime.now() async def generate_response(self): self.turn_count += 1 # Different behavior based on turn count if self.turn_count == 1: yield "Welcome! This is our first exchange." elif self.turn_count > 10: yield "We've been chatting for a while. " yield "Is there anything else I can help with?" ``` ## Shared State Across Nodes For multi-node graphs, use the session for shared state: ```python class RouterNode(Node): async def process_event(self, event): # Store routing decision in event metadata event.metadata["routed_to"] = "sales" await self.send_event(event) class SalesAgent(OutputAgentNode): async def process_event(self, event): # Read state from event if event.metadata.get("routed_to") == "sales": await super().process_event(event) ``` ## Persistent State For data that survives across calls, use an external store: ```python import redis class PersistentAgent(OutputAgentNode): def __init__(self): super().__init__(name="persistent-agent") self.redis = redis.Redis() async def start(self, init_event, task_manager): await super().start(init_event, task_manager) phone = init_event.session_context.initial_variables.get("phone") # Load previous state stored = self.redis.get(f"user:{phone}") if stored: self.user_data = json.loads(stored) else: self.user_data = {} async def stop(self): # Save state before session ends phone = self.init_event.session_context.initial_variables.get("phone") self.redis.set(f"user:{phone}", json.dumps(self.user_data)) await super().stop() ``` ## State Validation Validate collected data before proceeding: ```python class ValidationAgent(OutputAgentNode): def __init__(self): super().__init__(name="validation-agent") self.data = {} @function_tool() def submit_form(self) -> dict: """Submit the collected information.""" errors = [] if not self.data.get("name"): errors.append("Name is required") if not self.data.get("email"): errors.append("Email is required") elif "@" not in self.data["email"]: errors.append("Email format is invalid") if errors: return {"success": False, "errors": errors} return {"success": True, "data": self.data} ``` *** ## Tips The simplest and most reliable approach. Each session gets its own agent instance. Session context is available in the init\_event. Store what you need as instance variables. Build tools that check required fields are present before taking action.