Skip to content

CDC Webhooks User Guide

Version: v5.5 Last Updated: January 4, 2026


Table of Contents

  1. Introduction
  2. Getting Started
  3. Creating CDC Streams
  4. Event Filtering
  5. Event Transformation
  6. Delivery Configuration
  7. Circuit Breaker
  8. Exactly-Once Delivery
  9. Security
  10. Monitoring
  11. Best Practices
  12. API Reference

Introduction

HeliosDB's CDC-to-Webhook integration enables real-time streaming of database changes to external systems. Changes are captured from the Write-Ahead Log (WAL) and delivered to HTTP endpoints with exactly-once semantics.

Key Features

  • Real-time: Sub-50ms delivery latency (P99)
  • Reliable: Exactly-once delivery guarantee
  • Scalable: 100K+ events per second
  • Resilient: Circuit breaker protection
  • Flexible: Powerful filtering and transformation

Getting Started

Prerequisites

  • HeliosDB v5.5 or later
  • Network access to webhook endpoints
  • Webhook endpoints that accept POST requests

Quick Start

-- Create a simple CDC stream
CREATE CHANGE DATA CAPTURE ON orders
  TO KAFKA 'localhost:9092' TOPIC 'orders-events'
  FORMAT JSON;

-- Verify the stream
SHOW CDC STREAMS;

Creating CDC Streams

Basic Syntax

CREATE CHANGE DATA CAPTURE ON table_name
  TO destination_type 'address' [TOPIC 'topic_name']
  [FORMAT JSON|AVRO]
  [AS 'stream_name'];

Kafka Destination

CREATE CHANGE DATA CAPTURE ON users
  TO KAFKA 'kafka.example.com:9092' TOPIC 'user-events'
  FORMAT JSON
  AS 'users_stream';

AWS Kinesis Destination

CREATE CHANGE DATA CAPTURE ON transactions
  TO KINESIS 'transactions-stream' REGION 'us-east-1'
  FORMAT AVRO;

HTTP Webhook Destination

use heliosdb_webhooks::{WebhookServer, WebhookRegistration};

let server = WebhookServer::new(config).await?;

// Register webhook endpoint
server.register(WebhookRegistration {
    url: "https://api.example.com/webhooks/db-changes".to_string(),
    secret: "your-webhook-secret".to_string(),
    tables: vec!["users".to_string(), "orders".to_string()],
    operations: vec![OperationType::Insert, OperationType::Update],
}).await?;

Managing Streams

-- List all streams
SHOW CDC STREAMS;

-- View stream details
SHOW CDC STREAM STATUS users_stream;

-- Pause a stream
ALTER CDC STREAM users_stream PAUSE;

-- Resume a stream
ALTER CDC STREAM users_stream RESUME;

-- Drop a stream
DROP CHANGE DATA CAPTURE users_stream;

Event Filtering

Table Filtering

# Filter by table
filters:
  - name: product_updates
    tables:
      - products
      - product_inventory

Operation Filtering

# Filter by operation
filters:
  - name: inserts_only
    tables:
      - orders
    operations:
      - INSERT

Column Filtering

# Filter by changed columns
filters:
  - name: price_changes
    tables:
      - products
    columns:
      products:
        - price
        - discount

Custom Predicates

# Filter with SQL predicate
filters:
  - name: high_value_orders
    tables:
      - orders
    operations:
      - INSERT
    predicate: "total > 1000"

Rust API

use heliosdb_cdc::EventFilter;

let filter = EventFilter::new()
    .tables(vec!["orders".to_string()])
    .operations(vec![OperationType::Insert, OperationType::Update])
    .columns("orders", vec!["status".to_string()])
    .predicate("total > 100")
    .build();

processor.add_filter(filter).await?;

Event Transformation

Event Format

Default event structure:

