Skip to content

HeliosDB Streaming - Usage Examples

Practical examples for common streaming use cases


Table of Contents

  1. Real-Time Analytics Dashboard
  2. User Session Analysis
  3. Fraud Detection
  4. IoT Sensor Monitoring
  5. E-Commerce Conversion Funnel
  6. Financial Trading Alerts
  7. Log Aggregation
  8. Anomaly Detection

Real-Time Analytics Dashboard

Use Case: Real-time metrics for application monitoring.

use heliosdb_streaming::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    // Create stream from logs
    let stream = Stream::new(StreamConfig {
        name: "app_metrics".to_string(),
        buffer_size: 10000,
        ..Default::default()
    });

    // Apply 1-minute tumbling window
    let window = TumblingWindow::new(Duration::from_secs(60));
    let windowed = window.apply(stream).await?;

    // Process windows
    tokio::spawn(async move {
        while let Some(event) = windowed.recv().await {
            if let StreamEvent::Data(window_result) = event {
                let rows = &window_result.rows;

                // Calculate metrics
                let total_requests: usize = rows.len();
                let errors = rows.iter()
                    .filter(|r| r.get("status").and_then(|v| v.as_integer()) == Some(500))
                    .count();

                let avg_latency: f64 = rows.iter()
                    .filter_map(|r| r.get("latency_ms").and_then(|v| v.as_float()))
                    .sum::<f64>() / total_requests as f64;

                println!("Window [{} - {}]:",
                    window_result.window.start,
                    window_result.window.end
                );
                println!("  Total Requests: {}", total_requests);
                println!("  Errors: {} ({:.2}%)", errors,
                    (errors as f64 / total_requests as f64) * 100.0);
                println!("  Avg Latency: {:.2}ms", avg_latency);
            }
        }
    });

    Ok(())
}

User Session Analysis

Use Case: Track user sessions with session windows.

use heliosdb_streaming::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    // Create stream
    let stream = Stream::new(StreamConfig::default());

    // Session window: 30-minute inactivity gap
    let window = SessionWindow::new(Duration::from_secs(1800));
    let sessions = window.apply(stream).await?;

    // Analyze sessions
    tokio::spawn(async move {
        while let Some(event) = sessions.recv().await {
            if let StreamEvent::Data(session) = event {
                let user_id = session.rows[0]
                    .get("user_id")
                    .and_then(|v| v.as_string())
                    .unwrap();

                let page_views = session.rows.len();
                let duration = (session.window.end - session.window.start)
                    .num_seconds();

                let pages_visited: std::collections::HashSet<_> = session.rows
                    .iter()
                    .filter_map(|r| r.get("page").and_then(|v| v.as_string()))
                    .collect();

                println!("User {} Session:", user_id);
                println!("  Duration: {} minutes", duration / 60);
                println!("  Page Views: {}", page_views);
                println!("  Unique Pages: {}", pages_visited.len());
            }
        }
    });

    Ok(())
}

Fraud Detection

Use Case: Detect suspicious transaction patterns.

use heliosdb_streaming::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let stream = Stream::new(StreamConfig::default());

    // Pattern: Multiple high-value transactions in short time
    let pattern = PatternBuilder::new("fraud_pattern".to_string())
        .define_variable(
            "HighValue".to_string(),
            PatternCondition::GreaterThan {
                field: "amount".to_string(),
                value: 1000.0,
            },
        )
        .sequence(PatternSequence::Exactly {
            var: "HighValue".to_string(),
            count: 3,
        })
        .within(Duration::from_secs(300)) // Within 5 minutes
        .measure(
            "total_amount".to_string(),
            MeasureExpression::Aggregate {
                var: "HighValue".to_string(),
                field: "amount".to_string(),
                func: "SUM".to_string(),
            },
        )
        .measure(
            "transaction_count".to_string(),
            MeasureExpression::Count {
                var: "HighValue".to_string(),
            },
        )
        .build()?;

    let matcher = PatternMatcher::new(pattern);
    let alerts = matcher.apply(stream).await?;

    // Handle fraud alerts
    tokio::spawn(async move {
        while let Some(event) = alerts.recv().await {
            if let StreamEvent::Data(alert) = event {
                let user = alert.get("user_id").unwrap();
                let total = alert.get("total_amount").unwrap();
                let count = alert.get("transaction_count").unwrap();

                println!("âš  FRAUD ALERT:");
                println!("  User: {}", user);
                println!("  {} transactions totaling {}", count, total);
                println!("  Action: Account flagged for review");

                // Trigger alert system
                // send_alert_to_ops(user, total, count);
            }
        }
    });

    Ok(())
}

