Skip to content

ClickHouse Examples

Practical examples for HeliosDB's ClickHouse protocol support.

Connection Examples

Python Client

from clickhouse_driver import Client

# Basic connection
client = Client(
    host='localhost',
    port=9000,
    user='default',
    password='',
    database='default'
)

# Execute query
result = client.execute('SELECT count() FROM events')
print(f"Total events: {result[0][0]}")

Go Client

import "github.com/ClickHouse/clickhouse-go/v2"

conn, err := clickhouse.Open(&clickhouse.Options{
    Addr: []string{"localhost:9000"},
    Auth: clickhouse.Auth{
        Database: "default",
        Username: "default",
        Password: "",
    },
    Compression: &clickhouse.Compression{
        Method: clickhouse.CompressionLZ4,
    },
    MaxOpenConns: 5,
})

ctx := context.Background()
rows, _ := conn.Query(ctx, "SELECT count() FROM events")

Node.js Client

const { createClient } = require('@clickhouse/client');

const client = createClient({
  host: 'localhost:9000',
  database: 'default',
  username: 'default',
  password: '',
  compression: { type: 'lz4' },
});

const result = await client.query({
  query: 'SELECT count() FROM events',
});
console.log(await result.json());

Table Creation

MergeTree Table

CREATE TABLE events (
  timestamp DateTime,
  event_type String,
  user_id UInt32,
  duration Float32
) ENGINE = MergeTree()
ORDER BY (timestamp, user_id)
PRIMARY KEY (timestamp)
PARTITION BY toYYYYMM(timestamp);

ReplacingMergeTree (Upsert)

CREATE TABLE user_profiles (
  user_id UInt32,
  version UInt32,
  name String,
  email String,
  updated_at DateTime
) ENGINE = ReplacingMergeTree(version)
ORDER BY user_id
PARTITION BY toYYYYMM(updated_at);

-- Query latest version
SELECT user_id, name, email
FROM user_profiles
FINAL
WHERE user_id = 123;

AggregatingMergeTree

CREATE TABLE metrics_aggregated (
  timestamp DateTime,
  metric_name String,
  metric_value AggregateFunction(sum, Float64),
  metric_count AggregateFunction(count, UInt64)
) ENGINE = AggregatingMergeTree()
ORDER BY (timestamp, metric_name)
PARTITION BY toYYYYMMDD(timestamp);

-- Insert aggregated values
INSERT INTO metrics_aggregated
SELECT
    toStartOfMinute(timestamp) as timestamp,
    metric_name,
    sumState(value) as metric_value,
    countState(1) as metric_count
FROM raw_metrics
GROUP BY timestamp, metric_name;

-- Query merged values
SELECT
    timestamp,
    metric_name,
    sumMerge(metric_value) as total,
    countMerge(metric_count) as count
FROM metrics_aggregated
GROUP BY timestamp, metric_name;

Query Optimization

PREWHERE for Early Filtering

-- FAST: Filters before reading heavy columns
SELECT user_id, event_type, timestamp
FROM events
PREWHERE timestamp > '2025-01-01' AND event_type = 'purchase'
WHERE user_id > 0;

Sampling for Large Datasets

-- Analyze 10% of data
SELECT event_type, count() as cnt
FROM events
SAMPLE 0.1
GROUP BY event_type;

Efficient Aggregations

-- Basic aggregations
SELECT
  event_type,
  count() as events,
  count(DISTINCT user_id) as unique_users,
  sum(value) as total_value,
  avg(duration) as avg_duration,
  min(timestamp) as first_event,
  max(timestamp) as last_event
FROM events
GROUP BY event_type;

Quantile Functions

Percentile Calculations

-- Quick approximate quantiles
SELECT
  event_type,
  quantile(0.5)(duration) as p50,
  quantile(0.95)(duration) as p95,
  quantile(0.99)(duration) as p99
FROM events
GROUP BY event_type;

