Skip to content

Time-Series User Guide

Version: 6.0 Last Updated: January 4, 2026 Audience: Developers and Database Administrators


Table of Contents

  1. Time-Series Data Types
  2. Compression Options
  3. Retention Policies
  4. Continuous Aggregates
  5. Gap Filling
  6. Time-Zone Handling
  7. Integration with MVCC
  8. Query Engine
  9. Ingestion Pipeline
  10. Partition Management

Time-Series Data Types

Core Data Structures

TimeSeriesPoint

The fundamental data structure for time-series data:

pub struct TimeSeriesPoint {
    /// Metric identifier (e.g., "cpu.usage", "sensor.temperature")
    pub metric: String,

    /// Timestamp in milliseconds since Unix epoch (UTC)
    pub timestamp: u64,

    /// Numeric value
    pub value: f64,

    /// Optional dimensional tags for filtering
    pub tags: HashMap<String, String>,
}

TSValue Enum

For mixed-type time-series:

pub enum TSValue {
    Null,              // Missing value
    Integer(i64),      // Integer values
    Float(f64),        // Floating-point values
    Boolean(bool),     // Boolean flags
}

Creating Data Points

// Simple point
let point = TimeSeriesPoint::new(
    "server.cpu_percent",
    1704067200000,  // Timestamp in ms
    75.5,
);

// Point with tags (dimensions)
let mut tags = HashMap::new();
tags.insert("host".to_string(), "web-server-01".to_string());
tags.insert("datacenter".to_string(), "us-east-1".to_string());
tags.insert("environment".to_string(), "production".to_string());

let tagged_point = TimeSeriesPoint::with_tags(
    "server.cpu_percent",
    1704067200000,
    75.5,
    tags,
);

Timestamp Conventions

Format Example Usage
Unix milliseconds 1704067200000 Primary format in Rust API
Unix seconds 1704067200 Convert with * 1000
ISO 8601 2024-01-01T00:00:00Z SQL queries
RFC 3339 2024-01-01T00:00:00.000Z API responses

Compression Options

HeliosDB provides multiple compression strategies optimized for different data patterns.

Gorilla Compression

Facebook's Gorilla algorithm is the default for time-series data:

use heliosdb_storage::timeseries::compression::{
    GorillaCompressor, CompressionConfig,
};

let config = CompressionConfig {
    compress_timestamps: true,    // Delta-of-delta encoding
    compress_values: true,        // XOR + bit-packing
    block_size: 1024,             // Points per block
    min_ratio: 1.5,               // Minimum compression ratio
};

let compressor = GorillaCompressor::new(config);

Algorithm Details

Timestamp Compression (Delta-of-Delta): - First timestamp: stored in full (64 bits) - Second timestamp: delta from first (variable bits) - Subsequent timestamps: delta-of-delta (1-32 bits)

Timestamp sequence: [1000, 1060, 1120, 1180, 1240]
Delta sequence:     [     60,   60,   60,   60]
Delta-of-delta:     [         0,    0,    0]

Value Compression (XOR + Bit-packing): - XOR with previous value - Leading/trailing zero compression - Typical 4-8 bits per value for slowly changing data

Batch Compression

For high-throughput scenarios:

use heliosdb_storage::timeseries::compression::{
    BatchCompressor, BatchCompressionConfig,
};

let config = BatchCompressionConfig {
    block_size: 4096,           // Larger blocks for better ratio
    compress_timestamps: true,
    compress_values: true,
    compress_metrics: true,      // Dictionary compression for metric names
    enable_delta_encoding: true,
    target_ratio: 10.0,          // Target 10x compression
};

let compressor = BatchCompressor::with_config(config);

// Compress a batch of points
let compressed = compressor.compress_batch(&points)?;

// Decompress
let decompressed = compressor.decompress_batch(&compressed)?;

Dictionary Compression

For repeated strings (metric names, tags):

use heliosdb_storage::timeseries::compression::DictionaryCompressor;

let mut dict = DictionaryCompressor::new();

// Encode metrics to compact IDs
let id1 = dict.encode("server.cpu_percent")?;  // Returns 0
let id2 = dict.encode("server.memory_used")?;  // Returns 1
let id3 = dict.encode("server.cpu_percent")?;  // Returns 0 (reused)

// Decode back
let metric = dict.decode(0)?;  // "server.cpu_percent"

Compression Configuration Guide

Use Case Block Size Recommended Settings
Real-time streaming 256-512 Low latency, moderate ratio
Batch ingestion 4096+ High ratio, higher latency
Long-term storage 8192+ Maximum compression
High-frequency data 1024 Balance of ratio and latency

