Skip to content

Agent Features

Resilience, human-in-the-loop, reasoning, structured output, streaming, spawning, subagents, and observability.

See Agent Overview for the core Agent class, roles, and configuration.

Resilience

Built-in fault tolerance with retries, circuit breakers, and fallbacks:

from cogent.agent import ResilienceConfig, RetryPolicy

agent = Agent(
    name="Worker",
    model=model,
    resilience=ResilienceConfig(
        retry_policy=RetryPolicy(
            max_retries=3,
            base_delay=1.0,
            strategy="exponential",
        ),
    ),
)

Resilience Components

  • RetryPolicy: Configure retry behavior with exponential/linear backoff
  • CircuitBreaker: Prevent cascading failures
  • FallbackRegistry: Define fallback behaviors for failures

Structured Output Self-Correction

When returns= is used, failed validation attempts are retried using conversation-based feedback. Rather than blindly re-sending the original prompt, the agent appends a correction turn that shows the model exactly what it produced and why it failed:

Human: <task + schema instruction>     ← attempt 1
AI:    {"type": "array", ...}          ← bad output (model echoed schema)
Human: ⚠️ Your previous response failed validation.
       Validation error: Expected list, got dict
       Your response was:
       ```
       {"type": "array", ...}
       ```
       Please respond again with ONLY valid JSON that matches the required schema.
AI:    ["python", "async", "fastapi"]  ← self-corrected

This mirrors the Reflexion pattern — the model has full context of what it did wrong and can self-correct without needing a new conversation. Non-retryable errors (auth failures, exhausted rate-limit retries) propagate immediately and do not consume structured output retry budget.

The number of attempts is controlled by max_structured_output_retries on AgentConfig (default: 3).

Human-in-the-Loop (HITL)

Enable human oversight for sensitive operations:

agent = Agent(
    name="Executor",
    model=model,
    tools=[delete_file, send_email],
    interrupt_on={
        "tools": ["delete_file", "send_email"],  # Require approval
    },
)

try:
    result = await agent.run("Delete temp files")
except InterruptedException as e:
    # Human reviews pending action
    decision = HumanDecision(approved=True)
    result = await agent.resume(e.state, decision)

Reasoning

Enable extended thinking for complex problems with AI-controlled reasoning rounds.

@effort Shorthand

The simplest way to control model-native reasoning — append an effort level or token budget to the model string:

from cogent import Agent

agent = Agent(name="Analyst", model="o3-mini@high")
agent = Agent(name="Analyst", model="claude@high")
agent = Agent(name="Analyst", model="gemini-2.5-pro@16k")

See Reasoning — @effort Shorthand for the full provider mapping.

Basic Usage

from cogent import Agent
from cogent.agent.reasoning import ReasoningConfig

# Simple: Enable with defaults
agent = Agent(
    name="Analyst",
    model=model,
    reasoning=True,  # Default config
)

result = await agent.run("Analyze this complex problem...")

Custom Configuration

# Full control with ReasoningConfig
agent = Agent(
    name="DeepThinker",
    model=model,
    reasoning=ReasoningConfig(
        max_thinking_rounds=15,         # AI decides when ready (up to 15)
        style=ReasoningStyle.CRITICAL,  # Critical reasoning style
        show_thinking=True,             # Include thoughts in output
    ),
)

Per-Call Overrides

Enable or customize reasoning for specific calls:

# Agent without reasoning by default
agent = Agent(name="Helper", model=model, reasoning=False)

# Simple task - no reasoning
result = await agent.run("What time is it?")

# Complex task - enable reasoning
result = await agent.run(
    "Analyze this codebase architecture",
    reasoning=True,  # Enable for this call
)

# Very complex - custom config
result = await agent.run(
    "Debug this complex issue",
    reasoning=ReasoningConfig(
        max_thinking_rounds=10,
        style=ReasoningStyle.ANALYTICAL,
    ),
)

Reasoning Styles

  • ANALYTICAL: Step-by-step logical breakdown (default)
  • EXPLORATORY: Consider multiple approaches
  • CRITICAL: Question assumptions, find flaws
  • CREATIVE: Generate novel solutions

AI-Controlled Rounds

The AI signals when reasoning is complete via <ready>true</ready> tags. The max_thinking_rounds is a safety limit, not a fixed count:

ReasoningConfig.standard()  # max 10 rounds (safety net)
ReasoningConfig.deep()      # max 15 rounds (complex problems)

Structured Output

