Skip to content

Streaming & Real-Time Analytics User Guide

Table of Contents

  1. Overview
  2. Stream Processing Fundamentals
  3. Setting Up Streaming
  4. Continuous Queries
  5. Stream-Stream Joins
  6. Stream-Table Joins
  7. Aggregations & Complex Event Processing
  8. Exactly-Once Semantics
  9. Backpressure & Flow Control
  10. Integration Patterns
  11. Watermarks & Late Data
  12. State Management
  13. Performance Tuning
  14. Monitoring Streams
  15. Real-World Examples
  16. Troubleshooting

Overview

HeliosDB's streaming and real-time analytics capabilities enable you to process infinite streams of data with sub-second latency. Unlike batch systems that process data in discrete chunks, HeliosDB streaming processes events as they arrive, maintaining state across distributed operators.

Key Concepts

Event Time vs. Processing Time: Event time is when an event occurred in the real world, while processing time is when the stream processor sees it. HeliosDB supports both semantics, with event-time being the default for accurate time-windowed analytics.

Continuous Queries: Unlike traditional SQL queries that terminate, continuous queries run indefinitely, emitting results as new data arrives. This is essential for real-time dashboards and monitoring.

Windowing: Data arrives as an unbounded stream, so we aggregate using windows—fixed time intervals (tumbling windows), overlapping periods (sliding windows), or dynamic gaps (session windows).

Joins: HeliosDB enables three join patterns: stream-stream (correlating events from two streams), stream-table (enriching stream events with reference data), and stream-sink (persisting results).

Exactly-Once Semantics: Critical for applications requiring zero data loss and no duplicates. HeliosDB uses two-phase commit protocols with deduplication to guarantee exactly-once processing.

Use Cases

  • Real-time Event Analytics: Track user behavior, convert pageviews to revenue metrics in real-time
  • IoT & Sensor Monitoring: Ingest millions of sensor readings, detect anomalies instantly
  • Financial Transactions: Pattern detection for fraud, real-time risk scoring
  • Network Traffic Analysis: Detect DDoS attacks, analyze flow patterns
  • Application Metrics: Monitor API performance, request latencies, error rates
  • Anomaly Detection: Stream machine learning models with continuous predictions

Stream Processing Fundamentals

Time Semantics

HeliosDB supports two time semantics configured at stream creation:

// Event-time semantics (recommended for time-windowed analytics)
let config = StreamConfig {
    name: "events".to_string(),
    time_semantics: TimeSemantics::EventTime,
    allowed_lateness: Duration::from_secs(60),
    watermark_interval: Duration::from_secs(1),
};
let stream = Stream::new(config);

// Processing-time semantics (simpler, no late data handling)
let config = StreamConfig {
    name: "metrics".to_string(),
    time_semantics: TimeSemantics::ProcessingTime,
    allowed_lateness: Duration::from_secs(0),
    watermark_interval: Duration::from_secs(1),
};

Event-time processing gives you accurate results regardless of delay, but requires watermarks to determine window completion. Processing-time processing is simpler but results depend on execution speed, not event semantics.

Stream Data Model

Data flows as rows with typed fields:

use heliosdb_streaming::{Row, Value};
use std::collections::HashMap;
use chrono::Utc;

// Create a row with typed fields
let mut fields = HashMap::new();
fields.insert("user_id".to_string(), Value::Integer(12345));
fields.insert("event_type".to_string(), Value::String("purchase".to_string()));
fields.insert("amount".to_string(), Value::Float(99.99));
fields.insert("timestamp".to_string(), Value::Timestamp(Utc::now()));

let row = Row::new(fields)
    .with_event_time(Utc::now());  // Set event timestamp

// Access values
if let Some(Value::Integer(user_id)) = row.get("user_id") {
    println!("User: {}", user_id);
}

Values support: Null, Boolean, Integer, Float, String, Bytes, Timestamp, Array, and Object types.

Window Types

Tumbling Windows (fixed, non-overlapping): - Best for: Hourly/daily aggregations, periodic summaries - Example: 1-minute windows for request count tracking

Sliding Windows (overlapping): - Best for: Detecting trends, moving averages - Example: 1-minute window slides every 10 seconds

Session Windows (dynamic gaps): - Best for: User sessions, activity bursts - Example: 30-minute gap between events marks session end

Count Windows (event-count based): - Best for: Micro-batching, load balancing - Example: Emit results after every 1000 events


