PROMPT AND CONTEXT ENGINEERING

Context Engineering: Mastering the 200K Token Era

Context window management for 200K token large language models

With Claude 3.5 Sonnet supporting 200K tokens and Gemini 2.5 reaching 2M tokens, context engineering has become as important as prompt engineering. Proper context management is the difference between accurate, nuanced responses and truncated, incomplete outputs.

Understanding Token Limits

Model Context Window Practical Limit
GPT-4 Turbo 128K tokens ~100K usable
Claude 3.5 Sonnet 200K tokens ~180K usable
Claude 3.7 Opus 500K tokens ~450K usable
Gemini 2.5 Pro 2M tokens ~1.8M usable
Llama 3.1 128K tokens ~100K usable

Always reserve 15-20% of context for response generation and safety margins.

Token Counting Basics

import anthropic

def count_tokens(text: str, model: str = "claude-3-5-sonnet-20241022"):
    client = anthropic.Anthropic()

    # Use token counting API
    response = client.messages.count_tokens(
        model=model,
        messages=[{"role": "user", "content": text}]
    )

    return response.input_tokens

# Example
doc = "Your large document here..."
tokens = count_tokens(doc)
print(f"Document uses {tokens:,} tokens")

# Rule of thumb: ~750 tokens per 1000 English words

Hierarchical Context Structure

Organize context in priority layers:

class ContextBuilder:
    def __init__(self, max_tokens: int = 180000):
        self.max_tokens = max_tokens
        self.sections = {
            "critical": [],      # Always include (instructions, examples)
            "high": [],          # Include if space permits
            "medium": [],        # Include if ample space
            "low": []           # Include only if plenty of space
        }

    def add(self, content: str, priority: str = "medium"):
        self.sections[priority].append(content)

    def build(self) -> str:
        parts = []
        remaining_tokens = self.max_tokens

        # Add sections in priority order
        for priority in ["critical", "high", "medium", "low"]:
            for item in self.sections[priority]:
                item_tokens = count_tokens(item)

                if item_tokens <= remaining_tokens:
                    parts.append(item)
                    remaining_tokens -= item_tokens
                elif priority == "critical":
                    # Critical items must fit, truncate if needed
                    parts.append(truncate_to_tokens(item, remaining_tokens))
                    remaining_tokens = 0
                    break

        return "nn".join(parts)

# Usage
ctx = ContextBuilder(max_tokens=180000)
ctx.add("System instructions...", priority="critical")
ctx.add("Few-shot examples...", priority="critical")
ctx.add(retrieved_docs, priority="high")
ctx.add(historical_context, priority="medium")

prompt = ctx.build()

Intelligent Chunking Strategies

1. Semantic Chunking: Split at natural boundaries

from langchain.text_splitter import RecursiveCharacterTextSplitter

def semantic_chunk(text: str, chunk_size: int = 2000):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=200,  # overlap preserves context
        separators=[
            "nnn",  # section breaks
            "nn",    # paragraph breaks
            "n",      # line breaks
            ". ",      # sentences
            " "        # words
        ],
        length_function=count_tokens
    )

    return splitter.split_text(text)

2. Sliding Window: For sequential processing

def sliding_window(text: str, window_size: int = 10000, stride: int = 8000):
    """Process long text with overlapping windows"""
    chunks = []
    start = 0

    while start < len(text):
        end = start + window_size
        chunk = text[start:end]

        if count_tokens(chunk) > 0:
            chunks.append({
                "content": chunk,
                "position": start,
                "is_final": end >= len(text)
            })

        start += stride

    return chunks

# Process each window
results = []
for chunk in sliding_window(long_document):
    result = process_chunk(chunk["content"],
                          is_final=chunk["is_final"],
                          previous_summary=results[-1] if results else None)
    results.append(result)

Map-Reduce for Long Documents

def map_reduce_summarize(document: str, chunk_size: int = 50000):
    # Map phase: summarize each chunk
    chunks = semantic_chunk(document, chunk_size)
    summaries = []

    for i, chunk in enumerate(chunks):
        prompt = f"""Summarize this section ({i+1}/{len(chunks)}):

{chunk}

Summary:"""

        summary = llm.generate(prompt, max_tokens=500)
        summaries.append(summary)

    # Reduce phase: combine summaries
    combined = "nn".join(summaries)

    if count_tokens(combined) > 20000:
        # Recursively reduce if still too large
        return map_reduce_summarize(combined, chunk_size=20000)

    final_prompt = f"""Synthesize these section summaries into a comprehensive overview:

{combined}

Comprehensive summary:"""

    return llm.generate(final_prompt, max_tokens=2000)

Context Compression

Use smaller models to compress before feeding to large model:

def compress_context(text: str, target_ratio: float = 0.3):
    """Compress text to target_ratio of original length"""

    # Use Haiku for cheap compression
    prompt = f"""Extract and preserve only the most important information from this text.
Target length: {int(len(text) * target_ratio)} characters.

Text:
{text}

Compressed (key information only):"""

    compressed = llm.generate(
        prompt,
        model="claude-3-haiku-20240307",
        max_tokens=int(count_tokens(text) * target_ratio)
    )

    return compressed

