Skip to content

Time-Series Quick Start Guide

Time to Complete: 10 minutes Prerequisites: Rust 1.75+, HeliosDB installed Last Updated: January 4, 2026


Overview

This guide will help you get started with HeliosDB's time-series capabilities. You will learn to: 1. Create a time-series engine 2. Write time-series data points 3. Query time ranges 4. Configure retention policies 5. Set up downsampling


Step 1: Add Dependencies

Add HeliosDB storage crate to your Cargo.toml:

[dependencies]
heliosdb-storage = "6.0"
heliosdb-common = "6.0"
tokio = { version = "1.0", features = ["full"] }

Step 2: Create a Time-Series Engine

use heliosdb_storage::timeseries::{
    TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create time-series engine with daily partitioning
    let engine = TimeSeriesEngine::new(
        "/data/timeseries",
        PartitionStrategy::Daily,
    ).await?;

    println!("Time-series engine created successfully!");

    Ok(())
}

Step 3: Write Time-Series Data

Single Point

use heliosdb_storage::timeseries::TimeSeriesEngine;
use std::time::{SystemTime, UNIX_EPOCH};

async fn write_single_point(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
    // Write a single point (current timestamp is used if None)
    engine.write_point("sensor.temperature", 23.5, None).await?;

    // Write with specific timestamp (milliseconds since epoch)
    let timestamp = SystemTime::now()
        .duration_since(UNIX_EPOCH)?
        .as_millis() as u64;

    engine.write_point("sensor.humidity", 65.0, Some(timestamp)).await?;

    println!("Points written successfully!");
    Ok(())
}

Batch Write

use heliosdb_storage::timeseries::{TimeSeriesEngine, TimeSeriesPoint};
use std::collections::HashMap;

async fn write_batch(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
    let base_time = 1704067200000u64; // 2024-01-01 00:00:00 UTC

    // Create batch of points
    let points: Vec<TimeSeriesPoint> = (0..1000)
        .map(|i| {
            TimeSeriesPoint::new(
                "cpu.usage",
                base_time + (i * 1000),  // 1 second intervals
                45.0 + (i as f64 * 0.1).sin() * 10.0,
            )
        })
        .collect();

    // Write batch
    engine.write_points(&points).await?;

    println!("Wrote {} points", points.len());
    Ok(())
}

Points with Tags

use heliosdb_storage::timeseries::TimeSeriesPoint;
use std::collections::HashMap;

fn create_tagged_point() -> TimeSeriesPoint {
    let mut tags = HashMap::new();
    tags.insert("host".to_string(), "server-01".to_string());
    tags.insert("region".to_string(), "us-west".to_string());

    TimeSeriesPoint::with_tags(
        "cpu.usage",
        1704067200000,
        45.5,
        tags,
    )
}

Step 4: Query Time-Series Data

Basic Time Range Query

async fn query_range(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
    let start_time = 1704067200000u64;  // 2024-01-01 00:00:00
    let end_time = 1704153600000u64;    // 2024-01-02 00:00:00

    let points = engine.query_range(
        "cpu.usage",
        start_time,
        end_time,
    ).await?;

    println!("Retrieved {} points", points.len());

    for point in points.iter().take(5) {
        println!("  {} @ {} = {}", point.metric, point.timestamp, point.value);
    }

    Ok(())
}

Query with Aggregation

use heliosdb_storage::timeseries::query_engine::{
    TimeRangeQuery, TimeSeriesQueryEngine, AggregationFunction,
};
use std::time::Duration;

async fn query_with_aggregation(points: &[TimeSeriesPoint]) -> Result<(), Box<dyn std::error::Error>> {
    let mut engine = TimeSeriesQueryEngine::new();

    // Create query with 5-minute aggregation
    let query = TimeRangeQuery::new("cpu.usage", 1704067200000, 1704153600000)
        .with_aggregation(Duration::from_secs(300), AggregationFunction::Average);

    let results = engine.execute_query(&query, points).await?;

    println!("Aggregated to {} buckets", results.len());
    Ok(())
}

Step 5: Configure Retention Policies

use heliosdb_storage::timeseries::{TimeSeriesEngine, RetentionPolicy};
use std::time::Duration;

async fn configure_retention(engine: &mut TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
    // Set 30-day global retention policy
    let policy = RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600))
        .with_cleanup_interval(3600);  // Check every hour

    engine.set_retention_policy(policy).await;

    println!("Retention policy configured!");
    Ok(())
}

Step 6: Set Up Downsampling

use heliosdb_storage::timeseries::{
    TimeSeriesEngine, DownsamplingConfig, AggregationFunction,
};
use std::time::Duration;

async fn configure_downsampling(engine: &mut TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
    // Configure 5-minute downsampling for CPU metrics
    let config = DownsamplingConfig::new(Duration::from_secs(300))
        .with_aggregation(AggregationFunction::Average);

    engine.configure_downsampling("cpu.*", config).await?;

    println!("Downsampling configured!");
    Ok(())
}

Step 7: Start Background Maintenance

async fn start_maintenance(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
    // Start background tasks for retention cleanup, downsampling, and compaction
    engine.start_maintenance().await?;

    println!("Background maintenance started!");
    Ok(())
}

Step 8: Check Statistics

async fn check_stats(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
    let stats = engine.stats().await;

    println!("Time-Series Statistics:");
    println!("  Total partitions: {}", stats.total_partitions);
    println!("  Retention stats: {:?}", stats.retention_stats);
    println!("  Compression stats: {:?}", stats.compression_stats);

    Ok(())
}

