Pipelines Docs is in beta — content is actively being added.
AgentsRuntime Setup

Agent SDK (Python)

The pipelines.odyssey Python package.

The recommended path for any Python agent. The SDK handles envelope parsing, inbound auth, the proxy shim, and response shaping.

Install

pip install 'pipelines-sdk[odyssey]'

Add the adapter for your framework — each adapter extra implies [odyssey], so you don't need to list it separately:

pip install 'pipelines-sdk[openai-agents]'
pip install 'pipelines-sdk[anthropic]'
pip install 'pipelines-sdk[langchain]'
pip install 'pipelines-sdk[strands]'

Scaffold a project

Generate a runnable wrapper instead of hand writing one. Every command is also available as the standalone odyssey console script.

pipelines odyssey init --framework anthropic --dir my-agent

The emitted project shape, the framework options, and the full scaffold, preflight, and tunnel loop are documented once in Local development.

Minimal example (OpenAI Agents SDK)

from agents import Agent, Runner, function_tool
from fastapi import FastAPI
from pipelines.odyssey import proxy_call, register_dispatch_route

app = FastAPI()


@function_tool
def get_order(order_id: str) -> dict:
    return proxy_call("get_order", {"order_id": order_id})


@function_tool
def refund_order(order_id: str) -> dict:
    return proxy_call("refund_order", {"order_id": order_id})


def build_agent() -> Agent:
    return Agent(
        name="orders-triage",
        instructions="Triage refund requests; look up orders before deciding.",
        tools=[get_order, refund_order],
        model="gpt-5",
    )


@register_dispatch_route(app, agent_token_env="AGENT_TOKEN")
async def run(envelope):
    result = await Runner.run(build_agent(), envelope.user_instruction)
    return result.final_output

Dump the registration JSON for the Import JSON dialog:

from pipelines.odyssey.adapters.openai_agents import dump_tools_schema_json
print(dump_tools_schema_json(build_agent()))

register_dispatch_route

The decorator validates the inbound bearer, parses the request into an Envelope, binds it on a per-request context (so module-level tools can call proxy_call(name, args)), and coerces your return value into the v1 response envelope.

You return...Sent as...
"some string"{"final_response": "some string"}
{"final_response": ..., "messages": [...], "metadata": {...}}passes through
An object with final_output / final_response / output / messagewrapped into {"final_response": "..."}

Error → HTTP mapping:

FailureStatus
Missing or wrong inbound bearer401
agent_token_env env var unset503
Missing odyssey_proxy_url or run token400
Malformed JSON body400
Empty final_response500

The envelope

@dataclass(frozen=True)
class Envelope:
    proxy_url: str
    run_token: str
    user_instruction: str
    task_input: Mapping[str, Any]
    task_id: int | str | None
    run_id: int | str | None
    agent_id: int | str | None
    latest_user_prompt: str | None
    session_id: int | str | None
    turn_id: str | None
    messages: list | None
    scenario_state: Mapping | None
    workspace: Mapping | None
    run_token_jti: str | None
    tools_schema: list[dict] | None
    declared_actor_ids: frozenset[str] | None
    raw: Mapping[str, Any]
  • proxy_url — trailing-slash-stripped; append /tools/{name}.
  • run_token — per-run bearer.
  • user_instruction — canonical current prompt from body["input"]["user_instruction"]; stable across single-shot and multi-turn dispatches.
  • task_input — the row's current_state, else {}.
  • latest_user_prompt — back-compat alias for the current multi-turn prompt.
  • session_id / turn_id — optional multi-turn identifiers for agents that explicitly key their own memory. Platform proxy correlation uses the per-run token, so these are not required for tool calls.
  • messages — optional prior conversation history in replay memory mode.
  • scenario_state — optional replay-mode world state carried forward by the platform.
  • workspace — coding-agent (workspace) run context (cwd, setup_mode, setup_instructions); None on non-workspace runs.
  • run_token_jti — non-secret correlation id, safe to log.
  • tools_schema — inlined tools schema when it fits the platform's dispatch cap; None when the shim resolves it over a separate hop.
  • declared_actor_ids — the run's declared sub-agent catalog, used by for_actor to flag declared/runtime label drift; None when no catalog is available (single-agent runs, code mode, or older platforms).
  • raw — full dispatch body.

