Skip to content

Interceptors

Composable middleware that hooks into agent execution at specific phases. Use interceptors for cross-cutting concerns like cost control, security, context management, and tool gating.

Overview

from cogent import Agent
from cogent.interceptors import BudgetGuard, PIIShield

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        BudgetGuard(model_calls=10, tool_calls=50),
        PIIShield(patterns=["email", "ssn"], action="mask"),
    ],
)

Interceptors run sequentially in the order listed. If any interceptor stops execution, later interceptors are skipped.


Execution Phases

Interceptors hook into seven phases of the agent loop:

agent.run(task)
    ├─ PRE_RUN ── prompt adapters, initial tool filtering
    │  ┌─── iteration loop ───────────────────────┐
    │  │                                           │
    │  ├─ PRE_THINK ── budget check, tool filter,  │
    │  │                model switch, context compress│
    │  │                                           │
    │  ├─ LLM call                                 │
    │  │                                           │
    │  ├─ POST_THINK ── response observation       │
    │  │                                           │
    │  ├─ PRE_ACT ── tool gating, rate limiting    │
    │  │                                           │
    │  ├─ Tool execution                           │
    │  │                                           │
    │  ├─ POST_ACT ── result observation/mutation  │
    │  │                                           │
    │  └──────────────────────────────────────────┘
    ├─ POST_RUN ── final observation
    └─ ON_ERROR ── model failover on LLM failures
from cogent.interceptors import Phase

Phase.PRE_RUN      # Before agent.run() starts
Phase.PRE_THINK    # Before each LLM call
Phase.POST_THINK   # After LLM responds
Phase.PRE_ACT      # Before each tool execution
Phase.POST_ACT     # After each tool returns
Phase.POST_RUN     # After agent.run() completes
Phase.ON_ERROR     # When an LLM call fails

InterceptResult

Every interceptor phase handler returns an InterceptResult:

from cogent.interceptors import InterceptResult

# Continue unchanged
return InterceptResult.ok()

# Stop execution with a response
return InterceptResult.stop("Budget exceeded.")

# Skip the current tool call but continue the loop
return InterceptResult.skip()

# Modify messages before LLM call (PRE_THINK)
return InterceptResult.modify_messages(new_messages)

# Modify tool arguments before execution (PRE_ACT)
return InterceptResult.modify_args({"query": "sanitized"})

# Filter available tools (PRE_THINK)
return InterceptResult.modify_tools(filtered_tools)

# Switch the model for this call (PRE_THINK / ON_ERROR)
return InterceptResult.use_model(fallback_model)

# Rewrite the system prompt (PRE_RUN / PRE_THINK)
return InterceptResult.modify_prompt("New system prompt")

# Modify tool output before it enters message history (POST_ACT)
return InterceptResult.modify_tool_result("sanitized result")

Built-in Interceptors

BudgetGuard

Limit LLM and tool calls per run:

from cogent.interceptors import BudgetGuard

guard = BudgetGuard(
    model_calls=10,             # Max LLM calls (0 = unlimited)
    tool_calls=50,              # Max tool calls (0 = unlimited)
    exit_behavior="stop",       # "stop" or "error"
    warning_threshold=0.8,      # Warn at 80% usage
)

agent = Agent(name="assistant", model="gpt4", interceptors=[guard])

# After run, check usage
print(f"Model: {guard.current_model_calls}, Tool: {guard.current_tool_calls}")
print(f"Remaining: {guard.model_budget_remaining}")

PIIShield

Detect and handle PII in messages and tool results:

from cogent.interceptors import PIIShield

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        PIIShield(
            patterns=["email", "phone_us", "ssn", "credit_card"],
            action="mask",  # or PIIAction.MASK, "block", "warn", "log"
        ),
    ],
)

# Input: "Contact john@email.com, SSN 123-45-6789"
# Masked: "Contact [EMAIL_REDACTED], SSN [SSN_REDACTED]"

Patterns: "email", "phone_us", "ssn", "credit_card", "ip_address", "date_of_birth", "passport", "api_key", or "all".

Actions:

Action Behavior
"mask" Replace with [TYPE_REDACTED] placeholder
"block" Stop execution immediately
"warn" Log warning, continue unchanged
"log" Track detection, no modification

ContentFilter

Block messages containing forbidden words or patterns:

from cogent.interceptors import ContentFilter

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        ContentFilter(
            blocked_words=["password", "secret"],
            blocked_patterns=[r"\b\d{4}-\d{4}\b"],  # regex
            action="block",         # "block" or "mask"
            case_sensitive=False,    # default: case-insensitive
        ),
    ],
)

TokenLimiter

Hard stop when token count exceeds a limit:

from cogent.interceptors import TokenLimiter

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        TokenLimiter(
            max_tokens=8000,
            message="Context too large. Please start a new conversation.",
        ),
    ],
)

ContextCompressor

Summarise older messages when approaching token limits:

from cogent.interceptors import ContextCompressor

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        ContextCompressor(
            threshold_tokens=8000,  # Trigger compression above this
            keep_recent=4,          # Preserve last N messages
        ),
    ],
)

ContextCompressor vs ACC: ContextCompressor is a reactive overflow valve — it summarises old messages when the token count exceeds a threshold. ACC is a proactive memory system that extracts structured artifacts (entities, constraints, actions) every turn into bounded state, preventing drift over long conversations. For most multi-turn use cases, ACC is the right default. Add ContextCompressor only as an extra safety layer for very large context windows. See ACC docs for a detailed comparison.


Tool Control

ToolGate

Abstract base — subclass and override filter() to control which tools the model sees:

from cogent.interceptors import ToolGate
from cogent.interceptors.base import InterceptContext

class SafeGate(ToolGate):
    async def filter(self, tools: list, ctx: InterceptContext) -> list:
        allowed = {"search", "read_file"}
        return [t for t in tools if t.name in allowed]

agent = Agent(
    name="assistant",
    model="gpt4",
    tools=[search, write_file, delete_file],
    interceptors=[SafeGate()],
)

PermissionGate

Allowlist of tool names. Override allowed_tools() for dynamic gating:

from cogent.interceptors import PermissionGate

# Static allowlist
agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[PermissionGate(tools=["search", "read_file"])],
)

# Dynamic: all tools for admins, read-only for guests
class RoleGate(PermissionGate):
    def allowed_tools(self, ctx):
        if ctx.run_context and ctx.run_context.role == "admin":
            return ["*"]
        return ["search", "read_file"]

ConversationGate

Unlock tools as the conversation progresses (by message count):

from cogent.interceptors import ConversationGate

agent = Agent(
    name="assistant",
    model="gpt4",
    tools=[search, execute_order, admin_panel],
    interceptors=[
        ConversationGate(stages={
            0: ["search"],                    # Start: search only
            4: ["search", "execute_order"],    # After 4 messages
            8: ["search", "execute_order", "admin_panel"],
        }),
    ],
)

Rate Limiting

RateLimiter

Sliding-window rate limiting for tool calls:

from cogent.interceptors import RateLimiter

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        RateLimiter(
            calls_per_window=10,    # Max calls in window
            window_seconds=60.0,    # Window size
            action="wait",          # "wait" (sleep) or "block" (stop)
            per_tool=False,         # True = separate limit per tool
        ),
    ],
)

Thread-safe — uses asyncio.Lock internally, safe to share across concurrent runs.

ThrottleInterceptor

Minimum delay between consecutive tool calls:

from cogent.interceptors import ThrottleInterceptor

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        ThrottleInterceptor(min_delay=0.5, per_tool=True),
    ],
)

Resilience

Failover

Automatic model switching when the primary model fails:

from cogent.interceptors import Failover

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        Failover(
            fallbacks=["gpt-5.4-mini", "claude-sonnet-4-20250514"],
            on=["rate_limit", "timeout", "error"],  # triggers
            max_retries_per_model=2,
        ),
    ],
)

Failover intercepts ON_ERROR when an LLM call fails, switches to the next fallback model, and retries. The pre_think phase ensures subsequent iterations continue using the switched model.

Triggers: "rate_limit", "timeout", "error", "context_length".

CircuitBreaker

Prevent repeated calls to failing tools:

from cogent.interceptors import CircuitBreaker

agent = Agent(
    name="assistant",
    model="gpt4",
    tools=[search, database_query],
    interceptors=[
        CircuitBreaker(
            failure_threshold=5,    # Failures before opening
            reset_timeout=30.0,     # Seconds before half-open test
            tools=["database_query"],  # Protect specific tools (None = all)
        ),
    ],
)

Uses the structured tool_error field on InterceptContext to detect failures.

Per-Tool Retry

For per-tool retry with backoff, use ResilienceConfig.tool_overrides instead of an interceptor — it integrates with exhaustion escalation and fallback_model:

from cogent import Agent
from cogent.agent.resilience import ResilienceConfig

agent = Agent(
    name="assistant",
    model="gpt4",
    tools=[search, database_query],
    resilience=ResilienceConfig(
        max_retries=3,
        strategy="exponential_jitter",
        tool_overrides={
            "search": {"max_retries": 5, "base_delay": 0.5},
            "database_query": {"max_retries": 1, "timeout_seconds": 30},
        },
        fallback_model="gpt-5.4",
    ),
)

