Pipeline Template
Script from project-development
Source Code
metadata = {
"id": "code:project.development.pipelinetemplate",
"name": "Pipeline Template",
"description": "Script from project-development",
"language": "python",
"packages": [],
"args": []
}
"""
LLM Batch Processing Pipeline Template
A template demonstrating the staged pipeline architecture for LLM batch processing.
Customize the acquire, prepare, process, parse, and render functions for your use case.
Usage:
python pipeline_template.py acquire --batch-id 2025-01-15
python pipeline_template.py prepare --batch-id 2025-01-15
python pipeline_template.py process --batch-id 2025-01-15 --workers 10
python pipeline_template.py parse --batch-id 2025-01-15
python pipeline_template.py render --batch-id 2025-01-15
python pipeline_template.py all --batch-id 2025-01-15
python pipeline_template.py clean --batch-id 2025-01-15 --clean-stage process
python pipeline_template.py estimate --batch-id 2025-01-15
"""
import argparse
import json
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field, asdict
from datetime import date
from pathlib import Path
from typing import Any
# -----------------------------------------------------------------------------
# Configuration - Customize for your use case
# -----------------------------------------------------------------------------
DATA_DIR = Path("data")
OUTPUT_DIR = Path("output")
# Prompt template with structured output requirements
PROMPT_TEMPLATE = """Analyze the following content and provide your response in exactly this format.
## Summary
[2-3 sentence summary of the content]
## Key Points
- [Point 1]
- [Point 2]
- [Point 3]
## Score
Rating: [1-10]
Confidence: [low/medium/high]
## Reasoning
[Explanation of your analysis]
Follow this format exactly because I will be parsing it programmatically.
---
# Content to Analyze
Title: {title}
{content}
"""
# -----------------------------------------------------------------------------
# Data Structures
# -----------------------------------------------------------------------------
@dataclass
class Item:
"""Represents a single item to process."""
id: str
title: str
content: str
metadata: dict = field(default_factory=dict)
@dataclass
class ParsedResult:
"""Structured result from LLM response parsing."""
summary: str = ""
key_points: list[str] = field(default_factory=list)
score: int | None = None
confidence: str = ""
reasoning: str = ""
parse_errors: list[str] = field(default_factory=list)
# -----------------------------------------------------------------------------
# Path Utilities
# -----------------------------------------------------------------------------
def get_batch_dir(batch_id: str) -> Path:
"""Get the data directory for a batch."""
return DATA_DIR / batch_id
def get_item_dir(batch_id: str, item_id: str) -> Path:
"""Get the directory for a specific item."""
return get_batch_dir(batch_id) / item_id
def get_output_dir(batch_id: str) -> Path:
"""Get the output directory for a batch."""
return OUTPUT_DIR / batch_id
# -----------------------------------------------------------------------------
# Stage: Acquire
# -----------------------------------------------------------------------------
def stage_acquire(batch_id: str, limit: int | None = None):
"""
Stage 1: Acquire raw data.
Customize this function to fetch data from your sources:
- APIs
- Databases
- File systems
- Web scraping
Output: {batch_dir}/{item_id}/raw.json
"""
batch_dir = get_batch_dir(batch_id)
batch_dir.mkdir(parents=True, exist_ok=True)
# CUSTOMIZE: Replace with your data acquisition logic
items = fetch_items_from_source(limit)
for item in items:
item_dir = get_item_dir(batch_id, item.id)
item_dir.mkdir(exist_ok=True)
raw_file = item_dir / "raw.json"
if not raw_file.exists():
with open(raw_file, "w") as f:
json.dump(asdict(item), f, indent=2)
print(f"Acquired: {item.id}")
else:
print(f"Cached: {item.id}")
print(f"\nAcquire complete. {len(items)} items in {batch_dir}")
def fetch_items_from_source(limit: int | None = None) -> list[Item]:
"""
CUSTOMIZE: Implement your data fetching logic here.
This example creates dummy data for demonstration.
Replace with actual API calls, database queries, etc.
"""
# Example: Generate sample items
items = []
for i in range(limit or 10):
items.append(Item(
id=f"item-{i:04d}",
title=f"Sample Item {i}",
content=f"This is sample content for item {i}. " * 10,
metadata={"source": "example", "index": i},
))
return items
# -----------------------------------------------------------------------------
# Stage: Prepare
# -----------------------------------------------------------------------------
def stage_prepare(batch_id: str):
"""
Stage 2: Generate prompts from raw data.
Output: {batch_dir}/{item_id}/prompt.md
"""
batch_dir = get_batch_dir(batch_id)
for item_dir in sorted(batch_dir.iterdir()):
if not item_dir.is_dir():
continue
raw_file = item_dir / "raw.json"
prompt_file = item_dir / "prompt.md"
if not raw_file.exists():
continue
if prompt_file.exists():
continue
with open(raw_file) as f:
item_data = json.load(f)
prompt = generate_prompt(item_data)
with open(prompt_file, "w") as f:
f.write(prompt)
print(f"Prepared: {item_dir.name}")
print(f"\nPrepare complete.")
def generate_prompt(item_data: dict) -> str:
"""Generate prompt from item data using template."""
return PROMPT_TEMPLATE.format(
title=item_data.get("title", "Untitled"),
content=item_data.get("content", ""),
)
# -----------------------------------------------------------------------------
# Stage: Process
# -----------------------------------------------------------------------------
def stage_process(batch_id: str, model: str = "gpt-5.2", max_workers: int = 5):
"""
Stage 3: Execute LLM calls.
This is the expensive, non-deterministic stage.
Output: {batch_dir}/{item_id}/response.md
"""
batch_dir = get_batch_dir(batch_id)
# Collect items needing processing
to_process = []
for item_dir in sorted(batch_dir.iterdir()):
if not item_dir.is_dir():
continue
prompt_file = item_dir / "prompt.md"
response_file = item_dir / "response.md"
if prompt_file.exists() and not response_file.exists():
to_process.append((item_dir, prompt_file.read_text()))
if not to_process:
print("No items to process.")
return
print(f"Processing {len(to_process)} items with {max_workers} workers...")
def process_one(args):
item_dir, prompt = args
response_file = item_dir / "response.md"
try:
# CUSTOMIZE: Replace with your LLM API call
response = call_llm(prompt, model)
with open(response_file, "w") as f:
f.write(response)
return (item_dir.name, len(response), None)
except Exception as e:
return (item_dir.name, 0, str(e))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_one, item): item for item in to_process}
for future in as_completed(futures):
item_id, chars, error = future.result()
if error:
print(f" {item_id}: Error - {error}")
else:
print(f" {item_id}: Done ({chars} chars)")
print(f"\nProcess complete.")
def call_llm(prompt: str, model: str) -> str:
"""
CUSTOMIZE: Implement your LLM API call here.
This example returns a mock response for demonstration.
Replace with actual OpenAI, Anthropic, etc. API calls.
"""
# Example mock response - replace with actual API call
#
# from openai import OpenAI
# client = OpenAI()
# response = client.chat.completions.create(
# model=model,
# messages=[{"role": "user", "content": prompt}],
# )
# return response.choices[0].message.content
# Simulate API delay
time.sleep(0.1)
# Return mock structured response
return """## Summary
This is a sample summary of the analyzed content.
## Key Points
- First key observation from the content
- Second important finding
- Third notable aspect
## Score
Rating: 7
Confidence: medium
## Reasoning
The content demonstrates several characteristics that merit this rating.
The analysis considered multiple factors including relevance and clarity.
"""
# -----------------------------------------------------------------------------
# Stage: Parse
# -----------------------------------------------------------------------------
def stage_parse(batch_id: str):
"""
Stage 4: Extract structured data from LLM responses.
Output: {batch_dir}/{item_id}/parsed.json
"""
batch_dir = get_batch_dir(batch_id)
all_results = []
for item_dir in sorted(batch_dir.iterdir()):
if not item_dir.is_dir():
continue
response_file = item_dir / "response.md"
parsed_file = item_dir / "parsed.json"
if not response_file.exists():
continue
response = response_file.read_text()
result = parse_response(response)
with open(parsed_file, "w") as f:
json.dump(asdict(result), f, indent=2)
all_results.append({
"id": item_dir.name,
**asdict(result),
})
error_count = len(result.parse_errors)
print(f"Parsed: {item_dir.name} (score={result.score}, errors={error_count})")
# Save aggregated results
agg_file = batch_dir / "all_results.json"
with open(agg_file, "w") as f:
json.dump(all_results, f, indent=2)
print(f"\nParse complete. Results saved to {agg_file}")
def parse_response(text: str) -> ParsedResult:
"""Parse structured LLM response with graceful error handling."""
result = ParsedResult()
# Extract summary
try:
result.summary = extract_section(text, "Summary") or ""
except Exception as e:
result.parse_errors.append(f"Summary: {e}")
# Extract key points
try:
result.key_points = extract_list_items(text, "Key Points")
except Exception as e:
result.parse_errors.append(f"Key Points: {e}")
# Extract score
try:
result.score = extract_score(text, "Rating", 1, 10)
except Exception as e:
result.parse_errors.append(f"Score: {e}")
# Extract confidence
try:
result.confidence = extract_field(text, "Confidence") or ""
except Exception as e:
result.parse_errors.append(f"Confidence: {e}")
# Extract reasoning
try:
result.reasoning = extract_section(text, "Reasoning") or ""
except Exception as e:
result.parse_errors.append(f"Reasoning: {e}")
return result
def extract_section(text: str, section_name: str) -> str | None:
"""Extract content between section headers."""
pattern = rf'(?:^|\n)(?:#+ *)?{re.escape(section_name)}[:\s]*\n(.*?)(?=\n#|\Z)'
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
return match.group(1).strip() if match else None
def extract_field(text: str, field_name: str) -> str | None:
"""Extract value after field label."""
pattern = rf'(?:\*\*)?{re.escape(field_name)}(?:\*\*)?[\s:\-]+([^\n]+)'
match = re.search(pattern, text, re.IGNORECASE)
return match.group(1).strip() if match else None
def extract_list_items(text: str, section_name: str) -> list[str]:
"""Extract bullet points from a section."""
section = extract_section(text, section_name)
if not section:
return []
items = re.findall(r'^[\-\*]\s*(.+)$', section, re.MULTILINE)
return [item.strip() for item in items]
def extract_score(text: str, field_name: str, min_val: int, max_val: int) -> int | None:
"""Extract and validate numeric score."""
raw = extract_field(text, field_name)
if not raw:
return None
match = re.search(r'\d+', raw)
if not match:
return None
score = int(match.group())
return max(min_val, min(max_val, score))
# -----------------------------------------------------------------------------
# Stage: Render
# -----------------------------------------------------------------------------
def stage_render(batch_id: str):
"""
Stage 5: Generate final outputs.
Output: {output_dir}/index.html
"""
batch_dir = get_batch_dir(batch_id)
output_dir = get_output_dir(batch_id)
output_dir.mkdir(parents=True, exist_ok=True)
# Load aggregated results
results_file = batch_dir / "all_results.json"
if not results_file.exists():
print("No results to render. Run parse stage first.")
return
with open(results_file) as f:
results = json.load(f)
# CUSTOMIZE: Replace with your rendering logic
html = render_html(results, batch_id)
output_file = output_dir / "index.html"
with open(output_file, "w") as f:
f.write(html)
print(f"Rendered: {output_file}")
def render_html(results: list[dict], batch_id: str) -> str:
"""Generate HTML output from results."""
import html as html_lib
rows = ""
for r in results:
rows += f"""
<tr>
<td>{html_lib.escape(r.get('id', ''))}</td>
<td>{html_lib.escape(r.get('summary', '')[:100])}...</td>
<td>{r.get('score', 'N/A')}</td>
<td>{html_lib.escape(r.get('confidence', ''))}</td>
</tr>"""
return f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Results - {batch_id}</title>
<style>
body {{ font-family: system-ui, sans-serif; max-width: 1000px; margin: 0 auto; padding: 20px; }}
table {{ width: 100%; border-collapse: collapse; }}
th, td {{ text-align: left; padding: 10px; border-bottom: 1px solid #ddd; }}
th {{ background: #f5f5f5; }}
</style>
</head>
<body>
<h1>Results: {batch_id}</h1>
<p>{len(results)} items processed</p>
<table>
<tr>
<th>ID</th>
<th>Summary</th>
<th>Score</th>
<th>Confidence</th>
</tr>
{rows}
</table>
</body>
</html>"""
# -----------------------------------------------------------------------------
# Clean Stage
# -----------------------------------------------------------------------------
def stage_clean(batch_id: str, from_stage: str | None = None):
"""Remove outputs to enable re-processing."""
batch_dir = get_batch_dir(batch_id)
if not batch_dir.exists():
print(f"No data directory for {batch_id}")
return
stage_outputs = {
"acquire": ["raw.json"],
"prepare": ["prompt.md"],
"process": ["response.md"],
"parse": ["parsed.json"],
}
stage_order = ["acquire", "prepare", "process", "parse", "render"]
if from_stage:
start_idx = stage_order.index(from_stage)
stages_to_clean = stage_order[start_idx:]
else:
stages_to_clean = stage_order
files_to_delete = set()
for s in stages_to_clean:
files_to_delete.update(stage_outputs.get(s, []))
deleted_count = 0
for item_dir in batch_dir.iterdir():
if not item_dir.is_dir():
continue
for filename in files_to_delete:
filepath = item_dir / filename
if filepath.exists():
filepath.unlink()
deleted_count += 1
# Clean aggregated results
if "parse" in stages_to_clean:
agg_file = batch_dir / "all_results.json"
if agg_file.exists():
agg_file.unlink()
deleted_count += 1
print(f"Cleaned {deleted_count} files from stage '{from_stage or 'all'}' onwards")
# -----------------------------------------------------------------------------
# Cost Estimation
# -----------------------------------------------------------------------------
def stage_estimate(batch_id: str):
"""Estimate processing costs before running."""
batch_dir = get_batch_dir(batch_id)
if not batch_dir.exists():
print(f"No data directory for {batch_id}. Run acquire first.")
return
# Count items and estimate tokens
item_count = 0
total_prompt_chars = 0
for item_dir in batch_dir.iterdir():
if not item_dir.is_dir():
continue
prompt_file = item_dir / "prompt.md"
if prompt_file.exists():
total_prompt_chars += len(prompt_file.read_text())
item_count += 1
if item_count == 0:
print("No prompts found. Run prepare first.")
return
# Rough token estimation (1 token ≈ 4 chars)
est_input_tokens = total_prompt_chars / 4
est_output_tokens = item_count * 500 # Assume 500 tokens per response
# Example pricing (GPT-5.2)
input_price = 10.0 / 1_000_000 # $10 per MTok
output_price = 30.0 / 1_000_000 # $30 per MTok
est_cost = (est_input_tokens * input_price) + (est_output_tokens * output_price)
print(f"Cost Estimate for {batch_id}")
print(f" Items: {item_count}")
print(f" Estimated input tokens: {int(est_input_tokens):,}")
print(f" Estimated output tokens: {int(est_output_tokens):,}")
print(f" Estimated cost: ${est_cost:.2f}")
print(f"\nNote: Actual costs may vary. Add 20-30% buffer for retries.")
# -----------------------------------------------------------------------------
# CLI
# -----------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="LLM Batch Processing Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"stage",
choices=["acquire", "prepare", "process", "parse", "render", "all", "clean", "estimate"],
help="Pipeline stage to run",
)
parser.add_argument(
"--batch-id",
default=None,
help="Batch identifier (default: today's date)",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Limit number of items (for testing)",
)
parser.add_argument(
"--workers",
type=int,
default=5,
help="Number of parallel workers for processing",
)
parser.add_argument(
"--model",
default="gpt-5.2",
help="Model to use for processing",
)
parser.add_argument(
"--clean-stage",
choices=["acquire", "prepare", "process", "parse"],
help="For clean: only clean this stage and downstream",
)
args = parser.parse_args()
batch_id = args.batch_id or date.today().isoformat()
print(f"Batch ID: {batch_id}\n")
if args.stage == "clean":
stage_clean(batch_id, args.clean_stage)
elif args.stage == "estimate":
stage_estimate(batch_id)
elif args.stage == "all":
stage_acquire(batch_id, args.limit)
stage_prepare(batch_id)
stage_process(batch_id, args.model, args.workers)
stage_parse(batch_id)
stage_render(batch_id)
else:
if args.stage == "acquire":
stage_acquire(batch_id, args.limit)
elif args.stage == "prepare":
stage_prepare(batch_id)
elif args.stage == "process":
stage_process(batch_id, args.model, args.workers)
elif args.stage == "parse":
stage_parse(batch_id)
elif args.stage == "render":
stage_render(batch_id)
if __name__ == "__main__":
main()