Single-shot dispatches usually omit the multi-turn fields, so they parse as None. Agents that only need the current user prompt should continue to pass envelope.user_instruction to their runner.

Proxy Call retries

proxy_call and async_proxy_call automatically retry transient platform errors, specifically HTTP 429 and 503 responses tagged lock_contention, context_store_unavailable, or trace_sequence_contended. The retry policy uses up to four attempts with exponential backoff plus jitter (roughly 0.5 to 2.5 seconds per wait, capped at 4 seconds). As a result, tool calls may occasionally take a few additional seconds under contention. Terminal errors (401, 404, 400 or 422, and 500) raise ProxyCallError immediately, with status_code, body, and error_class attached.

# Disable auto-retry for a latency-sensitive call:
result = proxy_call("get_order", {"order_id": "4521"}, auto_retry=False)

Live trace forwarding

For long-running agents, forward intermediate events while in flight so the trace tab renders live (it polls every 3s):

from pipelines.odyssey import (
    post_trace_event,
    safe_post_trace_event,
    async_safe_post_trace_event,
)

post_trace_event("system_prompt", {"text": SYSTEM_PROMPT, "model": "gpt-5"})
safe_post_trace_event("thinking", {"text": "Considering refund eligibility..."})
await async_safe_post_trace_event("assistant_message", {"text": "Refund approved."})
HelperFailure mode
post_trace_event / async_post_trace_eventRaises TraceForwardError.
safe_post_trace_event / async_safe_post_trace_eventLogs a warning on failure. Recommended.

Reserved event_type values: assistant_message, thinking, system_prompt, custom, plus the multi-agent types handoff, subagent_message, subagent_final — post the first with the typed post_handoff / safe_post_handoff helpers. See Trace events.

Per-adapter helpers

Each adapter ships a dump_tools_schema(...) (for the Import JSON dialog) plus framework-specific glue.

OpenAI Agents SDK

from pipelines.odyssey.adapters.openai_agents import (
    dump_tools_schema_json,
    proxy_tool,
    forward_run_result_events,
)

print(dump_tools_schema_json(build_agent()))

get_order = proxy_tool("get_order", description="Look up an order.", is_async=False)

result = await Runner.run(build_agent(), envelope.user_instruction)
forward_run_result_events(result)  # post-run live-trace flush
return result.final_output

For internally multi-agent roots, build each sub-agent's tools from its own envelope.for_actor("<actor_id>") handle (pass it as actor= to pipelines_function_tool / proxy_tool) so every call — including a shared tool — is attributed deterministically; add pipelines_run_hooks for the handoff timeline, and use extract_topology / build_agent_manifest for the declared topology + tools. See Multi-agent systems.

Anthropic SDK

from functools import lru_cache

from anthropic import Anthropic
from pipelines.odyssey import register_dispatch_route
from pipelines.odyssey.adapters.anthropic import run_anthropic_loop

TOOLS = [...]  # in `tools_schema` shape — strip registration-only keys
               # (default_execution_mode, passthrough_binding, ledger_*)
               # before passing to the model; messages.create rejects them.


@lru_cache(maxsize=1)
def get_client() -> Anthropic:
    # Lazy: `Anthropic()` raises without ANTHROPIC_API_KEY, so building
    # it at import would crash the server before the ping probe can pass.
    return Anthropic()


