Examples
Sample applications and code snippets for common use cases.
12 min readReference
On this page
Real-Time Analytics Pipeline
This example shows how to build a real-time analytics pipeline that ingests click events, aggregates them per page per minute, and writes the results to a summary topic.
sql
-- Create the source stream
CREATE STREAM clickstream (
user_id VARCHAR,
page VARCHAR,
referrer VARCHAR,
timestamp TIMESTAMP
) WITH (topic = 'clicks', format = 'json');
-- Create a continuous query for page views per minute
CREATE CONTINUOUS QUERY page_views_per_minute AS
SELECT
page,
count(*) as views,
count(DISTINCT user_id) as unique_visitors,
window_start,
window_end
FROM TUMBLE(clickstream, timestamp, INTERVAL '1 minute')
GROUP BY page, window_start, window_end
OUTPUT TO 'page-views-per-minute';Change Data Capture (CDC)
Use StreamHouse to capture database changes and propagate them to downstream systems.
json
# 1. Set up CDC from PostgreSQL using Debezium
# debezium-connector.json
{
"name": "pg-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replicator",
"database.dbname": "myapp",
"topic.prefix": "cdc",
"plugin.name": "pgoutput"
}
}
# 2. Consume CDC events in StreamHouse
streamctl consume --topic cdc.public.users --from-beginning --format jsonLog Aggregation
Collect logs from multiple services into StreamHouse and query them with SQL.
bash
# Produce logs from your application
echo '{"service":"api","level":"error","message":"Connection timeout","timestamp":"2026-01-15T10:30:00Z"}' | \
streamctl produce --topic app-logs
# Query errors in the last hour
streamctl sql "
SELECT service, level, message, timestamp
FROM app_logs
WHERE level = 'error'
AND timestamp > NOW() - INTERVAL '1 hour'
ORDER BY timestamp DESC
LIMIT 50
"Python Client Example
Use the StreamHouse Python client for programmatic access.
python
import requests
import json
STREAMHOUSE_URL = "http://localhost:8080"
# Produce messages
def produce(topic: str, key: str, value: dict):
resp = requests.post(
f"{STREAMHOUSE_URL}/api/topics/{topic}/produce",
json={"key": key, "value": value}
)
return resp.json()
# Consume messages
def consume(topic: str, partition: int = 0, offset: int = 0, limit: int = 100):
resp = requests.get(
f"{STREAMHOUSE_URL}/api/topics/{topic}/consume",
params={"partition": partition, "offset": offset, "limit": limit}
)
return resp.json()
# Example usage
produce("user-events", "user-42", {"event": "signup", "plan": "pro"})
messages = consume("user-events", partition=0, offset=0)
for msg in messages:
print(f"Offset {msg['offset']}: {msg['value']}")