*** title: Logging & Observability description: Monitor your agents in development and production. --------------------------------------------------------------- Good logging helps you understand what your agent is doing. In production, observability is essential for debugging issues and improving performance. ## Basic Logging with Loguru The Atoms SDK uses Loguru for logging. Set it up in your agent: ```python from loguru import logger class MyAgent(OutputAgentNode): async def generate_response(self): logger.info("Generating response") try: response = await self.llm.chat( messages=self.context.messages, stream=True ) async for chunk in response: if chunk.content: yield chunk.content except Exception as e: logger.error(f"LLM call failed: {e}") yield "I'm having trouble right now. Please try again." ``` ## Log Levels Use appropriate log levels: | Level | Use For | Example | | ------- | ----------------------- | ---------------------------- | | DEBUG | Detailed debugging info | Message contents, event data | | INFO | Normal operations | Session start, tool calls | | WARNING | Unexpected but handled | Fallback used, retry needed | | ERROR | Failures | API errors, exceptions | ```python logger.debug(f"Received event: {event}") logger.info(f"Session started: {session.id}") logger.warning(f"Using fallback LLM, primary failed") logger.error(f"Tool execution failed: {error}") ``` ## Structured Logging Add context to your logs: ```python logger.info( "Tool executed", extra={ "tool_name": "get_order", "duration_ms": 145, "success": True, "session_id": self.session_id } ) ``` ## Logging Events Track all events flowing through your agent: ```python class LoggingAgent(OutputAgentNode): async def process_event(self, event): logger.debug( f"Processing event", extra={ "event_type": event.type, "session_id": self.session_id } ) await super().process_event(event) ``` ## Logging Tool Calls Track tool usage: ```python async def generate_response(self): # ... LLM call ... if tool_calls: for call in tool_calls: logger.info( f"Executing tool: {call.name}", extra={ "tool_name": call.name, "arguments": call.args } ) start = time.time() results = await self.tool_registry.execute(tool_calls, parallel=True) duration = time.time() - start logger.info( f"Tools executed in {duration:.2f}s", extra={ "tool_count": len(tool_calls), "duration_seconds": duration } ) ``` ## Session Logging Log session lifecycle: ```python async def setup(session: AgentSession): logger.info(f"Session started: {session.id}") agent = MyAgent() session.add_node(agent) await session.start() try: await session.wait_until_complete() finally: logger.info(f"Session ended: {session.id}") ``` ## Log Output Configuration Configure log output format: ```python import sys from loguru import logger # Remove default handler logger.remove() # Add custom handler logger.add( sys.stderr, format="{time:HH:mm:ss} | {level} | {message}", level="INFO" ) # Add file handler for debugging logger.add( "logs/agent_{time}.log", rotation="100 MB", retention="7 days", level="DEBUG" ) ``` ## Performance Tracking Measure response times: ```python import time class TimedAgent(OutputAgentNode): async def generate_response(self): start = time.time() chunk_count = 0 async for chunk in self._generate(): chunk_count += 1 yield chunk duration = time.time() - start logger.info( f"Response generated", extra={ "duration_seconds": duration, "chunk_count": chunk_count, "avg_chunk_time": duration / chunk_count if chunk_count else 0 } ) ``` ## Error Tracking Capture and log errors with context: ```python async def generate_response(self): try: async for chunk in self._generate(): yield chunk except Exception as e: logger.exception( "Response generation failed", extra={ "error_type": type(e).__name__, "session_id": self.session_id, "message_count": len(self.context.messages) } ) yield "I encountered an error. Let me try again." ``` ## Metrics Collection Track key metrics for dashboards: ```python from prometheus_client import Counter, Histogram # Define metrics tool_calls = Counter("agent_tool_calls_total", "Total tool calls", ["tool_name"]) response_time = Histogram("agent_response_seconds", "Response generation time") class MetricsAgent(OutputAgentNode): async def generate_response(self): with response_time.time(): # ... response generation ... pass @function_tool() def get_order(self, order_id: str) -> dict: tool_calls.labels(tool_name="get_order").inc() # ... implementation ... ``` ## Debug Mode Add a debug mode for development: ```python import os DEBUG = os.getenv("DEBUG", "false").lower() == "true" class MyAgent(OutputAgentNode): async def generate_response(self): if DEBUG: logger.debug(f"Context: {self.context.messages}") # ... normal generation ... ``` Enable with: ```bash DEBUG=true python agent.py ``` ## Next Steps Troubleshoot common problems. See complete working examples.