@register_dispatch_route(app, agent_token_env="AGENT_TOKEN")
def run(envelope):  # sync def: the SDK offloads it to a worker thread
    return run_anthropic_loop(
        client=get_client(),
        tools=TOOLS,
        user_instruction=envelope.user_instruction,
        model="claude-sonnet-4-5",
        system_prompt=SYSTEM_PROMPT,
        thinking_budget_tokens=1024,
    )

run_anthropic_loop runs the full tool-use loop and returns a v1 dict. Kwargs: system_prompt, thinking_budget_tokens, max_tokens (2048), max_turns (10), forward_live_trace (True).

Declare the handler as a plain def (not async def) whenever its body is synchronous — run_anthropic_loop, Runner.run_sync, sync httpx. The SDK runs sync handlers on a worker thread so the blocking work doesn't freeze the uvicorn event loop (which would serialize every concurrent dispatch). Use async def only when the body is genuinely await-based.

Anthropic model IDs are dash-delimited: claude-sonnet-4-5, not claude-sonnet-4.5.

LangChain

from fastapi import FastAPI
from pydantic import BaseModel, Field
from pipelines.odyssey import register_dispatch_route
from pipelines.odyssey.adapters.langchain import dump_tools_schema, proxy_tool

app = FastAPI()


class GetOrderArgs(BaseModel):
    order_id: str = Field(description="The order id to look up.")


class RefundOrderArgs(BaseModel):
    order_id: str


get_order = proxy_tool("get_order", description="Look up an order.", args_schema=GetOrderArgs)
refund_order = proxy_tool("refund_order", description="Refund an order.", args_schema=RefundOrderArgs)
TOOLS = [get_order, refund_order]


@register_dispatch_route(app, agent_token_env="AGENT_TOKEN")
async def run(envelope):
    from langchain_openai import ChatOpenAI
    from langgraph.prebuilt import create_react_agent

    agent = create_react_agent(ChatOpenAI(model="gpt-5"), TOOLS)
    result = await agent.ainvoke({"messages": [("user", envelope.user_instruction)]})
    return result["messages"][-1].content

Dump the registration JSON:

print(dump_tools_schema(TOOLS))

The adapter targets langchain_core.tools, which is stable across both langchain 0.x and the langgraph-based stack. If you're on LangChain 1.x+, AgentExecutor has moved to langgraph — the adapter works with either.

LangGraph approval gates. interrupt_before is an interactive pause/resume mechanism, while a Pipelines dispatch is one HTTP request that must return a final response. If your local graph uses interrupt_before=["sensitive_tools"] for human approval, compile the hosted dispatch graph without those interrupts, or route approvals through an explicit application flow before the dispatch runs. The platform does not resume a paused LangGraph thread inside a single dispatch.

@pipelines_proxy — keep your existing tool bodies. Wraps an existing @tool function in-place: during a dispatch the body is replaced with a proxy call; outside a dispatch (local CLI, tests) the original body runs normally.

from langchain_core.tools import tool
from pipelines.odyssey.adapters.langchain import pipelines_proxy

@pipelines_proxy()
@tool
def get_order(order_id: str) -> dict:
    """Look up an order by id."""
    return db.orders.find(order_id)   # runs locally; proxied at dispatch time

The LangChain import handles Tool/StructuredTool objects. For any other framework (or plain functions), import from the top level:

from pipelines.odyssey import pipelines_proxy

Pass tool_name="registered_name" if the platform name differs from the function name.

Strands

from pipelines.odyssey.adapters.strands import (
    dump_tools_schema,
    proxy_tool,
    forward_agent_result_events,
)

get_order = proxy_tool("get_order", description="Look up an order by id.")
print(dump_tools_schema(my_strands_agent))

result = agent(envelope.user_instruction)
forward_agent_result_events(result)  # post-run live-trace flush
return str(result.message) if result.message else str(result)

Tool endpoints from local modules

Ship local Python tool bodies as a code tool endpoint so passthrough runs them verbatim in a sandbox — no drift between local execution and the platform path.

import os
from pipelines.odyssey import tool_endpoints
from research_agent import tools_network

