Back to blog
Use CaseMarch 19, 202410 min read

Real-Time CDC Pipelines: Streaming Database Changes with StreamHouse

Every INSERT, UPDATE, and DELETE in your database can be a stream event. Here's how to build Change Data Capture pipelines with StreamHouse for real-time data synchronization, search indexing, and cache invalidation.

What is CDC?

Change Data Capture (CDC) turns database changes into a stream of events. Every time a row is inserted, updated, or deleted, a corresponding event is emitted. This enables real-time data synchronization between systems without polling or batch ETL.

PostgreSQL → CDC → StreamHouse → Downstream Systems
                                    ├── Elasticsearch (search index)
                                    ├── Redis (cache invalidation)
                                    ├── Data warehouse (analytics)
                                    └── Other microservices

Why StreamHouse for CDC?

CDC generates high-volume, ordered event streams — exactly what StreamHouse is built for. Key advantages:

  • Ordered delivery: CDC events must be processed in order per key. StreamHouse's partition-key ordering guarantees this.
  • Replay: If a downstream system needs to rebuild its state, consumers can replay from the beginning of the topic.
  • Fan-out: Multiple downstream systems can independently consume the same CDC stream.
  • Cost: CDC data is often high-volume but rarely queried. S3 storage at $0.023/GB is perfect.
  • SQL processing: Filter, transform, and aggregate CDC events using SQL before they reach downstream systems.

Setting Up CDC with Debezium

Debezium is the standard open-source CDC tool. It captures changes from PostgreSQL's WAL (Write-Ahead Log) and produces them to a streaming platform.

Step 1: Configure PostgreSQL for Logical Replication

-- In postgresql.conf
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

-- Create a replication user
CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'secret';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator;

Step 2: Create StreamHouse Topics

Create topics that mirror your database tables:

# One topic per table, keyed by primary key
streamctl topic create --name cdc.public.users --partitions 6 --retention 30d
streamctl topic create --name cdc.public.orders --partitions 12 --retention 30d
streamctl topic create --name cdc.public.products --partitions 6 --retention 30d

Step 3: Configure Debezium

{
  "name": "pg-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "replicator",
    "database.password": "secret",
    "database.dbname": "myapp",
    "topic.prefix": "cdc",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "filtered",
    "table.include.list": "public.users,public.orders,public.products"
  }
}

Step 4: Consume CDC Events

CDC events include the full row data plus metadata:

{
  "op": "u",
  "before": {"id": 1, "name": "Alice", "email": "alice@old.com"},
  "after": {"id": 1, "name": "Alice", "email": "alice@new.com"},
  "source": {
    "table": "users",
    "lsn": 123456789,
    "txId": 42
  },
  "ts_ms": 1706000000000
}

The op field indicates the operation: c (create), u (update), d (delete), r (snapshot read).

Use Case 1: Search Index Sync

Keep Elasticsearch in sync with your database:

-- Create a continuous query that transforms CDC events for Elasticsearch
CREATE CONTINUOUS QUERY user_search_updates AS
  SELECT
    after->>'id' as user_id,
    after->>'name' as name,
    after->>'email' as email,
    after->>'bio' as bio,
    CASE op
      WHEN 'd' THEN 'delete'
      ELSE 'upsert'
    END as action
  FROM cdc_users
  WHERE op IN ('c', 'u', 'd')
  OUTPUT TO 'search-user-updates';

A downstream consumer reads from search-user-updates and applies changes to Elasticsearch.

Use Case 2: Cache Invalidation

Invalidate Redis cache entries when database rows change:

CREATE CONTINUOUS QUERY cache_invalidations AS
  SELECT
    'product:' || (after->>'id') as cache_key,
    op as operation
  FROM cdc_products
  OUTPUT TO 'cache-invalidations';

A consumer reads from cache-invalidations and calls DEL on the corresponding Redis keys. This is faster and more reliable than TTL-based cache expiry.

Use Case 3: Real-Time Analytics

Stream database changes to your data warehouse without batch ETL:

-- Aggregate order metrics in real-time
CREATE CONTINUOUS QUERY order_metrics AS
  SELECT
    count(*) FILTER (WHERE op = 'c') as new_orders,
    sum(CAST(after->>'amount' AS DOUBLE)) FILTER (WHERE op = 'c') as total_revenue,
    window_start
  FROM TUMBLE(cdc_orders, ts_ms, INTERVAL '1 minute')
  GROUP BY window_start, window_end
  OUTPUT TO 'order-metrics-per-minute';

Ordering Guarantees

CDC events must be processed in order for each row (primary key). StreamHouse guarantees this because:

  1. Debezium produces events with the primary key as the message key
  2. StreamHouse routes all events with the same key to the same partition
  3. Within a partition, events are strictly ordered
  4. Consumer groups assign entire partitions (not individual keys) to consumers

This means all changes to user_id=123 are processed by the same consumer in the exact order they occurred in the database.

Handling Schema Changes

When your database schema evolves (ALTER TABLE), Debezium emits events with the new schema. StreamHouse's Schema Registry can validate these changes:

# Set backward compatibility on CDC topics
curl -X PUT http://localhost:8081/config/cdc.public.users-value \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "BACKWARD"}'

Adding nullable columns is always backward compatible. Removing columns or changing types will be caught by the compatibility check before any bad data reaches consumers.

Getting Started

# 1. Deploy StreamHouse + Debezium
docker compose up -d

# 2. Register the Debezium connector
curl -X POST http://debezium:8083/connectors -d @connector.json

# 3. Watch CDC events flow
streamctl consume --topic cdc.public.users --from-beginning --format json

CDC with StreamHouse gives you a real-time, replayable, SQL-queryable mirror of every change in your database. Build search indexes, invalidate caches, feed analytics, and sync microservices — all from a single, ordered event stream.

Tags:cdcpostgresqldebeziumuse-casedata-sync

Ready to try StreamHouse?

Get started with S3-native event streaming in minutes.