Skip to main content

Consumers

Consumers are the destination components in Flow pipelines that receive processed blockchain data and deliver it to your applications, databases, or storage systems. Each consumer is optimized for specific use cases and integration patterns.

Available Consumers

Database Storage - PostgreSQL

PostgreSQL (Generic)

General-purpose PostgreSQL consumer for ledger data with flexible JSON format.

Configuration:

type: postgres
config:
connection_string: "postgresql://user:pass@host:5432/database"
batch_size: 50

Use Cases: Analytics databases, application backends, historical data storage


PostgreSQL Bronze

Bronze layer data ingestion to PostgreSQL for data lakehouse architecture.

Configuration:

type: postgres_bronze
config:
host: "localhost"
database: "bronze"
username: "user"
password: "password"

Use Cases: Raw data layer storage, medallion architecture


Buffered PostgreSQL

Buffered batch inserts with retry logic for high-throughput scenarios.

Configuration:

type: buffered_postgres
config:
buffer_size: 1000
flush_interval_ms: 5000
max_retries: 3

Use Cases: High-volume data ingestion with batching


Account Data PostgreSQL

Stores Stellar account data with full metadata.

Configuration:

type: account_data_postgres
config:
host: "localhost"
port: 5432
database: "accounts"
max_open_conns: 20

Use Cases: Account state tracking and historical analysis


Asset PostgreSQL

Stores asset/ticker information with batching.

Configuration:

type: asset_postgres
config:
batch_size: 100
connection_string: "postgresql://user:pass@host:5432/assets"

Use Cases: Asset catalog and enrichment data storage


Asset Enrichment PostgreSQL

Updates asset enrichment data (TOML, auth flags).

Configuration:

type: asset_enrichment
config:
connection_string: "postgresql://user:pass@host:5432/assets"

Use Cases: Asset metadata enrichment from stellar.toml


Payments PostgreSQL

Batched payment transaction storage.

Configuration:

type: payments_postgres
config:
batch_size: 1000
connection_string: "postgresql://user:pass@host:5432/payments"

Use Cases: Payment history and analytics


Event Payment PostgreSQL

Event-based payment storage with account relationships.

Configuration:

type: event_payment_postgres
config:
host: "localhost"
database: "payments"
max_open_conns: 20

Use Cases: Event-driven payment tracking with foreign keys


Contract Events PostgreSQL

Soroban contract events storage.

Configuration:

type: contract_events_postgres
config:
host: "localhost"
database: "soroban_events"

Use Cases: Smart contract event logging and analysis


Contract Invocations PostgreSQL

Contract invocations with dual XDR/decoded format, diagnostic events, state changes.

Configuration:

type: contract_invocations_postgres
config:
host: "localhost"
database: "contract_invocations"

Use Cases: Complete contract execution tracking


Extracted Contract Invocations PostgreSQL

Business-logic extracted contract data (funder, recipient, amount, project_id).

Configuration:

type: extracted_contract_invocations_postgres
config:
host: "localhost"
database: "business_data"

Use Cases: Business intelligence from smart contracts


Claimable Balance PostgreSQL

Claimable balance tracking with claimants.

Configuration:

type: claimable_balance_postgres
config:
database_url: "postgresql://user:pass@host:5432/claimable_balances"

Use Cases: Claimable balance lifecycle management


Soroswap PostgreSQL

Soroswap DEX events (pairs, swaps, liquidity).

Configuration:

type: soroswap_postgres
config:
host: "localhost"
database: "soroswap"

Use Cases: Soroswap DEX analytics


Wallet Backend PostgreSQL

Wallet backend with state changes and participants.

Configuration:

type: wallet_backend_postgres
config:
host: "localhost"
database: "wallet"

Use Cases: Wallet application backend database


Database Storage - DuckDB

DuckDB (Generic)

General DuckDB storage for analytics-friendly columnar format.

Configuration:

type: duckdb
config:
db_path: "/path/to/data.duckdb"

Use Cases: Local analytics, data science workflows


DuckLake

DuckDB lakehouse pattern with schema registry.

Configuration:

type: ducklake
config:
db_path: "ducklake.duckdb"
catalog_name: "main"
schema_name: "bronze"
table_prefix: "stellar_"

Use Cases: Data lakehouse architecture with managed schemas


DuckLake Enhanced

Enhanced DuckLake with better performance and features.

Configuration:

type: ducklake_enhanced
config:
db_path: "ducklake_enhanced.duckdb"
batch_size: 100
flush_interval: 5

Use Cases: High-performance data lakehouse


Bronze DuckDB

Bronze layer DuckDB ingestion with appenders for medallion architecture.

Configuration:

type: bronze_duckdb
config:
db_path: "bronze.duckdb"
batch_size: 100
flush_interval_seconds: 10

Use Cases: Raw data ingestion for data lakehouse


