Skip to main content
LakeQuery examplesFlowGatewayPricingConsole

Flow consumers

This page lists the consumer IDs currently exposed by the Flow registry. Use these IDs in spec.consumers[].type when creating pipelines through the API.

export CONSOLE="https://console.withobsrvr.com"
export API_KEY="your-team-api-key"

curl -H "Authorization: Api-Key $API_KEY" \
"$CONSOLE/api/v1/flow/registry/consumers/"
Keep credentials out of source control

Consumer configs often include database passwords, connection strings, or cloud credentials. Use placeholders, CI secrets, or the Flow secrets API instead of committing live values.

Consumer summary

IDNameDescription
account_data_postgresAccount Data PostgreSQLStore Stellar account data in a specialized PostgreSQL schema
contract_invocations_postgresContract Invocations PostgreSQLStore Soroban contract invocations in a specialized PostgreSQL schema
contract_events_postgresContract Events PostgreSQLStore Soroban contract events in a specialized PostgreSQL schema
soroswap_postgresSwapService PostgreSQLStore SwapService events in a specialized PostgreSQL schema
save_latest_ledger_redisRedis Ledger StorageStore latest ledger information in Redis
contract_data_postgresContract Data PostgreSQLSave contract data to PostgreSQL with configurable schema
extracted_contract_invocations_postgresExtracted Contract Invocations PostgreSQLSave extracted contract invocation business data to PostgreSQL with optimized schema
event_payment_postgresEvent Payment PostgreSQLSaves event payment data to PostgreSQL with accounts tracking
google_pubsub_v2Google Pub/Sub (V2)Publishes event payments to Google Cloud Pub/Sub in V2 message format

Consumer configuration reference

account_data_postgres — Account Data PostgreSQL

Store Stellar account data in a specialized PostgreSQL schema

FieldTypeRequiredDefaultDescription
hoststringYesPostgreSQL server hostname
portintegerYes5432PostgreSQL server port
databasestringYesPostgreSQL database name
usernamestringYesPostgreSQL username
passwordstring sensitiveYesPostgreSQL password
sslmodestring enum: disable, require, verify-ca, verify-fullNodisablePostgreSQL SSL mode
max_open_connsinteger (min 1)No10Maximum number of open connections to the database
max_idle_connsinteger (min 1)No5Maximum number of idle connections in the pool

Example

consumers:
- type: account_data_postgres
config:
host: postgres.example.com
port: 5432
database: defaultdb
username: postgres
password: "${POSTGRES_PASSWORD}"
sslmode: require
max_open_conns: 10
max_idle_conns: 5

contract_invocations_postgres — Contract Invocations PostgreSQL

Store Soroban contract invocations in a specialized PostgreSQL schema

FieldTypeRequiredDefaultDescription
hoststringYesPostgreSQL server hostname
portintegerYes5432PostgreSQL server port
databasestringYesPostgreSQL database name
usernamestringYesPostgreSQL username
passwordstringYesPostgreSQL password
sslmodestring enum: disable, require, verify-ca, verify-fullYesdisablePostgreSQL SSL mode
max_open_connsintegerYes10Maximum number of open connections
max_idle_connsintegerYes5Maximum number of idle connections

Example

consumers:
- type: contract_invocations_postgres
config:
host: postgres.example.com
port: 5432
database: defaultdb
username: postgres
password: "${POSTGRES_PASSWORD}"
sslmode: require
max_open_conns: 10
max_idle_conns: 5

contract_events_postgres — Contract Events PostgreSQL

Store Soroban contract events in a specialized PostgreSQL schema

FieldTypeRequiredDefaultDescription
hoststringYesPostgreSQL server hostname
portintegerYesPostgreSQL server port
databasestringYesPostgreSQL database name
usernamestringYesPostgreSQL username
passwordstringYesPostgreSQL password
sslmodestring enum: disable, require, verify-ca, verify-fullYesdisablePostgreSQL SSL mode
max_open_connsintegerYes10Maximum number of open connections
max_idle_connsintegerYes5Maximum number of idle connections

Example

consumers:
- type: contract_events_postgres
config:
host: postgres.example.com
port: 5432
database: defaultdb
username: postgres
password: "${POSTGRES_PASSWORD}"
sslmode: require
max_open_conns: 10
max_idle_conns: 5

soroswap_postgres — SwapService PostgreSQL

Store SwapService events in a specialized PostgreSQL schema