Setting Up Streaming

Initialize Streaming Engine

use heliosdb_streaming::StreamingEngine;

#[tokio::main]
async fn main() -> Result<()> {
    // Create the streaming engine
    let engine = StreamingEngine::new().await?;

    // Create streams and queries
    // ... your streaming logic ...

    // Shutdown gracefully
    engine.shutdown().await?;
    Ok(())
}

Create a Stream

use heliosdb_streaming::{Stream, StreamConfig, TimeSemantics};
use std::time::Duration;

// Create an input stream
let config = StreamConfig {
    name: "api_requests".to_string(),
    time_semantics: TimeSemantics::EventTime,
    allowed_lateness: Duration::from_secs(10),
    watermark_interval: Duration::from_secs(1),
};

let stream = Stream::new(config)
    .with_schema(vec![
        "user_id".to_string(),
        "endpoint".to_string(),
        "response_time".to_string(),
        "status".to_string(),
    ]);

Define Stream Schemas

Schemas are metadata describing stream fields. While HeliosDB is schema-flexible, defining schemas enables validation:

// Schema definition (for documentation and validation)
let schema = vec![
    "request_id".to_string(),      // String
    "timestamp".to_string(),        // Timestamp
    "method".to_string(),          // String (GET, POST, etc)
    "path".to_string(),            // String
    "status_code".to_string(),     // Integer
    "response_ms".to_string(),     // Integer
    "user_id".to_string(),         // Integer
];

let stream = Stream::new(config).with_schema(schema);

Continuous Queries

Basic Continuous Query

A continuous query runs indefinitely, emitting results as new data arrives:

use heliosdb_streaming::{ContinuousQuery, Stream, Row, Value};
use std::collections::HashMap;

// Create output sink
let sink = Arc::new(CallbackSink::new(|rows| {
    for row in rows {
        println!("Result: {:?}", row.fields());
    }
}));

// Create continuous query
let query = ContinuousQuery::new(
    "my_query".to_string(),
    stream.clone(),
)
.with_sink(sink);

// Execute
let handle = engine.create_continuous_query(query).await?;

// Query is now running continuously...

// Stop when done
engine.stop_continuous_query("my_query").await?;

Query Sinks

Continuous queries emit results to sinks:

Table Sink - Writes to a database table:

use heliosdb_streaming::TableSink;

let sink = Arc::new(TableSink::new("realtime_metrics".to_string()));

Callback Sink - Calls a Rust function:

use heliosdb_streaming::CallbackSink;

let sink = Arc::new(CallbackSink::new(|rows| {
    for row in rows {
        // Handle result rows (update dashboard, send alert, etc)
        println!("Alert: {:?}", row);
    }
}));

Memory Sink - Stores in memory (testing):

use heliosdb_streaming::MemorySink;

let sink = Arc::new(MemorySink::new());
let results = sink.get_results();


Stream-Stream Joins

Join two streams to correlate events. Requires a time window to match events:

use heliosdb_streaming::StreamStreamJoin;
use std::time::Duration;

// Create second stream
let clicks = Stream::new(StreamConfig {
    name: "clicks".to_string(),
    ..Default::default()
});

let impressions = Stream::new(StreamConfig {
    name: "impressions".to_string(),
    ..Default::default()
});

// Join with 5-second window
let joiner = StreamStreamJoin::new(Duration::from_secs(5));
let joined = joiner.join(impressions, clicks).await?;

// Joined stream emits events where impression and click occur within 5 seconds

Stream-Stream Join Patterns

Inner Join - Only matching pairs:

// Both streams have event within window - emits combined row
// Example: Match ad impression with subsequent click

Left Outer Join - All left events, matching right when available:

// Impression with click = emits combined
// Impression without click = emits impression row with null click fields

Full Outer Join - All events from both streams:

// Click without impression = emits click row with null impression fields

Join Key Management

Specify how to match events (usually a user_id or correlation_id):

// Join on user_id field
let joiner = StreamStreamJoin::new(Duration::from_secs(10))
    .with_left_key("user_id".to_string())
    .with_right_key("user_id".to_string());

Without explicit keys, all events in the window are considered for joining.


Stream-Table Joins

Enrich stream events with reference data from a static table:

use heliosdb_streaming::StreamTableJoin;

