Context Degradation Patterns: Technical Reference
This document provides technical details on diagnosing and measuring context degradation.
Attention Distribution Analysis
U-Shaped Curve Measurement
Measure attention distribution across context positions:
def measure_attention_distribution(model, context_tokens, query):
"""
Measure how attention varies across context positions.
Returns distribution showing attention weight by position.
"""
attention_by_position = []
for position in range(len(context_tokens)):
# Measure model's attention to this position
attention = get_attention_weights(model, context_tokens, query, position)
attention_by_position.append({
"position": position,
"attention": attention,
"is_beginning": position < len(context_tokens) * 0.1,
"is_end": position > len(context_tokens) * 0.9,
"is_middle": True # Will be overwritten
})
# Classify positions
for item in attention_by_position:
if item["is_beginning"] or item["is_end"]:
item["region"] = "attention_favored"
else:
item["region"] = "attention_degraded"
return attention_by_positionLost-in-Middle Detection
Detect when critical information falls in degraded attention regions:
def detect_lost_in_middle(critical_positions, attention_distribution):
"""
Check if critical information is in attention-favored positions.
Args:
critical_positions: List of positions containing critical info
attention_distribution: Output from measure_attention_distribution
Returns:
Dictionary with detection results and recommendations
"""
results = {
"at_risk": [],
"safe": [],
"recommendations": []
}
for pos in critical_positions:
region = attention_distribution[pos]["region"]
if region == "attention_degraded":
results["at_risk"].append(pos)
else:
results["safe"].append(pos)
# Generate recommendations
if results["at_risk"]:
results["recommendations"].extend([
"Move critical information to attention-favored positions",
"Use explicit markers to highlight critical information",
"Consider splitting context to reduce middle section"
])
return resultsContext Poisoning Detection
Hallucination Tracking
Track potential hallucinations across conversation turns:
class HallucinationTracker:
def __init__(self):
self.claims = []
self.verifications = []
def add_claims(self, text):
"""Extract claims from text for later verification."""
claims = extract_claims(text)
self.claims.extend([{"text": c, "verified": None} for c in claims])
def verify_claims(self, ground_truth):
"""Verify claims against ground truth."""
for claim in self.claims:
if claim["verified"] is None:
claim["verified"] = check_claim(claim["text"], ground_truth)
def get_poisoning_indicators(self):
"""
Return indicators of potential context poisoning.
High ratio of unverified claims suggests poisoning risk.
"""
unverified = sum(1 for c in self.claims if not c["verified"])
verified_false = sum(1 for c in self.claims if c["verified"] == False)
return {
"unverified_count": unverified,
"false_count": verified_false,
"poisoning_risk": verified_false > 0 or unverified > len(self.claims) * 0.3
}Error Propagation Analysis
Track how errors flow through context:
def analyze_error_propagation(context, error_points):
"""
Analyze how errors at specific points affect downstream context.
Returns visualization of error spread and impact assessment.
"""
impact_map = {}
for error_point in error_points:
# Find all references to content after error point
downstream_refs = find_references(context, after=error_point)
for ref in downstream_refs:
if ref not in impact_map:
impact_map[ref] = []
impact_map[ref].append({
"source": error_point,
"type": classify_error_type(context[error_point])
})
# Assess severity
high_impact_areas = [k for k, v in impact_map.items() if len(v) > 3]
return {
"impact_map": impact_map,
"high_impact_areas": high_impact_areas,
"requires_intervention": len(high_impact_areas) > 0
}Distraction Metrics
Relevance Scoring
Score relevance of context elements to current task:
def score_context_relevance(context_elements, task_description):
"""
Score each context element for relevance to current task.
Returns scores and identifies high-distraction elements.
"""
task_embedding = embed(task_description)
scored_elements = []
for i, element in enumerate(context_elements):
element_embedding = embed(element)
relevance = cosine_similarity(task_embedding, element_embedding)
scored_elements.append({
"index": i,
"content_preview": element[:100],
"relevance_score": relevance
})
# Sort by relevance
scored_elements.sort(key=lambda x: x["relevance_score"], reverse=True)
# Identify potential distractors
threshold = calculate_relevance_threshold(scored_elements)
distractors = [e for e in scored_elements if e["relevance_score"] < threshold]
return {
"scored_elements": scored_elements,
"distractors": distractors,
"recommendation": f"Consider removing {len(distractors)} low-relevance elements"
}Degradation Monitoring System
Context Health Dashboard
Implement continuous monitoring of context health:
class ContextHealthMonitor:
def __init__(self, model, context_window_limit):
self.model = model
self.limit = context_window_limit
self.metrics = []
def assess_health(self, context, task):
"""
Assess overall context health for current task.
Returns composite score and component metrics.
"""
metrics = {
"token_count": len(context),
"utilization_ratio": len(context) / self.limit,
"attention_distribution": measure_attention_distribution(self.model, context, task),
"relevance_scores": score_context_relevance(context, task),
"age_tokens": count_recent_tokens(context)
}
# Calculate composite health score
health_score = self._calculate_composite(metrics)
result = {
"health_score": health_score,
"metrics": metrics,
"status": self._interpret_score(health_score),
"recommendations": self._generate_recommendations(metrics)
}
self.metrics.append(result)
return result
def _calculate_composite(self, metrics):
"""Calculate composite health score from components."""
# Weighted combination of metrics
utilization_penalty = min(metrics["utilization_ratio"] * 0.5, 0.3)
attention_penalty = self._calculate_attention_penalty(metrics["attention_distribution"])
relevance_penalty = self._calculate_relevance_penalty(metrics["relevance_scores"])
base_score = 1.0
score = base_score - utilization_penalty - attention_penalty - relevance_penalty
return max(0, score)
def _interpret_score(self, score):
"""Interpret health score and return status."""
if score > 0.8:
return "healthy"
elif score > 0.6:
return "warning"
elif score > 0.4:
return "degraded"
else:
return "critical"Alert Thresholds
Configure appropriate alert thresholds:
CONTEXT_ALERTS = {
"utilization_warning": 0.7, # 70% of context limit
"utilization_critical": 0.9, # 90% of context limit
"attention_degraded_ratio": 0.3, # 30% in middle region
"relevance_threshold": 0.3, # Below 30% relevance
"consecutive_warnings": 3 # Three warnings triggers alert
}Recovery Procedures
Context Truncation Strategy
When context degrades beyond recovery, truncate strategically:
def truncate_context_for_recovery(context, preserved_elements, target_size):
"""
Truncate context while preserving critical elements.
Strategy:
1. Preserve system prompt and tool definitions
2. Preserve recent conversation turns
3. Preserve critical retrieved documents
4. Summarize older content if needed
5. Truncate from middle if still over target
"""
truncated = []
# Category 1: Critical system elements (preserve always)
system_elements = extract_system_elements(context)
truncated.extend(system_elements)
# Category 2: Recent conversation (preserve more)
recent_turns = extract_recent_turns(context, num_turns=10)
truncated.extend(recent_turns)
# Category 3: Critical documents (preserve key ones)
critical_docs = extract_critical_documents(context, preserved_elements)
truncated.extend(critical_docs)
# Check size and summarize if needed
while len(truncated) > target_size:
# Summarize oldest category 3 elements
truncated = summarize_oldest(truncated, category="documents")
# If still too large, truncate oldest turns
if len(truncated) > target_size:
truncated = truncate_oldest_turns(truncated, keep_recent=5)
return truncated