# Example usage
large_context = retrieve_all_docs(query)  # 150K tokens
compressed = compress_context(large_context, target_ratio=0.4)  # 60K tokens
# Now add additional context in remaining space

Reranking and Filtering

Retrieve more than you need, then filter to best items:

from sentence_transformers import CrossEncoder

def rerank_and_fit(query: str, documents: list, max_tokens: int = 50000):
    # Rerank by relevance
    reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

    pairs = [[query, doc["content"]] for doc in documents]
    scores = reranker.predict(pairs)

    # Sort by relevance
    ranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)

    # Add documents until token budget exhausted
    selected = []
    used_tokens = 0

    for doc, score in ranked:
        doc_tokens = count_tokens(doc["content"])

        if used_tokens + doc_tokens <= max_tokens:
            selected.append(doc)
            used_tokens += doc_tokens
        else:
            break

    return selected

Attention Sinks: Recent vs Distant Context

Models pay more attention to recent tokens. Structure accordingly:

def optimize_attention(query: str, documents: list, examples: list):
    """Place critical info at start and end"""

    context_parts = [
        "=== INSTRUCTIONS ===",
        "Your task is to answer based on the provided documents.",
        "",
        "=== EXAMPLES ===",
        format_examples(examples),  # Keep examples near instructions
        "",
        "=== RETRIEVED DOCUMENTS ===",
        format_documents(documents),
        "",
        "=== QUERY ===",
        query,  # Query at the end (most recent context)
        "",
        "Answer based on the documents above:"
    ]

    return "n".join(context_parts)

Prompt Caching (Claude 3.5+)

Cache expensive context across requests:

def cached_query(static_context: str, user_query: str):
    """Cache static_context to save tokens and cost"""

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1000,
        system=[
            {
                "type": "text",
                "text": "You are a helpful assistant.",
            },
            {
                "type": "text",
                "text": static_context,  # Large static context
                "cache_control": {"type": "ephemeral"}  # Cache this
            }
        ],
        messages=[
            {"role": "user", "content": user_query}
        ]
    )

    # First call: full cost
    # Subsequent calls within 5 min: 90% cheaper for cached portion

    return response.content[0].text

# Use case: RAG with stable knowledge base
knowledge_base = "nn".join(all_company_docs)  # 100K tokens
answer1 = cached_query(knowledge_base, "What is our return policy?")  # Full cost
answer2 = cached_query(knowledge_base, "What are shipping options?")  # 90% cheaper

Multi-Turn Context Management

class ConversationManager:
    def __init__(self, max_tokens: int = 180000):
        self.max_tokens = max_tokens
        self.history = []
        self.static_context = ""

    def add_turn(self, user_msg: str, assistant_msg: str):
        self.history.append({
            "role": "user",
            "content": user_msg
        })
        self.history.append({
            "role": "assistant",
            "content": assistant_msg
        })

        self._prune_if_needed()

    def _prune_if_needed(self):
        """Keep recent turns, summarize old"""
        total = sum(count_tokens(m["content"]) for m in self.history)
        total += count_tokens(self.static_context)

        if total > self.max_tokens * 0.8:  # 80% threshold
            # Summarize oldest 50% of conversation
            midpoint = len(self.history) // 2
            old_history = self.history[:midpoint]

            summary = summarize_conversation(old_history)

            # Replace old history with summary
            self.history = [
                {"role": "user", "content": f"Previous conversation summary:n{summary}"}
            ] + self.history[midpoint:]

    def get_messages(self):
        return self.history

# Usage
conv = ConversationManager()
conv.static_context = load_rag_context()

for turn in range(20):
    user_input = get_user_input()
    response = llm.generate(conv.get_messages() + [{"role": "user", "content": user_input}])
    conv.add_turn(user_input, response)

Measuring Context Utilization

def analyze_context_usage(response_obj):
    """Understand which parts of context were actually used"""

    input_tokens = response_obj.usage.input_tokens
    output_tokens = response_obj.usage.output_tokens

    print(f"Context size: {input_tokens:,} tokens")
    print(f"Response size: {output_tokens:,} tokens")
    print(f"Utilization: {(output_tokens/input_tokens)*100:.2f}%")

    # Claude 3.5+ provides cache hits
    if hasattr(response_obj.usage, 'cache_read_input_tokens'):
        cached = response_obj.usage.cache_read_input_tokens
        print(f"Cached tokens: {cached:,} ({(cached/input_tokens)*100:.1f}%)")

# Monitor over time to optimize context size

Best Practices Summary

  1. Always count tokens before sending to API
  2. Structure context hierarchically by importance
  3. Place query/instructions near the end for better attention
  4. Use caching for static/repeated context
  5. Compress aggressively when nearing limits
  6. Monitor utilization to optimize context size
  7. Test with real data to find optimal chunk sizes