// Load reference data
let products = vec![
    Row::new(maplit::hashmap!{
        "product_id".to_string() => Value::Integer(1),
        "name".to_string() => Value::String("Widget".to_string()),
        "category".to_string() => Value::String("Hardware".to_string()),
        "price".to_string() => Value::Float(19.99),
    }),
    // ... more products
];

// Create join
let joiner = StreamTableJoin::new(
    "products".to_string(),  // Table identifier
    "product_id".to_string()  // Join key
)
.with_join_type(JoinType::LeftOuter);

// Load data into cache
joiner.load_table(products)?;

// Join stream with table
let enriched = joiner.join(purchase_stream).await?;

Join Types for Stream-Table

Type Behavior Use Case
Inner Only emit if product found Filtered pipeline
Left Outer Emit all purchases, null product fields if not found Catch missing products
Right Outer Not typical for stream-table N/A

Table Update Patterns

For frequently-changing reference data:

// Periodic reload
let joiner = Arc::new(joiner);
tokio::spawn({
    let joiner = joiner.clone();
    async move {
        loop {
            tokio::time::sleep(Duration::from_secs(3600)).await;

            // Reload products from database
            let products = fetch_products_from_db().await;
            joiner.load_table(products).ok();
        }
    }
});

Aggregations & Complex Event Processing

Windowed Aggregations

Aggregate stream data within windows:

use heliosdb_streaming::{WindowedAggregation, WindowType, AggregateFunction};
use std::time::Duration;

let aggregation = WindowedAggregation::new(
    WindowType::Tumbling {
        size: Duration::from_secs(60),
    },
    vec![
        AggregateFunction::Count,
        AggregateFunction::Sum {
            field: "amount".to_string(),
        },
        AggregateFunction::Avg {
            field: "response_time".to_string(),
        },
        AggregateFunction::Min {
            field: "response_time".to_string(),
        },
        AggregateFunction::Max {
            field: "response_time".to_string(),
        },
    ],
    stream.clone(),
)
.with_group_by(vec!["status".to_string(), "endpoint".to_string()]);

let results = aggregation.execute().await?;

Supported Aggregations

  • Count: Number of events
  • Sum: Total of numeric field
  • Avg: Average of numeric field
  • Min: Minimum value
  • Max: Maximum value
  • Distinct: Unique values
  • List: Collect all values
  • ApproxDistinct: HyperLogLog for cardinality

Complex Event Processing (CEP)

Detect patterns in event sequences using MATCH_RECOGNIZE:

use heliosdb_streaming::{Pattern, PatternVariable, PatternCondition, PatternSequence, Measure};

// Define pattern: Detect fraud (3+ failed logins in 5 minutes)
let pattern = Pattern {
    name: "fraud_pattern".to_string(),
    variables: vec![
        PatternVariable {
            name: "failed".to_string(),
            condition: PatternCondition::Equals {
                field: "result".to_string(),
                value: Value::String("failed".to_string()),
            },
        },
    ],
    sequence: PatternSequence::Sequence {
        patterns: vec![
            PatternSequence::Single { var: "failed".to_string() },
            PatternSequence::Single { var: "failed".to_string() },
            PatternSequence::Single { var: "failed".to_string() },
        ],
    },
    within: Some(Duration::from_secs(300)),
    measures: vec![
        Measure {
            name: "login_attempts".to_string(),
            expression: MeasureExpression::Count,
        },
    ],
};

let matcher = PatternMatcher::new(pattern);
let matches = matcher.match_stream(login_stream).await?;

CEP Pattern Syntax

Patterns follow SQL's MATCH_RECOGNIZE:

PATTERN (A B+ C)  - A followed by one or more B, then C
PATTERN (A{3})    - Exactly 3 consecutive A
PATTERN (A? B)    - Optional A, then B
PATTERN (A*)      - Zero or more A

Variables bind events matching conditions, measures compute aggregate values across matched events.


Exactly-Once Semantics

Critical guarantee: each event is processed exactly once, no data loss, no duplicates.

Enable Exactly-Once Processing

use heliosdb_streaming::{TransactionCoordinator, IdempotentProducer, MessageDeduplicator};

// Create coordinator for transactions
let coordinator = TransactionCoordinator::new(Duration::from_secs(30))?;

// Create deduplicator for idempotent writes
let dedup = MessageDeduplicator::new(10_000); // 10K message capacity

// When producing results, use deduplication
let msg_id = uuid::Uuid::new_v4();
if !dedup.is_duplicate(&msg_id)? {
    sink.emit(result_rows).await?;
    dedup.mark_processed(&msg_id)?;
}

