Coordination
Script from multi-agent-patterns
Source Code
metadata = {
"id": "code:multi.agentpatterns.coordination",
"name": "Coordination",
"description": "Script from multi-agent-patterns",
"language": "python",
"packages": [],
"args": []
}
"""
Multi-Agent Coordination
This module provides utilities for implementing multi-agent coordination patterns.
"""
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import time
import uuid
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
HANDOVER = "handover"
FEEDBACK = "feedback"
ALERT = "alert"
@dataclass
class AgentMessage:
"""Message exchanged between agents."""
sender: str
receiver: str
message_type: MessageType
content: Dict[str, Any]
timestamp: float = field(default_factory=time.time)
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
requires_response: bool = False
priority: int = 0 # 0 = normal, higher = more urgent
class AgentCommunication:
"""Communication channel for multi-agent systems."""
def __init__(self):
self.inbox: Dict[str, List[AgentMessage]] = {}
self.outbox: List[AgentMessage] = []
self.message_history: List[AgentMessage] = []
def send(self, message: AgentMessage):
"""Send a message to an agent."""
if message.receiver not in self.inbox:
self.inbox[message.receiver] = []
self.inbox[message.receiver].append(message)
self.outbox.append(message)
self.message_history.append(message)
def receive(self, agent_id: str) -> List[AgentMessage]:
"""Receive all messages for an agent."""
messages = self.inbox.get(agent_id, [])
self.inbox[agent_id] = [] # Clear inbox after receiving
return messages
def broadcast(self, sender: str, message_type: MessageType,
content: Dict[str, Any], receivers: List[str]):
"""Broadcast message to multiple agents."""
for receiver in receivers:
self.send(AgentMessage(
sender=sender,
receiver=receiver,
message_type=message_type,
content=content
))
# Supervisor Pattern Implementation
class SupervisorAgent:
"""
Central supervisor agent that coordinates worker agents.
"""
def __init__(self, name: str, communication: AgentCommunication):
self.name = name
self.communication = communication
self.workers: Dict[str, Dict] = {}
self.task_queue: List[Dict] = []
self.completed_tasks: List[Dict] = []
self.current_state: Dict = {}
def register_worker(self, worker_id: str, capabilities: List[str]):
"""Register a worker agent with the supervisor."""
self.workers[worker_id] = {
"capabilities": capabilities,
"status": "available",
"current_task": None,
"metrics": {"tasks_completed": 0, "avg_response_time": 0}
}
def decompose_task(self, task: Dict) -> List[Dict]:
"""
Decompose a task into subtasks.
In production, this would use task analysis and planning.
"""
subtasks = []
# Simple decomposition based on task type
task_type = task.get("type", "general")
if task_type == "research":
subtasks = [
{"type": "search", "description": "Gather information"},
{"type": "analyze", "description": "Analyze findings"},
{"type": "synthesize", "description": "Synthesize results"}
]
elif task_type == "create":
subtasks = [
{"type": "plan", "description": "Create plan"},
{"type": "draft", "description": "Draft content"},
{"type": "review", "description": "Review and refine"}
]
else:
subtasks = [
{"type": "execute", "description": task.get("description", "Execute task")}
]
# Add parent task info
for subtask in subtasks:
subtask["parent_task"] = task.get("id")
subtask["priority"] = task.get("priority", 0)
return subtasks
def assign_task(self, subtask: Dict, worker_id: str):
"""Assign a subtask to a worker agent."""
if worker_id not in self.workers:
raise ValueError(f"Unknown worker: {worker_id}")
self.workers[worker_id]["status"] = "busy"
self.workers[worker_id]["current_task"] = subtask["id"]
self.send(AgentMessage(
sender=self.name,
receiver=worker_id,
message_type=MessageType.REQUEST,
content={
"action": "execute_task",
"task": subtask
},
requires_response=True,
priority=subtask.get("priority", 0)
))
def select_worker(self, subtask: Dict) -> str:
"""Select the best worker for a subtask."""
required_capability = subtask.get("type", "general")
# Find available workers with required capability
candidates = [
wid for wid, info in self.workers.items()
if info["status"] == "available"
and required_capability in info["capabilities"]
]
if not candidates:
# Fall back to any available worker
candidates = [
wid for wid, info in self.workers.items()
if info["status"] == "available"
]
if not candidates:
raise ValueError("No available workers")
# Select based on metrics (fewest tasks completed = most available)
return min(candidates, key=lambda w: self.workers[w]["metrics"]["tasks_completed"])
def aggregate_results(self, subtask_results: List[Dict]) -> Dict:
"""Aggregate results from subtasks."""
aggregated = {
"results": subtask_results,
"summary": "",
"quality_score": 0.0
}
# Generate summary from results
summaries = [r.get("summary", "") for r in subtask_results if r.get("success")]
aggregated["summary"] = " | ".join(summaries)
# Calculate quality score
successful = sum(1 for r in subtask_results if r.get("success", False))
aggregated["quality_score"] = successful / len(subtask_results) if subtask_results else 0
return aggregated
def run_workflow(self, task: Dict) -> Dict:
"""Execute a complete workflow with supervision."""
# Decompose task
subtasks = self.decompose_task(task)
# Assign subtasks
results = []
for subtask in subtasks:
worker = self.select_worker(subtask)
self.assign_task(subtask, worker)
# Wait for result
messages = self.receive(self.name)
for msg in messages:
if msg.message_type == MessageType.RESPONSE:
results.append(msg.content)
# Aggregate results
final_result = self.aggregate_results(results)
return {
"task": task,
"subtask_results": results,
"final_result": final_result,
"success": final_result["quality_score"] >= 0.8
}
def send(self, message: AgentMessage):
"""Send message through communication channel."""
self.communication.send(message)
# Handoff Protocol
class HandoffProtocol:
"""
Protocol for agent-to-agent handoffs.
"""
def __init__(self, communication: AgentCommunication):
self.communication = communication
def create_handoff(self, from_agent: str, to_agent: str,
context: Dict, reason: str) -> AgentMessage:
"""Create a handoff message."""
return AgentMessage(
sender=from_agent,
receiver=to_agent,
message_type=MessageType.HANDOVER,
content={
"handoff_reason": reason,
"transferred_context": context,
"handoff_timestamp": time.time()
},
priority=1
)
def accept_handoff(self, agent_id: str) -> Optional[AgentMessage]:
"""Accept pending handoff for an agent."""
messages = self.communication.receive(agent_id)
for msg in messages:
if msg.message_type == MessageType.HANDOVER:
return msg
return None
def transfer_with_state(self, from_agent: str, to_agent: str,
state: Dict, task: Dict) -> bool:
"""
Transfer task state from one agent to another.
Returns success status.
"""
handoff = self.create_handoff(
from_agent=from_agent,
to_agent=to_agent,
context={
"task_state": state,
"task_details": task,
"progress": state.get("progress", 0)
},
reason="task_transfer"
)
self.communication.send(handoff)
# Wait for acknowledgment
time.sleep(0.1) # In production, use async with timeout
ack = self.communication.receive(from_agent)
return any(
m.message_type == MessageType.RESPONSE and
m.content.get("status") == "handoff_received"
for m in ack
)
# Consensus Mechanism
class ConsensusManager:
"""
Manager for multi-agent consensus building.
"""
def __init__(self):
self.votes: Dict[str, List[Dict]] = {}
self.debates: Dict[str, List[Dict]] = {}
def initiate_vote(self, topic_id: str, agents: List[str],
options: List[str]):
"""Initiate a voting round on a topic."""
self.votes[topic_id] = []
# Request votes from agents
for agent in agents:
vote_request = {
"agent": agent,
"topic": topic_id,
"options": options,
"status": "pending"
}
self.votes[topic_id].append(vote_request)
def submit_vote(self, topic_id: str, agent_id: str,
selection: str, confidence: float):
"""Submit a vote for a topic."""
if topic_id not in self.votes:
raise ValueError(f"Unknown topic: {topic_id}")
vote_record = {
"agent": agent_id,
"selection": selection,
"confidence": confidence,
"timestamp": time.time()
}
for vote in self.votes[topic_id]:
if vote["agent"] == agent_id:
vote["status"] = "cast"
vote["selection"] = selection
vote["confidence"] = confidence
break
def calculate_weighted_consensus(self, topic_id: str) -> Dict:
"""
votes.
Weight = confidence * expertise_factor
Calculate weighted consensus from """
if topic_id not in self.votes:
raise ValueError(f"Unknown topic: {topic_id}")
votes = [v for v in self.votes[topic_id] if v.get("status") == "cast"]
if not votes:
return {"status": "no_votes", "result": None}
# Group by selection
selections: Dict[str, List[Dict]] = {}
for vote in votes:
selection = vote["selection"]
if selection not in selections:
selections[selection] = []
selections[selection].append(vote)
# Calculate weighted score for each selection
results = {}
for selection, selection_votes in selections.items():
weighted_sum = sum(v["confidence"] for v in selection_votes)
avg_confidence = weighted_sum / len(selection_votes) if selection_votes else 0
results[selection] = {
"weighted_score": weighted_sum,
"avg_confidence": avg_confidence,
"vote_count": len(selection_votes)
}
# Select winner
winner = max(results.keys(), key=lambda s: results[s]["weighted_score"])
return {
"status": "complete",
"result": winner,
"details": results,
"consensus_strength": results[winner]["weighted_score"] / len(votes) if votes else 0
}
# Failure Handling
class AgentFailureHandler:
"""
Handler for agent failures in multi-agent systems.
"""
def __init__(self, communication: AgentCommunication,
max_retries: int = 3):
self.communication = communication
self.max_retries = max_retries
self.failure_counts: Dict[str, int] = {}
self.circuit_breakers: Dict[str, float] = {} # agent -> unlock time
def handle_failure(self, agent_id: str, task_id: str,
error: str) -> Dict:
"""
Handle a failure from an agent.
Returns action to take.
"""
# Increment failure count
self.failure_counts[agent_id] = self.failure_counts.get(agent_id, 0) + 1
# Check if circuit breaker should activate
if self.failure_counts[agent_id] >= self.max_retries:
self._activate_circuit_breaker(agent_id)
return {
"action": "reroute",
"reason": "circuit_breaker_activated",
"alternative": self._find_alternative_agent(agent_id)
}
return {
"action": "retry",
"reason": error,
"retry_count": self.failure_counts[agent_id],
"delay": min(2 ** self.failure_counts[agent_id], 60) # Exponential backoff
}
def _activate_circuit_breaker(self, agent_id: str):
"""Temporarily disable an agent."""
self.circuit_breakers[agent_id] = time.time() + 60 # 1 minute cooldown
def _find_alternative_agent(self, failed_agent: str) -> str:
"""Find an alternative agent to handle the task."""
# In production, this would check agent capabilities and availability
return "default_backup_agent"
def is_available(self, agent_id: str) -> bool:
"""Check if an agent is available (circuit breaker not active)."""
if agent_id in self.circuit_breakers:
if time.time() < self.circuit_breakers[agent_id]:
return False
# Reset after cooldown
del self.circuit_breakers[agent_id]
self.failure_counts[agent_id] = 0
return True
def record_success(self, agent_id: str):
"""Record a successful task completion."""
self.failure_counts[agent_id] = 0