{
    "event_id": "evt_abc123",
    "timestamp": "2026-01-04T12:00:00Z",
    "sequence": 12345,
    "operation": "INSERT",
    "database": "production",
    "table": "orders",
    "key": {"id": 123},
    "value": {
        "id": 123,
        "customer_id": 456,
        "total": 99.99,
        "status": "pending"
    },
    "old_value": null,
    "transaction_id": "txn_xyz789"
}

Jinja2 Templates

transformations:
  - name: slack_format
    target: https://hooks.slack.com/services/...
    template: |
      {
        "text": "New {{ operation }} on {{ table }}",
        "blocks": [
          {
            "type": "section",
            "text": {
              "type": "mrkdwn",
              "text": "*{{ operation }}* on `{{ table }}`\nKey: {{ key }}"
            }
          }
        ]
      }

JSONPath Extraction

transformations:
  - name: extract_fields
    target: https://analytics.io/webhooks/orders
    jsonpath:
      order_id: "$.key.id"
      customer: "$.value.customer_id"
      amount: "$.value.total"
      operation: "$.operation"

Delivery Configuration

Retry Strategy

delivery:
  max_retries: 3
  initial_retry_delay_ms: 100
  max_retry_delay_ms: 30000
  timeout_secs: 30

Retry schedule with exponential backoff: - Attempt 1: Immediate - Attempt 2: 100ms + jitter - Attempt 3: 200ms + jitter - Attempt 4: 400ms + jitter

Worker Pool

worker_pool:
  num_workers: 100
  max_concurrent_requests: 1000
  connection_pool_size: 500

Rust Configuration

use heliosdb_webhooks::{WebhookConfig, RetryConfig};

let config = WebhookConfig {
    worker_pool_size: 100,
    max_concurrent_requests: 1000,
    retry: RetryConfig {
        max_attempts: 3,
        initial_delay_ms: 100,
        max_delay_ms: 30000,
        multiplier: 2.0,
        jitter: true,
    },
    timeout_secs: 30,
    ..Default::default()
};

Circuit Breaker

The circuit breaker prevents cascading failures when webhook endpoints are unhealthy.

States

State Description Behavior
Closed Normal operation All requests flow through
Open Too many failures Requests rejected immediately
Half-Open Testing recovery Limited requests allowed

Configuration

circuit_breaker:
  failure_threshold: 5      # Open after 5 failures
  success_threshold: 2      # Close after 2 successes in half-open
  timeout_secs: 60          # Stay open for 60 seconds
  half_open_max_calls: 3    # Max calls in half-open state

Monitoring Circuit Breaker

// Check circuit state
let state = circuit_breaker.get_state("https://api.example.com").await?;

match state {
    CircuitState::Closed => println!("Operating normally"),
    CircuitState::Open => println!("Circuit open - endpoint unhealthy"),
    CircuitState::HalfOpen => println!("Testing endpoint recovery"),
}

Exactly-Once Delivery

How It Works

  1. Each event generates a unique idempotency key
  2. Delivered events are logged with their idempotency key
  3. Retries check the delivery log before sending
  4. Duplicate deliveries are prevented within the delivery window

Configuration

exactly_once:
  delivery_window_secs: 86400  # 24 hours

Verifying Delivery

// Check if event was delivered
let delivered = exactly_once.was_delivered(event_id, webhook_url).await?;

if delivered {
    println!("Event already delivered, skipping");
}

Security

HMAC Signature

All webhook requests include an HMAC-SHA256 signature:

POST /webhooks/db-changes HTTP/1.1
Host: api.example.com
Content-Type: application/json
X-HeliosDB-Signature: sha256=f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8
X-HeliosDB-Event-ID: evt_001
X-HeliosDB-Timestamp: 2026-01-04T12:00:00Z

Verifying Signatures (Receiver Side)

import hmac
import hashlib

def verify_signature(payload, signature, secret):
    expected = 'sha256=' + hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(signature, expected)

TLS Configuration