Two-Phase Commit Protocol

Ensures atomicity across multiple systems:

// Phase 1: Prepare (all participants acknowledge)
let txn = coordinator.begin_transaction().await?;
txn.add_participant("kafka_producer".to_string());
txn.add_participant("postgres_sink".to_string());

// Process events
for event in &events {
    process_event(event)?;
}

// Phase 2: Commit
coordinator.commit(txn.id).await?;
// If any participant fails, automatic rollback

Offset Management

Track positions in source streams to enable recovery:

use heliosdb_streaming::{OffsetManager, Checkpoint};

let offset_mgr = OffsetManager::new()?;

// Save offset after processing
let checkpoint = Checkpoint {
    stream_id: "kafka-topic".to_string(),
    partition: 0,
    offset: 12345,
    timestamp: Utc::now(),
};

offset_mgr.save_checkpoint(&checkpoint).await?;

// On recovery, resume from saved offset
let last = offset_mgr.get_checkpoint("kafka-topic", 0).await?;
kafka_source.seek_to_offset(last.offset).await?;

Failure Recovery

Recovery guarantees:

  1. Duplicate Detection: Message IDs prevent re-processing after crash
  2. Idempotent Operations: Writing same message twice produces same result
  3. Offset Tracking: Know exactly where to resume
// Process with idempotency
let msg_id = event.id.clone();

// Checkpoint before processing
offset_mgr.save_offset(&msg_id).await?;

// Process with deduplication
if !dedup.seen(&msg_id)? {
    write_to_sink(&event).await?;
}

Backpressure & Flow Control

Backpressure Strategies

Handle cases where downstream can't keep up:

use heliosdb_streaming::{BackpressureController, BackpressureStrategy};

let controller = BackpressureController::new(
    BackpressureStrategy::Block,  // Wait for space
    10_000,  // Max buffer size
);

// Or drop old data
let controller = BackpressureController::new(
    BackpressureStrategy::DropOldest,
    10_000,
);

Strategies: - Block: Pause source until buffer drains (no data loss, latency increases) - DropOldest: Remove oldest events when full (data loss, latency stable) - DropNewest: Drop incoming events when full (prevent memory explosion) - Signal: Track backpressure but don't drop (for monitoring)

Flow Control in Practice

// Acquire space before accepting event
let decision = controller.acquire().await;

match decision {
    BackpressureDecision::Accept => {
        // Process event
        process(event).await?;
    }
    BackpressureDecision::Reject => {
        // Drop or retry
        eprintln!("Dropping event due to backpressure");
    }
}

// Release space when done
controller.release();

Adaptive Backpressure

Dynamically adjust based on downstream lag:

use heliosdb_streaming::AdaptiveBackpressureController;

let adaptive = AdaptiveBackpressureController::new(
    10_000,  // Initial buffer
    20_000,  // Max buffer
);

// System automatically adjusts buffer based on lag
let decision = adaptive.acquire().await;

Integration Patterns

Kafka Integration

Consume from Kafka topics:

use heliosdb_streaming::{KafkaSourceConnector, KafkaConfig};

let kafka_config = KafkaConfig {
    bootstrap_servers: vec!["localhost:9092".to_string()],
    topic: "user_events".to_string(),
    group_id: Some("heliosdb-consumer".to_string()),
    client_id: Some("processor-1".to_string()),
    auto_offset_reset: "latest".to_string(),
    enable_auto_commit: false,  // Manual commit for exactly-once
    max_poll_records: 500,
};

let source = KafkaSourceConnector::new(kafka_config);
let stream = source.start().await?;

// Process stream
// ... filtering, joining, aggregating ...

// Commit offset after successful processing
offset_mgr.commit_offset(&stream_id, &offset).await?;

Kafka Sink

Write results to Kafka:

use heliosdb_streaming::KafkaSinkConnector;

let sink_config = KafkaConfig {
    bootstrap_servers: vec!["localhost:9092".to_string()],
    topic: "alerts".to_string(),
    ..Default::default()
};

let sink = KafkaSinkConnector::new(sink_config);

// Write results
sink.write(result_rows).await?;
sink.flush().await?;

Database Integration

Write results to PostgreSQL/MySQL:

use heliosdb_streaming::{DatabaseSinkConnector, DatabaseType, WriteMode};