FieldTypeRequiredDefaultDescription
hoststringYesPostgreSQL server hostname
portintegerYes5432PostgreSQL server port
databasestringYesPostgreSQL database name
usernamestringYesPostgreSQL username
passwordstringYesPostgreSQL password
sslmodestring enum: disable, require, verify-ca, verify-fullYesdisablePostgreSQL SSL mode
max_open_connsintegerYes10Maximum number of open connections
max_idle_connsintegerYes5Maximum number of idle connections

Example

consumers:
- type: soroswap_postgres
config:
host: postgres.example.com
port: 5432
database: defaultdb
username: postgres
password: "${POSTGRES_PASSWORD}"
sslmode: require
max_open_conns: 10
max_idle_conns: 5

save_latest_ledger_redis — Redis Ledger Storage

Store latest ledger information in Redis

FieldTypeRequiredDefaultDescription
redis_addressstringYes:6379Redis server address (host:port)
redis_passwordstring sensitiveNoRedis server password
redis_dbinteger (min 0, max 15)No0Redis database number
key_prefixstringNostellar:ledger:Prefix for Redis keys
use_tlsbooleanNotrueEnable TLS/SSL connection to Redis

Example

consumers:
- type: save_latest_ledger_redis
config:
redis_address: "redis.example.com:6379"
redis_db: 0
key_prefix: "stellar:ledger:"
use_tls: true

contract_data_postgres — Contract Data PostgreSQL

Save contract data to PostgreSQL with configurable schema

FieldTypeRequiredDefaultDescription
database_urlstring (uri) sensitiveYesPostgreSQL connection string
table_namestringYescontract_dataPostgreSQL table name
create_table_if_not_existsbooleanNotrueAutomatically create table if missing
batch_sizeinteger (min 1, max 1000)No100Number of records to batch before writing
max_open_connsinteger (min 1, max 100)No25Maximum database connections
max_idle_connsinteger (min 1, max 50)No5Maximum idle database connections

Example

consumers:
- type: contract_data_postgres
config:
database_url: "postgres://user:${POSTGRES_PASSWORD}@postgres.example.com:5432/defaultdb?sslmode=require"
table_name: "contract_data"
create_table_if_not_exists: true
batch_size: 100
max_open_conns: 25
max_idle_conns: 5

extracted_contract_invocations_postgres — Extracted Contract Invocations PostgreSQL

Save extracted contract invocation business data to PostgreSQL with optimized schema

FieldTypeRequiredDefaultDescription
hoststringYesPostgreSQL server hostname
portintegerYes5432PostgreSQL server port
databasestringYesPostgreSQL database name
usernamestringYesPostgreSQL username
passwordstring sensitiveYesPostgreSQL password
sslmodestring enum: disable, require, verify-ca, verify-fullNodisablePostgreSQL SSL mode
max_open_connsinteger (min 1)No10Maximum number of open connections to the database
max_idle_connsinteger (min 1)No5Maximum number of idle connections in the pool
connect_timeoutinteger (min 1)No30Connection timeout in seconds

Example

consumers:
- type: extracted_contract_invocations_postgres
config:
host: postgres.example.com
port: 5432
database: defaultdb
username: postgres
password: "${POSTGRES_PASSWORD}"
sslmode: require
max_open_conns: 10
max_idle_conns: 5
connect_timeout: 30

event_payment_postgres — Event Payment PostgreSQL

Saves event payment data to PostgreSQL with accounts tracking

FieldTypeRequiredDefaultDescription
connectionStringstring (uri) sensitiveYesPostgreSQL connection string (e.g., postgres://user:pass@host:5432/db)
batchSizeinteger (min 1, max 1000)No1Number of records to batch before committing

Example

consumers:
- type: event_payment_postgres
config:
connectionString: "postgres://user:${POSTGRES_PASSWORD}@postgres.example.com:5432/defaultdb?sslmode=require"
batchSize: 1

google_pubsub_v2 — Google Pub/Sub (V2)

Publishes event payments to Google Cloud Pub/Sub in V2 message format

FieldTypeRequiredDefaultDescription
project_idstringYesGoogle Cloud project ID
topic_idstringYesPub/Sub topic name
chain_identifierstring enum: StellarMainnet, StellarTestnetYesStellarTestnetStellar network identifier
credentials_jsonstring (textarea) sensitiveYesGoogle Cloud service account JSON key (paste the entire JSON content)

Example

consumers:
- type: google_pubsub_v2
config:
project_id: "my-gcp-project"
topic_id: "stellar-events"
chain_identifier: "StellarTestnet"
credentials_json: "${GOOGLE_APPLICATION_CREDENTIALS_JSON}"