Retention Policies

Policy Configuration

use heliosdb_storage::timeseries::retention::{
    RetentionPolicy, RetentionEngine,
};
use std::time::Duration;

// Time-based retention
let policy = RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600))  // 30 days
    .with_cleanup_interval(3600)   // Check every hour
    .with_auto_cleanup(true);

// Size-based retention
let policy = RetentionPolicy::new(Duration::from_secs(365 * 24 * 3600))  // 1 year
    .with_max_size(100_000_000_000)  // 100 GB limit
    .with_cleanup_interval(3600);

Per-Metric Policies

let mut retention_engine = RetentionEngine::new(storage);

// Global policy: 30 days
retention_engine.set_policy(
    RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600))
);

// High-frequency metrics: 7 days
retention_engine.set_metric_policy(
    "metrics.high_freq.*",
    RetentionPolicy::new(Duration::from_secs(7 * 24 * 3600))
);

// Critical metrics: 90 days
retention_engine.set_metric_policy(
    "metrics.critical.*",
    RetentionPolicy::new(Duration::from_secs(90 * 24 * 3600))
);

Manual Cleanup

// Run immediate cleanup
retention_engine.cleanup_expired().await?;

// Cleanup by size constraint
retention_engine.cleanup_by_size(50_000_000_000).await?;  // Keep under 50 GB

// Delete specific metric data
let deleted = retention_engine.delete_range(
    "old.metric",
    start_timestamp,
    end_timestamp,
).await?;

println!("Deleted {} records", deleted);

Retention Statistics

let stats = retention_engine.stats();
println!("Total deleted records: {}", stats.total_deleted_records);
println!("Total freed bytes: {}", stats.total_deleted_bytes);
println!("Cleanup operations: {}", stats.cleanup_count);

Estimate Savings

let estimate = retention_engine.estimate_savings(&policy).await?;
println!("Expired records: {}", estimate.expired_records);
println!("Bytes to free: {}", estimate.expired_bytes);
println!("Retention rate: {:.1}%", estimate.retention_rate * 100.0);

Continuous Aggregates

Continuous aggregates pre-compute rollups for fast dashboard queries.

Configuration

use heliosdb_storage::timeseries::downsampling::{
    DownsamplingEngine, DownsamplingConfig, DownsamplingTier, AggregationFunction,
};

let mut engine = DownsamplingEngine::new(storage);

// Configure multi-tier downsampling
let config = DownsamplingConfig::new(Duration::from_secs(60))  // 1-minute base
    .with_aggregation(AggregationFunction::Average)
    .add_tier(
        DownsamplingTier::new(Duration::from_secs(300), AggregationFunction::Average)
            .with_age_threshold(Duration::from_secs(3600))     // After 1 hour
            .with_retention(Duration::from_secs(7 * 24 * 3600)) // Keep 7 days
    )
    .add_tier(
        DownsamplingTier::new(Duration::from_secs(3600), AggregationFunction::Average)
            .with_age_threshold(Duration::from_secs(24 * 3600)) // After 1 day
            .with_retention(Duration::from_secs(30 * 24 * 3600)) // Keep 30 days
    )
    .add_tier(
        DownsamplingTier::new(Duration::from_secs(86400), AggregationFunction::Average)
            .with_age_threshold(Duration::from_secs(7 * 24 * 3600)) // After 7 days
    );

engine.configure_metric("cpu.usage", config).await?;

Aggregation Functions

pub enum AggregationFunction {
    Average,    // Mean value
    Min,        // Minimum value
    Max,        // Maximum value
    Sum,        // Sum of values
    Count,      // Number of points
    First,      // First value in bucket
    Last,       // Last value in bucket
    StdDev,     // Standard deviation
}

Process Downsampling

// Manual processing
engine.process_pending().await?;

// On-demand downsampling for a range
let downsampled = engine.downsample_range(
    "cpu.usage",
    start_time,
    end_time,
    AggregationFunction::Average,
    Duration::from_secs(3600),  // 1-hour buckets
).await?;

Gap Filling

Handle missing data points with various interpolation strategies.

Fill Strategies

pub enum FillMethod {
    Null,       // Leave gaps as null
    Zero,       // Fill with zero
    Forward,    // Use previous value (LOCF)
    Backward,   // Use next value
    Linear,     // Linear interpolation
}

Using Gap Filling

use heliosdb_storage::timeseries::TimeSeriesOps;

let filled = TimeSeriesOps::fill_missing(
    &time_series,
    TimeInterval::Minute(1),  // Expected interval
    FillMethod::Linear,       // Interpolation strategy
)?;