let db_sink = DatabaseSinkConnector::new(
    DatabaseSinkConfig {
        database_type: DatabaseType::PostgreSQL,
        host: "localhost".to_string(),
        port: 5432,
        database: "analytics".to_string(),
        table: "real_time_metrics".to_string(),
        write_mode: WriteMode::Upsert,
    }
);

db_sink.write(result_rows).await?;

REST API Integration

Expose streaming metrics via HTTP:

use axum::{Router, routing::get, Json};
use std::sync::Arc;

let metrics = Arc::new(sink.get_results());

let app = Router::new()
    .route("/metrics", get({
        let metrics = metrics.clone();
        move || {
            let m = metrics.clone();
            async move { Json(m.clone()) }
        }
    }));

axum::Server::bind(&"0.0.0.0:3000".parse()?)
    .serve(app.into_make_service())
    .await?;

Watermarks & Late Data

Watermark Strategy

Watermarks signal that no earlier events will arrive:

use heliosdb_streaming::{WatermarkGenerator, WatermarkStrategy};
use std::time::Duration;

// Bounded out-of-order: assume max 10-second delay
let generator = WatermarkGenerator::new(
    WatermarkStrategy::BoundedOutOfOrderness {
        max_out_of_order: Duration::from_secs(10),
    },
    Duration::from_secs(60),  // allowed_lateness
);

// Update watermark as events arrive
for event in events {
    if let Some(new_watermark) = generator.update(event.event_time) {
        println!("Watermark advanced to: {}", new_watermark);
        // This triggers window completion
    }
}

Handling Late Data

Data arriving after window close:

use heliosdb_streaming::{LateDataHandler, LateDataDecision};

let handler = LateDataHandler::new(Duration::from_secs(60));

// On late arrival
match handler.handle_late_event(&event)? {
    LateDataDecision::IncludeInWindow => {
        // Add to window, emit correction
        update_window(&event)?;
        sink.emit(correction_rows).await?;
    }
    LateDataDecision::Discard => {
        // Event is too late, drop it
    }
}

Allowed Lateness Configuration

let config = StreamConfig {
    name: "events".to_string(),
    time_semantics: TimeSemantics::EventTime,
    allowed_lateness: Duration::from_secs(60),  // Accept events up to 1 min late
    watermark_interval: Duration::from_secs(1),
};

Tradeoff: Larger allowed_lateness keeps windows open longer (more corrections possible, higher state cost).


State Management

Checkpoint Basics

Persist operator state for failure recovery:

use heliosdb_streaming::{CheckpointCoordinator, StateSnapshot};

let coordinator = CheckpointCoordinator::new()?;

// Trigger checkpoint periodically
tokio::spawn({
    let coordinator = coordinator.clone();
    async move {
        loop {
            tokio::time::sleep(Duration::from_secs(60)).await;

            let snapshot = coordinator.trigger_checkpoint().await?;
            println!("Checkpoint {}: {:?}", snapshot.id, snapshot.status);
        }
    }
});

State Backends

Store operator state locally or remotely:

In-Memory (fast, no persistence):

use heliosdb_streaming::InMemoryStateBackend;

let backend = InMemoryStateBackend::new();

File-based (persistent, local):

use heliosdb_streaming::FileStateBackend;

let backend = FileStateBackend::new("/data/checkpoints".to_string())?;

Encrypted (secure storage):

use heliosdb_streaming::EncryptedStateBackend;

let backend = EncryptedStateBackend::new(
    "/data/encrypted_state".to_string(),
    encryption_key.clone(),
)?;

Operator State

Manage mutable state within operators:

use heliosdb_streaming::OperatorState;

struct SessionAggregator {
    state: OperatorState,
}

impl SessionAggregator {
    fn process_event(&mut self, event: &Row) -> Result<()> {
        // Get value state
        let mut count = self.state.get_value("count")
            .and_then(|v| v.as_i64())
            .unwrap_or(0);

        count += 1;

        // Update state
        self.state.set_value("count".to_string(), Value::Integer(count))?;

        Ok(())
    }
}

Performance Tuning

Parallelism

Increase processing throughput with parallel tasks:

// Process multiple streams in parallel
let handles: Vec<_> = (0..4).map(|partition| {
    let stream = streams[partition].clone();
    tokio::spawn(async move {
        while let Some(event) = stream.recv().await {
            process_event(event).await.ok();
        }
    })
}).collect();

futures::future::join_all(handles).await;

