Building a working agent prototype is only the beginning. Deploying agents in production requires careful attention to safety, reliability, cost, observability, and governance. This chapter covers the practices that separate toy demos from production-ready systems.
Agents amplify both the capabilities and risks of LLMs. A chatbot that gives a wrong answer is annoying; an agent that takes a wrong action can cause real damage — sending incorrect emails, deleting files, making unauthorized purchases, or leaking private data.
Screen user inputs before they reach the agent:
# See code/production.py for the full implementation
async def input_guardrails(llm, user_input):
"""Check user input for safety issues before processing."""
checks = await asyncio.gather(
check_prompt_injection(llm, user_input),
check_content_policy(llm, user_input),
check_scope(llm, user_input)
)
injection_safe, policy_safe, in_scope = checks
if not injection_safe:
return False, "Potential prompt injection detected"
if not policy_safe:
return False, "Content violates usage policy"
if not in_scope:
return False, "Request is outside agent's capabilities"
return True, "All checks passed"
Verify agent outputs before they reach the user or external systems:
# See code/production.py for the full implementation
def output_guardrails(output, action_type):
"""Validate agent outputs before execution."""
# Check for PII leakage
if contains_pii(output):
return False, "Output contains personally identifiable information"
# Check for destructive actions
if action_type in ["delete", "overwrite", "send"]:
return False, f"Destructive action '{action_type}' requires approval"
# Check for cost thresholds
if action_type == "api_call" and estimated_cost(output) > MAX_COST:
return False, "Action exceeds cost threshold"
return True, "Output approved"
High-risk tools should operate in sandboxed environments:
# Principle: Least privilege for all tools
tool_permissions = {
"read_file": {"allowed_paths": ["/data/", "/config/"]},
"write_file": {"allowed_paths": ["/output/"], "max_size_kb": 1024},
"web_request": {"allowed_domains": ["api.example.com"]},
"execute_code": {"sandbox": "docker", "timeout": 30, "no_network": True},
}
For high-stakes operations, agents should pause and request human approval:
# See code/production.py for the full implementation
class HumanApproval:
"""Gate that requires human approval for sensitive actions."""
ALWAYS_APPROVE = ["read_file", "search", "calculate"]
ALWAYS_ASK = ["send_email", "delete_file", "make_purchase"]
@classmethod
def check(cls, action):
if action.tool_name in cls.ALWAYS_APPROVE:
return True
if action.tool_name in cls.ALWAYS_ASK:
print(f"\n⚠️ Agent wants to: {action.description}")
print(f" Tool: {action.tool_name}")
print(f" Args: {action.arguments}")
response = input(" Approve? (y/n): ")
return response.lower() == "y"
# Default: approve read operations, ask for write operations
return action.is_read_only
Log every agent action for debugging and audit:
# See code/production.py for the full implementation
import logging
import json
class AgentLogger:
def __init__(self, agent_id):
self.agent_id = agent_id
self.logger = logging.getLogger(f"agent.{agent_id}")
self.run_id = None
def log_action(self, action_type, details, result=None):
self.logger.info(json.dumps({
"agent_id": self.agent_id,
"run_id": self.run_id,
"action": action_type,
"details": details,
"result_summary": str(result)[:200] if result else None,
"timestamp": datetime.now().isoformat()
}))
def log_tool_call(self, tool_name, arguments, result, latency_ms):
self.log_action("tool_call", {
"tool": tool_name,
"arguments": arguments,
"latency_ms": latency_ms,
"success": "error" not in str(result).lower()
}, result)
| Metric | Why It Matters |
|---|---|
| Task completion rate | Are agents actually solving problems? |
| Tokens per task | Cost efficiency |
| Tool call count per task | Efficiency of reasoning |
| Error rate by tool | Which tools need improvement |
| Latency (P50, P95, P99) | User experience |
| Human escalation rate | How often does the agent need help |
| Cost per task | Budget management |
End-to-end tracing lets you follow the entire agent execution path:
# See code/production.py for the full implementation
class AgentTrace:
def __init__(self, task):
self.task = task
self.trace_id = str(uuid.uuid4())
self.spans = []
def start_span(self, name, metadata=None):
span = {
"name": name,
"start": time.time(),
"metadata": metadata or {}
}
self.spans.append(span)
return len(self.spans) - 1
def end_span(self, span_id, result=None):
self.spans[span_id]["end"] = time.time()
self.spans[span_id]["duration_ms"] = (
(self.spans[span_id]["end"] - self.spans[span_id]["start"]) * 1000
)
self.spans[span_id]["result"] = result
Agents can be expensive. Multiple LLM calls per task, each consuming thousands of tokens, add up quickly.
# See code/production.py for the full implementation
class CostController:
def __init__(self, max_cost_per_task=1.0, max_tokens_per_task=100000):
self.max_cost = max_cost_per_task
self.max_tokens = max_tokens_per_task
self.current_cost = 0.0
self.current_tokens = 0
def check_budget(self, estimated_tokens):
estimated_cost = self._estimate_cost(estimated_tokens)
if self.current_cost + estimated_cost > self.max_cost:
raise BudgetExceededError(
f"Task would exceed budget: "
f"${self.current_cost + estimated_cost:.2f} > ${self.max_cost:.2f}"
)
if self.current_tokens + estimated_tokens > self.max_tokens:
raise TokenLimitError(
f"Task would exceed token limit: "
f"{self.current_tokens + estimated_tokens} > {self.max_tokens}"
)
def record_usage(self, input_tokens, output_tokens):
total_tokens = input_tokens + output_tokens
self.current_tokens += total_tokens
self.current_cost += self._estimate_cost(total_tokens)
Use cheaper models for simpler subtasks:
def select_model(subtask_complexity):
"""Route to appropriate model based on task complexity."""
if subtask_complexity == "simple":
return "claude-haiku" # $0.25 / MTok input
elif subtask_complexity == "medium":
return "claude-sonnet" # $3 / MTok input
else:
return "claude-opus" # $15 / MTok input
# See code/production.py for the full implementation
async def retry_with_backoff(fn, max_retries=3, base_delay=1.0):
"""Retry with exponential backoff."""
for attempt in range(max_retries):
try:
return await fn()
except RateLimitError:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
except (TimeoutError, ConnectionError) as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(base_delay)
raise MaxRetriesExceeded()
For long-running agents, save state periodically so execution can resume after failures:
class CheckpointManager:
def __init__(self, storage_path):
self.storage_path = storage_path
def save(self, agent_state):
with open(self.storage_path, "w") as f:
json.dump(agent_state.to_dict(), f)
def restore(self):
if os.path.exists(self.storage_path):
with open(self.storage_path) as f:
return AgentState.from_dict(json.load(f))
return None
Maintain a complete record of what the agent did and why:
Before deploying an agent to production:
Navigation: