Interceptors¶
Composable middleware that hooks into agent execution at specific phases. Use interceptors for cross-cutting concerns like cost control, security, context management, and tool gating.
Overview¶
from cogent import Agent
from cogent.interceptors import BudgetGuard, PIIShield
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
BudgetGuard(model_calls=10, tool_calls=50),
PIIShield(patterns=["email", "ssn"], action="mask"),
],
)
Interceptors run sequentially in the order listed. If any interceptor stops execution, later interceptors are skipped.
Execution Phases¶
Interceptors hook into seven phases of the agent loop:
agent.run(task)
│
├─ PRE_RUN ── prompt adapters, initial tool filtering
│
│ ┌─── iteration loop ───────────────────────┐
│ │ │
│ ├─ PRE_THINK ── budget check, tool filter, │
│ │ model switch, context compress│
│ │ │
│ ├─ LLM call │
│ │ │
│ ├─ POST_THINK ── response observation │
│ │ │
│ ├─ PRE_ACT ── tool gating, rate limiting │
│ │ │
│ ├─ Tool execution │
│ │ │
│ ├─ POST_ACT ── result observation/mutation │
│ │ │
│ └──────────────────────────────────────────┘
│
├─ POST_RUN ── final observation
│
└─ ON_ERROR ── model failover on LLM failures
from cogent.interceptors import Phase
Phase.PRE_RUN # Before agent.run() starts
Phase.PRE_THINK # Before each LLM call
Phase.POST_THINK # After LLM responds
Phase.PRE_ACT # Before each tool execution
Phase.POST_ACT # After each tool returns
Phase.POST_RUN # After agent.run() completes
Phase.ON_ERROR # When an LLM call fails
InterceptResult¶
Every interceptor phase handler returns an InterceptResult:
from cogent.interceptors import InterceptResult
# Continue unchanged
return InterceptResult.ok()
# Stop execution with a response
return InterceptResult.stop("Budget exceeded.")
# Skip the current tool call but continue the loop
return InterceptResult.skip()
# Modify messages before LLM call (PRE_THINK)
return InterceptResult.modify_messages(new_messages)
# Modify tool arguments before execution (PRE_ACT)
return InterceptResult.modify_args({"query": "sanitized"})
# Filter available tools (PRE_THINK)
return InterceptResult.modify_tools(filtered_tools)
# Switch the model for this call (PRE_THINK / ON_ERROR)
return InterceptResult.use_model(fallback_model)
# Rewrite the system prompt (PRE_RUN / PRE_THINK)
return InterceptResult.modify_prompt("New system prompt")
# Modify tool output before it enters message history (POST_ACT)
return InterceptResult.modify_tool_result("sanitized result")
Built-in Interceptors¶
BudgetGuard¶
Limit LLM and tool calls per run:
from cogent.interceptors import BudgetGuard
guard = BudgetGuard(
model_calls=10, # Max LLM calls (0 = unlimited)
tool_calls=50, # Max tool calls (0 = unlimited)
exit_behavior="stop", # "stop" or "error"
warning_threshold=0.8, # Warn at 80% usage
)
agent = Agent(name="assistant", model="gpt4", interceptors=[guard])
# After run, check usage
print(f"Model: {guard.current_model_calls}, Tool: {guard.current_tool_calls}")
print(f"Remaining: {guard.model_budget_remaining}")
PIIShield¶
Detect and handle PII in messages and tool results:
from cogent.interceptors import PIIShield
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
PIIShield(
patterns=["email", "phone_us", "ssn", "credit_card"],
action="mask", # or PIIAction.MASK, "block", "warn", "log"
),
],
)
# Input: "Contact john@email.com, SSN 123-45-6789"
# Masked: "Contact [EMAIL_REDACTED], SSN [SSN_REDACTED]"
Patterns: "email", "phone_us", "ssn", "credit_card", "ip_address", "date_of_birth", "passport", "api_key", or "all".
Actions:
| Action | Behavior |
|---|---|
"mask" |
Replace with [TYPE_REDACTED] placeholder |
"block" |
Stop execution immediately |
"warn" |
Log warning, continue unchanged |
"log" |
Track detection, no modification |
ContentFilter¶
Block messages containing forbidden words or patterns:
from cogent.interceptors import ContentFilter
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
ContentFilter(
blocked_words=["password", "secret"],
blocked_patterns=[r"\b\d{4}-\d{4}\b"], # regex
action="block", # "block" or "mask"
case_sensitive=False, # default: case-insensitive
),
],
)
TokenLimiter¶
Hard stop when token count exceeds a limit:
from cogent.interceptors import TokenLimiter
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
TokenLimiter(
max_tokens=8000,
message="Context too large. Please start a new conversation.",
),
],
)
ContextCompressor¶
Summarise older messages when approaching token limits:
from cogent.interceptors import ContextCompressor
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
ContextCompressor(
threshold_tokens=8000, # Trigger compression above this
keep_recent=4, # Preserve last N messages
),
],
)
ContextCompressor vs ACC: ContextCompressor is a reactive overflow valve — it summarises old messages when the token count exceeds a threshold. ACC is a proactive memory system that extracts structured artifacts (entities, constraints, actions) every turn into bounded state, preventing drift over long conversations. For most multi-turn use cases, ACC is the right default. Add ContextCompressor only as an extra safety layer for very large context windows. See ACC docs for a detailed comparison.
Tool Control¶
ToolGate¶
Abstract base — subclass and override filter() to control which tools the model sees:
from cogent.interceptors import ToolGate
from cogent.interceptors.base import InterceptContext
class SafeGate(ToolGate):
async def filter(self, tools: list, ctx: InterceptContext) -> list:
allowed = {"search", "read_file"}
return [t for t in tools if t.name in allowed]
agent = Agent(
name="assistant",
model="gpt4",
tools=[search, write_file, delete_file],
interceptors=[SafeGate()],
)
PermissionGate¶
Allowlist of tool names. Override allowed_tools() for dynamic gating:
from cogent.interceptors import PermissionGate
# Static allowlist
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[PermissionGate(tools=["search", "read_file"])],
)
# Dynamic: all tools for admins, read-only for guests
class RoleGate(PermissionGate):
def allowed_tools(self, ctx):
if ctx.run_context and ctx.run_context.role == "admin":
return ["*"]
return ["search", "read_file"]
ConversationGate¶
Unlock tools as the conversation progresses (by message count):
from cogent.interceptors import ConversationGate
agent = Agent(
name="assistant",
model="gpt4",
tools=[search, execute_order, admin_panel],
interceptors=[
ConversationGate(stages={
0: ["search"], # Start: search only
4: ["search", "execute_order"], # After 4 messages
8: ["search", "execute_order", "admin_panel"],
}),
],
)
Rate Limiting¶
RateLimiter¶
Sliding-window rate limiting for tool calls:
from cogent.interceptors import RateLimiter
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
RateLimiter(
calls_per_window=10, # Max calls in window
window_seconds=60.0, # Window size
action="wait", # "wait" (sleep) or "block" (stop)
per_tool=False, # True = separate limit per tool
),
],
)
Thread-safe — uses asyncio.Lock internally, safe to share across concurrent runs.
ThrottleInterceptor¶
Minimum delay between consecutive tool calls:
from cogent.interceptors import ThrottleInterceptor
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
ThrottleInterceptor(min_delay=0.5, per_tool=True),
],
)
Resilience¶
Failover¶
Automatic model switching when the primary model fails:
from cogent.interceptors import Failover
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
Failover(
fallbacks=["gpt-5.4-mini", "claude-sonnet-4-20250514"],
on=["rate_limit", "timeout", "error"], # triggers
max_retries_per_model=2,
),
],
)
Failover intercepts ON_ERROR when an LLM call fails, switches to the next fallback model, and retries. The pre_think phase ensures subsequent iterations continue using the switched model.
Triggers: "rate_limit", "timeout", "error", "context_length".
CircuitBreaker¶
Prevent repeated calls to failing tools:
from cogent.interceptors import CircuitBreaker
agent = Agent(
name="assistant",
model="gpt4",
tools=[search, database_query],
interceptors=[
CircuitBreaker(
failure_threshold=5, # Failures before opening
reset_timeout=30.0, # Seconds before half-open test
tools=["database_query"], # Protect specific tools (None = all)
),
],
)
Uses the structured tool_error field on InterceptContext to detect failures.
Per-Tool Retry¶
For per-tool retry with backoff, use ResilienceConfig.tool_overrides instead of an interceptor — it integrates with exhaustion escalation and fallback_model:
from cogent import Agent
from cogent.agent.resilience import ResilienceConfig
agent = Agent(
name="assistant",
model="gpt4",
tools=[search, database_query],
resilience=ResilienceConfig(
max_retries=3,
strategy="exponential_jitter",
tool_overrides={
"search": {"max_retries": 5, "base_delay": 0.5},
"database_query": {"max_retries": 1, "timeout_seconds": 30},
},
fallback_model="gpt-5.4",
),
)
See Resilience docs for details.
Prompt Adapters¶
ContextPrompt¶
Inject RunContext values into the system prompt via template placeholders:
from cogent.interceptors import ContextPrompt
agent = Agent(
name="assistant",
model="gpt4",
instructions="Greet the user by name.",
interceptors=[
ContextPrompt(template="User: {user_name}\nRole: {role}"),
],
)
result = await agent.run(
"Hello!",
context={"user_name": "Alice", "role": "engineer"},
)
ConversationPrompt¶
Add stage-based instructions as the conversation grows:
from cogent.interceptors import ConversationPrompt
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
ConversationPrompt(stages={
0: "Start with a friendly greeting.",
4: "Offer deeper explanations.",
8: "Wrap up and ask if there's anything else.",
}),
],
)
LambdaPrompt¶
Rewrite the system prompt with a plain function:
from cogent.interceptors import LambdaPrompt
from datetime import datetime, UTC
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
LambdaPrompt(
adapter_fn=lambda prompt, ctx: f"{prompt}\n[Time: {datetime.now(UTC)}]"
),
],
)
Auditing¶
For logging agent activity (LLM requests/responses, tool calls/results, run lifecycle), use the Observer system instead of interceptors:
from cogent import Agent
from cogent.observability import Observer
# Built-in console logging
agent = Agent(name="assistant", model="gpt4", verbosity="detailed")
# Capture events for inspection
observer = Observer(capture=["llm.*", "tool.*"])
agent = Agent(name="assistant", model="gpt4", observer=observer)
await agent.run("Do something")
print(observer.history())
Custom Interceptors¶
Override only the phase methods you need. Unimplemented phases pass through automatically.
from cogent.interceptors import Interceptor, InterceptContext, InterceptResult
class LoggingInterceptor(Interceptor):
async def pre_think(self, ctx: InterceptContext) -> InterceptResult:
print(f"LLM call #{ctx.model_calls + 1}, {len(ctx.messages)} messages")
return InterceptResult.ok()
async def post_act(self, ctx: InterceptContext) -> InterceptResult:
status = "error" if ctx.tool_error else "ok"
print(f"Tool {ctx.tool_name}: {status}")
return InterceptResult.ok()
InterceptContext¶
Available fields at each phase:
@dataclass
class InterceptContext:
agent: Agent # Current agent
phase: Phase # Current phase
task: str # Original task/prompt
messages: list[dict] # Current message history
state: dict # Mutable shared state across all interceptors
run_context: RunContext | None # User-provided context from agent.run()
model_calls: int # LLM calls so far
tool_calls: int # Tool calls so far
tools: list | None # Available tools (for ToolGate)
is_subagent: bool # True if current tool is a subagent
# Phase-specific
tool_name: str | None # PRE_ACT / POST_ACT
tool_args: dict | None # PRE_ACT / POST_ACT
tool_result: object | None # POST_ACT only
tool_error: Exception | None # POST_ACT only (structured error signal)
model_response: Any # POST_THINK only
error: Exception | None # ON_ERROR only
Combining Interceptors¶
Interceptors run in order. Later interceptors see modifications from earlier ones:
from cogent.interceptors import PIIShield, ContentFilter, BudgetGuard
agent = Agent(
name="assistant",
model="gpt4",
interceptors=[
PIIShield(patterns=["email", "ssn"], action="mask"),
ContentFilter(blocked_words=["password"]),
BudgetGuard(model_calls=10, tool_calls=50),
],
)
API Reference¶
Core Classes¶
| Class | Description |
|---|---|
Interceptor |
Base class — override phase methods (pre_think, post_act, etc.) |
InterceptContext |
Context passed to each phase handler |
InterceptResult |
Return type — factories: ok(), stop(), skip(), modify_*() |
Phase |
Enum: PRE_RUN, PRE_THINK, POST_THINK, PRE_ACT, POST_ACT, POST_RUN, ON_ERROR |
StopExecution |
Exception to halt execution immediately |
Built-in Interceptors¶
| Category | Interceptors |
|---|---|
| Budget | BudgetGuard |
| Security | PIIShield, ContentFilter |
| Context | TokenLimiter, ContextCompressor |
| Gates | ToolGate, PermissionGate, ConversationGate |
| Rate Limit | RateLimiter, ThrottleInterceptor |
| Resilience | Failover, CircuitBreaker |
| Prompts | ContextPrompt, ConversationPrompt, LambdaPrompt |
| HITL | HITLInterceptor |