Gap Filling in SQL

-- Fill gaps with linear interpolation
SELECT
    time_bucket_gapfill('5 minutes', timestamp) AS bucket,
    sensor_id,
    interpolate(avg(temperature)) AS temperature
FROM sensor_readings
WHERE timestamp BETWEEN '2025-01-01' AND '2025-01-02'
GROUP BY bucket, sensor_id
ORDER BY bucket;

-- Forward fill (Last Observation Carried Forward)
SELECT
    time_bucket_gapfill('1 hour', timestamp) AS bucket,
    locf(avg(value)) AS filled_value
FROM metrics
WHERE timestamp > NOW() - INTERVAL '1 day'
GROUP BY bucket;

Time-Zone Handling

Storage Convention

All timestamps are stored in UTC. Time-zone conversion happens at query time.

Rust API

use chrono::{DateTime, Utc, TimeZone};
use chrono_tz::America::New_York;

// Store timestamp in UTC
let utc_time = Utc::now();
let timestamp_ms = utc_time.timestamp_millis() as u64;

// Convert from local time to UTC for storage
let local_time = New_York.with_ymd_and_hms(2025, 1, 1, 9, 0, 0).unwrap();
let utc_for_storage = local_time.with_timezone(&Utc);

SQL Queries

-- Query with timezone conversion
SELECT
    timestamp AT TIME ZONE 'America/New_York' AS local_time,
    value
FROM metrics
WHERE timestamp > NOW() - INTERVAL '24 hours'
ORDER BY timestamp;

-- Aggregate by local day (accounting for DST)
SELECT
    date_trunc('day', timestamp AT TIME ZONE 'America/New_York') AS local_day,
    AVG(value) AS daily_avg
FROM metrics
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY local_day;

Time Interval Truncation

use heliosdb_storage::timeseries::TimeInterval;

let interval = TimeInterval::Hour(1);
let truncated = interval.truncate(datetime)?;
// 2025-01-01 14:37:42 -> 2025-01-01 14:00:00

Integration with MVCC

HeliosDB's time-series storage integrates with MVCC for transactional guarantees.

Point-in-Time Queries

// Query data as it existed at a specific time
let snapshot_time = 1704067200000;  // 2024-01-01 00:00:00 UTC

let historical_points = engine.query_at_snapshot(
    "cpu.usage",
    start_time,
    end_time,
    snapshot_time,  // MVCC snapshot
).await?;

Transactional Writes

// Write multiple metrics atomically
engine.begin_transaction().await?;

engine.write_point("cpu.usage", 75.5, None).await?;
engine.write_point("memory.usage", 62.3, None).await?;
engine.write_point("disk.usage", 45.0, None).await?;

engine.commit().await?;

Consistent Reads

// Read consistent snapshot across multiple metrics
let snapshot = engine.get_snapshot().await?;

let cpu = snapshot.query_range("cpu.usage", start, end).await?;
let memory = snapshot.query_range("memory.usage", start, end).await?;
// Both queries see the same consistent state

Query Engine

Time Range Queries

use heliosdb_storage::timeseries::query_engine::{
    TimeRangeQuery, TimeSeriesQueryEngine, AggregationFunction,
};

let mut engine = TimeSeriesQueryEngine::new();

let query = TimeRangeQuery::new("cpu.usage", start_time, end_time)
    .with_tag("host", "server-01")
    .with_limit(1000)
    .with_aggregation(Duration::from_secs(300), AggregationFunction::Average);

let results = engine.execute_query(&query, &data_points).await?;

Window Functions

use heliosdb_storage::timeseries::query_engine::WindowType;

// Tumbling windows (non-overlapping)
let tumbling = engine.execute_windowed_query(
    &points,
    WindowType::Tumbling { size: Duration::from_secs(300) },
    AggregationFunction::Average,
)?;

// Sliding windows (overlapping)
let sliding = engine.execute_windowed_query(
    &points,
    WindowType::Sliding {
        size: Duration::from_secs(300),
        slide: Duration::from_secs(60),
    },
    AggregationFunction::Average,
)?;

// Session windows (gap-based)
let sessions = engine.execute_windowed_query(
    &points,
    WindowType::Session { gap: Duration::from_secs(1800) },
    AggregationFunction::Count,
)?;

Time-Based Joins

use heliosdb_storage::timeseries::query_engine::{
    TimeJoinConfig, TimeJoinType, FillStrategy,
};

let config = TimeJoinConfig::new(TimeJoinType::AsOf, 5000)  // 5 second tolerance
    .with_fill_strategy(FillStrategy::Forward);

