Memory Systems: Technical Reference
This document provides implementation details for memory system components.
Vector Store Implementation
Basic Vector Store
import numpy as np
from typing import List, Dict, Any
import json
class VectorStore:
def __init__(self, dimension=768):
self.dimension = dimension
self.vectors = []
self.metadata = []
self.texts = []
def add(self, text: str, metadata: Dict[str, Any] = None):
"""Add document to store."""
embedding = self._embed(text)
self.vectors.append(embedding)
self.metadata.append(metadata or {})
self.texts.append(text)
return len(self.vectors) - 1
def search(self, query: str, limit: int = 5,
filters: Dict[str, Any] = None) -> List[Dict]:
"""Search for similar documents."""
query_embedding = self._embed(query)
scores = []
for i, vec in enumerate(self.vectors):
score = cosine_similarity(query_embedding, vec)
# Apply filters
if filters and not self._matches_filters(self.metadata[i], filters):
score = -1 # Exclude
scores.append((i, score))
# Sort by score
scores.sort(key=lambda x: x[1], reverse=True)
# Return top k
results = []
for idx, score in scores[:limit]:
if score > 0: # Only include positive matches
results.append({
"index": idx,
"score": score,
"text": self._get_text(idx),
"metadata": self.metadata[idx]
})
return results
def _embed(self, text: str) -> np.ndarray:
"""Generate embedding for text."""
# In production, use actual embedding model
return np.random.randn(self.dimension)
def _matches_filters(self, metadata: Dict, filters: Dict) -> bool:
"""Check if metadata matches filters."""
for key, value in filters.items():
if key not in metadata:
return False
if isinstance(value, list):
if metadata[key] not in value:
return False
elif metadata[key] != value:
return False
return True
def _get_text(self, index: int) -> str:
"""Retrieve original text for index."""
return self.texts[index] if index < len(self.texts) else ""Metadata-Enhanced Vector Store
class MetadataVectorStore(VectorStore):
def __init__(self, dimension=768):
super().__init__(dimension)
self.entity_index = {} # entity -> [indices]
self.time_index = {} # time_range -> [indices]
def add(self, text: str, metadata: Dict[str, Any] = None):
"""Add with enhanced indexing."""
index = super().add(text, metadata)
# Index by entity
if "entity" in metadata:
entity = metadata["entity"]
if entity not in self.entity_index:
self.entity_index[entity] = []
self.entity_index[entity].append(index)
# Index by time
if "valid_from" in metadata:
time_key = self._time_range_key(
metadata.get("valid_from"),
metadata.get("valid_until")
)
if time_key not in self.time_index:
self.time_index[time_key] = []
self.time_index[time_key].append(index)
return index
def search_by_entity(self, query: str, entity: str, limit: int = 5) -> List[Dict]:
"""Search within specific entity."""
indices = self.entity_index.get(entity, [])
filtered = [self.metadata[i] for i in indices]
# Score and rank
query_embedding = self._embed(query)
scored = []
for i, meta in zip(indices, filtered):
vec = self.vectors[i]
score = cosine_similarity(query_embedding, vec)
scored.append((i, score, meta))
scored.sort(key=lambda x: x[1], reverse=True)
return [{
"index": idx,
"score": score,
"metadata": meta
} for idx, score, meta in scored[:limit]]Knowledge Graph Implementation
Property Graph Storage
from typing import Dict, List, Optional
import uuid
class PropertyGraph:
def __init__(self):
self.nodes = {} # id -> properties
self.edges = [] # list of edge dicts
self.indexes = {
"node_label": {}, # label -> [node_ids]
"edge_type": {} # type -> [edge_ids]
}
def create_node(self, label: str, properties: Dict = None) -> str:
"""Create node with label and properties."""
node_id = str(uuid.uuid4())
self.nodes[node_id] = {
"label": label,
"properties": properties or {}
}
# Index by label
if label not in self.indexes["node_label"]:
self.indexes["node_label"][label] = []
self.indexes["node_label"][label].append(node_id)
return node_id
def create_relationship(self, source_id: str, rel_type: str,
target_id: str, properties: Dict = None) -> str:
"""Create directed relationship between nodes."""
edge_id = str(uuid.uuid4())
self.edges.append({
"id": edge_id,
"source": source_id,
"target": target_id,
"type": rel_type,
"properties": properties or {}
})
# Index by type
if rel_type not in self.indexes["edge_type"]:
self.indexes["edge_type"][rel_type] = []
self.indexes["edge_type"][rel_type].append(edge_id)
return edge_id
def query(self, cypher_like: str, params: Dict = None) -> List[Dict]:
"""
Simple query matching.
Supports patterns like:
MATCH (e)-[r]->(o) WHERE e.id = $id RETURN r
"""
# In production, use actual graph database
# This is a simplified pattern matcher
results = []
if cypher_like.startswith("MATCH"):
# Parse basic pattern
pattern = self._parse_pattern(cypher_like)
results = self._match_pattern(pattern, params or {})
return results
def _parse_pattern(self, query: str) -> Dict:
"""Parse simplified MATCH pattern."""
# Simplified parser for demonstration
return {
"source_label": self._extract_label(query, "source"),
"rel_type": self._extract_type(query),
"target_label": self._extract_label(query, "target"),
"where": self._extract_where(query)
}
def _match_pattern(self, pattern: Dict, params: Dict) -> List[Dict]:
"""Match pattern against graph."""
results = []
for edge in self.edges:
# Match relationship type
if pattern["rel_type"] and edge["type"] != pattern["rel_type"]:
continue
source = self.nodes.get(edge["source"], {})
target = self.nodes.get(edge["target"], {})
# Match labels
if pattern["source_label"] and source.get("label") != pattern["source_label"]:
continue
if pattern["target_label"] and target.get("label") != pattern["target_label"]:
continue
# Match where clause
if pattern["where"] and not self._match_where(edge, source, target, params):
continue
results.append({
"source": source,
"relationship": edge,
"target": target
})
return resultsTemporal Knowledge Graph
from datetime import datetime
from typing import Optional
class TemporalKnowledgeGraph(PropertyGraph):
def __init__(self):
super().__init__()
self.temporal_index = {} # time_range -> [edge_ids]
def create_temporal_relationship(
self,
source_id: str,
rel_type: str,
target_id: str,
valid_from: datetime,
valid_until: Optional[datetime] = None,
properties: Dict = None
) -> str:
"""Create relationship with temporal validity."""
edge_id = super().create_relationship(
source_id, rel_type, target_id, properties
)
# Index temporally
time_key = self._time_range_key(valid_from, valid_until)
if time_key not in self.temporal_index:
self.temporal_index[time_key] = []
self.temporal_index[time_key].append(edge_id)
# Store validity on edge
edge = self._get_edge(edge_id)
edge["valid_from"] = valid_from.isoformat()
edge["valid_until"] = valid_until.isoformat() if valid_until else None
return edge_id
def query_at_time(self, query: str, query_time: datetime) -> List[Dict]:
"""Query graph state at specific time."""
# Find edges valid at query time
valid_edges = []
for edge in self.edges:
valid_from = datetime.fromisoformat(edge.get("valid_from", "1970-01-01"))
valid_until = edge.get("valid_until")
if valid_from <= query_time:
if valid_until is None or datetime.fromisoformat(valid_until) > query_time:
valid_edges.append(edge)
# Match against pattern
pattern = self._parse_pattern(query)
results = []
for edge in valid_edges:
if pattern["rel_type"] and edge["type"] != pattern["rel_type"]:
continue
source = self.nodes.get(edge["source"], {})
target = self.nodes.get(edge["target"], {})
results.append({
"source": source,
"relationship": edge,
"target": target
})
return results
def _time_range_key(self, start: datetime, end: Optional[datetime]) -> str:
"""Create time range key for indexing."""
start_str = start.isoformat()
end_str = end.isoformat() if end else "infinity"
return f"{start_str}::{end_str}"Memory Consolidation
class MemoryConsolidator:
def __init__(self, graph: PropertyGraph, vector_store: VectorStore):
self.graph = graph
self.vector_store = vector_store
self.consolidation_threshold = 1000 # memories before consolidation
def should_consolidate(self) -> bool:
"""Check if consolidation should trigger."""
total_memories = len(self.graph.nodes) + len(self.graph.edges)
return total_memories > self.consolidation_threshold
def consolidate(self):
"""Run consolidation process."""
# Step 1: Identify duplicate or merged facts
duplicates = self.find_duplicates()
# Step 2: Merge related facts
for group in duplicates:
self.merge_fact_group(group)
# Step 3: Update validity periods
self.update_validity_periods()
# Step 4: Rebuild indexes
self.rebuild_indexes()
def find_duplicates(self) -> List[List]:
"""Find groups of potentially duplicate facts."""
# Group by subject and predicate
groups = {}
for edge in self.graph.edges:
key = (edge["source"], edge["type"])
if key not in groups:
groups[key] = []
groups[key].append(edge)
# Return groups with multiple edges
return [edges for edges in groups.values() if len(edges) > 1]
def merge_fact_group(self, edges: List[Dict]):
"""Merge group of duplicate edges."""
if len(edges) == 1:
return
# Keep most recent/relevant
keeper = max(edges, key=lambda e: e.get("properties", {}).get("confidence", 0))
# Merge metadata
for edge in edges:
if edge["id"] != keeper["id"]:
self.merge_properties(keeper, edge)
self.graph.edges.remove(edge)
def merge_properties(self, target: Dict, source: Dict):
"""Merge properties from source into target."""
for key, value in source.get("properties", {}).items():
if key not in target["properties"]:
target["properties"][key] = value
elif isinstance(value, list):
target["properties"][key].extend(value)Memory-Context Integration
class MemoryContextIntegrator:
def __init__(self, memory_system, context_limit=100000):
self.memory_system = memory_system
self.context_limit = context_limit
def build_context(self, task: str, current_context: str = "") -> str:
"""Build context including relevant memories."""
# Extract entities from task
entities = self._extract_entities(task)
# Retrieve memories for each entity
memories = []
for entity in entities:
entity_memories = self.memory_system.retrieve_entity(entity)
memories.extend(entity_memories)
# Format memories for context
memory_section = self._format_memories(memories)
# Combine with current context
combined = current_context + "\n\n" + memory_section
# Check limit and truncate if needed
if self._token_count(combined) > self.context_limit:
combined = self._truncate_context(combined, self.context_limit)
return combined
def _extract_entities(self, task: str) -> List[str]:
"""Extract entity mentions from task."""
# In production, use NER or entity extraction
import re
pattern = r"\[([^\]]+)\]" # [[entity_name]] convention
return re.findall(pattern, task)
def _format_memories(self, memories: List[Dict]) -> str:
"""Format memories for context injection."""
sections = ["## Relevant Memories"]
for memory in memories:
formatted = f"- {memory.get('content', '')}"
if "source" in memory:
formatted += f" (Source: {memory['source']})"
if "timestamp" in memory:
formatted += f" [Time: {memory['timestamp']}]"
sections.append(formatted)
return "\n".join(sections)
def _token_count(self, text: str) -> int:
"""Estimate token count."""
return len(text) // 4 # Rough approximation
def _truncate_context(self, context: str, limit: int) -> str:
"""Truncate context to fit limit."""
tokens = context.split()
truncated = []
count = 0
for token in tokens:
if count + 1 > limit:
break
truncated.append(token)
count += 1
return " ".join(truncated)