Time-Series User Guide¶
Version: 6.0 Last Updated: January 4, 2026 Audience: Developers and Database Administrators
Table of Contents¶
- Time-Series Data Types
- Compression Options
- Retention Policies
- Continuous Aggregates
- Gap Filling
- Time-Zone Handling
- Integration with MVCC
- Query Engine
- Ingestion Pipeline
- Partition Management
Time-Series Data Types¶
Core Data Structures¶
TimeSeriesPoint¶
The fundamental data structure for time-series data:
pub struct TimeSeriesPoint {
/// Metric identifier (e.g., "cpu.usage", "sensor.temperature")
pub metric: String,
/// Timestamp in milliseconds since Unix epoch (UTC)
pub timestamp: u64,
/// Numeric value
pub value: f64,
/// Optional dimensional tags for filtering
pub tags: HashMap<String, String>,
}
TSValue Enum¶
For mixed-type time-series:
pub enum TSValue {
Null, // Missing value
Integer(i64), // Integer values
Float(f64), // Floating-point values
Boolean(bool), // Boolean flags
}
Creating Data Points¶
// Simple point
let point = TimeSeriesPoint::new(
"server.cpu_percent",
1704067200000, // Timestamp in ms
75.5,
);
// Point with tags (dimensions)
let mut tags = HashMap::new();
tags.insert("host".to_string(), "web-server-01".to_string());
tags.insert("datacenter".to_string(), "us-east-1".to_string());
tags.insert("environment".to_string(), "production".to_string());
let tagged_point = TimeSeriesPoint::with_tags(
"server.cpu_percent",
1704067200000,
75.5,
tags,
);
Timestamp Conventions¶
| Format | Example | Usage |
|---|---|---|
| Unix milliseconds | 1704067200000 |
Primary format in Rust API |
| Unix seconds | 1704067200 |
Convert with * 1000 |
| ISO 8601 | 2024-01-01T00:00:00Z |
SQL queries |
| RFC 3339 | 2024-01-01T00:00:00.000Z |
API responses |
Compression Options¶
HeliosDB provides multiple compression strategies optimized for different data patterns.
Gorilla Compression¶
Facebook's Gorilla algorithm is the default for time-series data:
use heliosdb_storage::timeseries::compression::{
GorillaCompressor, CompressionConfig,
};
let config = CompressionConfig {
compress_timestamps: true, // Delta-of-delta encoding
compress_values: true, // XOR + bit-packing
block_size: 1024, // Points per block
min_ratio: 1.5, // Minimum compression ratio
};
let compressor = GorillaCompressor::new(config);
Algorithm Details¶
Timestamp Compression (Delta-of-Delta): - First timestamp: stored in full (64 bits) - Second timestamp: delta from first (variable bits) - Subsequent timestamps: delta-of-delta (1-32 bits)
Timestamp sequence: [1000, 1060, 1120, 1180, 1240]
Delta sequence: [ 60, 60, 60, 60]
Delta-of-delta: [ 0, 0, 0]
Value Compression (XOR + Bit-packing): - XOR with previous value - Leading/trailing zero compression - Typical 4-8 bits per value for slowly changing data
Batch Compression¶
For high-throughput scenarios:
use heliosdb_storage::timeseries::compression::{
BatchCompressor, BatchCompressionConfig,
};
let config = BatchCompressionConfig {
block_size: 4096, // Larger blocks for better ratio
compress_timestamps: true,
compress_values: true,
compress_metrics: true, // Dictionary compression for metric names
enable_delta_encoding: true,
target_ratio: 10.0, // Target 10x compression
};
let compressor = BatchCompressor::with_config(config);
// Compress a batch of points
let compressed = compressor.compress_batch(&points)?;
// Decompress
let decompressed = compressor.decompress_batch(&compressed)?;
Dictionary Compression¶
For repeated strings (metric names, tags):
use heliosdb_storage::timeseries::compression::DictionaryCompressor;
let mut dict = DictionaryCompressor::new();
// Encode metrics to compact IDs
let id1 = dict.encode("server.cpu_percent")?; // Returns 0
let id2 = dict.encode("server.memory_used")?; // Returns 1
let id3 = dict.encode("server.cpu_percent")?; // Returns 0 (reused)
// Decode back
let metric = dict.decode(0)?; // "server.cpu_percent"
Compression Configuration Guide¶
| Use Case | Block Size | Recommended Settings |
|---|---|---|
| Real-time streaming | 256-512 | Low latency, moderate ratio |
| Batch ingestion | 4096+ | High ratio, higher latency |
| Long-term storage | 8192+ | Maximum compression |
| High-frequency data | 1024 | Balance of ratio and latency |
Retention Policies¶
Policy Configuration¶
use heliosdb_storage::timeseries::retention::{
RetentionPolicy, RetentionEngine,
};
use std::time::Duration;
// Time-based retention
let policy = RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600)) // 30 days
.with_cleanup_interval(3600) // Check every hour
.with_auto_cleanup(true);
// Size-based retention
let policy = RetentionPolicy::new(Duration::from_secs(365 * 24 * 3600)) // 1 year
.with_max_size(100_000_000_000) // 100 GB limit
.with_cleanup_interval(3600);
Per-Metric Policies¶
let mut retention_engine = RetentionEngine::new(storage);
// Global policy: 30 days
retention_engine.set_policy(
RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600))
);
// High-frequency metrics: 7 days
retention_engine.set_metric_policy(
"metrics.high_freq.*",
RetentionPolicy::new(Duration::from_secs(7 * 24 * 3600))
);
// Critical metrics: 90 days
retention_engine.set_metric_policy(
"metrics.critical.*",
RetentionPolicy::new(Duration::from_secs(90 * 24 * 3600))
);
Manual Cleanup¶
// Run immediate cleanup
retention_engine.cleanup_expired().await?;
// Cleanup by size constraint
retention_engine.cleanup_by_size(50_000_000_000).await?; // Keep under 50 GB
// Delete specific metric data
let deleted = retention_engine.delete_range(
"old.metric",
start_timestamp,
end_timestamp,
).await?;
println!("Deleted {} records", deleted);
Retention Statistics¶
let stats = retention_engine.stats();
println!("Total deleted records: {}", stats.total_deleted_records);
println!("Total freed bytes: {}", stats.total_deleted_bytes);
println!("Cleanup operations: {}", stats.cleanup_count);
Estimate Savings¶
let estimate = retention_engine.estimate_savings(&policy).await?;
println!("Expired records: {}", estimate.expired_records);
println!("Bytes to free: {}", estimate.expired_bytes);
println!("Retention rate: {:.1}%", estimate.retention_rate * 100.0);
Continuous Aggregates¶
Continuous aggregates pre-compute rollups for fast dashboard queries.
Configuration¶
use heliosdb_storage::timeseries::downsampling::{
DownsamplingEngine, DownsamplingConfig, DownsamplingTier, AggregationFunction,
};
let mut engine = DownsamplingEngine::new(storage);
// Configure multi-tier downsampling
let config = DownsamplingConfig::new(Duration::from_secs(60)) // 1-minute base
.with_aggregation(AggregationFunction::Average)
.add_tier(
DownsamplingTier::new(Duration::from_secs(300), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(3600)) // After 1 hour
.with_retention(Duration::from_secs(7 * 24 * 3600)) // Keep 7 days
)
.add_tier(
DownsamplingTier::new(Duration::from_secs(3600), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(24 * 3600)) // After 1 day
.with_retention(Duration::from_secs(30 * 24 * 3600)) // Keep 30 days
)
.add_tier(
DownsamplingTier::new(Duration::from_secs(86400), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(7 * 24 * 3600)) // After 7 days
);
engine.configure_metric("cpu.usage", config).await?;
Aggregation Functions¶
pub enum AggregationFunction {
Average, // Mean value
Min, // Minimum value
Max, // Maximum value
Sum, // Sum of values
Count, // Number of points
First, // First value in bucket
Last, // Last value in bucket
StdDev, // Standard deviation
}
Process Downsampling¶
// Manual processing
engine.process_pending().await?;
// On-demand downsampling for a range
let downsampled = engine.downsample_range(
"cpu.usage",
start_time,
end_time,
AggregationFunction::Average,
Duration::from_secs(3600), // 1-hour buckets
).await?;
Gap Filling¶
Handle missing data points with various interpolation strategies.
Fill Strategies¶
pub enum FillMethod {
Null, // Leave gaps as null
Zero, // Fill with zero
Forward, // Use previous value (LOCF)
Backward, // Use next value
Linear, // Linear interpolation
}
Using Gap Filling¶
use heliosdb_storage::timeseries::TimeSeriesOps;
let filled = TimeSeriesOps::fill_missing(
&time_series,
TimeInterval::Minute(1), // Expected interval
FillMethod::Linear, // Interpolation strategy
)?;
Gap Filling in SQL¶
-- Fill gaps with linear interpolation
SELECT
time_bucket_gapfill('5 minutes', timestamp) AS bucket,
sensor_id,
interpolate(avg(temperature)) AS temperature
FROM sensor_readings
WHERE timestamp BETWEEN '2025-01-01' AND '2025-01-02'
GROUP BY bucket, sensor_id
ORDER BY bucket;
-- Forward fill (Last Observation Carried Forward)
SELECT
time_bucket_gapfill('1 hour', timestamp) AS bucket,
locf(avg(value)) AS filled_value
FROM metrics
WHERE timestamp > NOW() - INTERVAL '1 day'
GROUP BY bucket;
Time-Zone Handling¶
Storage Convention¶
All timestamps are stored in UTC. Time-zone conversion happens at query time.
Rust API¶
use chrono::{DateTime, Utc, TimeZone};
use chrono_tz::America::New_York;
// Store timestamp in UTC
let utc_time = Utc::now();
let timestamp_ms = utc_time.timestamp_millis() as u64;
// Convert from local time to UTC for storage
let local_time = New_York.with_ymd_and_hms(2025, 1, 1, 9, 0, 0).unwrap();
let utc_for_storage = local_time.with_timezone(&Utc);
SQL Queries¶
-- Query with timezone conversion
SELECT
timestamp AT TIME ZONE 'America/New_York' AS local_time,
value
FROM metrics
WHERE timestamp > NOW() - INTERVAL '24 hours'
ORDER BY timestamp;
-- Aggregate by local day (accounting for DST)
SELECT
date_trunc('day', timestamp AT TIME ZONE 'America/New_York') AS local_day,
AVG(value) AS daily_avg
FROM metrics
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY local_day;
Time Interval Truncation¶
use heliosdb_storage::timeseries::TimeInterval;
let interval = TimeInterval::Hour(1);
let truncated = interval.truncate(datetime)?;
// 2025-01-01 14:37:42 -> 2025-01-01 14:00:00
Integration with MVCC¶
HeliosDB's time-series storage integrates with MVCC for transactional guarantees.
Point-in-Time Queries¶
// Query data as it existed at a specific time
let snapshot_time = 1704067200000; // 2024-01-01 00:00:00 UTC
let historical_points = engine.query_at_snapshot(
"cpu.usage",
start_time,
end_time,
snapshot_time, // MVCC snapshot
).await?;
Transactional Writes¶
// Write multiple metrics atomically
engine.begin_transaction().await?;
engine.write_point("cpu.usage", 75.5, None).await?;
engine.write_point("memory.usage", 62.3, None).await?;
engine.write_point("disk.usage", 45.0, None).await?;
engine.commit().await?;
Consistent Reads¶
// Read consistent snapshot across multiple metrics
let snapshot = engine.get_snapshot().await?;
let cpu = snapshot.query_range("cpu.usage", start, end).await?;
let memory = snapshot.query_range("memory.usage", start, end).await?;
// Both queries see the same consistent state
Query Engine¶
Time Range Queries¶
use heliosdb_storage::timeseries::query_engine::{
TimeRangeQuery, TimeSeriesQueryEngine, AggregationFunction,
};
let mut engine = TimeSeriesQueryEngine::new();
let query = TimeRangeQuery::new("cpu.usage", start_time, end_time)
.with_tag("host", "server-01")
.with_limit(1000)
.with_aggregation(Duration::from_secs(300), AggregationFunction::Average);
let results = engine.execute_query(&query, &data_points).await?;
Window Functions¶
use heliosdb_storage::timeseries::query_engine::WindowType;
// Tumbling windows (non-overlapping)
let tumbling = engine.execute_windowed_query(
&points,
WindowType::Tumbling { size: Duration::from_secs(300) },
AggregationFunction::Average,
)?;
// Sliding windows (overlapping)
let sliding = engine.execute_windowed_query(
&points,
WindowType::Sliding {
size: Duration::from_secs(300),
slide: Duration::from_secs(60),
},
AggregationFunction::Average,
)?;
// Session windows (gap-based)
let sessions = engine.execute_windowed_query(
&points,
WindowType::Session { gap: Duration::from_secs(1800) },
AggregationFunction::Count,
)?;
Time-Based Joins¶
use heliosdb_storage::timeseries::query_engine::{
TimeJoinConfig, TimeJoinType, FillStrategy,
};
let config = TimeJoinConfig::new(TimeJoinType::AsOf, 5000) // 5 second tolerance
.with_fill_strategy(FillStrategy::Forward);
let joined = engine.execute_time_join(
&left_series,
&right_series,
&config,
)?;
Query Result Caching¶
Ingestion Pipeline¶
Configuration¶
use heliosdb_storage::timeseries::ingestion::{
IngestionPipeline, IngestionConfig,
};
let config = IngestionConfig {
batch_size: 10000, // Points per batch
batch_timeout: Duration::from_millis(100), // Max wait time
write_workers: 4, // Parallel write threads
buffer_capacity: 100000, // Out-of-order buffer size
handle_out_of_order: true, // Handle late-arriving data
max_time_skew: 60000, // 1 minute tolerance
backfill_mode: false, // Normal mode
};
let pipeline = IngestionPipeline::new(config, storage, compressor);
Ingesting Data¶
// Single point
pipeline.ingest(point).await?;
// Batch ingestion (preferred)
pipeline.ingest_batch(&points).await?;
// Historical data backfill
pipeline.ingest_backfill(&historical_points).await?;
// Force flush pending batches
pipeline.flush().await?;
Ingestion Statistics¶
let stats = pipeline.stats().await;
println!("Total points: {}", stats.total_points);
println!("Throughput: {:.0} pts/sec", stats.points_per_second);
println!("Batches written: {}", stats.batches_written);
println!("Avg batch size: {:.0}", stats.avg_batch_size);
println!("Out-of-order: {}", stats.out_of_order_count);
println!("Avg latency: {:.2}ms", stats.avg_write_latency_ms);
println!("Buffer utilization: {:.1}%", stats.buffer_utilization * 100.0);
Graceful Shutdown¶
Partition Management¶
Partition Strategies¶
use heliosdb_storage::timeseries::partitioning::{
PartitionManager, PartitionStrategy,
};
// Choose strategy based on data volume
let strategy = PartitionStrategy::Daily; // Most common
// Available strategies:
// - Hourly: For very high volume (1M+ points/hour)
// - Daily: Standard metrics (recommended)
// - Weekly: Lower volume data
// - Monthly: Long-term storage
// - Yearly: Archive data
// - Custom(seconds): Flexible intervals
Partition Operations¶
let manager = PartitionManager::new(
"/data/partitions",
PartitionStrategy::Daily,
).await?;
// Compute partition for timestamp
let partition_id = manager.compute_partition_id(timestamp).await?;
// Get or create partition
let partition = manager.get_or_create_partition(timestamp).await?;
// Query partitions for time range
let partitions = manager.get_partitions_for_range(start, end).await?;
// List all partitions
let all_partitions = manager.list_partitions().await;
// Archive old partition
manager.archive_partition(partition_id).await?;
// Delete partition
manager.delete_partition(partition_id).await?;
Partition Metadata¶
let partition = manager.get_partition(partition_id).await?;
if let Some(metadata) = partition {
println!("Partition ID: {}", metadata.partition_id);
println!("Path: {:?}", metadata.path);
println!("Records: {}", metadata.record_count);
println!("Size: {} bytes", metadata.size_bytes);
println!("Time range: {} - {}", metadata.min_timestamp, metadata.max_timestamp);
println!("Compressed: {}", metadata.compressed);
println!("Archived: {}", metadata.archived);
}
Storage Statistics¶
let total_size = manager.total_size().await;
let total_records = manager.total_records().await;
println!("Total storage: {} GB", total_size / 1_000_000_000);
println!("Total records: {}", total_records);
Best Practices¶
Naming Conventions¶
# Hierarchical metric names
service.component.metric_name
# Examples
web.api.request_count
database.queries.latency_ms
cache.redis.hit_rate
Tag Design¶
// Good: Low cardinality, useful for filtering
tags.insert("environment", "production");
tags.insert("region", "us-east-1");
tags.insert("service", "api-gateway");
// Avoid: High cardinality tags (creates too many series)
// tags.insert("user_id", user_id); // Don't do this
// tags.insert("request_id", request_id); // Don't do this
Batch Size Tuning¶
| Scenario | Recommended Batch Size |
|---|---|
| Real-time streaming | 1,000 - 5,000 |
| Batch processing | 10,000 - 50,000 |
| Historical backfill | 50,000 - 100,000 |
Compression vs. Query Speed¶
| Priority | Configuration |
|---|---|
| Maximum compression | Large blocks (8192+), all compression enabled |
| Fast queries | Smaller blocks (512-1024), skip dictionary compression |
| Balanced | 1024-2048 block size, all compression enabled |
See Also: README | Quick Start | Examples