IoT Sensor Monitoring

Use Case: Monitor IoT sensors for anomalies.

use heliosdb_streaming::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let stream = Stream::new(StreamConfig::default());

    // Sliding window: 5-minute window, 1-minute slide
    let window = SlidingWindow::new(
        Duration::from_secs(300),
        Duration::from_secs(60)
    );
    let windowed = window.apply(stream).await?;

    // Detect anomalies
    tokio::spawn(async move {
        while let Some(event) = windowed.recv().await {
            if let StreamEvent::Data(window_result) = event {
                // Calculate statistics
                let temperatures: Vec<f64> = window_result.rows
                    .iter()
                    .filter_map(|r| r.get("temperature").and_then(|v| v.as_float()))
                    .collect();

                if temperatures.is_empty() {
                    continue;
                }

                let mean = temperatures.iter().sum::<f64>() / temperatures.len() as f64;
                let variance = temperatures.iter()
                    .map(|t| (t - mean).powi(2))
                    .sum::<f64>() / temperatures.len() as f64;
                let std_dev = variance.sqrt();

                // Check for anomalies (3-sigma rule)
                for temp in &temperatures {
                    if (temp - mean).abs() > 3.0 * std_dev {
                        println!("🚨 ANOMALY DETECTED:");
                        println!("  Temperature: {:.2}°C", temp);
                        println!("  Mean: {:.2}°C, Std Dev: {:.2}", mean, std_dev);
                        println!("  Deviation: {:.2} sigma", (temp - mean).abs() / std_dev);
                    }
                }
            }
        }
    });

    Ok(())
}

E-Commerce Conversion Funnel

Use Case: Track user journey from view to purchase.

use heliosdb_streaming::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let stream = Stream::new(StreamConfig::default());

    // Pattern: View -> Add to Cart -> Purchase
    let pattern = PatternBuilder::new("conversion_funnel".to_string())
        .define_variable(
            "View".to_string(),
            PatternCondition::Equals {
                field: "event_type".to_string(),
                value: Value::String("product_view".to_string()),
            },
        )
        .define_variable(
            "AddToCart".to_string(),
            PatternCondition::Equals {
                field: "event_type".to_string(),
                value: Value::String("add_to_cart".to_string()),
            },
        )
        .define_variable(
            "Purchase".to_string(),
            PatternCondition::Equals {
                field: "event_type".to_string(),
                value: Value::String("purchase".to_string()),
            },
        )
        .sequence(PatternSequence::Sequence {
            patterns: vec![
                PatternSequence::Single { var: "View".to_string() },
                PatternSequence::Single { var: "AddToCart".to_string() },
                PatternSequence::Single { var: "Purchase".to_string() },
            ],
        })
        .within(Duration::from_secs(7200)) // 2-hour window
        .measure(
            "product_id".to_string(),
            MeasureExpression::First {
                var: "View".to_string(),
                field: "product_id".to_string(),
            },
        )
        .measure(
            "revenue".to_string(),
            MeasureExpression::Last {
                var: "Purchase".to_string(),
                field: "amount".to_string(),
            },
        )
        .build()?;

    let matcher = PatternMatcher::new(pattern);
    let conversions = matcher.apply(stream).await?;

    // Track conversions
    tokio::spawn(async move {
        let mut total_revenue = 0.0;
        let mut conversion_count = 0;

        while let Some(event) = conversions.recv().await {
            if let StreamEvent::Data(conversion) = event {
                conversion_count += 1;
                let revenue = conversion.get("revenue")
                    .and_then(|v| v.as_float())
                    .unwrap_or(0.0);
                total_revenue += revenue;

                println!(" Conversion #{}", conversion_count);
                println!("  Product: {}", conversion.get("product_id").unwrap());
                println!("  Revenue: ${:.2}", revenue);
                println!("  Total Revenue: ${:.2}", total_revenue);
            }
        }
    });

    Ok(())
}

