Python SDK
Automate Pipelines from Python with API keys, artifacts, config swaps, and experiments.
Use the Python SDK when you want Python-native control over Pipelines automation: create configs, generate payloads, launch workflow runs, wait for completion, cancel in-flight work, import artifact bundles, and pipe results into CI or notebooks.
Install And Connect
Install from PyPI:
pip install pipelines-sdk
# Agent hosting (FastAPI dispatch route, registration helpers, CLI group)
pip install 'pipelines-sdk[odyssey]'
# Agent hosting plus a framework adapter (the adapter extra implies [agents])
pip install 'pipelines-sdk[odyssey,anthropic]'Adapter extras: [anthropic], [openai-agents], [langchain], [langchain-mcp], [strands], [mcp]. Each one pulls in [agents] automatically.
Create a client:
from pipelines import PipelinesClient
client = PipelinesClient(
api_key="pk_live_...",
)The SDK defaults to production Pipelines (https://api.pipelines.tech). Point at a self-hosted deployment with PIPELINES_ENV, the CLI's --env flag, or base_url=.
Read Library Configs
Start by listing what the key can see:
prompts = client.list_prompts(page_size=10)
criteria = client.list_criteria(page_size=10)
evaluations = client.list_evaluations(page_size=10)
models = client.list_models(org_id=2, page_size=10)If models or evaluations fails with 403, the key is valid but does not have access to that resource. You can still run prompt-only demos if prompt access works.
Create Library Configs
Create a prompt:
prompt = client.create_prompt(
{
"name": "Concise Rome prompt",
"description": "Short itinerary prompt for SDK experiments",
"prompt_text": "Create a concise 5 day Rome itinerary.",
"role": "user",
"scope_type": "org",
"org_id": 2,
}
)
prompt_id = prompt["id"]Create a criterion. Criteria are type-specific: the top-level shape is always name, display_label, type, config, and output_schema, but the contents of config depend on the criterion type.
criterion = client.create_criterion(
{
"name": "Exact Match",
"display_label": "Exact Match",
"type": "programmatic",
"config": {"subtype": "exact_match"},
"output_schema": {"type": "boolean"},
}
)Common criterion recipes:
exact_match = client.create_criterion(
{
"name": "Exact Match",
"display_label": "Exact Match",
"type": "programmatic",
"config": {
"subtype": "exact_match",
"case_sensitive": False,
"strip_whitespace": True,
},
"output_schema": {"type": "boolean"},
"scope_type": "org",
"org_id": 2,
}
)
keyword_match = client.create_criterion(
{
"name": "Mentions Rome Highlights",
"display_label": "Mentions Rome Highlights",
"type": "programmatic",
"config": {
"subtype": "contains_keywords",
"keywords": ["Colosseum", "Vatican", "Trastevere"],
"mode": "any",
"case_sensitive": False,
},
"output_schema": {"type": "boolean"},
"scope_type": "org",
"org_id": 2,
}
)
regex_match = client.create_criterion(
{
"name": "Has Day Number",
"display_label": "Has Day Number",
"type": "programmatic",
"config": {
"subtype": "regex_match",
"pattern": r"Day\s+[1-5]",
"flags": ["IGNORECASE"],
},
"output_schema": {"type": "boolean"},
"scope_type": "org",
"org_id": 2,
}
)
json_validity = client.create_criterion(
{
"name": "Valid JSON Itinerary",
"display_label": "Valid JSON Itinerary",
"type": "programmatic",
"config": {
"subtype": "json_validity",
"schema": {
"type": "object",
"required": ["days"],
"properties": {"days": {"type": "array"}},
},
},
"output_schema": {"type": "boolean"},
"scope_type": "org",
"org_id": 2,
}
)LLM judge criteria use a model and a judge prompt:
llm_judge = client.create_criterion(
{
"name": "Itinerary Quality Judge",
"display_label": "Itinerary Quality",
"type": "llm_judge",
"config": {
"model": "gemini-3.5-flash",
"temperature": 0,
"prompt_template": (
"Score the itinerary from 1 to 5 for specificity, feasibility, "
"and usefulness. Return only the number."
),
},
"output_schema": {"type": "rating", "range": [1, 5]},
"scope_type": "org",
"org_id": 2,
}
)Human-rating criteria define what contributors will score manually:
human_rating = client.create_criterion(
{
"name": "Human Quality Rating",
"display_label": "Rate itinerary quality",
"type": "human_rating",
"config": {
"rubric": "1 = poor, 3 = acceptable, 5 = excellent",
},
"output_schema": {
"type": "rating",
"range": [1, 5],
"labels": {"1": "Poor", "3": "Acceptable", "5": "Excellent"},
},
"scope_type": "org",
"org_id": 2,
}
)Built-in LLM metrics use type: "llm_metric" and a supported metric subtype:
llm_metric = client.create_criterion(
{
"name": "Answer Relevancy",
"display_label": "Answer Relevancy",
"type": "llm_metric",
"config": {
"subtype": "answer_relevancy",
"threshold": 0.7,
},
"output_schema": {"type": "numeric", "range": [0, 1]},
"scope_type": "org",
"org_id": 2,
}
)If a criterion create call returns 422, the most common cause is a type/config mismatch: for example, contains_keywords requires keywords, regex_match requires pattern, and llm_judge requires prompt_template.
Create an evaluation:
evaluation = client.create_evaluation(
{
"name": "Itinerary quality",
"description": "Scores itinerary quality",
"criteria": [{"criterion_id": exact_match["id"]}, {"criterion_id": llm_judge["id"]}],
"scope_type": "org",
"org_id": 2,
}
)
evaluation_id = evaluation["id"]Tool Endpoints, Bindings, And Ground Truth
Tools are configured in three layers:
- A tool endpoint belongs to an org and points at an MCP server, HTTP/OpenAPI spec, mock tool set, or code tool set.
- A tool binding attaches one endpoint to one workflow field and chooses which tools the LLM may call.
- A tool ground truth describes which tools should or should not be called for a field, so runs can be inspected and evaluated.
Create or inspect a tool endpoint:
endpoint = client.create_tool_endpoint(
org_id=2,
payload={
"name": "Docs MCP endpoint",
"description": "Search internal docs from LLM fields",
"endpoint_type": "mcp_server",
"url": "https://tools.example.com/mcp",
"auth_type": "none",
"auth_config": {},
},
)
endpoint_id = endpoint["id"]
tools = client.list_endpoint_tools(org_id=2, endpoint_id=endpoint_id)If the endpoint is MCP or HTTP/OpenAPI and the tool list looks stale, refresh discovery:
refreshed = client.refresh_tool_endpoint_tools(org_id=2, endpoint_id=endpoint_id)
health = client.test_tool_endpoint(org_id=2, endpoint_id=endpoint_id)
versions = client.list_tool_endpoint_versions(org_id=2, endpoint_id=endpoint_id)
usage = client.get_tool_endpoint_usage(org_id=2, endpoint_id=endpoint_id)Bind tools to a workflow field. Use get_configurable_fields(...) to find node_id and field_id first.
binding = client.create_tool_binding(
workflow_id=10,
payload={
"node_id": "node-id-from-configurable-fields",
"field_id": "field-id-from-configurable-fields",
"endpoint_id": endpoint_id,
"selected_tools": ["search_docs"],
"max_tool_rounds": 3,
},
)Create tool ground truth for the same field:
ground_truth = client.create_tool_ground_truth(
workflow_id=10,
payload={
"node_id": "node-id-from-configurable-fields",
"field_id": "field-id-from-configurable-fields",
"expected_tools": [{"name": "search_docs"}],
"excluded_tools": [{"name": "delete_record"}],
"ordering_matters": False,
"source": "manual",
"confidence": 1.0,
},
)Ask Pipelines to rank available tools for a prompt:
recommendation = client.recommend_tools(
{
"workflow_id": 10,
"prompt_text": "Answer the user by searching the docs when needed.",
"available_tools": [{"name": "search_docs"}, {"name": "fetch_url"}],
}
)After a workflow run creates nodes, inspect and evaluate tool calls:
history = client.get_tool_call_history(project_id=22, workflow_id=10, node_db_id=44)
result = client.evaluate_tool_correctness(
workflow_id=10,
node_db_id=44,
payload={
"field_id": "field-id-from-configurable-fields",
"evaluation_params": ["input_parameters"],
"should_exact_match": False,
"should_consider_ordering": False,
},
)Important auth note: tool endpoint read endpoints are API-key friendly today, but some binding and ground-truth routes still use the backend's existing user/session permission path. If a tool SDK call returns 401 or 403 while prompt APIs work, the SDK wrapper is reaching the correct route but the current backend permissions do not allow that credential for that operation yet.
Artifact Bundles
An artifact bundle is portable JSON you can version in git. In normal use, keep a bundle focused on one artifact family: many prompts, many criteria, many evaluations, many tool endpoints, many tool bindings, or many tool ground truths. Mixed bundles are supported for advanced migrations, but they are harder to review and are not the recommended default.
Create a starter bundle:
prompt_bundle = client.artifact_bundle_template(kind="prompts")
criteria_bundle = client.artifact_bundle_template(kind="criteria")
evaluation_bundle = client.artifact_bundle_template(kind="evaluations")
tool_endpoint_bundle = client.artifact_bundle_template(kind="tool-endpoints")
tool_binding_bundle = client.artifact_bundle_template(kind="tool-bindings")
tool_ground_truth_bundle = client.artifact_bundle_template(kind="tool-ground-truths")Push a prompt bundle:
summary = client.push_artifact_bundle(
{
"artifact_type": "pipelines.artifact.bundle.v1",
"name": "rome-prompt-bundle",
"prompts": [
{
"name": "Concise Rome Prompt",
"prompt_text": "Create a concise 5 day Rome itinerary.",
"role": "user",
"scope_type": "org",
"org_id": 2,
},
{
"name": "Detailed Rome Prompt",
"prompt_text": "Create a detailed 5 day Rome itinerary with morning, afternoon, and evening plans.",
"role": "user",
"scope_type": "org",
"org_id": 2,
}
],
"criteria": [],
"evaluations": [],
"experiments": [],
},
idempotency_key_prefix="rome-demo",
)The response tells you what was created:
{
"artifact_type": "pipelines.artifact.bundle.v1",
"name": "rome-prompt-bundle",
"created": {
"prompts": [
{"name": "Concise Rome Prompt", "id": 12},
{"name": "Detailed Rome Prompt", "id": 13}
],
"criteria": [],
"evaluations": [],
"experiments": [],
"tool_endpoints": [],
"tool_bindings": [],
"tool_ground_truths": []
}
}To push multiple criteria at once, use a criteria bundle:
summary = client.push_artifact_bundle(
{
"artifact_type": "pipelines.artifact.bundle.v1",
"name": "quality-criteria-bundle",
"prompts": [],
"criteria": [
{
"name": "Exact Match",
"display_label": "Exact Match",
"type": "programmatic",
"config": {"subtype": "exact_match"},
"output_schema": {"type": "boolean"},
"scope_type": "org",
"org_id": 2,
},
{
"name": "Contains Required Keywords",
"display_label": "Contains Required Keywords",
"type": "programmatic",
"config": {
"subtype": "contains_keywords",
"keywords": ["Colosseum", "Vatican"],
"mode": "all",
"case_sensitive": False,
},
"output_schema": {"type": "boolean"},
"scope_type": "org",
"org_id": 2,
},
],
"evaluations": [],
"experiments": [],
}
)To push multiple evaluations at once, use an evaluation bundle and reference existing criterion IDs in each evaluation's criteria list. This keeps imports simple: a prompt bundle creates prompts, a criteria bundle creates criteria, and an evaluation bundle creates evaluations.
Tool artifacts work the same way, but include routing context:
summary = client.push_artifact_bundle(
{
"artifact_type": "pipelines.artifact.bundle.v1",
"name": "tool-endpoint-bundle",
"tool_endpoints": [
{
"org_id": 2,
"name": "Docs MCP endpoint",
"endpoint_type": "mcp_server",
"url": "https://tools.example.com/mcp",
"auth_type": "none",
"auth_config": {},
}
],
}
)For tool_bindings and tool_ground_truths, include workflow_id on each item. The SDK uses it to call the workflow-scoped API and removes it from the JSON body before sending the request.
Push a single artifact when you do not need a bundle:
prompt = client.push_prompt_artifact({"name": "Demo prompt", "prompt_text": "Hello"})
criterion = client.push_criterion_artifact({"name": "Exact Match"})
evaluation = client.push_evaluation_artifact({"name": "Quality Eval"})
endpoint = client.push_tool_endpoint_artifact({"org_id": 2, "name": "Docs MCP", "url": "https://tools.example.com/mcp"})Current limitation: artifact bundles create existing Pipelines objects. They do not persist arbitrary run-output files as first-class experiment artifacts yet.
Dataset Export
Use dataset export when you want a local copy of an imported dataset for notebooks, offline inspection, or versioned snapshots.
dataset_bytes = client.export_dataset(dataset_id=123, format="csv")
with open("dataset-export.csv", "wb") as handle:
handle.write(dataset_bytes)Supported formats are csv, json, zip, and zip-json. Choose zip or zip-json when the dataset includes file columns and you want the attached files bundled with the row data.
Dataset export applies to imported datasets. If you want to export workflow-backed task data, use workflow task export:
export_result = client.export_tasks(
project_id=22,
workflow_id=10,
payload={
"format": "csv",
"include_files": False,
},
)Workflow Discovery
Before creating tasks, check whether the workflow supports plain task creation or requires CSV/dataset seeding:
mode = client.get_creation_mode(project_id=22, workflow_id=10)
if mode["requires_csv"]:
print("Use CSV/dataset seeding or choose a simpler workflow for this demo.")
else:
print("This workflow supports direct task creation.")Find nodeId and fieldId values for config swaps:
fields = client.get_configurable_fields(project_id=22, workflow_id=10)
for field in fields["fields"]:
print(field["nodeId"], field["fieldId"], field["fieldTitle"], field["supports"])nodeId identifies the workflow node. fieldId identifies the field inside that node. These are not prompt IDs, model slugs, or evaluation IDs. They tell Pipelines where to apply the config swap.
Config Swaps
Config swaps apply only to the run you launch. They do not mutate the saved workflow.
Prompt swap:
payload = {
"count": 1,
"configOverrides": {
"prompts": [
{
"nodeId": "node-id-from-configurable-fields",
"fieldId": "field-id-from-configurable-fields",
"promptId": prompt_id,
"version": 1,
}
]
},
}
run = client.create_tasks_async(22, 10, payload, idempotency_key="prompt-swap-1")
status = client.wait_for_create_tasks(22, 10, run["job_id"], timeout=300)Model swap:
payload = {
"count": 1,
"configOverrides": {
"models": [
{
"nodeId": "node-id-from-configurable-fields",
"modelSlug": "gemini-3.5-flash",
}
]
},
}Evaluation swap:
payload = {
"count": 1,
"configOverrides": {
"evaluations": [
{
"nodeId": "review-node-id",
"fieldId": "quality-field-id",
"evaluationId": evaluation_id,
"version": 1,
}
]
},
}Experiments
An experiment is a group of copied workflows with different configs. Each variant creates its own workflow copy, applies its overrides, and creates tasks in that copied workflow.
experiment = client.create_experiment(
project_id=22,
workflow_id=10,
payload={
"count": 1,
"variants": [
{"name": "baseline-copy", "configOverrides": {}},
{
"name": "new-prompt",
"configOverrides": {
"prompts": [
{
"nodeId": "node-id-from-configurable-fields",
"fieldId": "field-id-from-configurable-fields",
"promptId": prompt_id,
"version": 1,
}
]
},
},
],
},
idempotency_key="rome-experiment-1",
)
final_status = client.wait_for_experiment(
project_id=22,
workflow_id=10,
experiment_id=experiment["experiment_id"],
timeout=300,
)The response includes copied workflow IDs:
{
"experiment_id": "exp-123",
"status": "completed",
"variants": [
{
"name": "baseline-copy",
"workflow_id": 3087,
"workflow_name": "Italy - Experiment baseline-copy",
"task_ids": [39141]
}
]
}Open copied workflows in the UI using the returned workflow_id, not the source workflow ID.
Cancel an experiment:
client.cancel_experiment(22, 10, experiment["experiment_id"])Cancellation is best effort. Completed jobs remain completed, and tasks already created before cancellation are not deleted.
Troubleshooting
The shared HTTP error taxonomy (401, 403, 409, 422, 429) is documented once in CLI troubleshooting and applies identically here, so it is not repeated. To catch those statuses as Python exceptions, see Typed Errors below.
The one SDK specific case is a wait helper timing out. That happens when the job is still queued or the run is slow on model or tool execution. Raise the timeout, or poll status directly with the experiment status helper.
Pagination
Every list_* method returns one page of results. For automation you usually want every record across every page. Each list endpoint has a matching iter_* helper that walks pages for you and yields one item at a time:
for project in client.iter_projects(page_size=50):
print(project["id"], project["name"])
for prompt in client.iter_prompts(scope_type="org", org_id=2):
print(prompt["id"])
for workflow in client.iter_project_workflows(project_id=22):
print(workflow["id"], workflow["name"])The same shape works for iter_criteria, iter_evaluations, iter_datasets, iter_tool_endpoints(org_id), iter_tool_bindings(workflow_id), iter_tool_ground_truths(workflow_id), iter_dataset_notebooks(dataset_id), iter_notifications, iter_organizations, iter_org_users(org_id), iter_users, iter_workflow_chains(project_id), iter_org_workflow_templates(org_id), plus the tasks-runtime iter_my_work, iter_my_submissions, and org / global variants.
If you need finer control, the generic client.paginate(path, params=..., page_size=...) powers all of the above:
for instr in client.paginate("/api/projects/22/instructions", page_size=100):
...It auto-stops when has_more is false, when a page is empty, or when an optional max_pages is hit.
Identity And Health
me = client.whoami() # GET /api/auth/me
print(me["email"], me["org_id"])
print(client.health()) # GET /api/health -> {"status": "ok"}
print(client.auth_status()) # Lightweight key validationAsync Client
When you're inside a FastAPI handler, agent loop, or any other async context, use AsyncPipelinesClient. It mirrors the sync surface over httpx.AsyncClient and shares the same auth, retry, error, and User-Agent behavior:
import asyncio
from pipelines import AsyncPipelinesClient
async def main():
async with AsyncPipelinesClient(api_key="pk_live_...") as client:
me = await client.whoami()
print("hi", me["email"])
# Async pagination
async for project in client.paginate("/api/projects", page_size=50):
print(project["id"])
# Async polling
result = await client.wait_for_experiment(
project_id=22, workflow_id=10, experiment_id="exp-1",
poll_interval=2.0, timeout=600.0,
)
print(result["status"])
asyncio.run(main())Every sync method with a public name has an async counterpart (await client.list_projects(...), await client.create_workflow(...), etc.). Iterators become AsyncIterators.
Streaming Responses
For endpoints that emit chunked output (e.g. server-sent events or NDJSON), use the streaming primitives instead of request:
# Yield raw bytes
for chunk in client.request_stream("GET", "/api/notifications"):
do_something_with(chunk)
# Yield decoded text lines (good for NDJSON / SSE)
for line in client.stream_lines("GET", "/api/notifications"):
print(line)The async client has matching client.request_stream(...) and client.stream_lines(...) that return AsyncIterator[bytes] and AsyncIterator[str].
Typed Errors
Every non-2xx response raises a PipelinesAPIError subclass keyed to the HTTP status. You can catch broad failures with PipelinesAPIError or specific failures with the right subclass:
from pipelines import (
PipelinesClient,
PipelinesAPIError,
AuthenticationError, # 401
ForbiddenError, # 403
NotFoundError, # 404
ConflictError, # 409
ValidationError, # 422
RateLimitError, # 429 (exposes .retry_after)
ServerError, # 5xx
)
client = PipelinesClient(api_key="pk_live_...")
try:
client.update_prompt(7, {"name": "..."})
except AuthenticationError:
refresh_credentials_and_retry()
except ForbiddenError:
raise SystemExit("Key is valid but lacks permission for this prompt.")
except NotFoundError:
print("Prompt 7 doesn't exist.")
except RateLimitError as exc:
sleep(exc.retry_after or 1.0)
except ValidationError as exc:
print("Backend rejected the payload:", exc.body)
except PipelinesAPIError as exc:
log_unexpected(exc.status_code, exc.body)Connection failures (DNS, refused, timeout) raise PipelinesConnectionError; polling helpers that exhaust their budget raise PipelinesTimeoutError.
Tasks Runtime
The SDK wraps the full human-task surface from apps/api/app/routers/tasks.py (~30 endpoints) so you can drive labeling/review flows programmatically:
# Read your queue
queue = client.get_my_work(project_id=22)
submissions = client.get_my_submissions(project_id=22)
# Drill into a node
task = client.get_task(project_id=22, workflow_id=10, task_id=123)
history = client.get_node_history(project_id=22, workflow_id=10, node_db_id=456)
subtask = client.get_subtask_node(project_id=22, workflow_id=10, node_db_id=456)
# Act on a node
client.submit_subtask(
project_id=22, workflow_id=10, node_db_id=456,
payload={"output": {"answer": "..."}},
)
client.submit_review(project_id=22, workflow_id=10, node_db_id=456, payload={"approved": True})
client.release_node(project_id=22, workflow_id=10, node_db_id=456)
client.revert_node(project_id=22, workflow_id=10, node_db_id=456, attempt_number=3)
# Admin overrides
client.admin_edit_subtask(project_id=22, workflow_id=10, node_db_id=456, payload={...})
client.admin_submit_subtask(project_id=22, workflow_id=10, node_db_id=456, payload={...})
client.admin_edit_review(project_id=22, workflow_id=10, node_db_id=456, payload={...})
client.admin_inline_edit(project_id=22, workflow_id=10, node_db_id=456, payload={...})
client.admin_submit_review(project_id=22, workflow_id=10, node_db_id=456, payload={...})Permissions follow normal RBAC — a contributor key can drive their own queue, an org admin key can drive everyone's.
Per-Node LLM Generation
Pipelines runs LLM generation on a per-node basis, not via a free-form /api/generate. The SDK exposes the matching helpers:
# Kick off / re-run LLM generation on a specific workflow node
client.generate_llm_fields(project_id=22, workflow_id=10, node_db_id=456)
client.regenerate_llm(project_id=22, workflow_id=10, node_db_id=456)
# Admin-level override
client.admin_regenerate_llm(project_id=22, workflow_id=10, node_db_id=456)
# Watch progress
client.get_llm_progress(project_id=22, workflow_id=10, node_db_id=456)
client.get_llm_error(project_id=22, workflow_id=10, node_db_id=456)
client.get_llm_generation_history(project_id=22, workflow_id=10, node_db_id=456)
# Workflow-level rollup
client.get_workflow_llm_analytics(project_id=22, workflow_id=10)Notifications
unread = client.get_unread_notification_count()
for n in client.iter_notifications():
print(n["id"], n["title"])
# Mark specific notifications read
client.mark_notifications_read(notification_ids=[101, 102])
# Or all at once
client.mark_notifications_read(all=True)Notebooks (Dataset-Scoped)
for nb in client.iter_dataset_notebooks(dataset_id=42):
print(nb["id"], nb["name"])
notebook = client.get_notebook(notebook_id=7)
client.save_notebook(notebook_id=7, payload={"cells": [...]})
preview = client.preview_notebook(notebook_id=7, payload={"limit": 100})Organizations And Users
for org in client.iter_organizations():
print(org["id"], org["name"])
org = client.get_organization(org_id=2)
for user in client.iter_org_users(org_id=2):
print(user["email"])
# Self
me = client.whoami() # /api/auth/me
some_user = client.get_user(user_id=42)Workflow Chains (Project-Scoped)
for chain in client.iter_workflow_chains(project_id=22):
print(chain["id"], chain["name"])
chain = client.get_workflow_chain(project_id=22, chain_id=7)
client.create_workflow_chain(project_id=22, payload={...})
client.delete_workflow_chain(project_id=22, chain_id=7)Dataset Evaluation Runs
job = client.run_dataset_eval(dataset_id=42, payload={...})
status = client.get_dataset_eval_status(dataset_id=42, job_id=job["job_id"])
preview = client.preview_dataset_eval(dataset_id=42, payload={...})
config = client.get_dataset_eval_config(dataset_id=42)Dataset Imports
Multipart upload + async import job:
result = client.import_dataset_from_file(
file="my_data.csv",
name="My Dataset",
description="seed for prompt experiments",
org_id=2,
)
job_id = result["job_id"]
status = client.get_dataset_import_job(job_id)
# Cancel if needed
client.cancel_dataset_import_job(job_id)
# Preview before committing
preview = client.preview_dataset_import_file(file="my_data.csv")Project Files (Presigned URL Flow)
# Step 1: ask the backend for a presigned upload URL
ticket = client.request_project_file_upload_url(
project_id=22,
payload={"filename": "image.png", "content_type": "image/png"},
)
# Step 2: PUT your bytes to ticket["upload_url"] using any HTTP client
# (the SDK doesn't do this for you because the presigned URL is signed for direct upload)
# Step 3: tell the backend you finished
client.confirm_project_file_upload(project_id=22, file_id=ticket["file_id"])
# Later: get a download URL or delete
client.get_project_file_url(project_id=22, file_id=ticket["file_id"])
client.delete_project_file(project_id=22, file_id=ticket["file_id"])Type Hints
pipelines-sdk ships a py.typed marker, so mypy / pyright respect the SDK's annotations. Common response shapes are exposed as TypedDicts in pipelines.types:
from pipelines.types import Project, Workflow, Prompt, ToolEndpoint, User
def show(p: Project) -> None:
print(p["id"], p.get("name"))The schemas are intentionally permissive (total=False) so callers can adopt them incrementally without breaking on unexpected backend fields.
Logging & Debugging
The SDK uses a standard logger at the pipelines name. Two quick ways to see what it's doing:
import logging
logging.getLogger("pipelines").setLevel(logging.DEBUG)
logging.basicConfig()…or set PIPELINES_DEBUG=1 in the environment before launching your script. The CLI's --verbose flag wires up the same debug logging.
Every outgoing request includes a User-Agent: pipelines-sdk/<version> CPython/<py> httpx/<httpx> header, which makes it easy to filter your SDK traffic in backend logs.
Reference
For the full method catalog (every list_* / iter_* / create_* / waiter / streaming helper) see the SDK reference at platform.pipelines.tech/docs. Every SDK path is validated against a real backend route by the SDK's OpenAPI contract test before each release, so the published method-to-route mapping stays in sync with the API.