Enforce response schemas with returns= on agent.run():

from pydantic import BaseModel, Field

class ContactInfo(BaseModel):
    name: str = Field(description="Full name")
    email: str = Field(description="Email address")
    phone: str | None = Field(None, description="Phone number")

agent = Agent(name="Extractor", model=model)
result = await agent.run("Extract: John Doe, john@acme.com", returns=ContactInfo)
print(result.content.data)  # ContactInfo(name="John Doe", ...)

Supports Pydantic models, dataclasses, TypedDict, JSON Schema dicts, bare primitives (str, int, bool, float), Literal, collections (list[T], set[T], tuple[T, ...]), Union, Enum, None, and dict.

See Structured Output for the full reference — schema types, field guidance, few-shot examples, ResponseSchema configuration, and provider support.

TaskBoard

Human-like task tracking for complex workflows:

agent = Agent(
    name="Researcher",
    model=model,
    tools=[search, summarize],
    taskboard=True,  # Adds task management tools
)

result = await agent.run("Research Python async patterns")
print(agent.taskboard.summary())

Streaming

Real-time token-by-token streaming from agent executions.

Streaming enables agents to yield output progressively as tokens are generated, rather than waiting for complete responses. This provides:

  • Real-time feedback during long-running agent operations
  • Better UX with progressive output display
  • Lower perceived latency by showing immediate progress
  • Cancellation support for in-flight operations

Quick Start

from cogent import Agent

agent = Agent(
    name="writer",
    model="gpt-5.4",
    stream=True,  # Enable streaming
)

# Stream tokens as they arrive
async for chunk in agent.run_stream("Write a poem about AI"):
    print(chunk.content, end="", flush=True)

Enabling Streaming

# Option 1: Set on agent creation
agent = Agent(
    name="writer",
    model="gpt-5.4",
    stream=True,
)

result = await agent.run("Write about AI")  # Still works normally
async for chunk in agent.run_stream("Write a poem"):  # Stream tokens
    print(chunk.content, end="", flush=True)

# Option 2: Stream on demand (any agent)
agent = Agent(name="writer", model="gpt-5.4")

async for chunk in agent.run_stream("Write a poem"):
    print(chunk.content, end="", flush=True)

StreamChunk

Each streaming chunk contains:

@dataclass
class StreamChunk:
    content: str              # Token content
    delta: str                # Incremental text (same as content)
    is_final: bool           # Last chunk?
    finish_reason: str | None # Why stopped (stop, length, tool_calls)
    metadata: dict | None     # Token usage, model info

run_stream() vs run()

Feature run() run_stream()
Returns Response AsyncIterator[StreamChunk]
Output Complete final output Progressive tokens
Latency Wait for completion Immediate feedback
Use Case Batch processing Interactive UX

Usage Patterns

Collecting streamed output:

full_response = []

async for chunk in agent.run_stream("Write a story"):
    full_response.append(chunk.content)
    print(chunk.content, end="", flush=True)

final_text = "".join(full_response)

Progress tracking:

async for chunk in agent.run_stream("Long analysis task"):
    print(chunk.content, end="", flush=True)

    if chunk.is_final:
        print("\n✅ Complete!")

Error handling:

try:
    async for chunk in agent.run_stream("Query"):
        print(chunk.content, end="", flush=True)

        if chunk.finish_reason == "length":
            print("\n⚠️ Response truncated (max tokens reached)")
except Exception as e:
    print(f"\n❌ Streaming error: {e}")

Streaming with Tools

When an agent calls tools during streaming, the stream may pause while tools execute:

from cogent import Agent, tool

@tool
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

agent = Agent(name="researcher", model="gpt-5.4", tools=[search])

async for chunk in agent.run_stream("Search for AI news and summarize"):
    if chunk.content:
        print(chunk.content, end="", flush=True)

    if chunk.finish_reason == "tool_calls":
        print("\n[Tool calling...]")

Web UI Integration

FastAPI streaming endpoint:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(message: str):
    async def generate():
        async for chunk in agent.run_stream(message):
            yield chunk.content

    return StreamingResponse(generate(), media_type="text/plain")

Server-Sent Events (SSE):

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse

@app.post("/chat/sse")
async def chat_sse(message: str):
    async def event_stream():
        async for chunk in agent.run_stream(message):
            yield {"event": "token", "data": chunk.content}
        yield {"event": "done", "data": ""}

    return EventSourceResponse(event_stream())

Spawning

Dynamic agent creation at runtime:

