Streaming & Real-Time Analytics User Guide¶
Table of Contents¶
- Overview
- Stream Processing Fundamentals
- Setting Up Streaming
- Continuous Queries
- Stream-Stream Joins
- Stream-Table Joins
- Aggregations & Complex Event Processing
- Exactly-Once Semantics
- Backpressure & Flow Control
- Integration Patterns
- Watermarks & Late Data
- State Management
- Performance Tuning
- Monitoring Streams
- Real-World Examples
- Troubleshooting
Overview¶
HeliosDB's streaming and real-time analytics capabilities enable you to process infinite streams of data with sub-second latency. Unlike batch systems that process data in discrete chunks, HeliosDB streaming processes events as they arrive, maintaining state across distributed operators.
Key Concepts¶
Event Time vs. Processing Time: Event time is when an event occurred in the real world, while processing time is when the stream processor sees it. HeliosDB supports both semantics, with event-time being the default for accurate time-windowed analytics.
Continuous Queries: Unlike traditional SQL queries that terminate, continuous queries run indefinitely, emitting results as new data arrives. This is essential for real-time dashboards and monitoring.
Windowing: Data arrives as an unbounded stream, so we aggregate using windows—fixed time intervals (tumbling windows), overlapping periods (sliding windows), or dynamic gaps (session windows).
Joins: HeliosDB enables three join patterns: stream-stream (correlating events from two streams), stream-table (enriching stream events with reference data), and stream-sink (persisting results).
Exactly-Once Semantics: Critical for applications requiring zero data loss and no duplicates. HeliosDB uses two-phase commit protocols with deduplication to guarantee exactly-once processing.
Use Cases¶
- Real-time Event Analytics: Track user behavior, convert pageviews to revenue metrics in real-time
- IoT & Sensor Monitoring: Ingest millions of sensor readings, detect anomalies instantly
- Financial Transactions: Pattern detection for fraud, real-time risk scoring
- Network Traffic Analysis: Detect DDoS attacks, analyze flow patterns
- Application Metrics: Monitor API performance, request latencies, error rates
- Anomaly Detection: Stream machine learning models with continuous predictions
Stream Processing Fundamentals¶
Time Semantics¶
HeliosDB supports two time semantics configured at stream creation:
// Event-time semantics (recommended for time-windowed analytics)
let config = StreamConfig {
name: "events".to_string(),
time_semantics: TimeSemantics::EventTime,
allowed_lateness: Duration::from_secs(60),
watermark_interval: Duration::from_secs(1),
};
let stream = Stream::new(config);
// Processing-time semantics (simpler, no late data handling)
let config = StreamConfig {
name: "metrics".to_string(),
time_semantics: TimeSemantics::ProcessingTime,
allowed_lateness: Duration::from_secs(0),
watermark_interval: Duration::from_secs(1),
};
Event-time processing gives you accurate results regardless of delay, but requires watermarks to determine window completion. Processing-time processing is simpler but results depend on execution speed, not event semantics.
Stream Data Model¶
Data flows as rows with typed fields:
use heliosdb_streaming::{Row, Value};
use std::collections::HashMap;
use chrono::Utc;
// Create a row with typed fields
let mut fields = HashMap::new();
fields.insert("user_id".to_string(), Value::Integer(12345));
fields.insert("event_type".to_string(), Value::String("purchase".to_string()));
fields.insert("amount".to_string(), Value::Float(99.99));
fields.insert("timestamp".to_string(), Value::Timestamp(Utc::now()));
let row = Row::new(fields)
.with_event_time(Utc::now()); // Set event timestamp
// Access values
if let Some(Value::Integer(user_id)) = row.get("user_id") {
println!("User: {}", user_id);
}
Values support: Null, Boolean, Integer, Float, String, Bytes, Timestamp, Array, and Object types.
Window Types¶
Tumbling Windows (fixed, non-overlapping): - Best for: Hourly/daily aggregations, periodic summaries - Example: 1-minute windows for request count tracking
Sliding Windows (overlapping): - Best for: Detecting trends, moving averages - Example: 1-minute window slides every 10 seconds
Session Windows (dynamic gaps): - Best for: User sessions, activity bursts - Example: 30-minute gap between events marks session end
Count Windows (event-count based): - Best for: Micro-batching, load balancing - Example: Emit results after every 1000 events
Setting Up Streaming¶
Initialize Streaming Engine¶
use heliosdb_streaming::StreamingEngine;
#[tokio::main]
async fn main() -> Result<()> {
// Create the streaming engine
let engine = StreamingEngine::new().await?;
// Create streams and queries
// ... your streaming logic ...
// Shutdown gracefully
engine.shutdown().await?;
Ok(())
}
Create a Stream¶
use heliosdb_streaming::{Stream, StreamConfig, TimeSemantics};
use std::time::Duration;
// Create an input stream
let config = StreamConfig {
name: "api_requests".to_string(),
time_semantics: TimeSemantics::EventTime,
allowed_lateness: Duration::from_secs(10),
watermark_interval: Duration::from_secs(1),
};
let stream = Stream::new(config)
.with_schema(vec![
"user_id".to_string(),
"endpoint".to_string(),
"response_time".to_string(),
"status".to_string(),
]);
Define Stream Schemas¶
Schemas are metadata describing stream fields. While HeliosDB is schema-flexible, defining schemas enables validation:
// Schema definition (for documentation and validation)
let schema = vec![
"request_id".to_string(), // String
"timestamp".to_string(), // Timestamp
"method".to_string(), // String (GET, POST, etc)
"path".to_string(), // String
"status_code".to_string(), // Integer
"response_ms".to_string(), // Integer
"user_id".to_string(), // Integer
];
let stream = Stream::new(config).with_schema(schema);
Continuous Queries¶
Basic Continuous Query¶
A continuous query runs indefinitely, emitting results as new data arrives:
use heliosdb_streaming::{ContinuousQuery, Stream, Row, Value};
use std::collections::HashMap;
// Create output sink
let sink = Arc::new(CallbackSink::new(|rows| {
for row in rows {
println!("Result: {:?}", row.fields());
}
}));
// Create continuous query
let query = ContinuousQuery::new(
"my_query".to_string(),
stream.clone(),
)
.with_sink(sink);
// Execute
let handle = engine.create_continuous_query(query).await?;
// Query is now running continuously...
// Stop when done
engine.stop_continuous_query("my_query").await?;
Query Sinks¶
Continuous queries emit results to sinks:
Table Sink - Writes to a database table:
use heliosdb_streaming::TableSink;
let sink = Arc::new(TableSink::new("realtime_metrics".to_string()));
Callback Sink - Calls a Rust function:
use heliosdb_streaming::CallbackSink;
let sink = Arc::new(CallbackSink::new(|rows| {
for row in rows {
// Handle result rows (update dashboard, send alert, etc)
println!("Alert: {:?}", row);
}
}));
Memory Sink - Stores in memory (testing):
use heliosdb_streaming::MemorySink;
let sink = Arc::new(MemorySink::new());
let results = sink.get_results();
Stream-Stream Joins¶
Join two streams to correlate events. Requires a time window to match events:
use heliosdb_streaming::StreamStreamJoin;
use std::time::Duration;
// Create second stream
let clicks = Stream::new(StreamConfig {
name: "clicks".to_string(),
..Default::default()
});
let impressions = Stream::new(StreamConfig {
name: "impressions".to_string(),
..Default::default()
});
// Join with 5-second window
let joiner = StreamStreamJoin::new(Duration::from_secs(5));
let joined = joiner.join(impressions, clicks).await?;
// Joined stream emits events where impression and click occur within 5 seconds
Stream-Stream Join Patterns¶
Inner Join - Only matching pairs:
// Both streams have event within window - emits combined row
// Example: Match ad impression with subsequent click
Left Outer Join - All left events, matching right when available:
// Impression with click = emits combined
// Impression without click = emits impression row with null click fields
Full Outer Join - All events from both streams:
Join Key Management¶
Specify how to match events (usually a user_id or correlation_id):
// Join on user_id field
let joiner = StreamStreamJoin::new(Duration::from_secs(10))
.with_left_key("user_id".to_string())
.with_right_key("user_id".to_string());
Without explicit keys, all events in the window are considered for joining.
Stream-Table Joins¶
Enrich stream events with reference data from a static table:
use heliosdb_streaming::StreamTableJoin;
// Load reference data
let products = vec![
Row::new(maplit::hashmap!{
"product_id".to_string() => Value::Integer(1),
"name".to_string() => Value::String("Widget".to_string()),
"category".to_string() => Value::String("Hardware".to_string()),
"price".to_string() => Value::Float(19.99),
}),
// ... more products
];
// Create join
let joiner = StreamTableJoin::new(
"products".to_string(), // Table identifier
"product_id".to_string() // Join key
)
.with_join_type(JoinType::LeftOuter);
// Load data into cache
joiner.load_table(products)?;
// Join stream with table
let enriched = joiner.join(purchase_stream).await?;
Join Types for Stream-Table¶
| Type | Behavior | Use Case |
|---|---|---|
| Inner | Only emit if product found | Filtered pipeline |
| Left Outer | Emit all purchases, null product fields if not found | Catch missing products |
| Right Outer | Not typical for stream-table | N/A |
Table Update Patterns¶
For frequently-changing reference data:
// Periodic reload
let joiner = Arc::new(joiner);
tokio::spawn({
let joiner = joiner.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(3600)).await;
// Reload products from database
let products = fetch_products_from_db().await;
joiner.load_table(products).ok();
}
}
});
Aggregations & Complex Event Processing¶
Windowed Aggregations¶
Aggregate stream data within windows:
use heliosdb_streaming::{WindowedAggregation, WindowType, AggregateFunction};
use std::time::Duration;
let aggregation = WindowedAggregation::new(
WindowType::Tumbling {
size: Duration::from_secs(60),
},
vec![
AggregateFunction::Count,
AggregateFunction::Sum {
field: "amount".to_string(),
},
AggregateFunction::Avg {
field: "response_time".to_string(),
},
AggregateFunction::Min {
field: "response_time".to_string(),
},
AggregateFunction::Max {
field: "response_time".to_string(),
},
],
stream.clone(),
)
.with_group_by(vec!["status".to_string(), "endpoint".to_string()]);
let results = aggregation.execute().await?;
Supported Aggregations¶
- Count: Number of events
- Sum: Total of numeric field
- Avg: Average of numeric field
- Min: Minimum value
- Max: Maximum value
- Distinct: Unique values
- List: Collect all values
- ApproxDistinct: HyperLogLog for cardinality
Complex Event Processing (CEP)¶
Detect patterns in event sequences using MATCH_RECOGNIZE:
use heliosdb_streaming::{Pattern, PatternVariable, PatternCondition, PatternSequence, Measure};
// Define pattern: Detect fraud (3+ failed logins in 5 minutes)
let pattern = Pattern {
name: "fraud_pattern".to_string(),
variables: vec![
PatternVariable {
name: "failed".to_string(),
condition: PatternCondition::Equals {
field: "result".to_string(),
value: Value::String("failed".to_string()),
},
},
],
sequence: PatternSequence::Sequence {
patterns: vec![
PatternSequence::Single { var: "failed".to_string() },
PatternSequence::Single { var: "failed".to_string() },
PatternSequence::Single { var: "failed".to_string() },
],
},
within: Some(Duration::from_secs(300)),
measures: vec![
Measure {
name: "login_attempts".to_string(),
expression: MeasureExpression::Count,
},
],
};
let matcher = PatternMatcher::new(pattern);
let matches = matcher.match_stream(login_stream).await?;
CEP Pattern Syntax¶
Patterns follow SQL's MATCH_RECOGNIZE:
PATTERN (A B+ C) - A followed by one or more B, then C
PATTERN (A{3}) - Exactly 3 consecutive A
PATTERN (A? B) - Optional A, then B
PATTERN (A*) - Zero or more A
Variables bind events matching conditions, measures compute aggregate values across matched events.
Exactly-Once Semantics¶
Critical guarantee: each event is processed exactly once, no data loss, no duplicates.
Enable Exactly-Once Processing¶
use heliosdb_streaming::{TransactionCoordinator, IdempotentProducer, MessageDeduplicator};
// Create coordinator for transactions
let coordinator = TransactionCoordinator::new(Duration::from_secs(30))?;
// Create deduplicator for idempotent writes
let dedup = MessageDeduplicator::new(10_000); // 10K message capacity
// When producing results, use deduplication
let msg_id = uuid::Uuid::new_v4();
if !dedup.is_duplicate(&msg_id)? {
sink.emit(result_rows).await?;
dedup.mark_processed(&msg_id)?;
}
Two-Phase Commit Protocol¶
Ensures atomicity across multiple systems:
// Phase 1: Prepare (all participants acknowledge)
let txn = coordinator.begin_transaction().await?;
txn.add_participant("kafka_producer".to_string());
txn.add_participant("postgres_sink".to_string());
// Process events
for event in &events {
process_event(event)?;
}
// Phase 2: Commit
coordinator.commit(txn.id).await?;
// If any participant fails, automatic rollback
Offset Management¶
Track positions in source streams to enable recovery:
use heliosdb_streaming::{OffsetManager, Checkpoint};
let offset_mgr = OffsetManager::new()?;
// Save offset after processing
let checkpoint = Checkpoint {
stream_id: "kafka-topic".to_string(),
partition: 0,
offset: 12345,
timestamp: Utc::now(),
};
offset_mgr.save_checkpoint(&checkpoint).await?;
// On recovery, resume from saved offset
let last = offset_mgr.get_checkpoint("kafka-topic", 0).await?;
kafka_source.seek_to_offset(last.offset).await?;
Failure Recovery¶
Recovery guarantees:
- Duplicate Detection: Message IDs prevent re-processing after crash
- Idempotent Operations: Writing same message twice produces same result
- Offset Tracking: Know exactly where to resume
// Process with idempotency
let msg_id = event.id.clone();
// Checkpoint before processing
offset_mgr.save_offset(&msg_id).await?;
// Process with deduplication
if !dedup.seen(&msg_id)? {
write_to_sink(&event).await?;
}
Backpressure & Flow Control¶
Backpressure Strategies¶
Handle cases where downstream can't keep up:
use heliosdb_streaming::{BackpressureController, BackpressureStrategy};
let controller = BackpressureController::new(
BackpressureStrategy::Block, // Wait for space
10_000, // Max buffer size
);
// Or drop old data
let controller = BackpressureController::new(
BackpressureStrategy::DropOldest,
10_000,
);
Strategies: - Block: Pause source until buffer drains (no data loss, latency increases) - DropOldest: Remove oldest events when full (data loss, latency stable) - DropNewest: Drop incoming events when full (prevent memory explosion) - Signal: Track backpressure but don't drop (for monitoring)
Flow Control in Practice¶
// Acquire space before accepting event
let decision = controller.acquire().await;
match decision {
BackpressureDecision::Accept => {
// Process event
process(event).await?;
}
BackpressureDecision::Reject => {
// Drop or retry
eprintln!("Dropping event due to backpressure");
}
}
// Release space when done
controller.release();
Adaptive Backpressure¶
Dynamically adjust based on downstream lag:
use heliosdb_streaming::AdaptiveBackpressureController;
let adaptive = AdaptiveBackpressureController::new(
10_000, // Initial buffer
20_000, // Max buffer
);
// System automatically adjusts buffer based on lag
let decision = adaptive.acquire().await;
Integration Patterns¶
Kafka Integration¶
Consume from Kafka topics:
use heliosdb_streaming::{KafkaSourceConnector, KafkaConfig};
let kafka_config = KafkaConfig {
bootstrap_servers: vec!["localhost:9092".to_string()],
topic: "user_events".to_string(),
group_id: Some("heliosdb-consumer".to_string()),
client_id: Some("processor-1".to_string()),
auto_offset_reset: "latest".to_string(),
enable_auto_commit: false, // Manual commit for exactly-once
max_poll_records: 500,
};
let source = KafkaSourceConnector::new(kafka_config);
let stream = source.start().await?;
// Process stream
// ... filtering, joining, aggregating ...
// Commit offset after successful processing
offset_mgr.commit_offset(&stream_id, &offset).await?;
Kafka Sink¶
Write results to Kafka:
use heliosdb_streaming::KafkaSinkConnector;
let sink_config = KafkaConfig {
bootstrap_servers: vec!["localhost:9092".to_string()],
topic: "alerts".to_string(),
..Default::default()
};
let sink = KafkaSinkConnector::new(sink_config);
// Write results
sink.write(result_rows).await?;
sink.flush().await?;
Database Integration¶
Write results to PostgreSQL/MySQL:
use heliosdb_streaming::{DatabaseSinkConnector, DatabaseType, WriteMode};
let db_sink = DatabaseSinkConnector::new(
DatabaseSinkConfig {
database_type: DatabaseType::PostgreSQL,
host: "localhost".to_string(),
port: 5432,
database: "analytics".to_string(),
table: "real_time_metrics".to_string(),
write_mode: WriteMode::Upsert,
}
);
db_sink.write(result_rows).await?;
REST API Integration¶
Expose streaming metrics via HTTP:
use axum::{Router, routing::get, Json};
use std::sync::Arc;
let metrics = Arc::new(sink.get_results());
let app = Router::new()
.route("/metrics", get({
let metrics = metrics.clone();
move || {
let m = metrics.clone();
async move { Json(m.clone()) }
}
}));
axum::Server::bind(&"0.0.0.0:3000".parse()?)
.serve(app.into_make_service())
.await?;
Watermarks & Late Data¶
Watermark Strategy¶
Watermarks signal that no earlier events will arrive:
use heliosdb_streaming::{WatermarkGenerator, WatermarkStrategy};
use std::time::Duration;
// Bounded out-of-order: assume max 10-second delay
let generator = WatermarkGenerator::new(
WatermarkStrategy::BoundedOutOfOrderness {
max_out_of_order: Duration::from_secs(10),
},
Duration::from_secs(60), // allowed_lateness
);
// Update watermark as events arrive
for event in events {
if let Some(new_watermark) = generator.update(event.event_time) {
println!("Watermark advanced to: {}", new_watermark);
// This triggers window completion
}
}
Handling Late Data¶
Data arriving after window close:
use heliosdb_streaming::{LateDataHandler, LateDataDecision};
let handler = LateDataHandler::new(Duration::from_secs(60));
// On late arrival
match handler.handle_late_event(&event)? {
LateDataDecision::IncludeInWindow => {
// Add to window, emit correction
update_window(&event)?;
sink.emit(correction_rows).await?;
}
LateDataDecision::Discard => {
// Event is too late, drop it
}
}
Allowed Lateness Configuration¶
let config = StreamConfig {
name: "events".to_string(),
time_semantics: TimeSemantics::EventTime,
allowed_lateness: Duration::from_secs(60), // Accept events up to 1 min late
watermark_interval: Duration::from_secs(1),
};
Tradeoff: Larger allowed_lateness keeps windows open longer (more corrections possible, higher state cost).
State Management¶
Checkpoint Basics¶
Persist operator state for failure recovery:
use heliosdb_streaming::{CheckpointCoordinator, StateSnapshot};
let coordinator = CheckpointCoordinator::new()?;
// Trigger checkpoint periodically
tokio::spawn({
let coordinator = coordinator.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let snapshot = coordinator.trigger_checkpoint().await?;
println!("Checkpoint {}: {:?}", snapshot.id, snapshot.status);
}
}
});
State Backends¶
Store operator state locally or remotely:
In-Memory (fast, no persistence):
File-based (persistent, local):
use heliosdb_streaming::FileStateBackend;
let backend = FileStateBackend::new("/data/checkpoints".to_string())?;
Encrypted (secure storage):
use heliosdb_streaming::EncryptedStateBackend;
let backend = EncryptedStateBackend::new(
"/data/encrypted_state".to_string(),
encryption_key.clone(),
)?;
Operator State¶
Manage mutable state within operators:
use heliosdb_streaming::OperatorState;
struct SessionAggregator {
state: OperatorState,
}
impl SessionAggregator {
fn process_event(&mut self, event: &Row) -> Result<()> {
// Get value state
let mut count = self.state.get_value("count")
.and_then(|v| v.as_i64())
.unwrap_or(0);
count += 1;
// Update state
self.state.set_value("count".to_string(), Value::Integer(count))?;
Ok(())
}
}
Performance Tuning¶
Parallelism¶
Increase processing throughput with parallel tasks:
// Process multiple streams in parallel
let handles: Vec<_> = (0..4).map(|partition| {
let stream = streams[partition].clone();
tokio::spawn(async move {
while let Some(event) = stream.recv().await {
process_event(event).await.ok();
}
})
}).collect();
futures::future::join_all(handles).await;
Batching¶
Reduce overhead by processing events in batches:
use heliosdb_streaming::DynamicBatchSizer;
let batcher = DynamicBatchSizer::new(
100, // Min batch
1_000, // Max batch
);
let mut batch = Vec::new();
while let Some(event) = stream.recv().await {
batch.push(event);
if batch.len() >= batcher.ideal_batch_size() {
sink.write(batch.clone()).await?;
batch.clear();
}
}
State Size Management¶
Limit state growth in session windows:
// Session window with 30-minute timeout
let window = SessionWindow::new(Duration::from_secs(1800))
.with_max_sessions(10_000); // Limit concurrent sessions
Bloom Filter Optimization¶
Speed up stream-stream joins with Bloom filters:
use bloomfilter::Bloom;
let mut bloom = Bloom::new(100_000, 0.01); // 100K elements, 1% FPR
// Add keys from first stream
for event in &stream1 {
if let Some(Value::Integer(id)) = event.get("id") {
bloom.set(&id.to_string());
}
}
// Only look up in second stream if Bloom says "maybe present"
for event in &stream2 {
if let Some(Value::Integer(id)) = event.get("id") {
if bloom.check(&id.to_string()) {
// Probable match, do actual lookup
}
}
}
Monitoring Streams¶
Metrics Collection¶
Track stream health:
use heliosdb_streaming::BackpressureMetrics;
let metrics = controller.metrics();
println!("Total records: {}", metrics.total_records);
println!("Dropped: {}", metrics.dropped_records);
println!("Backpressure events: {}", metrics.backpressure_events);
println!("Avg buffer: {:.2}", metrics.average_buffer_size);
Watermark Monitoring¶
let wm_gen = WatermarkGenerator::new(strategy, Duration::from_secs(60));
let current = wm_gen.current_watermark();
println!("Current watermark: {}", current);
Consumer Lag Monitoring¶
Track how far behind source you are:
use heliosdb_streaming::ConsumerLagMonitor;
let lag_monitor = ConsumerLagMonitor::new();
// Periodically check lag
tokio::spawn({
let monitor = lag_monitor.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let lag = monitor.estimate_lag().await;
if lag > Duration::from_secs(30) {
eprintln!("WARNING: Consumer lag {} seconds", lag.as_secs());
}
}
}
});
Query Status¶
// Check if continuous query is running
if let Some(handle) = engine.get_continuous_query("my_query") {
let metrics = handle.metrics();
println!("Processed: {} rows", metrics.total_rows);
println!("Errors: {}", metrics.error_count);
}
Real-World Examples¶
Example 1: Real-Time E-Commerce Analytics¶
Track conversion funnel in real-time—detect drop-offs immediately:
#[tokio::main]
async fn main() -> Result<()> {
let engine = StreamingEngine::new().await?;
// Create streams for each funnel step
let views = Stream::new(StreamConfig {
name: "product_views".to_string(),
..Default::default()
});
let carts = Stream::new(StreamConfig {
name: "add_to_cart".to_string(),
..Default::default()
});
let purchases = Stream::new(StreamConfig {
name: "purchases".to_string(),
..Default::default()
});
// Join views → cart adds (30-min window)
let view_to_cart = engine.stream_stream_join(
views.clone(),
carts.clone(),
Duration::from_secs(1800),
).await?;
// Join cart → purchase (24-hour window)
let view_to_purchase = engine.stream_stream_join(
view_to_cart,
purchases.clone(),
Duration::from_secs(86400),
).await?;
// Aggregate by product per hour
let hourly_funnel = WindowedAggregation::new(
WindowType::Tumbling {
size: Duration::from_secs(3600),
},
vec![
AggregateFunction::Count,
AggregateFunction::Sum {
field: "purchase_amount".to_string(),
},
],
view_to_purchase,
)
.with_group_by(vec!["product_id".to_string()])
.execute()
.await?;
// Create query to emit results
let sink = Arc::new(CallbackSink::new(|rows| {
for row in rows {
let product_id = row.get("product_id");
let count = row.get("count");
let revenue = row.get("sum_purchase_amount");
println!("Funnel: product={:?}, count={:?}, revenue={:?}",
product_id, count, revenue);
}
}));
let query = ContinuousQuery::new(
"hourly_funnel".to_string(),
hourly_funnel,
).with_sink(sink);
engine.create_continuous_query(query).await?;
// Run forever
std::future::pending().await
}
Example 2: IoT Sensor Anomaly Detection¶
Monitor temperature/humidity sensors, detect anomalies:
#[tokio::main]
async fn main() -> Result<()> {
let engine = StreamingEngine::new().await?;
let sensor_stream = Stream::new(StreamConfig {
name: "sensor_readings".to_string(),
time_semantics: TimeSemantics::EventTime,
..Default::default()
});
// Detect: temperature > 100°F for 3+ consecutive readings
let pattern = Pattern {
name: "overheat".to_string(),
variables: vec![
PatternVariable {
name: "high_temp".to_string(),
condition: PatternCondition::GreaterThan {
field: "temperature_f".to_string(),
value: 100.0,
},
},
],
sequence: PatternSequence::Sequence {
patterns: vec![
PatternSequence::Single { var: "high_temp".to_string() },
PatternSequence::Single { var: "high_temp".to_string() },
PatternSequence::Single { var: "high_temp".to_string() },
],
},
within: Some(Duration::from_secs(300)), // 5-minute window
measures: vec![
Measure {
name: "max_temp".to_string(),
expression: MeasureExpression::Max {
field: "temperature_f".to_string(),
},
},
],
};
let matcher = PatternMatcher::new(pattern);
let anomalies = matcher.match_stream(sensor_stream).await?;
// Enrich with location data
let locations = vec![
// Load sensor locations from DB
];
let enriched = StreamTableJoin::new(
"sensor_locations".to_string(),
"sensor_id".to_string(),
)
.with_join_type(JoinType::Inner);
enriched.load_table(locations)?;
let enriched_anomalies = enriched.join(anomalies).await?;
// Alert via callback
let sink = Arc::new(CallbackSink::new(|rows| {
for row in rows {
if let (Some(Value::String(location)), Some(Value::Float(temp))) =
(row.get("location"), row.get("max_temp")) {
eprintln!("ALERT: Overheat at {} - {:.1}°F", location, temp);
// Send email, SMS, etc
}
}
}));
engine.create_continuous_query(
ContinuousQuery::new("overheat_detector".to_string(), enriched_anomalies)
.with_sink(sink)
).await?;
std::future::pending().await
}
Example 3: Financial Transaction Fraud Detection¶
Detect payment fraud with sliding window heuristics:
#[tokio::main]
async fn main() -> Result<()> {
let engine = StreamingEngine::new().await?;
let transactions = Stream::new(StreamConfig {
name: "transactions".to_string(),
time_semantics: TimeSemantics::EventTime,
allowed_lateness: Duration::from_secs(30),
watermark_interval: Duration::from_secs(1),
});
// Detect: 3+ transactions from different locations in 10 minutes
let pattern = Pattern {
name: "velocity_fraud".to_string(),
variables: vec![
PatternVariable {
name: "txn".to_string(),
condition: PatternCondition::Always,
},
],
sequence: PatternSequence::Sequence {
patterns: vec![
PatternSequence::Single { var: "txn".to_string() },
PatternSequence::Single { var: "txn".to_string() },
PatternSequence::Single { var: "txn".to_string() },
],
},
within: Some(Duration::from_secs(600)),
measures: vec![
Measure {
name: "location_count".to_string(),
expression: MeasureExpression::Distinct {
field: "location".to_string(),
},
},
Measure {
name: "total_amount".to_string(),
expression: MeasureExpression::Sum {
field: "amount".to_string(),
},
},
],
};
let matcher = PatternMatcher::new(pattern);
let fraud_signals = matcher.match_stream(transactions.clone()).await?;
// Enrich with customer info
let customer_profiles = vec![
// Load from DB
];
let enriched = StreamTableJoin::new(
"customers".to_string(),
"customer_id".to_string(),
);
enriched.load_table(customer_profiles)?;
let enriched_fraud = enriched.join(fraud_signals).await?;
// Score and alert
let sink = Arc::new(CallbackSink::new(|rows| {
for row in rows {
if let (Some(Value::Integer(customer_id)), Some(Value::Float(amount))) =
(row.get("customer_id"), row.get("total_amount")) {
let risk_score = calculate_risk_score(&row);
if risk_score > 0.8 {
// Block transaction
println!("BLOCK: Customer {} suspicious ${:.2}", customer_id, amount);
} else if risk_score > 0.5 {
// Flag for review
println!("FLAG: Customer {} suspicious ${:.2}", customer_id, amount);
}
}
}
}));
engine.create_continuous_query(
ContinuousQuery::new("fraud_detector".to_string(), enriched_fraud)
.with_sink(sink)
).await?;
std::future::pending().await
}
fn calculate_risk_score(row: &Row) -> f64 {
// Implement scoring logic
0.7
}
Troubleshooting¶
High Latency¶
Symptom: Results arrive slowly, watermarks lag behind real-time.
Diagnosis:
let lag_monitor = ConsumerLagMonitor::new();
let lag = lag_monitor.estimate_lag().await;
if lag > Duration::from_secs(30) {
println!("Consumer lag: {:?}", lag);
}
Solutions:
- Increase parallelism: Process partitions in parallel
- Reduce batch size: Lower latency per batch
- Check backpressure: Monitor BackpressureMetrics
- Profile operators: Identify slow joins/aggregations
Data Loss¶
Symptom: Output row count doesn't match input.
Diagnosis:
- Check backpressure strategy—DropOldest/DropNewest drop data
- Verify sink is writing successfully: sink.flush().await?
- Check offset commits: May have crashed before committing
Solutions:
// Use exactly-once with offset tracking
let coordinator = TransactionCoordinator::new(Duration::from_secs(30))?;
// Commit only after successful write
offset_mgr.save_checkpoint(&checkpoint).await?;
sink.write(rows).await?;
coordinator.commit(txn.id).await?; // Atomic
Duplicate Results¶
Symptom: Same row appears multiple times.
Solutions:
- Enable message deduplication: MessageDeduplicator
- Implement idempotent sinks: Writing same row is safe
- Check exactly-once configuration
Memory Leak in Session Windows¶
Symptom: Memory grows unbounded with session window.
Solutions:
let window = SessionWindow::new(Duration::from_secs(1800))
.with_max_sessions(10_000) // Limit concurrent
.with_cleanup_interval(Duration::from_secs(60)); // Periodic cleanup
Watermark Stuck¶
Symptom: Windows don't complete, data piles up in memory.
Diagnosis:
let current_wm = wm_gen.current_watermark();
let max_ts = wm_gen.max_timestamp();
println!("Watermark: {}, Max Event: {}", current_wm, max_ts);
Solutions:
- Check for idle sources (no events for long time)
- Lower allowed_lateness—enables faster window closure
- Monitor watermark_interval configuration
Join Explosion (Many Cartesian Products)¶
Symptom: Output rows >> input rows, memory bloats.
Cause: Stream-stream join matches all events in window.
Solutions:
// Add explicit join keys
let joiner = StreamStreamJoin::new(Duration::from_secs(10))
.with_left_key("user_id".to_string())
.with_right_key("user_id".to_string());
// Narrow window
StreamStreamJoin::new(Duration::from_secs(1)) // Shorter window
Backpressure Causing Delays¶
Symptom: Consistent 1-2 second latency even with fast sink.
Solutions:
// Increase buffer
let controller = BackpressureController::new(
BackpressureStrategy::Block,
50_000, // Larger buffer
);
// Or use adaptive
let adaptive = AdaptiveBackpressureController::new(10_000, 100_000);
Related Documentation¶
- Continuous Queries Specification
- Stream Processing Performance Tuning
- Kafka Integration Guide
- Time-Windowed Analytics Tutorial
- Exactly-Once Semantics Implementation
- Backpressure & Flow Control Design
Summary¶
HeliosDB's streaming analytics capabilities enable:
- Real-time processing with sub-second latency
- Event-time semantics for accurate time-windowed analytics
- Multiple join types (stream-stream, stream-table)
- Complex event processing for pattern detection
- Exactly-once guarantees with transactional semantics
- Flow control with adaptive backpressure
- Flexible integration (Kafka, databases, REST APIs)
- Production-grade monitoring and observability
Start with simple tumbling windows and aggregations, gradually add joins and CEP patterns as your needs evolve. Monitor closely during the first deployments to tune window sizes, batch parameters, and parallelism for your workload.