*** title: Interruption Handling description: Handle user barge-ins gracefully during agent responses. --------------------------------------------------------------------- Users interrupt. They change their minds mid-sentence. They correct themselves. Your agent must handle these interruptions smoothly. ## Default Behavior `OutputAgentNode` handles interruptions automatically. When a user speaks while the agent is talking: 1. Audio playback stops immediately 2. The current response generation is cancelled 3. The system processes the new user input 4. The agent responds to the interruption ```python class MyAgent(OutputAgentNode): def __init__(self): super().__init__( name="my-agent", is_interruptible=True # Default ) ``` ## Disabling Interruptions For critical information that users must hear completely, disable interruption handling: ```python class PaymentAgent(OutputAgentNode): def __init__(self): super().__init__( name="payment-agent", is_interruptible=False # User must hear the full message ) ``` ### When to Use `is_interruptible=False` | Use Case | Reason | | ------------------------- | --------------------------------------------------------- | | **Payment confirmations** | User must hear the full amount and confirmation number | | **Legal disclaimers** | Regulatory requirement to deliver complete information | | **Safety information** | Critical instructions must not be cut off | | **One-time codes** | OTPs and verification codes must be heard fully | | **Short responses** | Responses under 2 seconds don't benefit from interruption | Default to `is_interruptible=True` for natural conversation. Only disable for critical moments where incomplete information could cause problems. ## Custom Interrupt Handling Override `_handle_interrupt()` for custom behavior: ```python class MyAgent(OutputAgentNode): def __init__(self): super().__init__(name="my-agent") self.was_interrupted = False self.pending_message = "" async def _handle_interrupt(self): """Called when the user interrupts.""" self.was_interrupted = True # Clear any pending state self.pending_message = "" # Call parent handler await super()._handle_interrupt() async def generate_response(self): if self.was_interrupted: yield "Sorry, I was saying something. What did you need?" self.was_interrupted = False return # Normal response generation async for chunk in self._generate_normal(): yield chunk ``` ## Detecting Interrupt Events Listen for interrupt events at the session level: ```python async def setup(session: AgentSession): agent = MyAgent() session.add_node(agent) await session.start() @session.on_event("on_event_received") async def handle_event(session, event): if isinstance(event, SDKSystemControlInterruptEvent): # User interrupted logger.info("User barge-in detected") await session.wait_until_complete() ``` ## State During Interruption Track what was happening when the interruption occurred: ```python class StatefulAgent(OutputAgentNode): def __init__(self): super().__init__(name="stateful-agent") self.current_task = None self.task_progress = 0 async def _handle_interrupt(self): # Save state before handling if self.current_task: logger.info( f"Interrupted during {self.current_task} " f"at {self.task_progress}% progress" ) await super()._handle_interrupt() async def generate_response(self): self.current_task = "explaining_policy" self.task_progress = 0 yield "Let me explain our return policy. " self.task_progress = 25 yield "You have 30 days to return items. " self.task_progress = 50 yield "Items must be unused and in original packaging. " self.task_progress = 75 yield "Refunds are processed within 5 business days." self.task_progress = 100 self.current_task = None ``` ## Resuming After Interruption Sometimes you want to resume the previous topic: ```python class ResumableAgent(OutputAgentNode): def __init__(self): super().__init__(name="resumable-agent") self.last_topic = None self.interrupted_mid_response = False async def _handle_interrupt(self): self.interrupted_mid_response = True await super()._handle_interrupt() async def generate_response(self): # Get last user message user_msgs = [m for m in self.context.messages if m["role"] == "user"] user_message = user_msgs[-1]["content"] if user_msgs else "" # Check if user wants to continue previous topic if self.interrupted_mid_response and "continue" in user_message.lower(): yield f"Sure, back to {self.last_topic}. " # Resume previous topic self.interrupted_mid_response = False return self.interrupted_mid_response = False # Normal response generation... ``` ## Tools and Interruptions Tool calls in progress when an interruption occurs are typically allowed to complete: ```python class ToolAgent(OutputAgentNode): async def generate_response(self): # LLM calls a tool response = await self.llm.chat( messages=self.context.messages, tools=self.tool_schemas, stream=True ) tool_calls = [] async for chunk in response: if chunk.content: yield chunk.content if chunk.tool_calls: tool_calls.extend(chunk.tool_calls) # Tool execution happens after streaming completes # If interrupted mid-stream, tools may still execute on next turn if tool_calls: results = await self.tool_registry.execute( tool_calls=tool_calls, parallel=True ) self.context.add_messages([ { "role": "assistant", "content": "", "tool_calls": [ {"id": tc.id, "type": "function", "function": {"name": tc.name, "arguments": str(tc.arguments)}} for tc in tool_calls ], }, *[ {"role": "tool", "tool_call_id": tc.id, "content": result.content} for tc, result in zip(tool_calls, results) ], ]) ``` *** ## Tips Shorter responses have less to interrupt. Aim for 2-3 sentences max. Track where you are so users can say "continue" to resume after interrupting. When interrupted, briefly acknowledge before switching topics: "Sure, what did you need?" ***