Financial Trading Alerts

Use Case: Alert on rapid price movements.

use heliosdb_streaming::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let stream = Stream::new(StreamConfig::default());

    // Pattern: Price spike (3+ consecutive increases)
    let pattern = PatternBuilder::new("price_spike".to_string())
        .define_variable(
            "Increase".to_string(),
            PatternCondition::GreaterThan {
                field: "price_change_pct".to_string(),
                value: 2.0, // > 2% increase
            },
        )
        .sequence(PatternSequence::Exactly {
            var: "Increase".to_string(),
            count: 3,
        })
        .within(Duration::from_secs(60)) // Within 1 minute
        .measure(
            "start_price".to_string(),
            MeasureExpression::First {
                var: "Increase".to_string(),
                field: "price".to_string(),
            },
        )
        .measure(
            "end_price".to_string(),
            MeasureExpression::Last {
                var: "Increase".to_string(),
                field: "price".to_string(),
            },
        )
        .build()?;

    let matcher = PatternMatcher::new(pattern);
    let spikes = matcher.apply(stream).await?;

    // Handle alerts
    tokio::spawn(async move {
        while let Some(event) = spikes.recv().await {
            if let StreamEvent::Data(spike) = event {
                let symbol = spike.get("symbol").unwrap();
                let start = spike.get("start_price").and_then(|v| v.as_float()).unwrap();
                let end = spike.get("end_price").and_then(|v| v.as_float()).unwrap();
                let change_pct = ((end - start) / start) * 100.0;

                println!(" PRICE SPIKE DETECTED:");
                println!("  Symbol: {}", symbol);
                println!("  Price: ${:.2} -> ${:.2}", start, end);
                println!("  Change: {:.2}%", change_pct);
                println!("  Time: < 1 minute");
            }
        }
    });

    Ok(())
}

Log Aggregation

Use Case: Aggregate logs with SQL streaming.

use heliosdb_streaming::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    // Create SQL table environment
    let config = TableConfig {
        idle_state_retention: Duration::from_secs(3600),
        max_parallelism: 64,
        event_time_characteristic: TimeCharacteristic::EventTime,
    };

    let env = StreamTableEnvironment::new(config);

    // Create stream
    let stream = Stream::new(StreamConfig::default());

    // Define schema
    let schema = TableSchema {
        columns: vec![
            ColumnDef {
                name: "timestamp".to_string(),
                data_type: DataType::Timestamp,
                nullable: false,
            },
            ColumnDef {
                name: "level".to_string(),
                data_type: DataType::String,
                nullable: false,
            },
            ColumnDef {
                name: "service".to_string(),
                data_type: DataType::String,
                nullable: false,
            },
            ColumnDef {
                name: "message".to_string(),
                data_type: DataType::String,
                nullable: false,
            },
        ],
        primary_key: None,
        watermark_strategy: Some(WatermarkStrategy::BoundedOutOfOrderness {
            max_out_of_orderness: Duration::from_secs(5),
        }),
    };

    // Register stream as table
    env.register_stream("logs", stream, schema);

    // Query for errors
    let errors = env.sql_query(
        "SELECT service, COUNT(*) as error_count \
         FROM logs \
         WHERE level = 'ERROR' \
         GROUP BY service"
    ).await?;

    // Process results
    tokio::spawn(async move {
        while let Some(event) = errors.recv().await {
            if let StreamEvent::Data(row) = event {
                println!("Service: {}, Errors: {}",
                    row.get("service").unwrap(),
                    row.get("error_count").unwrap()
                );
            }
        }
    });

    Ok(())
}

Anomaly Detection

Use Case: Detect anomalies with stream-stream join and pattern matching.

