Logging & Observability

View as MarkdownOpen in Claude

Good logging helps you understand what your agent is doing. In production, observability is essential for debugging issues and improving performance.

Basic Logging with Loguru

The Atoms SDK uses Loguru for logging. Set it up in your agent:

1from loguru import logger
2
3
4class MyAgent(OutputAgentNode):
5 async def generate_response(self):
6 logger.info("Generating response")
7
8 try:
9 response = await self.llm.chat(
10 messages=self.context.messages,
11 stream=True
12 )
13
14 async for chunk in response:
15 if chunk.content:
16 yield chunk.content
17
18 except Exception as e:
19 logger.error(f"LLM call failed: {e}")
20 yield "I'm having trouble right now. Please try again."

Log Levels

Use appropriate log levels:

LevelUse ForExample
DEBUGDetailed debugging infoMessage contents, event data
INFONormal operationsSession start, tool calls
WARNINGUnexpected but handledFallback used, retry needed
ERRORFailuresAPI errors, exceptions
1logger.debug(f"Received event: {event}")
2logger.info(f"Session started: {session.id}")
3logger.warning(f"Using fallback LLM, primary failed")
4logger.error(f"Tool execution failed: {error}")

Structured Logging

Add context to your logs:

1logger.info(
2 "Tool executed",
3 extra={
4 "tool_name": "get_order",
5 "duration_ms": 145,
6 "success": True,
7 "session_id": self.session_id
8 }
9)

Logging Events

Track all events flowing through your agent:

1class LoggingAgent(OutputAgentNode):
2 async def process_event(self, event):
3 logger.debug(
4 f"Processing event",
5 extra={
6 "event_type": event.type,
7 "session_id": self.session_id
8 }
9 )
10
11 await super().process_event(event)

Logging Tool Calls

Track tool usage:

1async def generate_response(self):
2 # ... LLM call ...
3
4 if tool_calls:
5 for call in tool_calls:
6 logger.info(
7 f"Executing tool: {call.name}",
8 extra={
9 "tool_name": call.name,
10 "arguments": call.args
11 }
12 )
13
14 start = time.time()
15 results = await self.tool_registry.execute(tool_calls, parallel=True)
16 duration = time.time() - start
17
18 logger.info(
19 f"Tools executed in {duration:.2f}s",
20 extra={
21 "tool_count": len(tool_calls),
22 "duration_seconds": duration
23 }
24 )

Session Logging

Log session lifecycle:

1async def setup(session: AgentSession):
2 logger.info(f"Session started: {session.id}")
3
4 agent = MyAgent()
5 session.add_node(agent)
6 await session.start()
7
8 try:
9 await session.wait_until_complete()
10 finally:
11 logger.info(f"Session ended: {session.id}")

Log Output Configuration

Configure log output format:

1import sys
2from loguru import logger
3
4# Remove default handler
5logger.remove()
6
7# Add custom handler
8logger.add(
9 sys.stderr,
10 format="{time:HH:mm:ss} | {level} | {message}",
11 level="INFO"
12)
13
14# Add file handler for debugging
15logger.add(
16 "logs/agent_{time}.log",
17 rotation="100 MB",
18 retention="7 days",
19 level="DEBUG"
20)

Performance Tracking

Measure response times:

1import time
2
3
4class TimedAgent(OutputAgentNode):
5 async def generate_response(self):
6 start = time.time()
7 chunk_count = 0
8
9 async for chunk in self._generate():
10 chunk_count += 1
11 yield chunk
12
13 duration = time.time() - start
14 logger.info(
15 f"Response generated",
16 extra={
17 "duration_seconds": duration,
18 "chunk_count": chunk_count,
19 "avg_chunk_time": duration / chunk_count if chunk_count else 0
20 }
21 )

Error Tracking

Capture and log errors with context:

1async def generate_response(self):
2 try:
3 async for chunk in self._generate():
4 yield chunk
5
6 except Exception as e:
7 logger.exception(
8 "Response generation failed",
9 extra={
10 "error_type": type(e).__name__,
11 "session_id": self.session_id,
12 "message_count": len(self.context.messages)
13 }
14 )
15 yield "I encountered an error. Let me try again."

Metrics Collection

Track key metrics for dashboards:

1from prometheus_client import Counter, Histogram
2
3# Define metrics
4tool_calls = Counter("agent_tool_calls_total", "Total tool calls", ["tool_name"])
5response_time = Histogram("agent_response_seconds", "Response generation time")
6
7
8class MetricsAgent(OutputAgentNode):
9 async def generate_response(self):
10 with response_time.time():
11 # ... response generation ...
12 pass
13
14 @function_tool()
15 def get_order(self, order_id: str) -> dict:
16 tool_calls.labels(tool_name="get_order").inc()
17 # ... implementation ...

Debug Mode

Add a debug mode for development:

1import os
2
3
4DEBUG = os.getenv("DEBUG", "false").lower() == "true"
5
6
7class MyAgent(OutputAgentNode):
8 async def generate_response(self):
9 if DEBUG:
10 logger.debug(f"Context: {self.context.messages}")
11
12 # ... normal generation ...

Enable with:

$DEBUG=true python agent.py

Next Steps