endpoint = tool_endpoints.create_code(
    name="research-tools",
    functions=[
        tools_network.web_search,
        tools_network.search_wikipedia,
        tools_network.fetch_url,
    ],
    requirements=["httpx>=0.27", "trafilatura"],
    org_id=42,
    api_key=os.environ["PIPELINES_API_KEY"],
)
print(endpoint["id"], endpoint["name"])

The helper reads module source, derives input_schema from type hints, generates a per-tool execute(args) wrapper, and POSTs to /api/organizations/{org_id}/tool-endpoints.

For dry runs / custom flows, use the lower-level builder:

payload = tool_endpoints.build_code_endpoint_payload(
    name="research-tools",
    functions=[tools_network.web_search],
    requirements=["httpx>=0.27"],
)

Bind the agent's tool by name (no UUID required):

{
  "tools_schema": [
    {
      "name": "search_wikipedia",
      "input_schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
      "default_execution_mode": "passthrough",
      "passthrough_binding": {
        "endpoint_name": "research-tools",
        "tool_name": "search_wikipedia"
      }
    }
  ]
}

Cold-start cost per call is ~150 ms + pip install time for requirements. If your tool bodies are decorated with @function_tool / @tool, the SDK unwraps the decorator chain automatically; keep tool bodies in a module without the framework decorator if you want to avoid that dependency in the sandbox.

Secrets via env_vars → org credentials

A code tool that calls an authenticated upstream (a search API, your own backend) needs a key at runtime. Don't bake it into functions — pass env_vars, a list of org-credential names the platform decrypts and injects as environment variables inside the sandbox:

endpoint = tool_endpoints.create_code(
    name="research-tools",
    functions=[tools_network.web_search],
    requirements=["httpx>=0.27"],
    env_vars=["TAVILY_API_KEY"],   # ← name of an org credential
    org_id=42,
    api_key=os.environ["PIPELINES_API_KEY"],
)

Your tool body then reads it as a normal env var:

def web_search(query: str) -> list[dict]:
    key = os.environ["TAVILY_API_KEY"]   # injected by the platform
    ...

Each name in env_vars must match an org credential of the same credential_type. The platform injects that credential into the sandbox under the same name, so env_vars=["TAVILY_API_KEY"] resolves the TAVILY_API_KEY org credential and exposes it as $TAVILY_API_KEY.

Create the credential before the first run. Otherwise dispatch fails with an error reporting that the credential was not found, prompting you to add it on the Credentials page. The credentials endpoint accepts your pk_live_ org API key, so the full setup (registering the agent, configuring tool endpoints, and adding credentials) can be scripted end-to-end:

curl -X PUT \
  "$PIPELINES_BASE_URL/api/organizations/42/credentials/TAVILY_API_KEY" \
  -H "Authorization: Bearer $PIPELINES_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"value": "tvly-..."}'

Managing credentials requires an org-admin principal. An API key minted by a non-admin resolves to that user and is rejected — mint the key as an org admin, or add the credential from the dashboard (Settings → Credentials → Add credential), naming it exactly as it appears in env_vars.

Credential values are decrypted only at dispatch time, never logged, and masked in run output. The same env_vars surface works for hand-authored code endpoints.

Register an External HTTP agent

create_http_agent registers the agent you host yourself — the external_http analogue of create_code_agent, so you never hand-roll the POST /api/agents body. This is exactly what the scaffolded register.py runs:

import json
import os
from pipelines.odyssey import agents

agent = agents.create_http_agent(
    name="orders-triage",
    endpoint_url="https://your-agent.example.com/dispatch",
    api_key=os.environ["PIPELINES_API_KEY"],
    auth_header_name="Authorization",
    auth_credential_value=f"Bearer {os.environ['AGENT_TOKEN']}",
    tools_schema=json.load(open("tools_schema.json")),
)
print(agent["id"])