use heliosdb_streaming::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    // Two streams: current metrics and historical baseline
    let current_stream = Stream::new(StreamConfig::default());
    let baseline_stream = Stream::new(StreamConfig::default());

    // Join current with baseline
    let join = StreamStreamJoin::new(
        Duration::from_secs(60),
        JoinType::Inner,
        "metric_id",
        "metric_id"
    );

    let joined = join.apply(current_stream, baseline_stream).await?;

    // Detect anomalies where current > 3x baseline
    let pattern = PatternBuilder::new("anomaly".to_string())
        .define_variable(
            "Spike".to_string(),
            PatternCondition::And {
                conditions: vec![
                    PatternCondition::GreaterThan {
                        field: "current_value".to_string(),
                        value: 0.0,
                    },
                    // Custom condition: current > 3 * baseline
                    // (simplified - would need custom condition type)
                ],
            },
        )
        .sequence(PatternSequence::Single {
            var: "Spike".to_string(),
        })
        .build()?;

    let matcher = PatternMatcher::new(pattern);
    let anomalies = matcher.apply(joined).await?;

    // Handle anomalies
    tokio::spawn(async move {
        while let Some(event) = anomalies.recv().await {
            if let StreamEvent::Data(anomaly) = event {
                println!("🔴 ANOMALY:");
                println!("  Metric: {}", anomaly.get("metric_id").unwrap());
                println!("  Current: {}", anomaly.get("current_value").unwrap());
                println!("  Baseline: {}", anomaly.get("baseline_value").unwrap());
            }
        }
    });

    Ok(())
}

Production Deployment Tips

1. Error Handling

use heliosdb_streaming::*;

async fn robust_pipeline() -> Result<()> {
    let stream = Stream::new(StreamConfig::default());

    // Configure restart strategy
    let config = JobConfig {
        restart_strategy: RestartStrategy::FixedDelay {
            max_attempts: 3,
            delay: Duration::from_secs(10),
        },
        ..Default::default()
    };

    // Proper error handling
    match process_stream(stream).await {
        Ok(_) => println!("Pipeline completed successfully"),
        Err(e) => {
            eprintln!("Pipeline error: {}", e);
            // Trigger alerting
            // Fallback to backup pipeline
        }
    }

    Ok(())
}

2. Monitoring

use heliosdb_streaming::*;

async fn monitored_pipeline() -> Result<()> {
    let job_manager = JobManager::new(JobManagerConfig::default())?;

    // Submit with monitoring
    let job_id = job_manager.submit_job(config, graph).await?;

    // Periodic health checks
    tokio::spawn(async move {
        loop {
            tokio::time::sleep(Duration::from_secs(30)).await;

            let metrics = job_manager.get_metrics(job_id).await.unwrap();
            println!("Job {}: {} records/sec, {:.2}ms latency",
                job_id,
                metrics.throughput,
                metrics.avg_latency_ms
            );

            // Alert if degraded
            if metrics.avg_latency_ms > 100.0 {
                println!("âš  High latency detected!");
            }
        }
    });

    Ok(())
}

3. Checkpointing

use heliosdb_streaming::*;

async fn fault_tolerant_pipeline() -> Result<()> {
    // Enable checkpointing
    let backend = StateBackend::new(StateBackendConfig {
        storage_type: StorageType::RocksDB,
        checkpoint_dir: "/var/heliosdb/checkpoints".to_string(),
        enable_encryption: true,
        ..Default::default()
    })?;

    let coordinator = CheckpointCoordinator::new(
        backend,
        Duration::from_secs(60) // Checkpoint every minute
    )?;

    // Checkpoint on shutdown
    tokio::signal::ctrl_c().await?;
    println!("Shutting down, creating savepoint...");
    let savepoint = coordinator.create_savepoint().await?;
    println!("Savepoint created: {}", savepoint);

    Ok(())
}

Performance Optimization

1. Buffer Tuning

// High throughput: larger buffers
let stream = Stream::new(StreamConfig {
    buffer_size: 10000,
    ..Default::default()
});

// Low latency: smaller buffers
let stream = Stream::new(StreamConfig {
    buffer_size: 100,
    ..Default::default()
});

2. Parallelism

let config = JobConfig {
    parallelism: num_cpus::get() * 2,
    ..Default::default()
};

3. Batch Processing

let sink = DatabaseSink::new(DatabaseConfig {
    batch_size: 5000, // Batch writes for efficiency
    batch_timeout: Duration::from_millis(100),
    ..Default::default()
})?;

More examples available in test files: /heliosdb-streaming/tests/

API Documentation: HELIOSDB_STREAMING_API.md

Generated: October 28, 2025