ClickHouse Examples¶
Practical examples for HeliosDB's ClickHouse protocol support.
Connection Examples¶
Python Client¶
from clickhouse_driver import Client
# Basic connection
client = Client(
host='localhost',
port=9000,
user='default',
password='',
database='default'
)
# Execute query
result = client.execute('SELECT count() FROM events')
print(f"Total events: {result[0][0]}")
Go Client¶
import "github.com/ClickHouse/clickhouse-go/v2"
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"localhost:9000"},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
Password: "",
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
MaxOpenConns: 5,
})
ctx := context.Background()
rows, _ := conn.Query(ctx, "SELECT count() FROM events")
Node.js Client¶
const { createClient } = require('@clickhouse/client');
const client = createClient({
host: 'localhost:9000',
database: 'default',
username: 'default',
password: '',
compression: { type: 'lz4' },
});
const result = await client.query({
query: 'SELECT count() FROM events',
});
console.log(await result.json());
Table Creation¶
MergeTree Table¶
CREATE TABLE events (
timestamp DateTime,
event_type String,
user_id UInt32,
duration Float32
) ENGINE = MergeTree()
ORDER BY (timestamp, user_id)
PRIMARY KEY (timestamp)
PARTITION BY toYYYYMM(timestamp);
ReplacingMergeTree (Upsert)¶
CREATE TABLE user_profiles (
user_id UInt32,
version UInt32,
name String,
email String,
updated_at DateTime
) ENGINE = ReplacingMergeTree(version)
ORDER BY user_id
PARTITION BY toYYYYMM(updated_at);
-- Query latest version
SELECT user_id, name, email
FROM user_profiles
FINAL
WHERE user_id = 123;
AggregatingMergeTree¶
CREATE TABLE metrics_aggregated (
timestamp DateTime,
metric_name String,
metric_value AggregateFunction(sum, Float64),
metric_count AggregateFunction(count, UInt64)
) ENGINE = AggregatingMergeTree()
ORDER BY (timestamp, metric_name)
PARTITION BY toYYYYMMDD(timestamp);
-- Insert aggregated values
INSERT INTO metrics_aggregated
SELECT
toStartOfMinute(timestamp) as timestamp,
metric_name,
sumState(value) as metric_value,
countState(1) as metric_count
FROM raw_metrics
GROUP BY timestamp, metric_name;
-- Query merged values
SELECT
timestamp,
metric_name,
sumMerge(metric_value) as total,
countMerge(metric_count) as count
FROM metrics_aggregated
GROUP BY timestamp, metric_name;
Query Optimization¶
PREWHERE for Early Filtering¶
-- FAST: Filters before reading heavy columns
SELECT user_id, event_type, timestamp
FROM events
PREWHERE timestamp > '2025-01-01' AND event_type = 'purchase'
WHERE user_id > 0;
Sampling for Large Datasets¶
-- Analyze 10% of data
SELECT event_type, count() as cnt
FROM events
SAMPLE 0.1
GROUP BY event_type;
Efficient Aggregations¶
-- Basic aggregations
SELECT
event_type,
count() as events,
count(DISTINCT user_id) as unique_users,
sum(value) as total_value,
avg(duration) as avg_duration,
min(timestamp) as first_event,
max(timestamp) as last_event
FROM events
GROUP BY event_type;
Quantile Functions¶
Percentile Calculations¶
-- Quick approximate quantiles
SELECT
event_type,
quantile(0.5)(duration) as p50,
quantile(0.95)(duration) as p95,
quantile(0.99)(duration) as p99
FROM events
GROUP BY event_type;
-- Multiple quantiles at once
SELECT
event_type,
quantiles(0.25, 0.5, 0.75, 0.95, 0.99)(duration) as percentiles
FROM events
GROUP BY event_type;
Top-K Queries¶
-- Get top 10 event types
SELECT event_type, count() as cnt
FROM events
GROUP BY event_type
ORDER BY cnt DESC
LIMIT 10;
-- TopK function (more efficient)
SELECT
user_id,
topK(10)(event_type) as top_events
FROM events
GROUP BY user_id;
Unique Counting¶
-- Approximate distinct count (very fast!)
SELECT
event_type,
uniq(user_id) as unique_users,
uniqHLL12(user_id) as uniq_hll
FROM events
GROUP BY event_type;
-- Exact distinct count
SELECT
event_type,
uniqExact(user_id) as unique_users
FROM events
GROUP BY event_type;
Window Functions¶
Running Totals¶
SELECT
timestamp,
event_type,
value,
sum(value) OVER (
PARTITION BY event_type
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total
FROM events
ORDER BY event_type, timestamp;
Row Number and Ranking¶
SELECT
timestamp,
user_id,
value,
row_number() OVER (
PARTITION BY user_id
ORDER BY timestamp
) as event_seq,
rank() OVER (
PARTITION BY user_id
ORDER BY value DESC
) as value_rank
FROM events
ORDER BY user_id, timestamp;
Lead/Lag for Time-Series¶
SELECT
timestamp,
value,
lag(value, 1) OVER (ORDER BY timestamp) as prev_value,
lead(value, 1) OVER (ORDER BY timestamp) as next_value,
value - lag(value, 1) OVER (ORDER BY timestamp) as value_change
FROM metrics
ORDER BY timestamp;
Materialized Views¶
-- Base table
CREATE TABLE events (
timestamp DateTime,
event_type String,
user_id UInt32,
value Float32
) ENGINE = MergeTree()
ORDER BY timestamp;
-- Aggregated view
CREATE MATERIALIZED VIEW events_per_minute
ENGINE = AggregatingMergeTree()
ORDER BY (timestamp, event_type)
AS
SELECT
toStartOfMinute(timestamp) as timestamp,
event_type,
count() as event_count,
sum(value) as total_value,
avg(value) as avg_value
FROM events
GROUP BY toStartOfMinute(timestamp), event_type;
-- Query view directly
SELECT * FROM events_per_minute
WHERE timestamp > '2025-01-01'
ORDER BY timestamp DESC;
External Data Integration¶
S3 Integration¶
-- Direct query from S3
SELECT * FROM s3(
'https://my-bucket.s3.amazonaws.com/data/events/*.parquet',
'PARQUET'
)
LIMIT 100;
-- Insert from S3
INSERT INTO events
SELECT *
FROM s3(
'https://my-bucket.s3.amazonaws.com/data/events/*.csv',
'CSV',
'timestamp DateTime, event_type String, user_id UInt32, value Float32'
);
Kafka Integration¶
-- Create Kafka source table
CREATE TABLE events_kafka (
timestamp DateTime,
event_type String,
user_id UInt32,
value Float32
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-1:9092,kafka-2:9092',
kafka_topic_list = 'events',
kafka_group_id = 'clickhouse_consumer',
kafka_format = 'JSONEachRow';
-- Create materialized view to consume
CREATE MATERIALIZED VIEW events_mv
TO events
AS SELECT * FROM events_kafka;
Bulk Insert¶
def bulk_insert_events(client, events, batch_size=10000):
"""Insert events efficiently in batches."""
for i in range(0, len(events), batch_size):
batch = events[i:i+batch_size]
client.execute(
'INSERT INTO events (timestamp, event_type, user_id, value) VALUES',
batch,
types_check=True
)
print(f"Inserted {i+len(batch)} events")
# Generate sample data
import datetime
import random
events = [
(
datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 24)),
random.choice(['click', 'view', 'purchase']),
random.randint(1, 10000),
random.uniform(0, 100)
)
for _ in range(1000000)
]
bulk_insert_events(client, events, batch_size=50000)
Real-World Examples¶
Web Analytics Dashboard¶
# Create tables
client.execute('''
CREATE TABLE IF NOT EXISTS page_views (
timestamp DateTime,
session_id UUID,
user_id UInt32,
page_path String,
duration_ms UInt32,
status_code UInt16,
referrer String
) ENGINE = MergeTree()
ORDER BY (timestamp, user_id)
PARTITION BY toYYYYMMDD(timestamp)
''')
# Query for dashboard
dashboard_data = client.execute('''
SELECT
page_path,
count() as views,
count(DISTINCT user_id) as unique_users,
round(avg(duration_ms), 0) as avg_duration_ms,
round(quantile(0.95)(duration_ms), 0) as p95_duration_ms
FROM page_views
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY page_path
ORDER BY views DESC
LIMIT 20
''')
Log Analysis¶
CREATE TABLE application_logs (
timestamp DateTime,
level Enum8('DEBUG' = 1, 'INFO' = 2, 'WARN' = 3, 'ERROR' = 4),
service String,
message String,
trace_id String,
duration_ms UInt32
) ENGINE = MergeTree()
ORDER BY (timestamp, service, level)
PARTITION BY toYYYYMMDD(timestamp)
TTL timestamp + INTERVAL 30 DAY;
-- Error pattern analysis
SELECT
service,
level,
substring(message, 1, 100) as error_pattern,
count() as occurrences,
max(timestamp) as last_seen,
avg(duration_ms) as avg_duration
FROM application_logs
WHERE timestamp > now() - INTERVAL 24 HOUR
AND level >= 3
GROUP BY service, level, error_pattern
ORDER BY occurrences DESC
LIMIT 20;
Time-Series Metrics¶
# High-cardinality time-series storage
client.execute('''
CREATE TABLE metrics (
timestamp DateTime,
metric_name String,
metric_value Float64,
tags Array(String)
) ENGINE = MergeTree()
ORDER BY (timestamp, metric_name)
PARTITION BY toYYYYMMDD(timestamp)
''')
# Pre-aggregated hourly table
client.execute('''
CREATE MATERIALIZED VIEW metrics_1h
ENGINE = AggregatingMergeTree()
ORDER BY (hour, metric_name)
AS
SELECT
toStartOfHour(timestamp) as hour,
metric_name,
sumState(metric_value) as total,
countState(1) as count,
minState(metric_value) as min_value,
maxState(metric_value) as max_value
FROM metrics
GROUP BY hour, metric_name
''')
# Query with finalization
result = client.execute('''
SELECT
metric_name,
sumMerge(total) as total_value,
countMerge(count) as data_points,
minMerge(min_value) as min_value,
maxMerge(max_value) as max_value
FROM metrics_1h
WHERE hour >= now() - INTERVAL 7 DAY
GROUP BY metric_name
ORDER BY metric_name
''')
Troubleshooting Queries¶
Check Slow Queries¶
SELECT
query_duration_ms,
query,
read_rows,
result_rows
FROM system.query_log
WHERE query_duration_ms > 1000
ORDER BY query_duration_ms DESC
LIMIT 10;
Analyze Index Usage¶
SELECT
table,
name,
type,
data_compressed_bytes,
data_uncompressed_bytes
FROM system.data_skipping_indices;
Check Table Size¶
SELECT
database,
table,
formatReadableSize(sum(bytes)) as size,
sum(rows) as rows
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;
Related: README.md | CONFIGURATION.md | COMPATIBILITY.md
Last Updated: December 2025