security:
  tls:
    enabled: true
    cert_path: /etc/heliosdb/tls/cert.pem
    key_path: /etc/heliosdb/tls/key.pem
  ip_whitelist:
    - 10.0.0.0/8
    - 172.16.0.0/12

mTLS (Mutual TLS)

let client = create_mtls_client(
    "/path/to/ca.pem",
    "/path/to/client-cert.pem",
    "/path/to/client-key.pem",
)?;

Monitoring

Prometheus Metrics

# Event processing rate
rate(heliosdb_cdc_events_processed_total[5m])

# Webhook delivery latency (P99)
histogram_quantile(0.99, rate(heliosdb_webhook_delivery_duration_seconds_bucket[5m]))

# Delivery success rate
sum(rate(heliosdb_webhook_delivery_total{status="success"}[5m])) /
sum(rate(heliosdb_webhook_delivery_total[5m]))

# Circuit breaker state
heliosdb_circuit_breaker_state == 2  # Open circuits

# Event queue backlog
heliosdb_event_queue_size{queue_type="overflow"}

Available Metrics

Metric Type Description
heliosdb_cdc_events_processed_total Counter Events processed
heliosdb_cdc_events_filtered_total Counter Events filtered out
heliosdb_webhook_delivery_duration_seconds Histogram Delivery latency
heliosdb_webhook_delivery_total Counter Delivery attempts
heliosdb_webhook_retry_total Counter Retry attempts
heliosdb_circuit_breaker_state Gauge Circuit state
heliosdb_event_queue_size Gauge Queue size

Grafana Dashboard

Import the HeliosDB CDC dashboard for real-time monitoring of: - Event throughput - Delivery latency - Success rates - Circuit breaker status - Queue backlogs


Best Practices

1. Use Appropriate Filters

# Filter to reduce volume
filters:
  - name: relevant_changes
    tables:
      - orders  # Only tables you need
    operations:
      - INSERT
      - UPDATE  # Skip DELETEs if not needed
    columns:
      orders:
        - status
        - total  # Only columns you care about

2. Configure Retries Appropriately

# Balance between reliability and latency
delivery:
  max_retries: 3           # Enough for transient failures
  initial_retry_delay_ms: 100  # Quick first retry
  max_retry_delay_ms: 5000     # Don't wait too long

3. Monitor Circuit Breakers

# Alert on open circuits
ALERT WebhookCircuitOpen
IF heliosdb_circuit_breaker_state == 2
FOR 5m
LABELS { severity = "warning" }

4. Implement Idempotent Receivers

def handle_webhook(event):
    event_id = event['event_id']

    # Check if already processed
    if already_processed(event_id):
        return {"status": "duplicate", "event_id": event_id}

    # Process event
    process_event(event)

    # Mark as processed
    mark_processed(event_id)

    return {"status": "success", "event_id": event_id}

5. Set Appropriate Timeouts

delivery:
  timeout_secs: 30  # Match your endpoint's expected response time

API Reference

CdcConfig

pub struct CdcConfig {
    pub wal_path: String,
    pub checkpoint_interval: usize,
    pub batch_size: usize,
    pub poll_interval_ms: u64,
}

WebhookConfig

pub struct WebhookConfig {
    pub bind_address: String,
    pub worker_pool_size: usize,
    pub max_concurrent_requests: usize,
    pub timeout_secs: u64,
    pub retry: RetryConfig,
    pub circuit_breaker: CircuitBreakerConfig,
    pub exactly_once: ExactlyOnceConfig,
}

Event Types

pub enum OperationType {
    Insert,
    Update,
    Delete,
    Truncate,
    SchemaChange,
}

pub struct CdcEvent {
    pub event_id: String,
    pub timestamp: DateTime<Utc>,
    pub sequence: u64,
    pub operation: OperationType,
    pub database: String,
    pub table: String,
    pub key: Vec<u8>,
    pub value: Option<Vec<u8>>,
    pub old_value: Option<Vec<u8>>,
    pub transaction_id: String,
}