let joined = engine.execute_time_join(
    &left_series,
    &right_series,
    &config,
)?;

Query Result Caching

// Caching is automatic with 5-minute TTL
// Clear cache if needed
engine.clear_cache();

Ingestion Pipeline

Configuration

use heliosdb_storage::timeseries::ingestion::{
    IngestionPipeline, IngestionConfig,
};

let config = IngestionConfig {
    batch_size: 10000,           // Points per batch
    batch_timeout: Duration::from_millis(100),  // Max wait time
    write_workers: 4,            // Parallel write threads
    buffer_capacity: 100000,     // Out-of-order buffer size
    handle_out_of_order: true,   // Handle late-arriving data
    max_time_skew: 60000,        // 1 minute tolerance
    backfill_mode: false,        // Normal mode
};

let pipeline = IngestionPipeline::new(config, storage, compressor);

Ingesting Data

// Single point
pipeline.ingest(point).await?;

// Batch ingestion (preferred)
pipeline.ingest_batch(&points).await?;

// Historical data backfill
pipeline.ingest_backfill(&historical_points).await?;

// Force flush pending batches
pipeline.flush().await?;

Ingestion Statistics

let stats = pipeline.stats().await;

println!("Total points: {}", stats.total_points);
println!("Throughput: {:.0} pts/sec", stats.points_per_second);
println!("Batches written: {}", stats.batches_written);
println!("Avg batch size: {:.0}", stats.avg_batch_size);
println!("Out-of-order: {}", stats.out_of_order_count);
println!("Avg latency: {:.2}ms", stats.avg_write_latency_ms);
println!("Buffer utilization: {:.1}%", stats.buffer_utilization * 100.0);

Graceful Shutdown

pipeline.shutdown().await?;

Partition Management

Partition Strategies

use heliosdb_storage::timeseries::partitioning::{
    PartitionManager, PartitionStrategy,
};

// Choose strategy based on data volume
let strategy = PartitionStrategy::Daily;  // Most common

// Available strategies:
// - Hourly: For very high volume (1M+ points/hour)
// - Daily: Standard metrics (recommended)
// - Weekly: Lower volume data
// - Monthly: Long-term storage
// - Yearly: Archive data
// - Custom(seconds): Flexible intervals

Partition Operations

let manager = PartitionManager::new(
    "/data/partitions",
    PartitionStrategy::Daily,
).await?;

// Compute partition for timestamp
let partition_id = manager.compute_partition_id(timestamp).await?;

// Get or create partition
let partition = manager.get_or_create_partition(timestamp).await?;

// Query partitions for time range
let partitions = manager.get_partitions_for_range(start, end).await?;

// List all partitions
let all_partitions = manager.list_partitions().await;

// Archive old partition
manager.archive_partition(partition_id).await?;

// Delete partition
manager.delete_partition(partition_id).await?;

Partition Metadata

let partition = manager.get_partition(partition_id).await?;

if let Some(metadata) = partition {
    println!("Partition ID: {}", metadata.partition_id);
    println!("Path: {:?}", metadata.path);
    println!("Records: {}", metadata.record_count);
    println!("Size: {} bytes", metadata.size_bytes);
    println!("Time range: {} - {}", metadata.min_timestamp, metadata.max_timestamp);
    println!("Compressed: {}", metadata.compressed);
    println!("Archived: {}", metadata.archived);
}

Storage Statistics

let total_size = manager.total_size().await;
let total_records = manager.total_records().await;

println!("Total storage: {} GB", total_size / 1_000_000_000);
println!("Total records: {}", total_records);

Best Practices

Naming Conventions

# Hierarchical metric names
service.component.metric_name

# Examples
web.api.request_count
database.queries.latency_ms
cache.redis.hit_rate

Tag Design

// Good: Low cardinality, useful for filtering
tags.insert("environment", "production");
tags.insert("region", "us-east-1");
tags.insert("service", "api-gateway");

// Avoid: High cardinality tags (creates too many series)
// tags.insert("user_id", user_id);  // Don't do this
// tags.insert("request_id", request_id);  // Don't do this

Batch Size Tuning

Scenario Recommended Batch Size
Real-time streaming 1,000 - 5,000
Batch processing 10,000 - 50,000
Historical backfill 50,000 - 100,000

Compression vs. Query Speed

Priority Configuration
Maximum compression Large blocks (8192+), all compression enabled
Fast queries Smaller blocks (512-1024), skip dictionary compression
Balanced 1024-2048 block size, all compression enabled

See Also: README | Quick Start | Examples