Section 5: Context Management & Reliability
Production Claude applications live and die by how well they manage context. A prototype works because the conversation is short and the task is simple. A production system must handle conversations that span hundreds of messages, coordinate multiple agents, recover from failures gracefully, and do all of this without burning through your budget. This section covers the engineering patterns that make that possible.
1. Context Window Mechanics
What Tokens Actually Are
A token is the fundamental unit of text that a language model reads and produces. Tokens are not words and not characters — they are subword pieces created by a tokenizer algorithm (typically Byte Pair Encoding). The word "unbelievable" might be three tokens: "un", "believ", "able". A single space character before a word is usually merged into that word's token. Code tends to tokenize less efficiently than English prose: a line like const result = await fetch(url); might be 8–10 tokens, while a plain English sentence of similar length might be 6–7.
Why does this matter? Because the context window is measured in tokens, and every token you spend on input is a token you cannot spend on output. Claude's context windows range from 200K tokens (standard models) to larger extended contexts. That sounds enormous — until you see how fast it fills up in a production agent.
How the Context Window Fills
Every API call to Claude packs the following into the context window, in this order:
- System prompt — Your instructions, persona, tool definitions, and any injected context. This is always present and always processed first. It has the strongest influence on behaviour because it frames everything that follows.
- Conversation history — Every prior user message and assistant response in the conversation. Each back-and-forth turn adds hundreds to thousands of tokens.
- Tool calls and tool results — When Claude calls a tool (reads a file, runs a search, queries a database), both the tool call and its result are inserted into the conversation. This is where context consumption can explode.
- The current user message — The latest input you are responding to.
Attention Priority
Not all tokens are created equal in terms of influence on the model's output. There is a well-documented primacy and recency effect:
- System prompt (highest influence) — Instructions here are treated as foundational. They persist across the entire conversation.
- Recent messages (high influence) — The last 2–3 turns have the strongest impact on what the model does next.
- Middle of conversation (lower influence) — Messages in the middle of a long conversation can be effectively "forgotten" even though they are technically in context. This is the "lost in the middle" phenomenon.
- Early conversation messages (moderate influence) — The first few user messages retain some influence due to primacy bias.
This means that if you need the model to remember a critical instruction from 50 turns ago, you cannot rely on it being in the conversation history. You must either repeat it in the system prompt, reintroduce it in a recent message, or use an external memory mechanism.
2. Strategies for Large Contexts
When your conversations or documents exceed what fits comfortably in context, you need a strategy. Here are four proven approaches, each with trade-offs.
Strategy A: Summarisation
Compress older messages into a concise summary. Replace the original messages with the summary so the context window reclaims that space. The risk is information loss — the summary might omit a detail that turns out to be important later.
def summarise_old_messages(messages, max_history=10):
"""Keep the last max_history messages verbatim.
Summarise everything older into a single system-injected summary."""
if len(messages) <= max_history:
return messages # Nothing to compress
old_messages = messages[:-max_history]
recent_messages = messages[-max_history:]
# Ask Claude to produce a summary of the old conversation
summary_prompt = (
"Summarise the following conversation history. "
"Preserve: key decisions, file paths mentioned, "
"user preferences stated, and any unresolved questions.\n\n"
)
for msg in old_messages:
summary_prompt += f"[{msg['role']}]: {msg['content']}\n"
summary = call_claude(summary_prompt, max_tokens=500)
# Replace old messages with a single summary message
summary_message = {
"role": "user",
"content": f"[CONVERSATION SUMMARY]\n{summary}"
}
return [summary_message] + recent_messages
Strategy B: Selective Inclusion
Instead of including all history, only include messages that are relevant to the current query. This requires some way of scoring relevance — typically embeddings-based similarity search over your conversation history.
def select_relevant_history(messages, current_query, max_tokens=4000):
"""Score each historical message for relevance to the
current query and include only the top-scoring ones."""
scored = []
for msg in messages:
similarity = compute_embedding_similarity(
embed(current_query), embed(msg["content"])
)
scored.append((similarity, msg))
# Sort by relevance, highest first
scored.sort(key=lambda x: x[0], reverse=True)
selected = []
token_count = 0
for score, msg in scored:
msg_tokens = count_tokens(msg["content"])
if token_count + msg_tokens > max_tokens:
break
selected.append(msg)
token_count += msg_tokens
# Re-sort by original order to maintain chronology
selected.sort(key=lambda m: messages.index(m))
return selected
Strategy C: Chunking with Overlap
When processing a large document (a 50-page report, a massive codebase), split it into chunks that overlap at the boundaries. The overlap ensures that no concept is split across a chunk boundary without context on both sides.
def chunk_document(text, chunk_size=3000, overlap=500):
"""Split a document into overlapping chunks measured in tokens."""
tokens = tokenize(text)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunk_text = detokenize(chunk_tokens)
chunks.append({
"text": chunk_text,
"start_token": start,
"end_token": min(end, len(tokens)),
"chunk_index": len(chunks)
})
# Advance by chunk_size minus overlap
start += chunk_size - overlap
return chunks
# Process each chunk independently, then merge results
results = []
for chunk in chunk_document(large_report):
analysis = call_claude(
f"Analyse this section:\n\n{chunk['text']}",
max_tokens=1000
)
results.append(analysis)
final_synthesis = call_claude(
"Synthesise these partial analyses into a single report:\n\n"
+ "\n---\n".join(results)
)
Strategy D: Hierarchical Summarisation
When even summaries are too long, summarise the summaries. This creates a tree structure: leaf nodes are raw text, intermediate nodes are summaries of sections, and the root is a summary of summaries. Ideal for book-length content or months of conversation logs.
def hierarchical_summarise(texts, max_group_size=5, max_tokens=500):
"""Recursively summarise groups of texts until a single
summary remains."""
if len(texts) == 1:
return texts[0]
# Group texts into batches
groups = []
for i in range(0, len(texts), max_group_size):
groups.append(texts[i:i + max_group_size])
# Summarise each group
summaries = []
for group in groups:
combined = "\n---\n".join(group)
summary = call_claude(
f"Summarise the key points from these texts:\n\n{combined}",
max_tokens=max_tokens
)
summaries.append(summary)
# Recurse until we have a single summary
return hierarchical_summarise(summaries, max_group_size, max_tokens)
3. The Working Memory Pattern
Human experts use notepads, whiteboards, and scratch paper when solving complex problems. They write down intermediate results, cross things off, and refer back to notes. The Working Memory pattern gives an agent the same capability: a scratchpad file that persists outside the context window.
The idea is simple. Give the agent a tool that writes to a file and another that reads from it. When the agent finishes analysing one chunk of work, it writes a summary of findings to the scratchpad. When it starts the next chunk, it reads the scratchpad to recall what it already knows. The context window only ever holds the current scratchpad contents and the current task — not the entire history of how it arrived at those notes.
import json
from pathlib import Path
SCRATCHPAD_PATH = Path("/tmp/agent_scratchpad.json")
def init_scratchpad(task_description):
"""Initialise a fresh scratchpad for a new task."""
scratchpad = {
"task": task_description,
"findings": [],
"decisions": [],
"open_questions": [],
"files_examined": [],
"status": "in_progress"
}
SCRATCHPAD_PATH.write_text(json.dumps(scratchpad, indent=2))
return scratchpad
def read_scratchpad():
"""Read current scratchpad state."""
if not SCRATCHPAD_PATH.exists():
return None
return json.loads(SCRATCHPAD_PATH.read_text())
def append_finding(finding):
"""Record a finding from the current analysis step."""
pad = read_scratchpad()
pad["findings"].append({
"text": finding,
"timestamp": datetime.now().isoformat()
})
SCRATCHPAD_PATH.write_text(json.dumps(pad, indent=2))
def record_decision(decision, reasoning):
"""Record a decision and why it was made."""
pad = read_scratchpad()
pad["decisions"].append({
"decision": decision,
"reasoning": reasoning
})
SCRATCHPAD_PATH.write_text(json.dumps(pad, indent=2))
# --- Agent loop using working memory ---
def agent_with_scratchpad(task, files_to_analyse):
"""An agent that uses a scratchpad to process many files
without overflowing context."""
init_scratchpad(task)
for file_path in files_to_analyse:
# Read current memory state (compact)
memory = read_scratchpad()
memory_summary = (
f"Findings so far: {len(memory['findings'])} items. "
f"Decisions made: {len(memory['decisions'])}. "
f"Files examined: {len(memory['files_examined'])}."
)
# Read the next file
file_content = read_file(file_path)
# Ask Claude to analyse with memory context
analysis = call_claude(
system=f"You are analysing files for: {task}\n"
f"Current memory: {memory_summary}\n"
f"Key findings: {json.dumps(memory['findings'][-5:])}",
user=f"Analyse this file:\n{file_path}\n\n{file_content}"
)
# Update scratchpad with results
append_finding(f"[{file_path}]: {analysis}")
pad = read_scratchpad()
pad["files_examined"].append(file_path)
SCRATCHPAD_PATH.write_text(json.dumps(pad, indent=2))
# Final synthesis using only scratchpad contents
final_memory = read_scratchpad()
return call_claude(
f"Based on all findings, produce a final report:\n"
f"{json.dumps(final_memory, indent=2)}"
)
4. Cross-Session Context
A single agent session ends when the conversation concludes. But many real workflows span multiple sessions: a code review agent that runs daily, a support agent that handles a ticket over several days, or a research agent that picks up where it left off. The challenge is passing state between independent sessions without losing critical information.
What to Preserve vs What to Discard
- Preserve: Decisions made and their rationale. Files created, modified, or deleted. User preferences expressed. Unfinished tasks and their current state. Error patterns encountered. Key facts discovered.
- Discard: Intermediate reasoning chains. Failed approaches that were abandoned. Verbose tool outputs that have been summarised. Exploratory queries that led nowhere. Token-heavy conversation turns that carried no new information.
The Context Handoff Pattern
def end_session_handoff(conversation_messages, scratchpad):
"""Generate a handoff document at end of session."""
handoff = call_claude(
system="You are creating a handoff document for the next "
"agent session. Be concise but complete.",
user=f"""Conversation had {len(conversation_messages)} turns.
Scratchpad state:
{json.dumps(scratchpad, indent=2)}
Create a handoff document with these sections:
1. TASK STATUS: What was the goal? Is it complete?
2. DECISIONS MADE: List every decision and its rationale.
3. FILES CHANGED: Exact paths and what was changed.
4. OPEN ITEMS: What still needs to be done?
5. CONTEXT FOR NEXT SESSION: Anything the next agent must know.
Omit: intermediate reasoning, failed attempts, verbose outputs."""
)
Path("/tmp/session_handoff.md").write_text(handoff)
return handoff
def start_session_with_handoff():
"""Load handoff document into the new session's system prompt."""
handoff_path = Path("/tmp/session_handoff.md")
if handoff_path.exists():
handoff = handoff_path.read_text()
system_prompt = (
"You are continuing a task from a previous session. "
"Here is the handoff document:\n\n"
f"{handoff}\n\n"
"Resume work from where the previous session left off."
)
else:
system_prompt = "You are starting a new task."
return system_prompt
5. Escalation Patterns
A production agent must know its limits. Escalation is the process of recognising when the agent should stop acting autonomously and hand off to a human (or a more capable system). There are four primary triggers for escalation.
Escalation Triggers
- Confidence threshold: The agent assesses its own confidence in the proposed action. If confidence falls below a configured threshold (e.g., 0.7), it escalates rather than proceeding with a likely-wrong answer.
- Complexity trigger: The task has exceeded the allowed step budget. If the agent has taken 15 steps and the budget is 10, something has gone wrong — escalate before burning more tokens.
- Safety trigger: The proposed action could cause irreversible harm. Deleting a production database, sending an email to all customers, deploying to production — these require human approval regardless of confidence.
- Cost trigger: The conversation has consumed more tokens than the budget allows. Rather than silently failing when the context window fills, gracefully escalate with a summary of work done so far.
Decision Flowchart
- 1. Agent proposes an action.
- 2. Is the action on the "always escalate" list (safety-critical)? → YES: Escalate immediately.
- 3. Has the step budget been exceeded? → YES: Summarise progress, escalate.
- 4. Has the token budget been exceeded? → YES: Summarise progress, escalate.
- 5. Is the agent's self-assessed confidence above threshold? → NO: Escalate with explanation of uncertainty.
- 6. All checks pass → Execute the action.
class EscalationFramework:
def __init__(self, config):
self.confidence_threshold = config.get("confidence_threshold", 0.7)
self.max_steps = config.get("max_steps", 20)
self.max_tokens = config.get("max_tokens", 150_000)
self.safety_critical_actions = config.get("safety_critical", [
"delete_database", "send_mass_email", "deploy_production",
"modify_billing", "revoke_access"
])
self.current_step = 0
self.tokens_used = 0
def check_escalation(self, proposed_action, confidence, tokens_this_turn):
"""Returns (should_escalate, reason) tuple."""
self.current_step += 1
self.tokens_used += tokens_this_turn
# Safety check first -- non-negotiable
if proposed_action in self.safety_critical_actions:
return True, EscalationResult(
reason="safety",
message=f"Action '{proposed_action}' requires human approval.",
summary=self._generate_summary(),
urgency="high"
)
# Step budget
if self.current_step > self.max_steps:
return True, EscalationResult(
reason="complexity",
message=f"Exceeded step budget ({self.current_step}/{self.max_steps}).",
summary=self._generate_summary(),
urgency="medium"
)
# Token budget
if self.tokens_used > self.max_tokens:
return True, EscalationResult(
reason="cost",
message=f"Token budget exceeded ({self.tokens_used}/{self.max_tokens}).",
summary=self._generate_summary(),
urgency="medium"
)
# Confidence check
if confidence < self.confidence_threshold:
return True, EscalationResult(
reason="confidence",
message=f"Confidence {confidence:.2f} below threshold "
f"{self.confidence_threshold}.",
summary=self._generate_summary(),
urgency="low"
)
return False, None
def _generate_summary(self):
"""Produce a summary of all work done so far for the human."""
return {
"steps_completed": self.current_step,
"tokens_consumed": self.tokens_used,
"scratchpad": read_scratchpad()
}
6. Distributed Error Handling
When you have multiple agents working together — an orchestrator dispatching subtasks to specialist agents — failure handling becomes significantly more complex than a single try/except block. You must reason about partial failure, cascading failure, idempotency, and compensation.
Partial Failure
Suppose an orchestrator dispatches four subtasks. Three succeed. One fails. What should happen? This depends on the relationship between the subtasks:
- Independent subtasks: Return the three successful results and report the failure. Example: analysing four separate documents — three analyses are still useful even if one failed.
- All-or-nothing subtasks: Roll back the three successful results because the overall task cannot be considered complete. Example: migrating a database schema across four tables that must all be consistent.
- Best-effort subtasks: Return whatever succeeded and mark the task as partially complete. Example: sending notifications to four channels — three out of four is better than zero.
Cascading Failure
When Agent B depends on Agent A's output, and Agent A fails, Agent B will also fail — but with a confusing error because it received malformed or missing input. The solution is dependency-aware dispatching: before launching Agent B, verify that Agent A's output is valid.
Idempotent Operations
An operation is idempotent if running it twice produces the same result as running it once. This is critical for retry logic: if a tool call times out and you retry it, you must be sure the retry does not cause double-execution of side effects (charging a credit card twice, creating duplicate records). Design every tool to be idempotent by default.
Compensation Transactions
When a later step fails and you need to undo an earlier step's side effects, you execute a compensation transaction. This is borrowed from the Saga pattern in distributed systems.
Real-World Example: Order Pipeline
Consider an e-commerce pipeline: Payment → Shipping → Notification. The shipping step fails because the item is out of stock. The payment has already been charged. We need to compensate.
class SagaOrchestrator:
"""Orchestrates a multi-step pipeline with compensation on failure."""
def __init__(self):
self.completed_steps = []
def execute_pipeline(self, order):
steps = [
SagaStep(
name="payment",
execute=lambda: charge_payment(order.amount, order.payment_id),
compensate=lambda: refund_payment(order.payment_id)
),
SagaStep(
name="shipping",
execute=lambda: create_shipment(order.items, order.address),
compensate=lambda: cancel_shipment(order.shipment_id)
),
SagaStep(
name="notification",
execute=lambda: send_confirmation(order.customer_email),
compensate=lambda: send_cancellation_notice(order.customer_email)
),
]
for step in steps:
try:
result = step.execute()
self.completed_steps.append(step)
log(f"Step '{step.name}' succeeded: {result}")
except Exception as e:
log(f"Step '{step.name}' FAILED: {e}")
self._compensate_all()
raise PipelineFailure(
failed_step=step.name,
error=str(e),
compensated_steps=[s.name for s in self.completed_steps]
)
return PipelineSuccess(steps=[s.name for s in self.completed_steps])
def _compensate_all(self):
"""Roll back completed steps in reverse order."""
for step in reversed(self.completed_steps):
try:
step.compensate()
log(f"Compensated step '{step.name}'")
except Exception as comp_error:
# Compensation failure is critical -- alert immediately
alert_ops_team(
f"COMPENSATION FAILED for step '{step.name}': {comp_error}"
)
7. Confidence Calibration
A critical capability for production agents is knowing what they do not know. Confidence calibration means giving the agent a structured way to express how certain it is, and then routing decisions based on that certainty level.
Implementing Confidence Scoring
The simplest approach is to ask the model to include a confidence score in its structured output. This is not the same as a model's internal log-probabilities — it is a self-assessed score based on the model's reasoning about the quality of its own answer.
# System prompt for confidence-aware responses
SYSTEM_PROMPT = """You are a support agent. For every response, output JSON:
{
"answer": "your answer text",
"confidence": 0.0 to 1.0,
"confidence_reasoning": "why you chose this score",
"alternative_interpretations": ["other ways the question could be read"]
}
Confidence guide:
- 0.9-1.0: You have seen this exact scenario, answer is certain.
- 0.7-0.9: High confidence, minor ambiguity possible.
- 0.5-0.7: Moderate confidence, multiple valid answers exist.
- 0.3-0.5: Low confidence, speculating based on limited info.
- 0.0-0.3: Very uncertain, essentially guessing."""
def route_by_confidence(response):
"""Route the agent's response based on confidence level."""
confidence = response["confidence"]
if confidence >= 0.85:
# Auto-execute: send directly to the user
return send_to_user(response["answer"])
elif confidence >= 0.6:
# Review queue: flag for human review before sending
return enqueue_for_review(
response["answer"],
response["confidence_reasoning"],
response["alternative_interpretations"]
)
else:
# Human required: route to a human agent
return escalate_to_human(
original_query=current_query,
agent_analysis=response,
reason="Low confidence score"
)
Why Probabilities Are Not Confidence
Language models produce probability distributions over the next token. These probabilities reflect the model's prediction of what text is likely to follow, not how correct that text is. A model can produce a factually wrong answer with very high token probability because the wrong answer is a common pattern in its training data. Conversely, a correct but unusual answer might have low token probability. Self-assessed confidence, while imperfect, captures something closer to "how sure am I that this is right" rather than "how predictable is this text."
To improve calibration over time, log every confidence score alongside human-verified correctness. Analyse the correlation: when the agent says 0.9, is it actually right 90% of the time? Adjust your routing thresholds based on real data, not intuition.
8. Prompt Caching
Prompt caching is an API-level optimisation that avoids reprocessing the same prefix of tokens on every request. If your system prompt is 4,000 tokens and you send 100 requests with the same system prompt, without caching the API processes those 4,000 tokens 100 times. With caching, it processes them once and reuses the cached computation for the remaining 99 requests.
How It Works
- You mark certain content blocks in your messages with
cache_control: {"type": "ephemeral"}. - The API caches the processed representation of all tokens up to and including that block.
- On subsequent requests, if the prefix up to the cache breakpoint is identical, the cached version is used.
- Cached input tokens are billed at a significant discount (typically 90% cheaper) compared to uncached tokens.
- The cache has a time-to-live (TTL), typically 5 minutes, refreshed each time it is used.
When to Use Prompt Caching
- Repeated system prompts: Every agent in your system that uses the same system prompt benefits.
- Shared few-shot examples: If you include 10 example input/output pairs in every request, cache them.
- Large reference documents: If every request includes the same product catalogue or policy document, cache it.
- Multi-turn conversations: Cache the conversation history prefix that does not change between turns.
import anthropic
client = anthropic.Anthropic()
# The system prompt and few-shot examples are cached
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=[
{
"type": "text",
"text": "You are a customer support agent for Acme Corp...",
},
{
"type": "text",
"text": LARGE_FEW_SHOT_EXAMPLES, # 3,000 tokens of examples
"cache_control": {"type": "ephemeral"} # Cache breakpoint
}
],
messages=[
{"role": "user", "content": "How do I reset my password?"}
]
)
# Check cache performance in the response
usage = response.usage
print(f"Cache read tokens: {usage.cache_read_input_tokens}")
print(f"Cache creation tokens: {usage.cache_creation_input_tokens}")
print(f"Uncached input tokens: {usage.input_tokens}")
Limitations
Caching only works for exact prefix matches. If you change even one token in the cached prefix, the cache is invalidated. This means dynamic content (user names, timestamps) should come after the cache breakpoint, not before it. There is also a minimum cacheable length — very short prefixes may not be eligible for caching. Plan your message structure so that static content comes first and dynamic content comes last.
9. Cost Management
An uncontrolled agent can burn through API credits astonishingly fast. A single agentic loop that reads files, reasons about them, and calls tools might consume 500K tokens in a 15-minute session. At production scale with hundreds of concurrent users, costs can spiral. Cost management is not an afterthought — it is a core architectural concern.
Token Budgets
class TokenBudget:
"""Enforce per-conversation and per-user token budgets."""
def __init__(self, max_per_conversation=200_000, max_per_user_daily=1_000_000):
self.max_per_conversation = max_per_conversation
self.max_per_user_daily = max_per_user_daily
self.conversation_usage = 0
self.user_daily_usage = {}
def check_budget(self, user_id, estimated_tokens):
"""Check if the next API call is within budget."""
# Conversation-level check
if self.conversation_usage + estimated_tokens > self.max_per_conversation:
return BudgetResult(
allowed=False,
reason="conversation_limit",
remaining=self.max_per_conversation - self.conversation_usage
)
# User daily check
daily = self.user_daily_usage.get(user_id, 0)
if daily + estimated_tokens > self.max_per_user_daily:
return BudgetResult(
allowed=False,
reason="daily_user_limit",
remaining=self.max_per_user_daily - daily
)
return BudgetResult(allowed=True, remaining=None)
def record_usage(self, user_id, input_tokens, output_tokens):
total = input_tokens + output_tokens
self.conversation_usage += total
self.user_daily_usage[user_id] = (
self.user_daily_usage.get(user_id, 0) + total
)
Graceful Degradation
When budget is running low, do not simply stop responding. Degrade gracefully:
- Tier 1 (full budget): Use the most capable model (e.g., Claude Opus). Allow extended tool use. Full agentic loops.
- Tier 2 (75% consumed): Switch to a mid-tier model (e.g., Claude Sonnet). Limit tool calls to 5 per turn. Reduce max output tokens.
- Tier 3 (90% consumed): Switch to the fastest model (e.g., Claude Haiku). No tool use. Direct answers only. Warn the user that reduced capability is in effect.
- Tier 4 (budget exhausted): Return a polite message explaining the budget is exceeded. Provide a summary of work completed. Offer escalation to a human.
Max Turns per Agent
Every agentic loop should have a hard limit on the number of turns (tool-call cycles) it can execute. Without this, a confused agent can loop indefinitely, burning tokens while making no progress. A typical limit is 10–25 turns. When the limit is reached, the agent must produce a final answer with whatever it has, not simply stop.
10. Monitoring & Observability
You cannot improve what you cannot measure. In production, you need visibility into every agent decision, every tool call, every failure, and every cost. This is not logging — it is structured observability that enables debugging, performance analysis, and quality improvement.
Structured Logging
Every agent interaction should produce a structured log entry, not a plain text message. Structured logs can be queried, aggregated, and visualised.
import structlog
import time
logger = structlog.get_logger()
def log_agent_turn(turn_number, messages, response, tools_called, usage):
"""Log a complete agent turn with all relevant metadata."""
logger.info(
"agent_turn",
turn=turn_number,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cache_read_tokens=getattr(usage, "cache_read_input_tokens", 0),
tools_called=[t["name"] for t in tools_called],
tool_count=len(tools_called),
model=response.model,
stop_reason=response.stop_reason,
latency_ms=response.latency_ms,
confidence=extract_confidence(response),
user_id=get_current_user_id(),
session_id=get_session_id(),
)
Distributed Tracing
In a multi-agent system, a single user request might flow through an orchestrator, three specialist agents, and five tool calls. A trace ID ties all of these together so you can reconstruct the full execution path after the fact.
import uuid
class AgentTracer:
"""Trace a request through multiple agents and tool calls."""
def __init__(self):
self.trace_id = str(uuid.uuid4())
self.spans = []
def start_span(self, name, parent_span_id=None):
span = {
"span_id": str(uuid.uuid4()),
"trace_id": self.trace_id,
"parent_span_id": parent_span_id,
"name": name,
"start_time": time.time(),
"end_time": None,
"metadata": {}
}
self.spans.append(span)
return span
def end_span(self, span, metadata=None):
span["end_time"] = time.time()
span["duration_ms"] = (span["end_time"] - span["start_time"]) * 1000
if metadata:
span["metadata"].update(metadata)
def get_trace_summary(self):
"""Produce a human-readable trace summary."""
total_duration = sum(
s.get("duration_ms", 0) for s in self.spans
)
return {
"trace_id": self.trace_id,
"total_spans": len(self.spans),
"total_duration_ms": total_duration,
"spans": self.spans
}
# Usage in a multi-agent pipeline
tracer = AgentTracer()
# Orchestrator span
orch_span = tracer.start_span("orchestrator")
# Sub-agent spans
research_span = tracer.start_span("research_agent", orch_span["span_id"])
result = run_research_agent(query)
tracer.end_span(research_span, {"tokens_used": result.tokens})
writing_span = tracer.start_span("writing_agent", orch_span["span_id"])
draft = run_writing_agent(result.findings)
tracer.end_span(writing_span, {"tokens_used": draft.tokens})
tracer.end_span(orch_span, {"total_tokens": result.tokens + draft.tokens})
print(tracer.get_trace_summary())
Automated Quality Evaluation
Use a separate "evaluator" model (or the same model with a different prompt) to score the quality of agent outputs. This creates a feedback loop: the agent produces an answer, the evaluator scores it, and you track scores over time to detect quality regressions.
def evaluate_response(query, response, criteria):
"""Use Claude as an automated evaluator."""
eval_prompt = f"""Score the following agent response on a 1-5 scale
for each criterion. Return JSON.
User query: {query}
Agent response: {response}
Criteria:
- relevance: Does it answer the actual question?
- accuracy: Are the facts correct?
- completeness: Does it cover all aspects?
- conciseness: Is it appropriately brief?
- safety: Does it avoid harmful content?
Return: {{"relevance": N, "accuracy": N, "completeness": N,
"conciseness": N, "safety": N, "overall": N,
"explanation": "brief rationale"}}"""
evaluation = call_claude(eval_prompt, model="claude-haiku-4-20250414")
scores = json.loads(evaluation)
# Log and alert
logger.info("quality_eval", query=query, scores=scores)
if scores["overall"] < 3:
alert("Low quality response detected", scores=scores, query=query)
return scores
Anomaly Detection and Alerting
Track key metrics over rolling windows and alert when they deviate from the baseline:
- Error rate: Percentage of agent turns that result in errors. A spike from 2% to 15% signals a systemic problem (API outage, broken tool, bad prompt update).
- Average confidence: A sudden drop in mean confidence across all conversations may indicate that a prompt change introduced ambiguity or that the model is encountering unfamiliar queries.
- Token consumption per turn: A sudden increase suggests the agent is reading larger files, making more tool calls, or stuck in a loop.
- Latency per turn: Increased latency can indicate API throttling, network issues, or excessively long prompts.
- Escalation rate: A rising escalation rate means either the agent is becoming less capable (bad) or it is correctly identifying harder queries (possibly fine).
class MetricsMonitor:
"""Track rolling metrics and alert on anomalies."""
def __init__(self, window_size=100):
self.window_size = window_size
self.error_history = deque(maxlen=window_size)
self.confidence_history = deque(maxlen=window_size)
self.token_history = deque(maxlen=window_size)
self.baselines = {}
def record(self, is_error, confidence, tokens_used):
self.error_history.append(1 if is_error else 0)
self.confidence_history.append(confidence)
self.token_history.append(tokens_used)
self._check_anomalies()
def _check_anomalies(self):
if len(self.error_history) < self.window_size:
return # Not enough data yet
error_rate = sum(self.error_history) / len(self.error_history)
avg_confidence = sum(self.confidence_history) / len(self.confidence_history)
avg_tokens = sum(self.token_history) / len(self.token_history)
if error_rate > 0.10:
alert(f"High error rate: {error_rate:.1%}")
if avg_confidence < 0.5:
alert(f"Low avg confidence: {avg_confidence:.2f}")
if avg_tokens > 50_000:
alert(f"High token usage: {avg_tokens:.0f} avg per turn")
Putting It All Together
Context management and reliability are not independent concerns — they interlock. Poor context management leads to confused agents, which leads to low confidence scores, which triggers escalation, which costs human time. Uncontrolled costs lead to budget exhaustion, which causes abrupt failures. Missing observability means you discover problems only when users complain.
The production-ready agent system combines all of these patterns: it manages its context window with summarisation and working memory, hands off state between sessions cleanly, knows when to escalate, handles distributed failures with compensation, monitors its own performance, and stays within budget. Each of these patterns is individually simple. The engineering challenge is composing them into a coherent whole — and that is what separates a demo from a product.