update_http_agent(agent_id=..., ...) edits one in place (e.g. after changing tools).

Single-source the Tools Schema

Mount a GET /tools_schema route so your declared tools live in exactly one place and can't drift from what the platform was told to expect:

from pipelines.odyssey import register_tools_schema_route

register_tools_schema_route(app, TOOLS)  # GET /tools_schema -> {"tools_schema": [...]}

Then push changes from the running wrapper to the platform without a manual re-paste:

pipelines odyssey sync --from-app http://localhost:8000 --agent-id 42

pipelines odyssey doctor also reads this route to flag a schema mismatch before a run.

Shipping a multi-file code agent

For code-mode agents larger than one file (up to 50 files, 200 KB per file, 1 MB total):

from pathlib import Path
from pipelines.odyssey import agents

payload = agents.build_code_agent_payload(
    name="research-agent",
    entrypoint="run",
    entrypoint_file="main.py",
    source_dir=Path("./my_agent"),
    requirements=["httpx>=0.27", "trafilatura"],
)

agent = agents.create_code_agent(
    name="research-agent",
    entrypoint="run",
    entrypoint_file="main.py",
    source_dir=Path("./my_agent"),
    requirements=["httpx>=0.27", "trafilatura"],
    api_key="pk_live_...",
)

walk_agent_source_dir reads a .pipelinesignore (fnmatch globs, one per line) and skips the usual __pycache__/, .venv/, .git/, node_modules/, dist/, build/, .pytest_cache/ directories.

Updating an existing agent without resetting version history:

agents.update_code_agent(
    agent_id=42,
    api_key="pk_live_...",
    source_dir=Path("./my_agent"),
    entrypoint="run",
    entrypoint_file="main.py",
    requirements=["httpx>=0.27"],
)

Ledger schema helpers

build_code_agent_payload / create_code_agent / update_code_agent take an optional ledger_schema, and generate_ledger_schema drafts one from your tools (the generate → review → save flow):

draft = agents.generate_ledger_schema(api_key="pk_live_...", tools_schema=my_tools)
agents.create_code_agent(..., tools_schema=my_tools, ledger_schema=draft)

The draft is un-persisted until you save it on an agent. Pass ledger_schema=None to update_code_agent to clear it. See Ledger schema.

Custom HTTP framework

The framework-agnostic helpers compose into ~20 lines:

from pipelines.odyssey import (
    Envelope,
    EnvelopeError,
    InboundAuthError,
    build_response,
    require_pipelines_auth,
    set_current,
)

def handle_dispatch(headers, body, agent_token):
    try:
        require_pipelines_auth(headers.get("Authorization"), expected_token=agent_token)
    except InboundAuthError:
        return 401, {"detail": "missing or invalid bearer token"}

    try:
        envelope = Envelope.parse(body, headers)
    except EnvelopeError as exc:
        return 400, {"detail": str(exc)}

    with set_current(envelope):
        result = your_agent_runtime(envelope)

    return 200, build_response(result)

Custom auth header

If you registered with X-API-Key instead of Authorization:

@register_dispatch_route(
    app,
    auth_header_name="X-API-Key",
    agent_token_env="AGENT_TOKEN",
)
async def run(envelope):
    ...

Unit-testing tools

set_current stages an envelope for tests:

from pipelines.odyssey import Envelope, set_current

def test_get_order_calls_proxy(httpx_mock):
    envelope = Envelope(
        proxy_url="http://stub/proxy",
        run_token="t",
        user_instruction="",
        task_input={},
    )
    httpx_mock.add_response(
        url="http://stub/proxy/tools/get_order",
        json={"tool_name": "get_order", "response": {"id": "1"}, "source": "odyssey"},
    )
    with set_current(envelope):
        result = get_order("1")
    assert result == {"id": "1"}

Tested adapter floors

AdapterPinned floor
openai-agents0.0.7
anthropic0.39
langchain-core0.3
strands-agents0.4

Next