See Resilience docs for details.


Prompt Adapters

ContextPrompt

Inject RunContext values into the system prompt via template placeholders:

from cogent.interceptors import ContextPrompt

agent = Agent(
    name="assistant",
    model="gpt4",
    instructions="Greet the user by name.",
    interceptors=[
        ContextPrompt(template="User: {user_name}\nRole: {role}"),
    ],
)

result = await agent.run(
    "Hello!",
    context={"user_name": "Alice", "role": "engineer"},
)

ConversationPrompt

Add stage-based instructions as the conversation grows:

from cogent.interceptors import ConversationPrompt

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        ConversationPrompt(stages={
            0: "Start with a friendly greeting.",
            4: "Offer deeper explanations.",
            8: "Wrap up and ask if there's anything else.",
        }),
    ],
)

LambdaPrompt

Rewrite the system prompt with a plain function:

from cogent.interceptors import LambdaPrompt
from datetime import datetime, UTC

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        LambdaPrompt(
            adapter_fn=lambda prompt, ctx: f"{prompt}\n[Time: {datetime.now(UTC)}]"
        ),
    ],
)

Auditing

For logging agent activity (LLM requests/responses, tool calls/results, run lifecycle), use the Observer system instead of interceptors:

from cogent import Agent
from cogent.observability import Observer

# Built-in console logging
agent = Agent(name="assistant", model="gpt4", verbosity="detailed")

# Capture events for inspection
observer = Observer(capture=["llm.*", "tool.*"])
agent = Agent(name="assistant", model="gpt4", observer=observer)
await agent.run("Do something")
print(observer.history())

Custom Interceptors

Override only the phase methods you need. Unimplemented phases pass through automatically.

from cogent.interceptors import Interceptor, InterceptContext, InterceptResult

class LoggingInterceptor(Interceptor):
    async def pre_think(self, ctx: InterceptContext) -> InterceptResult:
        print(f"LLM call #{ctx.model_calls + 1}, {len(ctx.messages)} messages")
        return InterceptResult.ok()

    async def post_act(self, ctx: InterceptContext) -> InterceptResult:
        status = "error" if ctx.tool_error else "ok"
        print(f"Tool {ctx.tool_name}: {status}")
        return InterceptResult.ok()

InterceptContext

Available fields at each phase:

@dataclass
class InterceptContext:
    agent: Agent                        # Current agent
    phase: Phase                        # Current phase
    task: str                           # Original task/prompt
    messages: list[dict]                # Current message history
    state: dict                         # Mutable shared state across all interceptors
    run_context: RunContext | None       # User-provided context from agent.run()
    model_calls: int                    # LLM calls so far
    tool_calls: int                     # Tool calls so far
    tools: list | None                  # Available tools (for ToolGate)
    is_subagent: bool                   # True if current tool is a subagent

    # Phase-specific
    tool_name: str | None               # PRE_ACT / POST_ACT
    tool_args: dict | None              # PRE_ACT / POST_ACT
    tool_result: object | None          # POST_ACT only
    tool_error: Exception | None        # POST_ACT only (structured error signal)
    model_response: Any                 # POST_THINK only
    error: Exception | None             # ON_ERROR only

Combining Interceptors

Interceptors run in order. Later interceptors see modifications from earlier ones:

from cogent.interceptors import PIIShield, ContentFilter, BudgetGuard

agent = Agent(
    name="assistant",
    model="gpt4",
    interceptors=[
        PIIShield(patterns=["email", "ssn"], action="mask"),
        ContentFilter(blocked_words=["password"]),
        BudgetGuard(model_calls=10, tool_calls=50),
    ],
)

API Reference

Core Classes

Class Description
Interceptor Base class — override phase methods (pre_think, post_act, etc.)
InterceptContext Context passed to each phase handler
InterceptResult Return type — factories: ok(), stop(), skip(), modify_*()
Phase Enum: PRE_RUN, PRE_THINK, POST_THINK, PRE_ACT, POST_ACT, POST_RUN, ON_ERROR
StopExecution Exception to halt execution immediately

Built-in Interceptors

Category Interceptors
Budget BudgetGuard
Security PIIShield, ContentFilter
Context TokenLimiter, ContextCompressor
Gates ToolGate, PermissionGate, ConversationGate
Rate Limit RateLimiter, ThrottleInterceptor
Resilience Failover, CircuitBreaker
Prompts ContextPrompt, ConversationPrompt, LambdaPrompt
HITL HITLInterceptor