Batching

Reduce overhead by processing events in batches:

use heliosdb_streaming::DynamicBatchSizer;

let batcher = DynamicBatchSizer::new(
    100,   // Min batch
    1_000, // Max batch
);

let mut batch = Vec::new();
while let Some(event) = stream.recv().await {
    batch.push(event);

    if batch.len() >= batcher.ideal_batch_size() {
        sink.write(batch.clone()).await?;
        batch.clear();
    }
}

State Size Management

Limit state growth in session windows:

// Session window with 30-minute timeout
let window = SessionWindow::new(Duration::from_secs(1800))
    .with_max_sessions(10_000);  // Limit concurrent sessions

Bloom Filter Optimization

Speed up stream-stream joins with Bloom filters:

use bloomfilter::Bloom;

let mut bloom = Bloom::new(100_000, 0.01);  // 100K elements, 1% FPR

// Add keys from first stream
for event in &stream1 {
    if let Some(Value::Integer(id)) = event.get("id") {
        bloom.set(&id.to_string());
    }
}

// Only look up in second stream if Bloom says "maybe present"
for event in &stream2 {
    if let Some(Value::Integer(id)) = event.get("id") {
        if bloom.check(&id.to_string()) {
            // Probable match, do actual lookup
        }
    }
}

Monitoring Streams

Metrics Collection

Track stream health:

use heliosdb_streaming::BackpressureMetrics;

let metrics = controller.metrics();
println!("Total records: {}", metrics.total_records);
println!("Dropped: {}", metrics.dropped_records);
println!("Backpressure events: {}", metrics.backpressure_events);
println!("Avg buffer: {:.2}", metrics.average_buffer_size);

Watermark Monitoring

let wm_gen = WatermarkGenerator::new(strategy, Duration::from_secs(60));
let current = wm_gen.current_watermark();
println!("Current watermark: {}", current);

Consumer Lag Monitoring

Track how far behind source you are:

use heliosdb_streaming::ConsumerLagMonitor;

let lag_monitor = ConsumerLagMonitor::new();

// Periodically check lag
tokio::spawn({
    let monitor = lag_monitor.clone();
    async move {
        loop {
            tokio::time::sleep(Duration::from_secs(10)).await;

            let lag = monitor.estimate_lag().await;
            if lag > Duration::from_secs(30) {
                eprintln!("WARNING: Consumer lag {} seconds", lag.as_secs());
            }
        }
    }
});

Query Status

// Check if continuous query is running
if let Some(handle) = engine.get_continuous_query("my_query") {
    let metrics = handle.metrics();
    println!("Processed: {} rows", metrics.total_rows);
    println!("Errors: {}", metrics.error_count);
}

Real-World Examples

Example 1: Real-Time E-Commerce Analytics

Track conversion funnel in real-time—detect drop-offs immediately:

#[tokio::main]
async fn main() -> Result<()> {
    let engine = StreamingEngine::new().await?;

    // Create streams for each funnel step
    let views = Stream::new(StreamConfig {
        name: "product_views".to_string(),
        ..Default::default()
    });

    let carts = Stream::new(StreamConfig {
        name: "add_to_cart".to_string(),
        ..Default::default()
    });

    let purchases = Stream::new(StreamConfig {
        name: "purchases".to_string(),
        ..Default::default()
    });

    // Join views → cart adds (30-min window)
    let view_to_cart = engine.stream_stream_join(
        views.clone(),
        carts.clone(),
        Duration::from_secs(1800),
    ).await?;

    // Join cart → purchase (24-hour window)
    let view_to_purchase = engine.stream_stream_join(
        view_to_cart,
        purchases.clone(),
        Duration::from_secs(86400),
    ).await?;

    // Aggregate by product per hour
    let hourly_funnel = WindowedAggregation::new(
        WindowType::Tumbling {
            size: Duration::from_secs(3600),
        },
        vec![
            AggregateFunction::Count,
            AggregateFunction::Sum {
                field: "purchase_amount".to_string(),
            },
        ],
        view_to_purchase,
    )
    .with_group_by(vec!["product_id".to_string()])
    .execute()
    .await?;

    // Create query to emit results
    let sink = Arc::new(CallbackSink::new(|rows| {
        for row in rows {
            let product_id = row.get("product_id");
            let count = row.get("count");
            let revenue = row.get("sum_purchase_amount");
            println!("Funnel: product={:?}, count={:?}, revenue={:?}",
                     product_id, count, revenue);
        }
    }));

    let query = ContinuousQuery::new(
        "hourly_funnel".to_string(),
        hourly_funnel,
    ).with_sink(sink);

    engine.create_continuous_query(query).await?;

    // Run forever
    std::future::pending().await
}

