Examples

Sample applications and code snippets for common use cases.

12 min readReference

Real-Time Analytics Pipeline

This example shows how to build a real-time analytics pipeline that ingests click events, aggregates them per page per minute, and writes the results to a summary topic.

sql
-- Create the source stream
CREATE STREAM clickstream (
  user_id VARCHAR,
  page VARCHAR,
  referrer VARCHAR,
  timestamp TIMESTAMP
) WITH (topic = 'clicks', format = 'json');

-- Create a continuous query for page views per minute
CREATE CONTINUOUS QUERY page_views_per_minute AS
  SELECT
    page,
    count(*) as views,
    count(DISTINCT user_id) as unique_visitors,
    window_start,
    window_end
  FROM TUMBLE(clickstream, timestamp, INTERVAL '1 minute')
  GROUP BY page, window_start, window_end
  OUTPUT TO 'page-views-per-minute';

Change Data Capture (CDC)

Use StreamHouse to capture database changes and propagate them to downstream systems.

json
# 1. Set up CDC from PostgreSQL using Debezium
# debezium-connector.json
{
  "name": "pg-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "replicator",
    "database.dbname": "myapp",
    "topic.prefix": "cdc",
    "plugin.name": "pgoutput"
  }
}

# 2. Consume CDC events in StreamHouse
streamctl consume --topic cdc.public.users --from-beginning --format json

Log Aggregation

Collect logs from multiple services into StreamHouse and query them with SQL.

bash
# Produce logs from your application
echo '{"service":"api","level":"error","message":"Connection timeout","timestamp":"2026-01-15T10:30:00Z"}' | \
  streamctl produce --topic app-logs

# Query errors in the last hour
streamctl sql "
  SELECT service, level, message, timestamp
  FROM app_logs
  WHERE level = 'error'
    AND timestamp > NOW() - INTERVAL '1 hour'
  ORDER BY timestamp DESC
  LIMIT 50
"

Python Client Example

Use the StreamHouse Python client for programmatic access.

python
import requests
import json

STREAMHOUSE_URL = "http://localhost:8080"

# Produce messages
def produce(topic: str, key: str, value: dict):
    resp = requests.post(
        f"{STREAMHOUSE_URL}/api/topics/{topic}/produce",
        json={"key": key, "value": value}
    )
    return resp.json()

# Consume messages
def consume(topic: str, partition: int = 0, offset: int = 0, limit: int = 100):
    resp = requests.get(
        f"{STREAMHOUSE_URL}/api/topics/{topic}/consume",
        params={"partition": partition, "offset": offset, "limit": limit}
    )
    return resp.json()

# Example usage
produce("user-events", "user-42", {"event": "signup", "plan": "pro"})
messages = consume("user-events", partition=0, offset=0)
for msg in messages:
    print(f"Offset {msg['offset']}: {msg['value']}")