Account Data DuckDB

Account data with schema validation.

Configuration:

type: account_data_duckdb
config:
db_path: "accounts.duckdb"

Use Cases: Account analytics in DuckDB


Assets DuckDB

Asset catalog with upsert logic.

Configuration:

type: assets_duckdb
config:
db_path: "assets.duckdb"

Use Cases: Asset tracking and analytics


Contract Events DuckDB

Soroban contract events with analytics views.

Configuration:

type: contract_events_duckdb
config:
db_path: "soroban_events.duckdb"

Use Cases: Contract event analytics with built-in views


Soroswap Pairs DuckDB

Soroswap pair and sync events.

Configuration:

type: soroswap_pairs_duckdb
config:
db_path: "soroswap_pairs.duckdb"

Use Cases: DEX pair tracking and reserves


Soroswap Router DuckDB

Soroswap router transactions.

Configuration:

type: soroswap_router_duckdb
config:
db_path: "soroswap_router.duckdb"

Use Cases: Router analytics and swap tracking


Database Storage - Other

SQLite (Soroswap)

Lightweight SQLite versions of Soroswap consumers.

Configuration:

type: soroswap_pairs_sqlite
config:
db_path: "soroswap.db"

Use Cases: Portable DEX data storage, development


ClickHouse

OLAP database with materialized views for real-time analytics.

Configuration:

type: clickhouse
config:
address: "localhost:9000"
database: "stellar"
username: "default"
password: ""
max_open_conns: 25

Use Cases: Payment stats, price analytics, trustlines, high-volume analytics


MongoDB

Document-based storage for flexible schemas.

Configuration:

type: mongodb
config:
uri: "mongodb://localhost:27017"
database: "stellar"
collection: "transactions"

Use Cases: Flexible document storage, semi-structured data


TimescaleDB

Time-series database for temporal analytics.

Configuration:

type: timescaledb
config:
host: "localhost"
database: "timeseries"

Use Cases: Time-series data and analytics


Caching - Redis

Redis (Generic)

Multi-operation Redis storage (payments, accounts, assets, prices, offers, trustlines).

Configuration:

type: redis
config:
redis_url: "redis://localhost:6379"
key_prefix: "flow:"
ttl_hours: 24
use_tls: true

Use Cases: Fast lookups, caching, real-time data


Payments Redis

Payment-specific Redis storage with indices.

Configuration:

type: payments_redis
config:
redis_url: "redis://localhost:6379"
key_prefix: "payment:"
ttl_hours: 48

Use Cases: Payment caching and fast retrieval


Latest Ledger Redis

Latest ledger tracking with history.

Configuration:

type: latest_ledger_redis
config:
redis_url: "redis://localhost:6379"
key_prefix: "ledger:"
use_tls: true

Use Cases: Ledger progress tracking


Orderbook Redis

Order book data storage.

Configuration:

type: orderbook_redis
config:
redis_url: "redis://localhost:6379"

Use Cases: Trading orderbook caching


Market Analytics Redis

Market analytics and metrics.

Configuration:

type: market_analytics_redis
config:
redis_url: "redis://localhost:6379"

Use Cases: Trading analytics and market data


Cloud Storage

Google Cloud Storage

Upload to Google Cloud Storage buckets.

Configuration:

type: gcs
config:
bucket_name: "my-stellar-data"
object_prefix: "ledgers/"
credentials_file: "/path/to/credentials.json"

Use Cases: Cloud backup and archival


Parquet Files

Parquet file generation with cloud storage support (local/S3/GCS).

Configuration:

type: parquet
config:
storage_type: "s3"
s3_bucket: "my-stellar-data"
output_path: "parquet/ledgers/"

Use Cases: Columnar data export for analytics


Ledger Parquet

Ledger-specific Parquet files.

Configuration:

type: ledger_parquet
config:
storage_type: "gcs"
gcs_bucket: "my-stellar-ledgers"
output_path: "parquet/"

Use Cases: Ledger data archival in Parquet format


Streaming & Messaging

Google Pub/Sub

Publisher for Google Pub/Sub with EventPayment support.

Configuration:

type: pubsub
config:
project_id: "my-project"
topic_id: "stellar-events"
credentials_file: "/path/to/credentials.json"

Use Cases: Event streaming to Google Pub/Sub


Google Pub/Sub V2

V2 format with chain identifier (StellarMainnet/Testnet).

Configuration:

type: pubsub_v2
config:
project_id: "my-project"
topic_id: "stellar-events-v2"
chain_identifier: "StellarMainnet"

Use Cases: Multi-chain Pub/Sub publishing


ZeroMQ

Low-latency message publishing.

Configuration:

type: zeromq
config:
endpoint: "tcp://localhost:5555"

Use Cases: High-performance IPC, minimal latency


Real-time Communications

WebSocket

WebSocket server with client filtering and queuing.