Example 2: IoT Sensor Anomaly Detection

Monitor temperature/humidity sensors, detect anomalies:

#[tokio::main]
async fn main() -> Result<()> {
    let engine = StreamingEngine::new().await?;

    let sensor_stream = Stream::new(StreamConfig {
        name: "sensor_readings".to_string(),
        time_semantics: TimeSemantics::EventTime,
        ..Default::default()
    });

    // Detect: temperature > 100°F for 3+ consecutive readings
    let pattern = Pattern {
        name: "overheat".to_string(),
        variables: vec![
            PatternVariable {
                name: "high_temp".to_string(),
                condition: PatternCondition::GreaterThan {
                    field: "temperature_f".to_string(),
                    value: 100.0,
                },
            },
        ],
        sequence: PatternSequence::Sequence {
            patterns: vec![
                PatternSequence::Single { var: "high_temp".to_string() },
                PatternSequence::Single { var: "high_temp".to_string() },
                PatternSequence::Single { var: "high_temp".to_string() },
            ],
        },
        within: Some(Duration::from_secs(300)),  // 5-minute window
        measures: vec![
            Measure {
                name: "max_temp".to_string(),
                expression: MeasureExpression::Max {
                    field: "temperature_f".to_string(),
                },
            },
        ],
    };

    let matcher = PatternMatcher::new(pattern);
    let anomalies = matcher.match_stream(sensor_stream).await?;

    // Enrich with location data
    let locations = vec![
        // Load sensor locations from DB
    ];

    let enriched = StreamTableJoin::new(
        "sensor_locations".to_string(),
        "sensor_id".to_string(),
    )
    .with_join_type(JoinType::Inner);
    enriched.load_table(locations)?;

    let enriched_anomalies = enriched.join(anomalies).await?;

    // Alert via callback
    let sink = Arc::new(CallbackSink::new(|rows| {
        for row in rows {
            if let (Some(Value::String(location)), Some(Value::Float(temp))) =
                (row.get("location"), row.get("max_temp")) {
                eprintln!("ALERT: Overheat at {} - {:.1}°F", location, temp);
                // Send email, SMS, etc
            }
        }
    }));

    engine.create_continuous_query(
        ContinuousQuery::new("overheat_detector".to_string(), enriched_anomalies)
            .with_sink(sink)
    ).await?;

    std::future::pending().await
}

Example 3: Financial Transaction Fraud Detection

Detect payment fraud with sliding window heuristics:

#[tokio::main]
async fn main() -> Result<()> {
    let engine = StreamingEngine::new().await?;

    let transactions = Stream::new(StreamConfig {
        name: "transactions".to_string(),
        time_semantics: TimeSemantics::EventTime,
        allowed_lateness: Duration::from_secs(30),
        watermark_interval: Duration::from_secs(1),
    });

    // Detect: 3+ transactions from different locations in 10 minutes
    let pattern = Pattern {
        name: "velocity_fraud".to_string(),
        variables: vec![
            PatternVariable {
                name: "txn".to_string(),
                condition: PatternCondition::Always,
            },
        ],
        sequence: PatternSequence::Sequence {
            patterns: vec![
                PatternSequence::Single { var: "txn".to_string() },
                PatternSequence::Single { var: "txn".to_string() },
                PatternSequence::Single { var: "txn".to_string() },
            ],
        },
        within: Some(Duration::from_secs(600)),
        measures: vec![
            Measure {
                name: "location_count".to_string(),
                expression: MeasureExpression::Distinct {
                    field: "location".to_string(),
                },
            },
            Measure {
                name: "total_amount".to_string(),
                expression: MeasureExpression::Sum {
                    field: "amount".to_string(),
                },
            },
        ],
    };

    let matcher = PatternMatcher::new(pattern);
    let fraud_signals = matcher.match_stream(transactions.clone()).await?;

    // Enrich with customer info
    let customer_profiles = vec![
        // Load from DB
    ];

    let enriched = StreamTableJoin::new(
        "customers".to_string(),
        "customer_id".to_string(),
    );
    enriched.load_table(customer_profiles)?;

    let enriched_fraud = enriched.join(fraud_signals).await?;

    // Score and alert
    let sink = Arc::new(CallbackSink::new(|rows| {
        for row in rows {
            if let (Some(Value::Integer(customer_id)), Some(Value::Float(amount))) =
                (row.get("customer_id"), row.get("total_amount")) {
                let risk_score = calculate_risk_score(&row);

                if risk_score > 0.8 {
                    // Block transaction
                    println!("BLOCK: Customer {} suspicious ${:.2}", customer_id, amount);
                } else if risk_score > 0.5 {
                    // Flag for review
                    println!("FLAG: Customer {} suspicious ${:.2}", customer_id, amount);
                }
            }
        }
    }));

    engine.create_continuous_query(
        ContinuousQuery::new("fraud_detector".to_string(), enriched_fraud)
            .with_sink(sink)
    ).await?;

    std::future::pending().await
}

