Observability¶
The cogent.observability module provides real-time visibility into agent execution.
Quick Start¶
Pass True directly to observer= for default output, or an Observer instance for fine-grained control:
agent = Agent(
name="Assistant",
model="gpt-5.4-mini",
tools=[my_tool],
observer=True, # agent / tool / subagent lifecycle
)
result = await agent.run("Do something useful")
Use Observer directly when you need subscriptions, custom sinks, event capture, or extra detail:
from cogent.observability import Observer
# Also show LLM request/response lines and token counts
observer = Observer(llm_calls=True)
# Also prepend timestamps to every line
observer = Observer(llm_calls=True, timestamps=True)
agent = Agent(name="Assistant", model="gpt-5.4-mini", observer=observer)
Output Flags¶
Core flags¶
| Flag | Default | What it adds |
|---|---|---|
| (observer active) | — | Agent lifecycle · tool calls and results · subagent calls · retrieval start/complete · streaming start/end |
llm_calls=True |
False |
[request] and [response] lines — model name, message count, token breakdown (in/out/reasoning) |
memory_events=True |
False |
Memory lifecycle events — conversation load/save, ACC context/update, semantic cache hit/miss |
retrieval_events=False |
None |
Override retrieval event visibility. None (default) inherits from the resolved policy (visible at PROGRESS+). Set False to suppress. |
timestamps=True |
False |
Timestamp prefix on every output line |
trace_ids=True |
False |
Correlation IDs on event lines |
Backward-compatible level shortcuts¶
String shortcuts passed to Agent(observer=...) or Observer(level=...) are still accepted:
| String | Equivalent |
|---|---|
"progress" or True |
Observer() |
"debug" or "trace" |
Observer(llm_calls=True, memory_events=True) |
"off" or False |
disabled |
Example output at "progress"¶
[Assistant] [user-input]
Do some math.
[Assistant] [tool-call] a1b2c3d4
calculate(x=6, y=7)
[Assistant] [tool-result] a1b2c3d4 (42ms)
calculate 42
[Assistant] [agent-completed] (2.0s) • 330 tokens
The answer is 42.
Example output at "debug"¶
[2026-04-01 12:00:00.123] [Assistant] [user-input] 8552e158
Do some math.
[2026-04-01 12:00:00.124] [Assistant] [request] gpt-5.4-mini (1 msgs) • 1 tools
[2026-04-01 12:00:00.200] [Assistant] [tool-decision]
calculate
I should call the calculate tool with x=6, y=7.
[2026-04-01 12:00:00.201] [Assistant] [tool-call] a1b2c3d4
calculate(x=6, y=7)
[2026-04-01 12:00:00.350] [Assistant] [tool-result] a1b2c3d4 (149ms)
calculate 42
[2026-04-01 12:00:00.351] [Assistant] [response] (228ms) • 330 tokens
[2026-04-01 12:00:00.352] [Assistant] [agent-completed] (228ms) • 330 tokens
The answer is 42.
MCP and A2A source labels¶
The console prefixes tool and subagent names with a protocol scheme and appends the origin so the source is visible at a glance:
[coordinator] [subagent-call] c6d1b2e9
a2a://analyst@localhost:10099('what is 18% of 250?')
[coordinator] [subagent-result] c6d1b2e9 (3.1s)
a2a://analyst@localhost:10099 '18% of 250 is 45.'
[researcher] [tool-call] 7f1a2b3c
mcp://web_search@search(query='Python async best practices')
[researcher] [tool-result] 7f1a2b3c (120ms)
mcp://web_search@search 'Use asyncio.gather for concurrent tasks...'
a2a://name@host:port— subagent backed by a remoteAgent(url=...)endpointmcp://name@server— tool sourced from an MCP server (the server'sname=fromMCP.stdio(name=...))- No prefix or suffix — local subagent or built-in / toolkit tool
Data forwarding visibility¶
When a Message carries DataParts (structured JSON from A2A or user code), each data part is shown on a separate indented line:
[coordinator] [user-input]
Ask the analyst: what was Q3 revenue?
data= {"quarter": "Q3", "revenue": 148.0, "currency": "EUR"}
[coordinator] [subagent-call] 4f0782b4
analyst('what was Q3 revenue?', data={"quarter": "Q3", "revenue": 148.0, "currency": "EUR"})
When an A2A subagent returns DataPart or FilePart attachments in its response envelope, the result line shows a +N data, N files (names) indicator:
[coordinator] [subagent-result] ccc12345 (1.2s)
a2a://analyst@remote:9000 'See the attached report' +2 data, 1 file (report.pdf)
Post-run Event Inspection¶
Every result carries the events emitted during the run:
result = await agent.run("Do something")
# All events
result.events
# Filter by type — supports glob
errors = result.events_of("tool.error")
llm_reqs = result.events_of("llm.*")
This does not require capture configuration — events are always stored on the result.
Observer API¶
Subscribing to Events¶
from cogent.observability import Observer
observer = Observer()
# Subscribe to a specific type
observer.on("tool.called", lambda e: print(f"tool: {e.data['tool_name']}"))
# Subscribe to a glob pattern
observer.on("tool.*", lambda e: print(f"{e.type}: {e.data}"))
# Subscribe to all events
observer.on_all(lambda e: print(e.type))
# Unsubscribe
unsub = observer.on("agent.*", handler)
unsub()
Event Capture and History¶
observer.history() only returns events that matched a capture= pattern at
construction time. Use it when you want a filtered post-run log separate from
result.events.
observer = Observer(
llm_calls=True,
capture=["tool.result", "agent.*"],
)
await agent.run("Do something")
for event in observer.history("tool.*"):
print(event.type, event.data["tool_name"])
observer.clear_history()
Summary¶
Dynamic Configuration¶
observer.enabled = False # pause output
observer.llm_calls = True # not yet a live setter — re-create Observer instead
Emitting Custom Events¶
Use your own namespace (e.g. my_app.*) rather than Cogent's built-in names.
Nested Subagent Traces¶
When a coordinator delegates to child agents, their events are muted in the output by default — agent name tags, event labels, and timing all render dim to keep the primary agent's flow readable.
To display a full inline trace of each delegated run nested under its subagent branch, opt in with subagent_trace="nested":
observer = Observer(level="progress", subagent_trace="nested")
coordinator = Agent(name="Coordinator", model="gpt-5.4-mini", observer=observer)
This adds a faint [subagent-trace] section under each [subagent-call] branch showing the child agent's full tool and reasoning steps. The default is "off".
Sharing an Observer Across Agents¶
One observer can track multiple agents. Output is tagged with the agent name.
observer = Observer()
researcher = Agent(name="Researcher", model=..., observer=observer)
writer = Agent(name="Writer", model=..., observer=observer)
await researcher.run("Research AI trends")
await writer.run("Write summary")
print(observer.summary())
[Researcher] [user-input]
Research AI trends
[Researcher] [tool-call] abc123de
search(query='AI trends')
[Researcher] [tool-result] abc123de (38ms)
search 'Latest trends in AI...'
[Researcher] [agent-completed] (2.1s) • 250 tokens
Here are the key AI trends...
[Writer] [user-input]
Write summary
[Writer] [agent-completed] (1.5s) • 180 tokens
Here is a summary...
Event Reference¶
Event Shape¶
All built-in events are immutable Event records:
| Field | Meaning |
|---|---|
type |
String name such as tool.called |
data |
Payload dictionary |
timestamp |
UTC timestamp |
source |
Emitting agent or component |
correlation_id |
Optional correlation ID |
Memory Events¶
Memory events are hidden by default and enabled with Observer(memory_events=True) or level="debug".
When enabled, they appear as indented, dimmed lines nested under the agent action that triggered them:
[Agent] [user-input] abc123
What is my name?
[conversation-loaded] my-thread(messages=3)
[acc-context] 271 chars • 2 items (0 constraints, 1 entities, 0 actions, 1 context)
[Agent] [tool-decision]
recall(key='name')
OK (0ms) 'Alice'
[acc-updated] compressed 1 tool call
[conversation-saved] my-thread(messages=5)
[Agent] [agent-completed] (1.6s) • 2247 tokens
Your name is Alice.
| Event | Meaning |
|---|---|
memory.conversation.loaded |
Conversation history loaded for a thread |
memory.conversation.saved |
Conversation state persisted after execution |
memory.acc.context_formatted |
ACC built bounded memory context for the prompt |
memory.acc.updated |
ACC state compressed from the latest turn |
memory.cache.hit |
Semantic cache served a cached tool result |
memory.cache.miss |
Semantic cache miss — tool will execute fresh |
memory.cache.write |
Fresh tool result written to semantic cache |
Low-level store operations (memory.write, memory.read, memory.delete) are suppressed in console output because the tool events (remember(…) / recall(…) / forget(…)) already surface them.
Retrieval Events¶
Retrieval events are visible by default at PROGRESS level. Suppress them with Observer(retrieval_events=False).
When a retriever has an observer set, it emits lifecycle events rendered by the RetrievalFormatter:
[retrieval-start]
dense(query='what is quantum computing?', k=4)
[retrieval-complete] dense 3 results • scores=[0.95, 0.80, 0.70] • 42ms
For summary indexing:
[retrieval-indexing] 5 documents
[retrieval-summarized] 1/5 (350ms)
[retrieval-summarized] 2/5 (280ms)
...
[retrieval-indexed] 5 summaries from 5 docs (2.0s)
| Event | Meaning |
|---|---|
retrieval.start |
Retrieval query initiated |
retrieval.complete |
Results returned with count, scores, and duration |
retrieval.error |
Retrieval failed |
retrieval.summary_index.start |
Summary index build started |
retrieval.summary_index.document_summarized |
One document summarized during indexing |
retrieval.summary_index.complete |
Summary index build completed |
retrieval.summary_index.error |
Summary index build failed |
Fine-grained control via policy:
from cogent.observability import Observer
from cogent.observability.core.config import ObservabilityPolicy, RetrievalTelemetry
# Show retrieval lifecycle but hide summary indexing detail
policy = ObservabilityPolicy(
retrieval=RetrievalTelemetry(enabled=True, summary_index=False),
)
observer = Observer(policy=policy)
| event_id | Unique event ID |
Built-in Event Types¶
| Event | Level | Description |
|---|---|---|
agent.invoked |
PROGRESS | Agent execution started |
agent.thinking |
PROGRESS | Thinking step / loop iteration |
agent.responded |
PROGRESS | Final response produced |
agent.error |
PROGRESS | Agent or validation failure |
tool.called |
PROGRESS | Tool invocation started |
tool.result |
PROGRESS | Tool completed successfully |
tool.retry |
PROGRESS | Tool call failed; framework will retry (one event per failed attempt) |
tool.error |
PROGRESS | All retries exhausted — error returned to caller |
tool.escalated |
PROGRESS | All retries exhausted — error handed to LLM for recovery |
tool.suspended |
PROGRESS | Tool raised Suspend — run will finish with response.suspended=True |
tool.waiting |
PROGRESS | Auto-resolve polling started (check + timeout on Suspend) |
subagent.called |
PROGRESS | Subagent delegation started |
subagent.result |
PROGRESS | Subagent completed |
subagent.context |
PROGRESS | Delegation context summary injected |
subagent.error |
PROGRESS | Subagent failed |
stream.start |
PROGRESS | Streaming started |
stream.end |
PROGRESS | Streaming completed |
stream.error |
PROGRESS | Streaming failed |
output.generated |
PROGRESS | Structured output produced |
retrieval.start |
PROGRESS | Retrieval query initiated |
retrieval.complete |
PROGRESS | Retrieval results returned |
retrieval.error |
PROGRESS | Retrieval failed |
retrieval.summary_index.start |
PROGRESS | Summary index build started |
retrieval.summary_index.document_summarized |
PROGRESS | One document summarized |
retrieval.summary_index.complete |
PROGRESS | Summary index build completed |
retrieval.summary_index.error |
PROGRESS | Summary index build failed |
llm.request |
DEBUG | Request sent to the model |
llm.response |
DEBUG | Model response metadata |
llm.thinking |
DEBUG | Extended reasoning/thinking tokens |
agent.reasoning |
DEBUG | Explicit reasoning phase |
agent.acting |
DEBUG | Tool execution phase |
Common Payload Fields¶
| Family | Fields |
|---|---|
agent.* |
agent_name, agent_id, run_id, step_id, iteration, duration_ms, input_data |
tool.* |
tool_name, call_id, args, result, error, attempts, tool_source |
tool.retry extra |
attempt (1-based retry number), max_retries, error_type, retry_delay |
subagent.* |
subagent_name, call_id, run_id, subagent_run_id, subagent_type ("local" or "a2a"), subagent_url (A2A only), task, data_parts, file_parts, file_names |
subagent.context |
subagent_name, summary_chars, history_turns, summarizer |
llm.* |
agent_name, model, iteration, token counts, thinking content |
retrieval.* |
retriever, query, k, results_count, top_scores, duration_ms, error |
stream.* |
agent_name, token or preview fields |
Run Lineage¶
Built-in events carry lineage fields for reconstructing a run:
| Field | Meaning |
|---|---|
run_id |
Stable ID for one agent invocation |
parent_run_id |
Parent invocation ID for nested/delegated runs |
step_id |
Step within a run (step-1, reasoning-2) |
tool_call_id |
Per-invocation ID on tool.* and subagent.* events |
observer = Observer(llm_calls=True, capture=["agent.*", "subagent.*"])
agent = Agent(name="Assistant", model="gpt-5.4-mini", observer=observer)
await agent.run("Check the shipping quote")
for event in observer.history("subagent.*"):
print(event.type, event.data["run_id"], event.data["tool_call_id"])
Sinks¶
By default the observer writes to stderr. Add custom sinks to route events elsewhere:
from cogent.observability import Observer, ConsoleSink, FileSink, CallbackSink
import sys
observer = Observer(level="progress")
observer.add_sink(ConsoleSink(stream=sys.stdout)) # redirect to stdout
observer.add_sink(FileSink("agent.log")) # write to file
observer.add_sink(CallbackSink( # custom handler
lambda event, formatted: send_to_datadog(formatted)
))
Layout Presets¶
Control output density with the layout parameter:
observer = Observer(layout="compact") # single-line events, minimal detail
observer = Observer(layout="standard") # default multi-line formatting
observer = Observer(layout="verbose") # expanded, no truncation
| Layout | Behavior |
|---|---|
compact |
Single-line events. Suppresses multi-line bodies, reasoning previews, and response previews. Ideal for busy runs or CI. |
standard |
Default. Multi-line when content is present (task body, error detail, response preview). |
verbose |
Same as standard, but disables all truncation. Every field is shown in full. |
Compact example:
[Bot] [user-input] Do some math.
[Bot] [tool-call] a1b2c3d4 calculate(x=6, y=7)
[Bot] [tool-result] a1b2c3d4 (42ms) calculate 42
[Bot] [agent-completed] (2.0s) • 330 tokens
Themes¶
Themes control the color palette applied to semantic roles (agent names, labels, errors, etc.).
Built-in themes¶
observer = Observer(theme="default") # dim agent, bold tool, colored labels
observer = Observer(theme="monochrome") # bold/dim only, no hue — safe for CI/pipes
observer = Observer(theme="subtle") # blue agent, cooler palette
observer = Observer(theme="minimal") # everything dimmed — background agents
observer = Observer(theme="obs-crisp") # high-contrast for scanning dense output
observer = Observer(theme="vivid") # 256-color palette — degrades on basic terminals
Custom themes¶
Build a custom Theme using Style objects:
from cogent.observability.core.config import Theme
from cogent.observability.formatters.style import Color, Style
my_theme = Theme(
agent=Style(fg=Color.BLUE, bold=True),
tool=Style(fg=Color.CYAN),
error=Style(fg=Color.RED, bold=True),
success=Style(fg=Color.GREEN),
)
observer = Observer(theme=my_theme)
Extended colors¶
Style supports 256-color and true-color values that auto-degrade based on terminal capability:
Style(fg=Color.from_256(141)) # 256-color palette index
Style(fg=Color.from_rgb(100, 200, 255)) # true-color → auto-fallback to 256
Degradation is automatic: true-color terminals get full RGB, 256-color terminals get the nearest palette index, and 8-color terminals skip the extended color (modifiers like bold/dim still apply).
Style reference¶
| Parameter | Type | Effect |
|---|---|---|
fg |
Color, Color.from_256(n), Color.from_rgb(r,g,b) |
Foreground color |
bold |
bool |
Bold text |
dim |
bool |
Dimmed text |
italic |
bool |
Italic text |
Compose styles with style.with_dim() to add dim without replacing other attributes.