from cogent.agent import SpawningConfig, AgentSpec

agent = Agent(
    name="Coordinator",
    model=model,
    spawning=SpawningConfig(
        allowed_specs=[
            AgentSpec(name="researcher", tools=["search"]),
            AgentSpec(name="writer", tools=["write"]),
        ],
    ),
)

# Agent can spawn sub-agents during execution
result = await agent.run("Research and write about AI")

Subagents (Native Delegation)

New in v0.x.x: Native subagent support with full metadata preservation.

Delegate tasks to specialist agents while preserving Response metadata (tokens, duration, delegation chain):

from cogent import Agent

# Create specialist agents
data_analyst = Agent(
    name="data_analyst",
    model="gpt-5.4-mini",
    instructions="Analyze data and provide statistical insights.",
)

market_researcher = Agent(
    name="market_researcher",
    model="gpt-5.4-mini",
    instructions="Research market trends and competitive landscape.",
)

# Create coordinator with subagents
coordinator = Agent(
    name="coordinator",
    model="gpt-5.4-mini",
    instructions="""Coordinate research tasks:
- Use data_analyst for numerical analysis
- Use market_researcher for market trends
Synthesize their findings.""",
    # Simply pass the agents - uses their names automatically
    subagents=[data_analyst, market_researcher],
)

# Coordinator delegates automatically
response = await coordinator.run("Analyze Q4 2025 e-commerce growth")

# Full metadata preserved
print(f"Total tokens: {response.metadata.tokens.total_tokens}")  # Includes all subagents
print(f"Subagent calls: {len(response.subagent_responses)}")
for sub_resp in response.subagent_responses:
    print(f"  {sub_resp.metadata.agent}: {sub_resp.metadata.tokens.total_tokens} tokens")

Structured Output from Subagents

Use returns= on a subagent to declare the schema it produces. The coordinator's LLM receives clean JSON instead of a plain string, enabling it to reason over the structured result:

from pydantic import BaseModel
from typing import Literal

class ReviewScore(BaseModel):
    score: int
    verdict: Literal["approved", "needs_revision"]
    feedback: str

reviewer = Agent(
    name="reviewer",
    model="gpt-5.4-mini",
    returns=ReviewScore,  # Declares output schema when used as a subagent
    instructions="Review content and score it 1-10.",
)

editor = Agent(
    name="editor",
    model="gpt-5.4-mini",
    subagents=[writer, reviewer],
)

# reviewer.run() is called with returns=ReviewScore automatically;
# editor's LLM sees {"score": 8, "verdict": "approved", "feedback": "..."}  ✅
result = await editor.run("Write and review a product tweet")

Key Benefits: - ✅ Accurate token counting (coordinator + all subagents, including reasoning tokens) - ✅ Full delegation chain tracking - ✅ Context propagates automatically - ✅ Observable with [subagent-call], [subagent-result] events - ✅ Zero LLM behavior changes (uses existing tool calling) - ✅ Subagents can declare their output schema via returns=

Example:

# List syntax (recommended) - uses agent names as tool names
coordinator = Agent(
    name="coordinator",
    model="gpt-5.4",
    subagents=[specialist],
)

# Dict syntax - override tool names
coordinator = Agent(
    name="coordinator",
    model="gpt-5.4",
    subagents={"custom_name": specialist},
)

See docs/subagents.md for complete documentation.

Observability

Built-in observability for standalone usage:

from cogent import Agent
from cogent.observability import ObservabilityLevel

# Boolean shorthand
agent = Agent(name="Worker", model=model, verbosity=True)  # Progress level

# String levels
agent = Agent(name="Worker", model=model, verbosity="debug")

# Enum (explicit)
agent = Agent(name="Worker", model=model, verbosity=ObservabilityLevel.DEBUG)

# Integer (0-5)
agent = Agent(name="Worker", model=model, verbosity=4)  # DEBUG

# Advanced: Full control with observer
from cogent.observability import Observer

observer = Observer(level="debug")
agent = Agent(name="Worker", model=model, observer=observer)

Verbosity levels:

Level Int String Description
OFF 0 "off" No output
RESULT 1 "result", "minimal" Only final results
PROGRESS 2 "progress", "normal" Key milestones (default for True)
DETAILED 3 "detailed", "verbose" Tool calls, timing
DEBUG 4 "debug" Everything including internal events
TRACE 5 "trace" Maximum detail + execution graph

Priority: observer parameter takes precedence over verbosity.