Agent Features¶
Resilience, human-in-the-loop, reasoning, structured output, streaming, spawning, subagents, and observability.
See Agent Overview for the core Agent class, roles, and configuration.
Resilience¶
Built-in fault tolerance with retries, circuit breakers, and fallbacks:
from cogent.agent import ResilienceConfig, RetryPolicy
agent = Agent(
name="Worker",
model=model,
resilience=ResilienceConfig(
retry_policy=RetryPolicy(
max_retries=3,
base_delay=1.0,
strategy="exponential",
),
),
)
Resilience Components¶
- RetryPolicy: Configure retry behavior with exponential/linear backoff
- CircuitBreaker: Prevent cascading failures
- FallbackRegistry: Define fallback behaviors for failures
Structured Output Self-Correction¶
When returns= is used, failed validation attempts are retried using conversation-based feedback. Rather than blindly re-sending the original prompt, the agent appends a correction turn that shows the model exactly what it produced and why it failed:
Human: <task + schema instruction> ← attempt 1
AI: {"type": "array", ...} ← bad output (model echoed schema)
Human: ⚠️ Your previous response failed validation.
Validation error: Expected list, got dict
Your response was:
```
{"type": "array", ...}
```
Please respond again with ONLY valid JSON that matches the required schema.
AI: ["python", "async", "fastapi"] ← self-corrected
This mirrors the Reflexion pattern — the model has full context of what it did wrong and can self-correct without needing a new conversation. Non-retryable errors (auth failures, exhausted rate-limit retries) propagate immediately and do not consume structured output retry budget.
The number of attempts is controlled by max_structured_output_retries on AgentConfig (default: 3).
Human-in-the-Loop (HITL)¶
Enable human oversight for sensitive operations:
agent = Agent(
name="Executor",
model=model,
tools=[delete_file, send_email],
interrupt_on={
"tools": ["delete_file", "send_email"], # Require approval
},
)
try:
result = await agent.run("Delete temp files")
except InterruptedException as e:
# Human reviews pending action
decision = HumanDecision(approved=True)
result = await agent.resume(e.state, decision)
Reasoning¶
Enable extended thinking for complex problems with AI-controlled reasoning rounds.
@effort Shorthand¶
The simplest way to control model-native reasoning — append an effort level or token budget to the model string:
from cogent import Agent
agent = Agent(name="Analyst", model="o3-mini@high")
agent = Agent(name="Analyst", model="claude@high")
agent = Agent(name="Analyst", model="gemini-2.5-pro@16k")
See Reasoning — @effort Shorthand for the full provider mapping.
Basic Usage¶
from cogent import Agent
from cogent.agent.reasoning import ReasoningConfig
# Simple: Enable with defaults
agent = Agent(
name="Analyst",
model=model,
reasoning=True, # Default config
)
result = await agent.run("Analyze this complex problem...")
Custom Configuration¶
# Full control with ReasoningConfig
agent = Agent(
name="DeepThinker",
model=model,
reasoning=ReasoningConfig(
max_thinking_rounds=15, # AI decides when ready (up to 15)
style=ReasoningStyle.CRITICAL, # Critical reasoning style
show_thinking=True, # Include thoughts in output
),
)
Per-Call Overrides¶
Enable or customize reasoning for specific calls:
# Agent without reasoning by default
agent = Agent(name="Helper", model=model, reasoning=False)
# Simple task - no reasoning
result = await agent.run("What time is it?")
# Complex task - enable reasoning
result = await agent.run(
"Analyze this codebase architecture",
reasoning=True, # Enable for this call
)
# Very complex - custom config
result = await agent.run(
"Debug this complex issue",
reasoning=ReasoningConfig(
max_thinking_rounds=10,
style=ReasoningStyle.ANALYTICAL,
),
)
Reasoning Styles¶
ANALYTICAL: Step-by-step logical breakdown (default)EXPLORATORY: Consider multiple approachesCRITICAL: Question assumptions, find flawsCREATIVE: Generate novel solutions
AI-Controlled Rounds¶
The AI signals when reasoning is complete via <ready>true</ready> tags. The max_thinking_rounds is a safety limit, not a fixed count:
ReasoningConfig.standard() # max 10 rounds (safety net)
ReasoningConfig.deep() # max 15 rounds (complex problems)
Structured Output¶
Enforce response schemas with returns= on agent.run():
from pydantic import BaseModel, Field
class ContactInfo(BaseModel):
name: str = Field(description="Full name")
email: str = Field(description="Email address")
phone: str | None = Field(None, description="Phone number")
agent = Agent(name="Extractor", model=model)
result = await agent.run("Extract: John Doe, john@acme.com", returns=ContactInfo)
print(result.content.data) # ContactInfo(name="John Doe", ...)
Supports Pydantic models, dataclasses, TypedDict, JSON Schema dicts, bare primitives (str, int, bool, float), Literal, collections (list[T], set[T], tuple[T, ...]), Union, Enum, None, and dict.
See Structured Output for the full reference — schema types, field guidance, few-shot examples, ResponseSchema configuration, and provider support.
TaskBoard¶
Human-like task tracking for complex workflows:
agent = Agent(
name="Researcher",
model=model,
tools=[search, summarize],
taskboard=True, # Adds task management tools
)
result = await agent.run("Research Python async patterns")
print(agent.taskboard.summary())
Streaming¶
Real-time token-by-token streaming from agent executions.
Streaming enables agents to yield output progressively as tokens are generated, rather than waiting for complete responses. This provides:
- Real-time feedback during long-running agent operations
- Better UX with progressive output display
- Lower perceived latency by showing immediate progress
- Cancellation support for in-flight operations
Quick Start¶
from cogent import Agent
agent = Agent(
name="writer",
model="gpt-5.4",
stream=True, # Enable streaming
)
# Stream tokens as they arrive
async for chunk in agent.run_stream("Write a poem about AI"):
print(chunk.content, end="", flush=True)
Enabling Streaming¶
# Option 1: Set on agent creation
agent = Agent(
name="writer",
model="gpt-5.4",
stream=True,
)
result = await agent.run("Write about AI") # Still works normally
async for chunk in agent.run_stream("Write a poem"): # Stream tokens
print(chunk.content, end="", flush=True)
# Option 2: Stream on demand (any agent)
agent = Agent(name="writer", model="gpt-5.4")
async for chunk in agent.run_stream("Write a poem"):
print(chunk.content, end="", flush=True)
StreamChunk¶
Each streaming chunk contains:
@dataclass
class StreamChunk:
content: str # Token content
delta: str # Incremental text (same as content)
is_final: bool # Last chunk?
finish_reason: str | None # Why stopped (stop, length, tool_calls)
metadata: dict | None # Token usage, model info
run_stream() vs run()¶
| Feature | run() |
run_stream() |
|---|---|---|
| Returns | Response |
AsyncIterator[StreamChunk] |
| Output | Complete final output | Progressive tokens |
| Latency | Wait for completion | Immediate feedback |
| Use Case | Batch processing | Interactive UX |
Usage Patterns¶
Collecting streamed output:
full_response = []
async for chunk in agent.run_stream("Write a story"):
full_response.append(chunk.content)
print(chunk.content, end="", flush=True)
final_text = "".join(full_response)
Progress tracking:
async for chunk in agent.run_stream("Long analysis task"):
print(chunk.content, end="", flush=True)
if chunk.is_final:
print("\n✅ Complete!")
Error handling:
try:
async for chunk in agent.run_stream("Query"):
print(chunk.content, end="", flush=True)
if chunk.finish_reason == "length":
print("\n⚠️ Response truncated (max tokens reached)")
except Exception as e:
print(f"\n❌ Streaming error: {e}")
Streaming with Tools¶
When an agent calls tools during streaming, the stream may pause while tools execute:
from cogent import Agent, tool
@tool
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
agent = Agent(name="researcher", model="gpt-5.4", tools=[search])
async for chunk in agent.run_stream("Search for AI news and summarize"):
if chunk.content:
print(chunk.content, end="", flush=True)
if chunk.finish_reason == "tool_calls":
print("\n[Tool calling...]")
Web UI Integration¶
FastAPI streaming endpoint:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(message: str):
async def generate():
async for chunk in agent.run_stream(message):
yield chunk.content
return StreamingResponse(generate(), media_type="text/plain")
Server-Sent Events (SSE):
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
@app.post("/chat/sse")
async def chat_sse(message: str):
async def event_stream():
async for chunk in agent.run_stream(message):
yield {"event": "token", "data": chunk.content}
yield {"event": "done", "data": ""}
return EventSourceResponse(event_stream())
Spawning¶
Dynamic agent creation at runtime:
from cogent.agent import SpawningConfig, AgentSpec
agent = Agent(
name="Coordinator",
model=model,
spawning=SpawningConfig(
allowed_specs=[
AgentSpec(name="researcher", tools=["search"]),
AgentSpec(name="writer", tools=["write"]),
],
),
)
# Agent can spawn sub-agents during execution
result = await agent.run("Research and write about AI")
Subagents (Native Delegation)¶
New in v0.x.x: Native subagent support with full metadata preservation.
Delegate tasks to specialist agents while preserving Response metadata (tokens, duration, delegation chain):
from cogent import Agent
# Create specialist agents
data_analyst = Agent(
name="data_analyst",
model="gpt-5.4-mini",
instructions="Analyze data and provide statistical insights.",
)
market_researcher = Agent(
name="market_researcher",
model="gpt-5.4-mini",
instructions="Research market trends and competitive landscape.",
)
# Create coordinator with subagents
coordinator = Agent(
name="coordinator",
model="gpt-5.4-mini",
instructions="""Coordinate research tasks:
- Use data_analyst for numerical analysis
- Use market_researcher for market trends
Synthesize their findings.""",
# Simply pass the agents - uses their names automatically
subagents=[data_analyst, market_researcher],
)
# Coordinator delegates automatically
response = await coordinator.run("Analyze Q4 2025 e-commerce growth")
# Full metadata preserved
print(f"Total tokens: {response.metadata.tokens.total_tokens}") # Includes all subagents
print(f"Subagent calls: {len(response.subagent_responses)}")
for sub_resp in response.subagent_responses:
print(f" {sub_resp.metadata.agent}: {sub_resp.metadata.tokens.total_tokens} tokens")
Structured Output from Subagents¶
Use returns= on a subagent to declare the schema it produces. The coordinator's LLM receives clean JSON instead of a plain string, enabling it to reason over the structured result:
from pydantic import BaseModel
from typing import Literal
class ReviewScore(BaseModel):
score: int
verdict: Literal["approved", "needs_revision"]
feedback: str
reviewer = Agent(
name="reviewer",
model="gpt-5.4-mini",
returns=ReviewScore, # Declares output schema when used as a subagent
instructions="Review content and score it 1-10.",
)
editor = Agent(
name="editor",
model="gpt-5.4-mini",
subagents=[writer, reviewer],
)
# reviewer.run() is called with returns=ReviewScore automatically;
# editor's LLM sees {"score": 8, "verdict": "approved", "feedback": "..."} ✅
result = await editor.run("Write and review a product tweet")
Key Benefits:
- ✅ Accurate token counting (coordinator + all subagents, including reasoning tokens)
- ✅ Full delegation chain tracking
- ✅ Context propagates automatically
- ✅ Observable with [subagent-call], [subagent-result] events
- ✅ Zero LLM behavior changes (uses existing tool calling)
- ✅ Subagents can declare their output schema via returns=
Example:
# List syntax (recommended) - uses agent names as tool names
coordinator = Agent(
name="coordinator",
model="gpt-5.4",
subagents=[specialist],
)
# Dict syntax - override tool names
coordinator = Agent(
name="coordinator",
model="gpt-5.4",
subagents={"custom_name": specialist},
)
See docs/subagents.md for complete documentation.
Observability¶
Built-in observability for standalone usage:
from cogent import Agent
from cogent.observability import ObservabilityLevel
# Boolean shorthand
agent = Agent(name="Worker", model=model, verbosity=True) # Progress level
# String levels
agent = Agent(name="Worker", model=model, verbosity="debug")
# Enum (explicit)
agent = Agent(name="Worker", model=model, verbosity=ObservabilityLevel.DEBUG)
# Integer (0-5)
agent = Agent(name="Worker", model=model, verbosity=4) # DEBUG
# Advanced: Full control with observer
from cogent.observability import Observer
observer = Observer(level="debug")
agent = Agent(name="Worker", model=model, observer=observer)
Verbosity levels:
| Level | Int | String | Description |
|---|---|---|---|
OFF |
0 | "off" |
No output |
RESULT |
1 | "result", "minimal" |
Only final results |
PROGRESS |
2 | "progress", "normal" |
Key milestones (default for True) |
DETAILED |
3 | "detailed", "verbose" |
Tool calls, timing |
DEBUG |
4 | "debug" |
Everything including internal events |
TRACE |
5 | "trace" |
Maximum detail + execution graph |
Priority: observer parameter takes precedence over verbosity.