-- Multiple quantiles at once
SELECT
  event_type,
  quantiles(0.25, 0.5, 0.75, 0.95, 0.99)(duration) as percentiles
FROM events
GROUP BY event_type;

Top-K Queries

-- Get top 10 event types
SELECT event_type, count() as cnt
FROM events
GROUP BY event_type
ORDER BY cnt DESC
LIMIT 10;

-- TopK function (more efficient)
SELECT
  user_id,
  topK(10)(event_type) as top_events
FROM events
GROUP BY user_id;

Unique Counting

-- Approximate distinct count (very fast!)
SELECT
  event_type,
  uniq(user_id) as unique_users,
  uniqHLL12(user_id) as uniq_hll
FROM events
GROUP BY event_type;

-- Exact distinct count
SELECT
  event_type,
  uniqExact(user_id) as unique_users
FROM events
GROUP BY event_type;

Window Functions

Running Totals

SELECT
  timestamp,
  event_type,
  value,
  sum(value) OVER (
    PARTITION BY event_type
    ORDER BY timestamp
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) as running_total
FROM events
ORDER BY event_type, timestamp;

Row Number and Ranking

SELECT
  timestamp,
  user_id,
  value,
  row_number() OVER (
    PARTITION BY user_id
    ORDER BY timestamp
  ) as event_seq,
  rank() OVER (
    PARTITION BY user_id
    ORDER BY value DESC
  ) as value_rank
FROM events
ORDER BY user_id, timestamp;

Lead/Lag for Time-Series

SELECT
  timestamp,
  value,
  lag(value, 1) OVER (ORDER BY timestamp) as prev_value,
  lead(value, 1) OVER (ORDER BY timestamp) as next_value,
  value - lag(value, 1) OVER (ORDER BY timestamp) as value_change
FROM metrics
ORDER BY timestamp;

Materialized Views

-- Base table
CREATE TABLE events (
  timestamp DateTime,
  event_type String,
  user_id UInt32,
  value Float32
) ENGINE = MergeTree()
ORDER BY timestamp;

-- Aggregated view
CREATE MATERIALIZED VIEW events_per_minute
ENGINE = AggregatingMergeTree()
ORDER BY (timestamp, event_type)
AS
SELECT
  toStartOfMinute(timestamp) as timestamp,
  event_type,
  count() as event_count,
  sum(value) as total_value,
  avg(value) as avg_value
FROM events
GROUP BY toStartOfMinute(timestamp), event_type;

-- Query view directly
SELECT * FROM events_per_minute
WHERE timestamp > '2025-01-01'
ORDER BY timestamp DESC;

External Data Integration

S3 Integration

-- Direct query from S3
SELECT * FROM s3(
    'https://my-bucket.s3.amazonaws.com/data/events/*.parquet',
    'PARQUET'
)
LIMIT 100;

-- Insert from S3
INSERT INTO events
SELECT *
FROM s3(
    'https://my-bucket.s3.amazonaws.com/data/events/*.csv',
    'CSV',
    'timestamp DateTime, event_type String, user_id UInt32, value Float32'
);

Kafka Integration

-- Create Kafka source table
CREATE TABLE events_kafka (
  timestamp DateTime,
  event_type String,
  user_id UInt32,
  value Float32
) ENGINE = Kafka()
SETTINGS
  kafka_broker_list = 'kafka-1:9092,kafka-2:9092',
  kafka_topic_list = 'events',
  kafka_group_id = 'clickhouse_consumer',
  kafka_format = 'JSONEachRow';

-- Create materialized view to consume
CREATE MATERIALIZED VIEW events_mv
TO events
AS SELECT * FROM events_kafka;

Bulk Insert

def bulk_insert_events(client, events, batch_size=10000):
    """Insert events efficiently in batches."""
    for i in range(0, len(events), batch_size):
        batch = events[i:i+batch_size]
        client.execute(
            'INSERT INTO events (timestamp, event_type, user_id, value) VALUES',
            batch,
            types_check=True
        )
        print(f"Inserted {i+len(batch)} events")

