***
title: Interruption Handling
description: Handle user barge-ins gracefully during agent responses.
---------------------------------------------------------------------
Users interrupt. They change their minds mid-sentence. They correct themselves. Your agent must handle these interruptions smoothly.
## Default Behavior
`OutputAgentNode` handles interruptions automatically. When a user speaks while the agent is talking:
1. Audio playback stops immediately
2. The current response generation is cancelled
3. The system processes the new user input
4. The agent responds to the interruption
```python
class MyAgent(OutputAgentNode):
def __init__(self):
super().__init__(
name="my-agent",
is_interruptible=True # Default
)
```
## Disabling Interruptions
For critical information that users must hear completely, disable interruption handling:
```python
class PaymentAgent(OutputAgentNode):
def __init__(self):
super().__init__(
name="payment-agent",
is_interruptible=False # User must hear the full message
)
```
### When to Use `is_interruptible=False`
| Use Case | Reason |
| ------------------------- | --------------------------------------------------------- |
| **Payment confirmations** | User must hear the full amount and confirmation number |
| **Legal disclaimers** | Regulatory requirement to deliver complete information |
| **Safety information** | Critical instructions must not be cut off |
| **One-time codes** | OTPs and verification codes must be heard fully |
| **Short responses** | Responses under 2 seconds don't benefit from interruption |
Default to `is_interruptible=True` for natural conversation. Only disable for critical moments where incomplete information could cause problems.
## Custom Interrupt Handling
Override `_handle_interrupt()` for custom behavior:
```python
class MyAgent(OutputAgentNode):
def __init__(self):
super().__init__(name="my-agent")
self.was_interrupted = False
self.pending_message = ""
async def _handle_interrupt(self):
"""Called when the user interrupts."""
self.was_interrupted = True
# Clear any pending state
self.pending_message = ""
# Call parent handler
await super()._handle_interrupt()
async def generate_response(self):
if self.was_interrupted:
yield "Sorry, I was saying something. What did you need?"
self.was_interrupted = False
return
# Normal response generation
async for chunk in self._generate_normal():
yield chunk
```
## Detecting Interrupt Events
Listen for interrupt events at the session level:
```python
async def setup(session: AgentSession):
agent = MyAgent()
session.add_node(agent)
await session.start()
@session.on_event("on_event_received")
async def handle_event(session, event):
if isinstance(event, SDKSystemControlInterruptEvent):
# User interrupted
logger.info("User barge-in detected")
await session.wait_until_complete()
```
## State During Interruption
Track what was happening when the interruption occurred:
```python
class StatefulAgent(OutputAgentNode):
def __init__(self):
super().__init__(name="stateful-agent")
self.current_task = None
self.task_progress = 0
async def _handle_interrupt(self):
# Save state before handling
if self.current_task:
logger.info(
f"Interrupted during {self.current_task} "
f"at {self.task_progress}% progress"
)
await super()._handle_interrupt()
async def generate_response(self):
self.current_task = "explaining_policy"
self.task_progress = 0
yield "Let me explain our return policy. "
self.task_progress = 25
yield "You have 30 days to return items. "
self.task_progress = 50
yield "Items must be unused and in original packaging. "
self.task_progress = 75
yield "Refunds are processed within 5 business days."
self.task_progress = 100
self.current_task = None
```
## Resuming After Interruption
Sometimes you want to resume the previous topic:
```python
class ResumableAgent(OutputAgentNode):
def __init__(self):
super().__init__(name="resumable-agent")
self.last_topic = None
self.interrupted_mid_response = False
async def _handle_interrupt(self):
self.interrupted_mid_response = True
await super()._handle_interrupt()
async def generate_response(self):
# Get last user message
user_msgs = [m for m in self.context.messages if m["role"] == "user"]
user_message = user_msgs[-1]["content"] if user_msgs else ""
# Check if user wants to continue previous topic
if self.interrupted_mid_response and "continue" in user_message.lower():
yield f"Sure, back to {self.last_topic}. "
# Resume previous topic
self.interrupted_mid_response = False
return
self.interrupted_mid_response = False
# Normal response generation...
```
## Tools and Interruptions
Tool calls in progress when an interruption occurs are typically allowed to complete:
```python
class ToolAgent(OutputAgentNode):
async def generate_response(self):
# LLM calls a tool
response = await self.llm.chat(
messages=self.context.messages,
tools=self.tool_schemas,
stream=True
)
tool_calls = []
async for chunk in response:
if chunk.content:
yield chunk.content
if chunk.tool_calls:
tool_calls.extend(chunk.tool_calls)
# Tool execution happens after streaming completes
# If interrupted mid-stream, tools may still execute on next turn
if tool_calls:
results = await self.tool_registry.execute(
tool_calls=tool_calls, parallel=True
)
self.context.add_messages([
{
"role": "assistant",
"content": "",
"tool_calls": [
{"id": tc.id, "type": "function", "function": {"name": tc.name, "arguments": str(tc.arguments)}}
for tc in tool_calls
],
},
*[
{"role": "tool", "tool_call_id": tc.id, "content": result.content}
for tc, result in zip(tool_calls, results)
],
])
```
***
## Tips
Shorter responses have less to interrupt. Aim for 2-3 sentences max.
Track where you are so users can say "continue" to resume after interrupting.
When interrupted, briefly acknowledge before switching topics: "Sure, what did you need?"
***