Time-Series Quick Start Guide¶
Time to Complete: 10 minutes Prerequisites: Rust 1.75+, HeliosDB installed Last Updated: January 4, 2026
Overview¶
This guide will help you get started with HeliosDB's time-series capabilities. You will learn to: 1. Create a time-series engine 2. Write time-series data points 3. Query time ranges 4. Configure retention policies 5. Set up downsampling
Step 1: Add Dependencies¶
Add HeliosDB storage crate to your Cargo.toml:
[dependencies]
heliosdb-storage = "6.0"
heliosdb-common = "6.0"
tokio = { version = "1.0", features = ["full"] }
Step 2: Create a Time-Series Engine¶
use heliosdb_storage::timeseries::{
TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create time-series engine with daily partitioning
let engine = TimeSeriesEngine::new(
"/data/timeseries",
PartitionStrategy::Daily,
).await?;
println!("Time-series engine created successfully!");
Ok(())
}
Step 3: Write Time-Series Data¶
Single Point¶
use heliosdb_storage::timeseries::TimeSeriesEngine;
use std::time::{SystemTime, UNIX_EPOCH};
async fn write_single_point(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
// Write a single point (current timestamp is used if None)
engine.write_point("sensor.temperature", 23.5, None).await?;
// Write with specific timestamp (milliseconds since epoch)
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_millis() as u64;
engine.write_point("sensor.humidity", 65.0, Some(timestamp)).await?;
println!("Points written successfully!");
Ok(())
}
Batch Write¶
use heliosdb_storage::timeseries::{TimeSeriesEngine, TimeSeriesPoint};
use std::collections::HashMap;
async fn write_batch(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
let base_time = 1704067200000u64; // 2024-01-01 00:00:00 UTC
// Create batch of points
let points: Vec<TimeSeriesPoint> = (0..1000)
.map(|i| {
TimeSeriesPoint::new(
"cpu.usage",
base_time + (i * 1000), // 1 second intervals
45.0 + (i as f64 * 0.1).sin() * 10.0,
)
})
.collect();
// Write batch
engine.write_points(&points).await?;
println!("Wrote {} points", points.len());
Ok(())
}
Points with Tags¶
use heliosdb_storage::timeseries::TimeSeriesPoint;
use std::collections::HashMap;
fn create_tagged_point() -> TimeSeriesPoint {
let mut tags = HashMap::new();
tags.insert("host".to_string(), "server-01".to_string());
tags.insert("region".to_string(), "us-west".to_string());
TimeSeriesPoint::with_tags(
"cpu.usage",
1704067200000,
45.5,
tags,
)
}
Step 4: Query Time-Series Data¶
Basic Time Range Query¶
async fn query_range(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
let start_time = 1704067200000u64; // 2024-01-01 00:00:00
let end_time = 1704153600000u64; // 2024-01-02 00:00:00
let points = engine.query_range(
"cpu.usage",
start_time,
end_time,
).await?;
println!("Retrieved {} points", points.len());
for point in points.iter().take(5) {
println!(" {} @ {} = {}", point.metric, point.timestamp, point.value);
}
Ok(())
}
Query with Aggregation¶
use heliosdb_storage::timeseries::query_engine::{
TimeRangeQuery, TimeSeriesQueryEngine, AggregationFunction,
};
use std::time::Duration;
async fn query_with_aggregation(points: &[TimeSeriesPoint]) -> Result<(), Box<dyn std::error::Error>> {
let mut engine = TimeSeriesQueryEngine::new();
// Create query with 5-minute aggregation
let query = TimeRangeQuery::new("cpu.usage", 1704067200000, 1704153600000)
.with_aggregation(Duration::from_secs(300), AggregationFunction::Average);
let results = engine.execute_query(&query, points).await?;
println!("Aggregated to {} buckets", results.len());
Ok(())
}
Step 5: Configure Retention Policies¶
use heliosdb_storage::timeseries::{TimeSeriesEngine, RetentionPolicy};
use std::time::Duration;
async fn configure_retention(engine: &mut TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
// Set 30-day global retention policy
let policy = RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600))
.with_cleanup_interval(3600); // Check every hour
engine.set_retention_policy(policy).await;
println!("Retention policy configured!");
Ok(())
}
Step 6: Set Up Downsampling¶
use heliosdb_storage::timeseries::{
TimeSeriesEngine, DownsamplingConfig, AggregationFunction,
};
use std::time::Duration;
async fn configure_downsampling(engine: &mut TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
// Configure 5-minute downsampling for CPU metrics
let config = DownsamplingConfig::new(Duration::from_secs(300))
.with_aggregation(AggregationFunction::Average);
engine.configure_downsampling("cpu.*", config).await?;
println!("Downsampling configured!");
Ok(())
}
Step 7: Start Background Maintenance¶
async fn start_maintenance(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
// Start background tasks for retention cleanup, downsampling, and compaction
engine.start_maintenance().await?;
println!("Background maintenance started!");
Ok(())
}
Step 8: Check Statistics¶
async fn check_stats(engine: &TimeSeriesEngine) -> Result<(), Box<dyn std::error::Error>> {
let stats = engine.stats().await;
println!("Time-Series Statistics:");
println!(" Total partitions: {}", stats.total_partitions);
println!(" Retention stats: {:?}", stats.retention_stats);
println!(" Compression stats: {:?}", stats.compression_stats);
Ok(())
}
Complete Example¶
use heliosdb_storage::timeseries::{
TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy,
RetentionPolicy, DownsamplingConfig, AggregationFunction,
};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Create engine
let mut engine = TimeSeriesEngine::new(
"/data/timeseries",
PartitionStrategy::Daily,
).await?;
// 2. Configure retention (30 days)
engine.set_retention_policy(
RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600))
).await;
// 3. Configure downsampling (5-minute averages)
engine.configure_downsampling(
"sensors.*",
DownsamplingConfig::new(Duration::from_secs(300))
.with_aggregation(AggregationFunction::Average),
).await?;
// 4. Start background maintenance
engine.start_maintenance().await?;
// 5. Write data
for i in 0..100 {
engine.write_point(
"sensors.temperature",
23.5 + (i as f64 * 0.1),
None,
).await?;
}
// 6. Query data
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let points = engine.query_range(
"sensors.temperature",
now - 3600000, // Last hour
now,
).await?;
println!("Retrieved {} points", points.len());
// 7. Check stats
let stats = engine.stats().await;
println!("Partitions: {}", stats.total_partitions);
Ok(())
}
Using SQL Interface¶
If you prefer SQL, HeliosDB supports time-series operations through SQL:
Create Time-Series Table¶
-- Create a time-series optimized table
CREATE TABLE sensor_readings (
timestamp TIMESTAMPTZ NOT NULL,
sensor_id VARCHAR(64) NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
) WITH (
partition_by = 'DAY',
retention = '30 days',
compression = 'gorilla'
);
Insert Data¶
-- Insert sensor readings
INSERT INTO sensor_readings (timestamp, sensor_id, temperature, humidity)
VALUES
(NOW(), 'sensor-001', 23.5, 65.0),
(NOW(), 'sensor-002', 24.1, 62.5);
Query with Time Bucket¶
-- Aggregate by 5-minute buckets
SELECT
time_bucket('5 minutes', timestamp) AS bucket,
sensor_id,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp
FROM sensor_readings
WHERE timestamp > NOW() - INTERVAL '1 day'
GROUP BY bucket, sensor_id
ORDER BY bucket DESC;
Configure Retention via SQL¶
-- Set retention policy
ALTER TABLE sensor_readings
SET RETENTION POLICY '30 days';
-- Set downsampling
ALTER TABLE sensor_readings
ADD CONTINUOUS AGGREGATE sensor_readings_1h
EVERY '1 hour'
WITH (aggregation = 'avg');
Common Configuration Patterns¶
High-Frequency IoT Data¶
let config = IngestionConfig {
batch_size: 10000,
batch_timeout: Duration::from_millis(100),
handle_out_of_order: true,
max_time_skew: 60000, // 1 minute tolerance
..Default::default()
};
let engine = TimeSeriesEngine::new(path, PartitionStrategy::Hourly).await?;
Financial Tick Data¶
let engine = TimeSeriesEngine::new(path, PartitionStrategy::Daily).await?;
// High compression for historical data
let compression_config = BatchCompressionConfig {
block_size: 4096, // Larger blocks for better compression
compress_timestamps: true,
compress_values: true,
compress_metrics: true,
min_ratio: 1.5,
};
Metrics and Monitoring¶
let engine = TimeSeriesEngine::new(path, PartitionStrategy::Daily).await?;
// Multi-tier downsampling: raw -> 1m -> 5m -> 1h -> 1d
engine.configure_downsampling("metrics.*",
DownsamplingConfig::new(Duration::from_secs(60))
.add_tier(DownsamplingTier::new(Duration::from_secs(300), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(3600)))
.add_tier(DownsamplingTier::new(Duration::from_secs(3600), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(86400)))
.add_tier(DownsamplingTier::new(Duration::from_secs(86400), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(604800)))
).await?;
Troubleshooting¶
Low Compression Ratio¶
// Ensure data is sorted and has regular intervals
// Check compression configuration
let config = BatchCompressionConfig {
compress_timestamps: true,
compress_values: true,
block_size: 1024,
..Default::default()
};
High Write Latency¶
// Increase batch size for better throughput
let config = IngestionConfig {
batch_size: 50000,
batch_timeout: Duration::from_millis(500),
write_workers: 8,
..Default::default()
};
Query Too Slow¶
// Check partition pruning
let partitions = manager.get_partitions_for_range(start, end).await?;
println!("Querying {} partitions", partitions.len());
// Add index on frequently queried tags
// Use continuous aggregates for dashboards
Next Steps¶
| Topic | Guide |
|---|---|
| Complete documentation | USER_GUIDE.md |
| Code examples | EXAMPLES.md |
| Performance optimization | ../timeseries/PERFORMANCE_TUNING.md |
| Compression details | ../timeseries/F3.8-timeseries-compression.md |
See Also: README.md | User Guide | Examples