HeliosDB Streaming - Usage Examples¶
Practical examples for common streaming use cases
Table of Contents¶
- Real-Time Analytics Dashboard
- User Session Analysis
- Fraud Detection
- IoT Sensor Monitoring
- E-Commerce Conversion Funnel
- Financial Trading Alerts
- Log Aggregation
- Anomaly Detection
Real-Time Analytics Dashboard¶
Use Case: Real-time metrics for application monitoring.
use heliosdb_streaming::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
// Create stream from logs
let stream = Stream::new(StreamConfig {
name: "app_metrics".to_string(),
buffer_size: 10000,
..Default::default()
});
// Apply 1-minute tumbling window
let window = TumblingWindow::new(Duration::from_secs(60));
let windowed = window.apply(stream).await?;
// Process windows
tokio::spawn(async move {
while let Some(event) = windowed.recv().await {
if let StreamEvent::Data(window_result) = event {
let rows = &window_result.rows;
// Calculate metrics
let total_requests: usize = rows.len();
let errors = rows.iter()
.filter(|r| r.get("status").and_then(|v| v.as_integer()) == Some(500))
.count();
let avg_latency: f64 = rows.iter()
.filter_map(|r| r.get("latency_ms").and_then(|v| v.as_float()))
.sum::<f64>() / total_requests as f64;
println!("Window [{} - {}]:",
window_result.window.start,
window_result.window.end
);
println!(" Total Requests: {}", total_requests);
println!(" Errors: {} ({:.2}%)", errors,
(errors as f64 / total_requests as f64) * 100.0);
println!(" Avg Latency: {:.2}ms", avg_latency);
}
}
});
Ok(())
}
User Session Analysis¶
Use Case: Track user sessions with session windows.
use heliosdb_streaming::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
// Create stream
let stream = Stream::new(StreamConfig::default());
// Session window: 30-minute inactivity gap
let window = SessionWindow::new(Duration::from_secs(1800));
let sessions = window.apply(stream).await?;
// Analyze sessions
tokio::spawn(async move {
while let Some(event) = sessions.recv().await {
if let StreamEvent::Data(session) = event {
let user_id = session.rows[0]
.get("user_id")
.and_then(|v| v.as_string())
.unwrap();
let page_views = session.rows.len();
let duration = (session.window.end - session.window.start)
.num_seconds();
let pages_visited: std::collections::HashSet<_> = session.rows
.iter()
.filter_map(|r| r.get("page").and_then(|v| v.as_string()))
.collect();
println!("User {} Session:", user_id);
println!(" Duration: {} minutes", duration / 60);
println!(" Page Views: {}", page_views);
println!(" Unique Pages: {}", pages_visited.len());
}
}
});
Ok(())
}
Fraud Detection¶
Use Case: Detect suspicious transaction patterns.
use heliosdb_streaming::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let stream = Stream::new(StreamConfig::default());
// Pattern: Multiple high-value transactions in short time
let pattern = PatternBuilder::new("fraud_pattern".to_string())
.define_variable(
"HighValue".to_string(),
PatternCondition::GreaterThan {
field: "amount".to_string(),
value: 1000.0,
},
)
.sequence(PatternSequence::Exactly {
var: "HighValue".to_string(),
count: 3,
})
.within(Duration::from_secs(300)) // Within 5 minutes
.measure(
"total_amount".to_string(),
MeasureExpression::Aggregate {
var: "HighValue".to_string(),
field: "amount".to_string(),
func: "SUM".to_string(),
},
)
.measure(
"transaction_count".to_string(),
MeasureExpression::Count {
var: "HighValue".to_string(),
},
)
.build()?;
let matcher = PatternMatcher::new(pattern);
let alerts = matcher.apply(stream).await?;
// Handle fraud alerts
tokio::spawn(async move {
while let Some(event) = alerts.recv().await {
if let StreamEvent::Data(alert) = event {
let user = alert.get("user_id").unwrap();
let total = alert.get("total_amount").unwrap();
let count = alert.get("transaction_count").unwrap();
println!("âš FRAUD ALERT:");
println!(" User: {}", user);
println!(" {} transactions totaling {}", count, total);
println!(" Action: Account flagged for review");
// Trigger alert system
// send_alert_to_ops(user, total, count);
}
}
});
Ok(())
}
IoT Sensor Monitoring¶
Use Case: Monitor IoT sensors for anomalies.
use heliosdb_streaming::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let stream = Stream::new(StreamConfig::default());
// Sliding window: 5-minute window, 1-minute slide
let window = SlidingWindow::new(
Duration::from_secs(300),
Duration::from_secs(60)
);
let windowed = window.apply(stream).await?;
// Detect anomalies
tokio::spawn(async move {
while let Some(event) = windowed.recv().await {
if let StreamEvent::Data(window_result) = event {
// Calculate statistics
let temperatures: Vec<f64> = window_result.rows
.iter()
.filter_map(|r| r.get("temperature").and_then(|v| v.as_float()))
.collect();
if temperatures.is_empty() {
continue;
}
let mean = temperatures.iter().sum::<f64>() / temperatures.len() as f64;
let variance = temperatures.iter()
.map(|t| (t - mean).powi(2))
.sum::<f64>() / temperatures.len() as f64;
let std_dev = variance.sqrt();
// Check for anomalies (3-sigma rule)
for temp in &temperatures {
if (temp - mean).abs() > 3.0 * std_dev {
println!("🚨 ANOMALY DETECTED:");
println!(" Temperature: {:.2}°C", temp);
println!(" Mean: {:.2}°C, Std Dev: {:.2}", mean, std_dev);
println!(" Deviation: {:.2} sigma", (temp - mean).abs() / std_dev);
}
}
}
}
});
Ok(())
}
E-Commerce Conversion Funnel¶
Use Case: Track user journey from view to purchase.
use heliosdb_streaming::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let stream = Stream::new(StreamConfig::default());
// Pattern: View -> Add to Cart -> Purchase
let pattern = PatternBuilder::new("conversion_funnel".to_string())
.define_variable(
"View".to_string(),
PatternCondition::Equals {
field: "event_type".to_string(),
value: Value::String("product_view".to_string()),
},
)
.define_variable(
"AddToCart".to_string(),
PatternCondition::Equals {
field: "event_type".to_string(),
value: Value::String("add_to_cart".to_string()),
},
)
.define_variable(
"Purchase".to_string(),
PatternCondition::Equals {
field: "event_type".to_string(),
value: Value::String("purchase".to_string()),
},
)
.sequence(PatternSequence::Sequence {
patterns: vec![
PatternSequence::Single { var: "View".to_string() },
PatternSequence::Single { var: "AddToCart".to_string() },
PatternSequence::Single { var: "Purchase".to_string() },
],
})
.within(Duration::from_secs(7200)) // 2-hour window
.measure(
"product_id".to_string(),
MeasureExpression::First {
var: "View".to_string(),
field: "product_id".to_string(),
},
)
.measure(
"revenue".to_string(),
MeasureExpression::Last {
var: "Purchase".to_string(),
field: "amount".to_string(),
},
)
.build()?;
let matcher = PatternMatcher::new(pattern);
let conversions = matcher.apply(stream).await?;
// Track conversions
tokio::spawn(async move {
let mut total_revenue = 0.0;
let mut conversion_count = 0;
while let Some(event) = conversions.recv().await {
if let StreamEvent::Data(conversion) = event {
conversion_count += 1;
let revenue = conversion.get("revenue")
.and_then(|v| v.as_float())
.unwrap_or(0.0);
total_revenue += revenue;
println!(" Conversion #{}", conversion_count);
println!(" Product: {}", conversion.get("product_id").unwrap());
println!(" Revenue: ${:.2}", revenue);
println!(" Total Revenue: ${:.2}", total_revenue);
}
}
});
Ok(())
}
Financial Trading Alerts¶
Use Case: Alert on rapid price movements.
use heliosdb_streaming::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let stream = Stream::new(StreamConfig::default());
// Pattern: Price spike (3+ consecutive increases)
let pattern = PatternBuilder::new("price_spike".to_string())
.define_variable(
"Increase".to_string(),
PatternCondition::GreaterThan {
field: "price_change_pct".to_string(),
value: 2.0, // > 2% increase
},
)
.sequence(PatternSequence::Exactly {
var: "Increase".to_string(),
count: 3,
})
.within(Duration::from_secs(60)) // Within 1 minute
.measure(
"start_price".to_string(),
MeasureExpression::First {
var: "Increase".to_string(),
field: "price".to_string(),
},
)
.measure(
"end_price".to_string(),
MeasureExpression::Last {
var: "Increase".to_string(),
field: "price".to_string(),
},
)
.build()?;
let matcher = PatternMatcher::new(pattern);
let spikes = matcher.apply(stream).await?;
// Handle alerts
tokio::spawn(async move {
while let Some(event) = spikes.recv().await {
if let StreamEvent::Data(spike) = event {
let symbol = spike.get("symbol").unwrap();
let start = spike.get("start_price").and_then(|v| v.as_float()).unwrap();
let end = spike.get("end_price").and_then(|v| v.as_float()).unwrap();
let change_pct = ((end - start) / start) * 100.0;
println!(" PRICE SPIKE DETECTED:");
println!(" Symbol: {}", symbol);
println!(" Price: ${:.2} -> ${:.2}", start, end);
println!(" Change: {:.2}%", change_pct);
println!(" Time: < 1 minute");
}
}
});
Ok(())
}
Log Aggregation¶
Use Case: Aggregate logs with SQL streaming.
use heliosdb_streaming::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
// Create SQL table environment
let config = TableConfig {
idle_state_retention: Duration::from_secs(3600),
max_parallelism: 64,
event_time_characteristic: TimeCharacteristic::EventTime,
};
let env = StreamTableEnvironment::new(config);
// Create stream
let stream = Stream::new(StreamConfig::default());
// Define schema
let schema = TableSchema {
columns: vec![
ColumnDef {
name: "timestamp".to_string(),
data_type: DataType::Timestamp,
nullable: false,
},
ColumnDef {
name: "level".to_string(),
data_type: DataType::String,
nullable: false,
},
ColumnDef {
name: "service".to_string(),
data_type: DataType::String,
nullable: false,
},
ColumnDef {
name: "message".to_string(),
data_type: DataType::String,
nullable: false,
},
],
primary_key: None,
watermark_strategy: Some(WatermarkStrategy::BoundedOutOfOrderness {
max_out_of_orderness: Duration::from_secs(5),
}),
};
// Register stream as table
env.register_stream("logs", stream, schema);
// Query for errors
let errors = env.sql_query(
"SELECT service, COUNT(*) as error_count \
FROM logs \
WHERE level = 'ERROR' \
GROUP BY service"
).await?;
// Process results
tokio::spawn(async move {
while let Some(event) = errors.recv().await {
if let StreamEvent::Data(row) = event {
println!("Service: {}, Errors: {}",
row.get("service").unwrap(),
row.get("error_count").unwrap()
);
}
}
});
Ok(())
}
Anomaly Detection¶
Use Case: Detect anomalies with stream-stream join and pattern matching.
use heliosdb_streaming::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
// Two streams: current metrics and historical baseline
let current_stream = Stream::new(StreamConfig::default());
let baseline_stream = Stream::new(StreamConfig::default());
// Join current with baseline
let join = StreamStreamJoin::new(
Duration::from_secs(60),
JoinType::Inner,
"metric_id",
"metric_id"
);
let joined = join.apply(current_stream, baseline_stream).await?;
// Detect anomalies where current > 3x baseline
let pattern = PatternBuilder::new("anomaly".to_string())
.define_variable(
"Spike".to_string(),
PatternCondition::And {
conditions: vec![
PatternCondition::GreaterThan {
field: "current_value".to_string(),
value: 0.0,
},
// Custom condition: current > 3 * baseline
// (simplified - would need custom condition type)
],
},
)
.sequence(PatternSequence::Single {
var: "Spike".to_string(),
})
.build()?;
let matcher = PatternMatcher::new(pattern);
let anomalies = matcher.apply(joined).await?;
// Handle anomalies
tokio::spawn(async move {
while let Some(event) = anomalies.recv().await {
if let StreamEvent::Data(anomaly) = event {
println!("🔴 ANOMALY:");
println!(" Metric: {}", anomaly.get("metric_id").unwrap());
println!(" Current: {}", anomaly.get("current_value").unwrap());
println!(" Baseline: {}", anomaly.get("baseline_value").unwrap());
}
}
});
Ok(())
}
Production Deployment Tips¶
1. Error Handling¶
use heliosdb_streaming::*;
async fn robust_pipeline() -> Result<()> {
let stream = Stream::new(StreamConfig::default());
// Configure restart strategy
let config = JobConfig {
restart_strategy: RestartStrategy::FixedDelay {
max_attempts: 3,
delay: Duration::from_secs(10),
},
..Default::default()
};
// Proper error handling
match process_stream(stream).await {
Ok(_) => println!("Pipeline completed successfully"),
Err(e) => {
eprintln!("Pipeline error: {}", e);
// Trigger alerting
// Fallback to backup pipeline
}
}
Ok(())
}
2. Monitoring¶
use heliosdb_streaming::*;
async fn monitored_pipeline() -> Result<()> {
let job_manager = JobManager::new(JobManagerConfig::default())?;
// Submit with monitoring
let job_id = job_manager.submit_job(config, graph).await?;
// Periodic health checks
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
let metrics = job_manager.get_metrics(job_id).await.unwrap();
println!("Job {}: {} records/sec, {:.2}ms latency",
job_id,
metrics.throughput,
metrics.avg_latency_ms
);
// Alert if degraded
if metrics.avg_latency_ms > 100.0 {
println!("âš High latency detected!");
}
}
});
Ok(())
}
3. Checkpointing¶
use heliosdb_streaming::*;
async fn fault_tolerant_pipeline() -> Result<()> {
// Enable checkpointing
let backend = StateBackend::new(StateBackendConfig {
storage_type: StorageType::RocksDB,
checkpoint_dir: "/var/heliosdb/checkpoints".to_string(),
enable_encryption: true,
..Default::default()
})?;
let coordinator = CheckpointCoordinator::new(
backend,
Duration::from_secs(60) // Checkpoint every minute
)?;
// Checkpoint on shutdown
tokio::signal::ctrl_c().await?;
println!("Shutting down, creating savepoint...");
let savepoint = coordinator.create_savepoint().await?;
println!("Savepoint created: {}", savepoint);
Ok(())
}
Performance Optimization¶
1. Buffer Tuning¶
// High throughput: larger buffers
let stream = Stream::new(StreamConfig {
buffer_size: 10000,
..Default::default()
});
// Low latency: smaller buffers
let stream = Stream::new(StreamConfig {
buffer_size: 100,
..Default::default()
});
2. Parallelism¶
3. Batch Processing¶
let sink = DatabaseSink::new(DatabaseConfig {
batch_size: 5000, // Batch writes for efficiency
batch_timeout: Duration::from_millis(100),
..Default::default()
})?;
More examples available in test files: /heliosdb-streaming/tests/
API Documentation: HELIOSDB_STREAMING_API.md
Generated: October 28, 2025