Skip to main content
LakeQuery examplesFlowGatewayPricingConsole

Flow Pipeline API

The Flow Pipeline API creates and manages Flow pipelines from YAML. Use it for CI/CD and infrastructure-as-code workflows when pipeline definitions should live outside the Console UI.

Rotate exposed credentials

The example pipeline file shared during this docs update contained live PostgreSQL credentials. Rotate that database password before committing or sharing any derived examples. The examples below are sanitized.

Base URL

export CONSOLE="https://console.withobsrvr.com"
export API_KEY="your-team-api-key"

All requests use a Team API key from Console.

Authorization: Api-Key $API_KEY

Do not pass API keys in URL parameters.

Pipeline YAML format

The Pipeline API accepts Kubernetes-style Flow resources with apiVersion, kind, metadata, and spec. Use registry component IDs, such as contract_event and contract_events_postgres, rather than runtime adapter class names.

apiVersion: flow.obsrvr.com/v1
kind: Pipeline
metadata:
name: ContractEventsToPostgres
spec:
network: testnet
startLedger: "2434280"
endLedger: "2434281"
processors:
- type: contract_event
config:
network_passphrase: Test SDF Network ; September 2015
consumers:
- type: contract_events_postgres
config:
host: postgres.example.com
port: 5432
connect_timeout: 30
database: defaultdb
username: postgres
password: ${POSTGRES_PASSWORD}
sslmode: require
schema: public
table_prefix: stellar_

Use placeholders or secret injection for credentials. Do not commit live passwords.

Quick start: validate and apply

Create contract-events-to-postgres.yaml using the shape above.

Validate it:

curl -X POST \
-H "Authorization: Api-Key $API_KEY" \
-H "Content-Type: text/yaml" \
--data-binary @contract-events-to-postgres.yaml \
"$CONSOLE/api/v1/flow/pipelines/validate/"

Apply it and start processing:

curl -X POST \
-H "Authorization: Api-Key $API_KEY" \
-H "Content-Type: text/yaml" \
--data-binary @contract-events-to-postgres.yaml \
"$CONSOLE/api/v1/flow/pipelines/apply/?auto_start=true"

apply is idempotent by pipeline name. It creates the pipeline if it does not exist and updates it if it does.

Endpoints

Pipelines

MethodEndpointDescription
GET/api/v1/flow/pipelines/List team pipelines
POST/api/v1/flow/pipelines/Create a new pipeline
GET/api/v1/flow/pipelines/{uuid}/Get pipeline details
PUT/api/v1/flow/pipelines/{uuid}/Update a stopped pipeline
DELETE/api/v1/flow/pipelines/{uuid}/Delete a pipeline
POST/api/v1/flow/pipelines/{uuid}/start/Start a pipeline
POST/api/v1/flow/pipelines/{uuid}/stop/Stop a pipeline
GET/api/v1/flow/pipelines/{uuid}/config/Export pipeline YAML
POST/api/v1/flow/pipelines/apply/Idempotent create/update by name
POST/api/v1/flow/pipelines/validate/Validate config without creating

Registry

MethodEndpointDescription
GET/api/v1/flow/registry/processors/List available processors and config schemas
GET/api/v1/flow/registry/consumers/List available consumers and config schemas

Secrets

MethodEndpointDescription
POST/api/v1/secrets/Create or update a secret
GET/api/v1/secrets/List secret paths, never values
GET/api/v1/secrets/{path}/Check whether a secret exists
DELETE/api/v1/secrets/{path}/Delete a secret

Spec configuration

The API derives the ledger source adapter from the selected network and ledger range.

FieldDescription
metadata.namePipeline name. apply matches existing pipelines by name.
spec.networktestnet or mainnet.
spec.startLedgerFirst ledger to process, or latest / genesis.
spec.endLedgerOptional final ledger. Omit for continuous processing. Must be greater than startLedger when both are numeric.
spec.processorsProcessor list using registry IDs.
spec.consumersConsumer list using registry IDs.

Processor configuration

Processors transform source records into the shape consumers expect.

processors:
- type: contract_event
config:
network_passphrase: Test SDF Network ; September 2015

Use the processor registry endpoint to discover available processor types.

curl -H "Authorization: Api-Key $API_KEY" \
"$CONSOLE/api/v1/flow/registry/processors/"

Consumer configuration

Consumers write processed records to a destination.

consumers:
- type: contract_events_postgres
config:
host: postgres.example.com
port: 5432
connect_timeout: 30
database: defaultdb
username: postgres
password: ${POSTGRES_PASSWORD}
sslmode: require
schema: public
table_prefix: stellar_

Use the consumer registry endpoint to discover available consumer types.

curl -H "Authorization: Api-Key $API_KEY" \
"$CONSOLE/api/v1/flow/registry/consumers/"

Historical backfill

Set both startLedger and endLedger to process a bounded range. The pipeline completes after the final ledger.

apiVersion: flow.obsrvr.com/v1
kind: Pipeline
metadata:
name: ContractEventBackfill
spec:
network: testnet
startLedger: "2434280"
endLedger: "2435000"
processors:
- type: contract_event
config:
network_passphrase: Test SDF Network ; September 2015
consumers:
- type: contract_events_postgres
config:
host: postgres.example.com
port: 5432
connect_timeout: 30
database: defaultdb
username: postgres
password: ${POSTGRES_PASSWORD}
sslmode: require
schema: public
table_prefix: stellar_

Start, stop, and export

# Start
curl -X POST -H "Authorization: Api-Key $API_KEY" \
"$CONSOLE/api/v1/flow/pipelines/$PIPELINE_UUID/start/"

# Stop
curl -X POST -H "Authorization: Api-Key $API_KEY" \
"$CONSOLE/api/v1/flow/pipelines/$PIPELINE_UUID/stop/"

# Export YAML
curl -H "Authorization: Api-Key $API_KEY" \
"$CONSOLE/api/v1/flow/pipelines/$PIPELINE_UUID/config/"

Responses

Create/apply success:

{
"pipeline_id": "47d6e2c8-8fb8-46a5-9222-932f2f9d7ac9",
"action": "created",
"status": "running",
"warnings": [],
"errors": []
}

Validation success:

{
"valid": true,
"errors": [],
"warnings": [],
"effective_config": {
"apiVersion": "flow.obsrvr.com/v1",
"kind": "Pipeline",
"metadata": {"name": "ContractEventsToPostgres"},
"spec": {"...": "..."}
}
}

Error codes

CodeMeaning
400Invalid YAML or invalid pipeline config
402Active subscription required to create a pipeline
403API key is invalid or team does not match authenticated team
404Pipeline or secret does not exist
409Duplicate name, running pipeline update, or ambiguous apply target
500Server error

CI/CD example

name: Deploy Flow pipeline
on:
push:
paths:
- pipelines/**

jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Validate pipeline
run: |
curl -X POST \
-H "Authorization: Api-Key ${{ secrets.OBSRVR_API_KEY }}" \
-H "Content-Type: text/yaml" \
--data-binary @pipelines/contract-events-to-postgres.yaml \
"https://console.withobsrvr.com/api/v1/flow/pipelines/validate/"

- name: Apply pipeline
run: |
curl -X POST \
-H "Authorization: Api-Key ${{ secrets.OBSRVR_API_KEY }}" \
-H "Content-Type: text/yaml" \
--data-binary @pipelines/contract-events-to-postgres.yaml \
"https://console.withobsrvr.com/api/v1/flow/pipelines/apply/?auto_start=true"