fn calculate_risk_score(row: &Row) -> f64 {
    // Implement scoring logic
    0.7
}

Troubleshooting

High Latency

Symptom: Results arrive slowly, watermarks lag behind real-time.

Diagnosis:

let lag_monitor = ConsumerLagMonitor::new();
let lag = lag_monitor.estimate_lag().await;
if lag > Duration::from_secs(30) {
    println!("Consumer lag: {:?}", lag);
}

Solutions: - Increase parallelism: Process partitions in parallel - Reduce batch size: Lower latency per batch - Check backpressure: Monitor BackpressureMetrics - Profile operators: Identify slow joins/aggregations

Data Loss

Symptom: Output row count doesn't match input.

Diagnosis: - Check backpressure strategy—DropOldest/DropNewest drop data - Verify sink is writing successfully: sink.flush().await? - Check offset commits: May have crashed before committing

Solutions:

// Use exactly-once with offset tracking
let coordinator = TransactionCoordinator::new(Duration::from_secs(30))?;

// Commit only after successful write
offset_mgr.save_checkpoint(&checkpoint).await?;
sink.write(rows).await?;
coordinator.commit(txn.id).await?;  // Atomic

Duplicate Results

Symptom: Same row appears multiple times.

Solutions: - Enable message deduplication: MessageDeduplicator - Implement idempotent sinks: Writing same row is safe - Check exactly-once configuration

Memory Leak in Session Windows

Symptom: Memory grows unbounded with session window.

Solutions:

let window = SessionWindow::new(Duration::from_secs(1800))
    .with_max_sessions(10_000)  // Limit concurrent
    .with_cleanup_interval(Duration::from_secs(60));  // Periodic cleanup

Watermark Stuck

Symptom: Windows don't complete, data piles up in memory.

Diagnosis:

let current_wm = wm_gen.current_watermark();
let max_ts = wm_gen.max_timestamp();
println!("Watermark: {}, Max Event: {}", current_wm, max_ts);

Solutions: - Check for idle sources (no events for long time) - Lower allowed_lateness—enables faster window closure - Monitor watermark_interval configuration

Join Explosion (Many Cartesian Products)

Symptom: Output rows >> input rows, memory bloats.

Cause: Stream-stream join matches all events in window.

Solutions:

// Add explicit join keys
let joiner = StreamStreamJoin::new(Duration::from_secs(10))
    .with_left_key("user_id".to_string())
    .with_right_key("user_id".to_string());

// Narrow window
StreamStreamJoin::new(Duration::from_secs(1))  // Shorter window

Backpressure Causing Delays

Symptom: Consistent 1-2 second latency even with fast sink.

Solutions:

// Increase buffer
let controller = BackpressureController::new(
    BackpressureStrategy::Block,
    50_000,  // Larger buffer
);

// Or use adaptive
let adaptive = AdaptiveBackpressureController::new(10_000, 100_000);



Summary

HeliosDB's streaming analytics capabilities enable:

  • Real-time processing with sub-second latency
  • Event-time semantics for accurate time-windowed analytics
  • Multiple join types (stream-stream, stream-table)
  • Complex event processing for pattern detection
  • Exactly-once guarantees with transactional semantics
  • Flow control with adaptive backpressure
  • Flexible integration (Kafka, databases, REST APIs)
  • Production-grade monitoring and observability

Start with simple tumbling windows and aggregations, gradually add joins and CEP patterns as your needs evolve. Monitor closely during the first deployments to tune window sizes, batch parameters, and parallelism for your workload.