AgentsReference agent examples
Strands Agents SDK
A FastAPI wrapper around Strands Agents using the OTEL exporter drain pattern.
The fastest implementation path uses
pipelines.odyssey, which ships
proxy_tool, dump_tools_schema, and forward_agent_result_events.
Register the agent
Use external_http mode. Dump the tool schema:
uv run python <<'EOF'
import json
from app import _build_agent
agent = _build_agent("http://stub", "stub")
print(json.dumps(
[
{"name": t.tool_spec.name, "description": t.tool_spec.description, "parameters": t.tool_spec.input_schema}
for t in agent.tool_registry.registry.values()
],
indent=2,
))
EOFrequirements.txt
fastapi>=0.115
uvicorn[standard]>=0.32
httpx>=0.27
strands-agents>=0.4
opentelemetry-sdk>=1.27app.py
import asyncio
import json
from typing import Any
import httpx
from fastapi import FastAPI, Header, HTTPException, Request
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider, ReadableSpan
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter
from strands import Agent, tool
app = FastAPI()
class _MemorySpanExporter(SpanExporter):
def __init__(self) -> None:
self.spans: list[ReadableSpan] = []
def export(self, spans):
self.spans.extend(spans)
return 0
def shutdown(self) -> None:
self.spans.clear()
_exporter = _MemorySpanExporter()
_provider = TracerProvider()
_provider.add_span_processor(SimpleSpanProcessor(_exporter))
trace.set_tracer_provider(_provider)
def _proxy_call(proxy_url: str, run_token: str, tool_name: str, args: dict) -> Any:
url = f"{proxy_url.rstrip('/')}/tools/{tool_name}"
with httpx.Client(timeout=120.0) as c:
r = c.post(url, json=args, headers={"Authorization": f"Bearer {run_token}"})
r.raise_for_status()
return r.json()["response"]
def _build_agent(proxy_url: str, run_token: str) -> Agent:
@tool
def get_order(order_id: str) -> dict:
"""Look up an order by id."""
return _proxy_call(proxy_url, run_token, "get_order", {"order_id": order_id})
@tool
def refund_order(order_id: str) -> dict:
"""Refund an order."""
return _proxy_call(proxy_url, run_token, "refund_order", {"order_id": order_id})
return Agent(
name="orders-triage",
model="claude-sonnet-4-5",
tools=[get_order, refund_order],
system_prompt=(
"You triage refund requests. Look up orders before deciding."
),
)
def _drain_trace(spans: list[ReadableSpan]) -> tuple[list[dict], dict]:
messages: list[dict] = []
metadata = {"total_input_tokens": 0, "total_output_tokens": 0}
for span in sorted(spans, key=lambda s: s.start_time):
attrs = dict(span.attributes or {})
kind = attrs.get("strands.span.kind") or span.name
if kind == "agent.llm":
messages.append({
"role": "assistant",
"content": attrs.get("strands.completion.text"),
"thinking": json.loads(attrs["strands.thinking.json"]) if "strands.thinking.json" in attrs else None,
"tool_calls": json.loads(attrs["strands.tool_calls.json"]) if "strands.tool_calls.json" in attrs else None,
})
metadata["total_input_tokens"] += int(attrs.get("strands.usage.input_tokens", 0))
metadata["total_output_tokens"] += int(attrs.get("strands.usage.output_tokens", 0))
elif kind == "agent.tool":
messages.append({
"role": "tool",
"tool_call_id": attrs.get("strands.tool.call_id"),
"content": attrs.get("strands.tool.result_json"),
})
return messages, metadata
# Serialize the model run + span drain: the in-memory exporter is
# process-global, so two runs draining `_exporter.spans` at once would
# interleave. The lock scopes one run's spans at a time. For real
# concurrency, give each run its own exporter/provider (see the OTEL
# callout below) instead of widening this lock.
_run_lock = asyncio.Lock()
def _run_loop(proxy_url: str, run_token: str, user_instruction: str) -> dict:
"""Synchronous Strands run. Executed on a worker thread (see dispatch)."""
_exporter.spans.clear()
agent = _build_agent(proxy_url, run_token)
result = agent(user_instruction)
messages, metadata = _drain_trace(_exporter.spans)
metadata["model"] = "claude-sonnet-4-5"
final_response = getattr(result, "message", None) or str(result)
return {
"final_response": final_response,
"messages": [{"role": "user", "content": user_instruction}, *messages],
"metadata": metadata,
}
@app.post("/dispatch")
async def dispatch(
request: Request,
x_pipelines_run_token: str | None = Header(default=None),
x_pipelines_odyssey_proxy_url: str | None = Header(default=None),
):
payload = await request.json()
proxy_url = payload.get("odyssey_proxy_url") or x_pipelines_odyssey_proxy_url
run_token = x_pipelines_run_token
if not proxy_url or not run_token:
raise HTTPException(400, "missing proxy URL or run token")
user_instruction = (payload.get("input") or {}).get("user_instruction") or ""
# Offload Strands' blocking call to a worker thread so it never freezes
# the event loop; the lock keeps the shared exporter's spans per-run.
async with _run_lock:
return await asyncio.to_thread(_run_loop, proxy_url, run_token, user_instruction)Strands OTEL attribute names can change across releases. The strands.span.kind and strands.tool_calls.json keys shown in this example are illustrative. Run one dispatch with a debug exporter and adjust key mapping as needed.
Live trace forwarding
Use post-run flush:
from pipelines.odyssey.adapters.strands import forward_agent_result_events
forward_agent_result_events(result)For per-turn streaming, attach safe_post_trace_event to a Strands CallbackHandler, following the same pattern as the built-in OTEL exporter. See Trace events.
Customizations
- Span attribute mapping: print span.attributes during local testing and adjust key mapping as needed.
- Per-task context: set on task current_state and read from payload["input"]["input"]. behavior_instructions remains on the platform.