# Generate sample data
import datetime
import random

events = [
    (
        datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 24)),
        random.choice(['click', 'view', 'purchase']),
        random.randint(1, 10000),
        random.uniform(0, 100)
    )
    for _ in range(1000000)
]

bulk_insert_events(client, events, batch_size=50000)

Real-World Examples

Web Analytics Dashboard

# Create tables
client.execute('''
CREATE TABLE IF NOT EXISTS page_views (
  timestamp DateTime,
  session_id UUID,
  user_id UInt32,
  page_path String,
  duration_ms UInt32,
  status_code UInt16,
  referrer String
) ENGINE = MergeTree()
ORDER BY (timestamp, user_id)
PARTITION BY toYYYYMMDD(timestamp)
''')

# Query for dashboard
dashboard_data = client.execute('''
SELECT
  page_path,
  count() as views,
  count(DISTINCT user_id) as unique_users,
  round(avg(duration_ms), 0) as avg_duration_ms,
  round(quantile(0.95)(duration_ms), 0) as p95_duration_ms
FROM page_views
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY page_path
ORDER BY views DESC
LIMIT 20
''')

Log Analysis

CREATE TABLE application_logs (
  timestamp DateTime,
  level Enum8('DEBUG' = 1, 'INFO' = 2, 'WARN' = 3, 'ERROR' = 4),
  service String,
  message String,
  trace_id String,
  duration_ms UInt32
) ENGINE = MergeTree()
ORDER BY (timestamp, service, level)
PARTITION BY toYYYYMMDD(timestamp)
TTL timestamp + INTERVAL 30 DAY;

-- Error pattern analysis
SELECT
  service,
  level,
  substring(message, 1, 100) as error_pattern,
  count() as occurrences,
  max(timestamp) as last_seen,
  avg(duration_ms) as avg_duration
FROM application_logs
WHERE timestamp > now() - INTERVAL 24 HOUR
  AND level >= 3
GROUP BY service, level, error_pattern
ORDER BY occurrences DESC
LIMIT 20;

Time-Series Metrics

# High-cardinality time-series storage
client.execute('''
CREATE TABLE metrics (
  timestamp DateTime,
  metric_name String,
  metric_value Float64,
  tags Array(String)
) ENGINE = MergeTree()
ORDER BY (timestamp, metric_name)
PARTITION BY toYYYYMMDD(timestamp)
''')

# Pre-aggregated hourly table
client.execute('''
CREATE MATERIALIZED VIEW metrics_1h
ENGINE = AggregatingMergeTree()
ORDER BY (hour, metric_name)
AS
SELECT
  toStartOfHour(timestamp) as hour,
  metric_name,
  sumState(metric_value) as total,
  countState(1) as count,
  minState(metric_value) as min_value,
  maxState(metric_value) as max_value
FROM metrics
GROUP BY hour, metric_name
''')

# Query with finalization
result = client.execute('''
SELECT
  metric_name,
  sumMerge(total) as total_value,
  countMerge(count) as data_points,
  minMerge(min_value) as min_value,
  maxMerge(max_value) as max_value
FROM metrics_1h
WHERE hour >= now() - INTERVAL 7 DAY
GROUP BY metric_name
ORDER BY metric_name
''')

Troubleshooting Queries

Check Slow Queries

SELECT
  query_duration_ms,
  query,
  read_rows,
  result_rows
FROM system.query_log
WHERE query_duration_ms > 1000
ORDER BY query_duration_ms DESC
LIMIT 10;

Analyze Index Usage

SELECT
  table,
  name,
  type,
  data_compressed_bytes,
  data_uncompressed_bytes
FROM system.data_skipping_indices;

Check Table Size

SELECT
  database,
  table,
  formatReadableSize(sum(bytes)) as size,
  sum(rows) as rows
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;

Related: README.md | CONFIGURATION.md | COMPATIBILITY.md

Last Updated: December 2025