Configuration:

type: websocket
config:
port: 8080
path: "/ws"
max_queue_size: 1000

Use Cases: Real-time browser/app updates with custom filters


File Export

Excel

Excel spreadsheet export.

Configuration:

type: excel
config:
file_path: "/path/to/output.xlsx"
sheet_name: "Data"

Use Cases: Manual analysis and reporting


Latest Ledger Excel

Ledger-specific Excel export.

Configuration:

type: latest_ledger_excel
config:
file_path: "/path/to/ledgers.xlsx"

Use Cases: Ledger progress tracking in Excel


AI/ML Integration

Anthropic Claude

Batches messages and sends to Claude API for analysis.

Configuration:

type: claude
config:
anthropic_api_key: "sk-ant-..."
batch_size: 10
flush_interval_seconds: 30

Use Cases: AI-powered transaction analysis and insights


Notifications

Notification Dispatcher

Multi-channel notifications (Slack, email, webhook) with rule-based filtering.

Configuration:

type: notification_dispatcher
config:
slack_token: "xoxb-..."
webhook_urls:
- "https://hooks.slack.com/services/..."
rules:
- condition: "amount > 1000000"
channel: "high-value-payments"

Use Cases: Alert system for threshold-based monitoring


Data Transformation

Silver Ingester

Bronze to Silver layer transformation for medallion architecture.

Configuration:

type: silver_ingester
config:
source_db: "bronze.duckdb"
target_db: "silver.duckdb"

Use Cases: Curated data layer processing


Debugging & Development

Stdout

Prints JSON to stdout for debugging.

Configuration:

type: stdout

Use Cases: Development, debugging, testing


Debug Logger

Detailed message inspection with field limits.

Configuration:

type: debug_logger
config:
name: "pipeline-debug"
log_prefix: "DEBUG"
max_fields: 10

Use Cases: Deep debugging of message structure


Log Debug

Logs message types and metadata.

Configuration:

type: log_debug
config:
log_level: "info"

Use Cases: Pipeline flow debugging


Choosing the Right Consumer

Consider these factors when selecting a consumer:

1. Data Access Patterns

  • Real-time queries: PostgreSQL, Redis
  • Batch analytics: S3, DuckDB
  • Stream processing: Kafka, Webhook
  • Embedded/Edge: SQLite, DuckDB

2. Scale Requirements

  • High volume: Kafka, S3
  • Moderate volume: PostgreSQL, Webhook
  • Low volume: SQLite, ZeroMQ

3. Infrastructure

  • Managed services: PostgreSQL, S3
  • Self-hosted: All options
  • No infrastructure: SQLite, DuckDB

4. Integration Needs

  • SQL queries: PostgreSQL, DuckDB, SQLite
  • REST APIs: Webhook
  • Stream processing: Kafka, ZeroMQ
  • Key-value access: Redis

Consumer Configuration

All consumers support configuration through the Flow pipeline builder:

{
"consumer_type": "postgres",
"config": {
"connection_string": "postgresql://user:pass@host/db",
"batch_size": 100
}
}

Security Considerations

Credential Management

  • Credentials are stored securely in Vault
  • Never exposed in logs or UI
  • Automatic rotation support
  • Encrypted in transit

Connection Security

  • TLS/SSL support for all network consumers
  • Authentication mechanisms:
    • PostgreSQL: Username/password, SSL certs
    • Kafka: SASL, SSL
    • S3: IAM credentials
    • Webhook: Shared secrets

Performance Optimization

Batch Processing

Most consumers support batch processing for efficiency:

  • Configure appropriate batch sizes
  • Balance between latency and throughput
  • Consider memory constraints

Connection Pooling

Database consumers maintain connection pools:

  • Reuse connections for better performance
  • Configure pool sizes based on load
  • Monitor connection health

Parallel Processing

Some consumers support parallel writes:

  • Kafka partitions
  • S3 multipart uploads
  • PostgreSQL parallel inserts

Monitoring & Troubleshooting

Health Checks

  • Connection status monitoring
  • Write failure tracking
  • Latency measurements
  • Queue depth (for streaming consumers)

Common Issues

  • Connection failures: Check credentials and network
  • Performance degradation: Adjust batch sizes
  • Data loss: Enable consumer acknowledgments
  • Schema mismatches: Verify data formats

Best Practices

  1. Start Simple: Begin with PostgreSQL or Webhook
  2. Plan for Scale: Choose consumers that can grow with your needs
  3. Monitor Performance: Track metrics and adjust configurations
  4. Handle Failures: Implement retry logic and dead letter queues
  5. Secure Credentials: Use Vault integration for sensitive data

Next Steps

  • Explore individual consumer documentation for detailed configuration
  • Check our Getting Started Guide for implementation examples
  • Learn about processors to transform your data
  • Review Flow Overview for architecture details