Interruption Handling

View as MarkdownOpen in Claude

Users interrupt. They change their minds mid-sentence. They correct themselves. Your agent must handle these interruptions smoothly.

Default Behavior

OutputAgentNode handles interruptions automatically. When a user speaks while the agent is talking:

  1. Audio playback stops immediately
  2. The current response generation is cancelled
  3. The system processes the new user input
  4. The agent responds to the interruption
1class MyAgent(OutputAgentNode):
2 def __init__(self):
3 super().__init__(
4 name="my-agent",
5 is_interruptible=True # Default
6 )

Disabling Interruptions

For critical information that users must hear completely, disable interruption handling:

1class PaymentAgent(OutputAgentNode):
2 def __init__(self):
3 super().__init__(
4 name="payment-agent",
5 is_interruptible=False # User must hear the full message
6 )

When to Use is_interruptible=False

Use CaseReason
Payment confirmationsUser must hear the full amount and confirmation number
Legal disclaimersRegulatory requirement to deliver complete information
Safety informationCritical instructions must not be cut off
One-time codesOTPs and verification codes must be heard fully
Short responsesResponses under 2 seconds don’t benefit from interruption

Default to is_interruptible=True for natural conversation. Only disable for critical moments where incomplete information could cause problems.

Custom Interrupt Handling

Override _handle_interrupt() for custom behavior:

1class MyAgent(OutputAgentNode):
2 def __init__(self):
3 super().__init__(name="my-agent")
4 self.was_interrupted = False
5 self.pending_message = ""
6
7 async def _handle_interrupt(self):
8 """Called when the user interrupts."""
9 self.was_interrupted = True
10
11 # Clear any pending state
12 self.pending_message = ""
13
14 # Call parent handler
15 await super()._handle_interrupt()
16
17 async def generate_response(self):
18 if self.was_interrupted:
19 yield "Sorry, I was saying something. What did you need?"
20 self.was_interrupted = False
21 return
22
23 # Normal response generation
24 async for chunk in self._generate_normal():
25 yield chunk

Detecting Interrupt Events

Listen for interrupt events at the session level:

1async def setup(session: AgentSession):
2 agent = MyAgent()
3 session.add_node(agent)
4 await session.start()
5
6 @session.on_event("on_event_received")
7 async def handle_event(session, event):
8 if isinstance(event, SDKSystemControlInterruptEvent):
9 # User interrupted
10 logger.info("User barge-in detected")
11
12 await session.wait_until_complete()

State During Interruption

Track what was happening when the interruption occurred:

1class StatefulAgent(OutputAgentNode):
2 def __init__(self):
3 super().__init__(name="stateful-agent")
4 self.current_task = None
5 self.task_progress = 0
6
7 async def _handle_interrupt(self):
8 # Save state before handling
9 if self.current_task:
10 logger.info(
11 f"Interrupted during {self.current_task} "
12 f"at {self.task_progress}% progress"
13 )
14
15 await super()._handle_interrupt()
16
17 async def generate_response(self):
18 self.current_task = "explaining_policy"
19 self.task_progress = 0
20
21 yield "Let me explain our return policy. "
22 self.task_progress = 25
23
24 yield "You have 30 days to return items. "
25 self.task_progress = 50
26
27 yield "Items must be unused and in original packaging. "
28 self.task_progress = 75
29
30 yield "Refunds are processed within 5 business days."
31 self.task_progress = 100
32 self.current_task = None

Resuming After Interruption

Sometimes you want to resume the previous topic:

1class ResumableAgent(OutputAgentNode):
2 def __init__(self):
3 super().__init__(name="resumable-agent")
4 self.last_topic = None
5 self.interrupted_mid_response = False
6
7 async def _handle_interrupt(self):
8 self.interrupted_mid_response = True
9 await super()._handle_interrupt()
10
11 async def generate_response(self):
12 # Get last user message
13 user_msgs = [m for m in self.context.messages if m["role"] == "user"]
14 user_message = user_msgs[-1]["content"] if user_msgs else ""
15
16 # Check if user wants to continue previous topic
17 if self.interrupted_mid_response and "continue" in user_message.lower():
18 yield f"Sure, back to {self.last_topic}. "
19 # Resume previous topic
20 self.interrupted_mid_response = False
21 return
22
23 self.interrupted_mid_response = False
24 # Normal response generation...

Tools and Interruptions

Tool calls in progress when an interruption occurs are typically allowed to complete:

1class ToolAgent(OutputAgentNode):
2 async def generate_response(self):
3 # LLM calls a tool
4 response = await self.llm.chat(
5 messages=self.context.messages,
6 tools=self.tool_schemas,
7 stream=True
8 )
9
10 tool_calls = []
11
12 async for chunk in response:
13 if chunk.content:
14 yield chunk.content
15 if chunk.tool_calls:
16 tool_calls.extend(chunk.tool_calls)
17
18 # Tool execution happens after streaming completes
19 # If interrupted mid-stream, tools may still execute on next turn
20 if tool_calls:
21 results = await self.tool_registry.execute(
22 tool_calls=tool_calls, parallel=True
23 )
24
25 self.context.add_messages([
26 {
27 "role": "assistant",
28 "content": "",
29 "tool_calls": [
30 {"id": tc.id, "type": "function", "function": {"name": tc.name, "arguments": str(tc.arguments)}}
31 for tc in tool_calls
32 ],
33 },
34 *[
35 {"role": "tool", "tool_call_id": tc.id, "content": result.content}
36 for tc, result in zip(tool_calls, results)
37 ],
38 ])

Tips

Shorter responses have less to interrupt. Aim for 2-3 sentences max.

Track where you are so users can say “continue” to resume after interrupting.

When interrupted, briefly acknowledge before switching topics: “Sure, what did you need?”