Complete Example

use heliosdb_storage::timeseries::{
    TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy,
    RetentionPolicy, DownsamplingConfig, AggregationFunction,
};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Create engine
    let mut engine = TimeSeriesEngine::new(
        "/data/timeseries",
        PartitionStrategy::Daily,
    ).await?;

    // 2. Configure retention (30 days)
    engine.set_retention_policy(
        RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600))
    ).await;

    // 3. Configure downsampling (5-minute averages)
    engine.configure_downsampling(
        "sensors.*",
        DownsamplingConfig::new(Duration::from_secs(300))
            .with_aggregation(AggregationFunction::Average),
    ).await?;

    // 4. Start background maintenance
    engine.start_maintenance().await?;

    // 5. Write data
    for i in 0..100 {
        engine.write_point(
            "sensors.temperature",
            23.5 + (i as f64 * 0.1),
            None,
        ).await?;
    }

    // 6. Query data
    let now = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)?
        .as_millis() as u64;

    let points = engine.query_range(
        "sensors.temperature",
        now - 3600000,  // Last hour
        now,
    ).await?;

    println!("Retrieved {} points", points.len());

    // 7. Check stats
    let stats = engine.stats().await;
    println!("Partitions: {}", stats.total_partitions);

    Ok(())
}

Using SQL Interface

If you prefer SQL, HeliosDB supports time-series operations through SQL:

Create Time-Series Table

-- Create a time-series optimized table
CREATE TABLE sensor_readings (
    timestamp TIMESTAMPTZ NOT NULL,
    sensor_id VARCHAR(64) NOT NULL,
    temperature DOUBLE PRECISION,
    humidity DOUBLE PRECISION
) WITH (
    partition_by = 'DAY',
    retention = '30 days',
    compression = 'gorilla'
);

Insert Data

-- Insert sensor readings
INSERT INTO sensor_readings (timestamp, sensor_id, temperature, humidity)
VALUES
    (NOW(), 'sensor-001', 23.5, 65.0),
    (NOW(), 'sensor-002', 24.1, 62.5);

Query with Time Bucket

-- Aggregate by 5-minute buckets
SELECT
    time_bucket('5 minutes', timestamp) AS bucket,
    sensor_id,
    AVG(temperature) AS avg_temp,
    MAX(temperature) AS max_temp
FROM sensor_readings
WHERE timestamp > NOW() - INTERVAL '1 day'
GROUP BY bucket, sensor_id
ORDER BY bucket DESC;

Configure Retention via SQL

-- Set retention policy
ALTER TABLE sensor_readings
SET RETENTION POLICY '30 days';

-- Set downsampling
ALTER TABLE sensor_readings
ADD CONTINUOUS AGGREGATE sensor_readings_1h
    EVERY '1 hour'
    WITH (aggregation = 'avg');

Common Configuration Patterns

High-Frequency IoT Data

let config = IngestionConfig {
    batch_size: 10000,
    batch_timeout: Duration::from_millis(100),
    handle_out_of_order: true,
    max_time_skew: 60000,  // 1 minute tolerance
    ..Default::default()
};

let engine = TimeSeriesEngine::new(path, PartitionStrategy::Hourly).await?;

Financial Tick Data

let engine = TimeSeriesEngine::new(path, PartitionStrategy::Daily).await?;

// High compression for historical data
let compression_config = BatchCompressionConfig {
    block_size: 4096,  // Larger blocks for better compression
    compress_timestamps: true,
    compress_values: true,
    compress_metrics: true,
    min_ratio: 1.5,
};

Metrics and Monitoring

let engine = TimeSeriesEngine::new(path, PartitionStrategy::Daily).await?;

// Multi-tier downsampling: raw -> 1m -> 5m -> 1h -> 1d
engine.configure_downsampling("metrics.*",
    DownsamplingConfig::new(Duration::from_secs(60))
        .add_tier(DownsamplingTier::new(Duration::from_secs(300), AggregationFunction::Average)
            .with_age_threshold(Duration::from_secs(3600)))
        .add_tier(DownsamplingTier::new(Duration::from_secs(3600), AggregationFunction::Average)
            .with_age_threshold(Duration::from_secs(86400)))
        .add_tier(DownsamplingTier::new(Duration::from_secs(86400), AggregationFunction::Average)
            .with_age_threshold(Duration::from_secs(604800)))
).await?;

Troubleshooting

Low Compression Ratio

// Ensure data is sorted and has regular intervals
// Check compression configuration
let config = BatchCompressionConfig {
    compress_timestamps: true,
    compress_values: true,
    block_size: 1024,
    ..Default::default()
};

High Write Latency

// Increase batch size for better throughput
let config = IngestionConfig {
    batch_size: 50000,
    batch_timeout: Duration::from_millis(500),
    write_workers: 8,
    ..Default::default()
};

Query Too Slow

// Check partition pruning
let partitions = manager.get_partitions_for_range(start, end).await?;
println!("Querying {} partitions", partitions.len());

// Add index on frequently queried tags
// Use continuous aggregates for dashboards

Next Steps

Topic Guide
Complete documentation USER_GUIDE.md
Code examples EXAMPLES.md
Performance optimization ../timeseries/PERFORMANCE_TUNING.md
Compression details ../timeseries/F3.8-timeseries-compression.md

See Also: README.md | User Guide | Examples