CDC Webhooks User Guide¶
Version: v5.5 Last Updated: January 4, 2026
Table of Contents¶
- Introduction
- Getting Started
- Creating CDC Streams
- Event Filtering
- Event Transformation
- Delivery Configuration
- Circuit Breaker
- Exactly-Once Delivery
- Security
- Monitoring
- Best Practices
- API Reference
Introduction¶
HeliosDB's CDC-to-Webhook integration enables real-time streaming of database changes to external systems. Changes are captured from the Write-Ahead Log (WAL) and delivered to HTTP endpoints with exactly-once semantics.
Key Features¶
- Real-time: Sub-50ms delivery latency (P99)
- Reliable: Exactly-once delivery guarantee
- Scalable: 100K+ events per second
- Resilient: Circuit breaker protection
- Flexible: Powerful filtering and transformation
Getting Started¶
Prerequisites¶
- HeliosDB v5.5 or later
- Network access to webhook endpoints
- Webhook endpoints that accept POST requests
Quick Start¶
-- Create a simple CDC stream
CREATE CHANGE DATA CAPTURE ON orders
TO KAFKA 'localhost:9092' TOPIC 'orders-events'
FORMAT JSON;
-- Verify the stream
SHOW CDC STREAMS;
Creating CDC Streams¶
Basic Syntax¶
CREATE CHANGE DATA CAPTURE ON table_name
TO destination_type 'address' [TOPIC 'topic_name']
[FORMAT JSON|AVRO]
[AS 'stream_name'];
Kafka Destination¶
CREATE CHANGE DATA CAPTURE ON users
TO KAFKA 'kafka.example.com:9092' TOPIC 'user-events'
FORMAT JSON
AS 'users_stream';
AWS Kinesis Destination¶
CREATE CHANGE DATA CAPTURE ON transactions
TO KINESIS 'transactions-stream' REGION 'us-east-1'
FORMAT AVRO;
HTTP Webhook Destination¶
use heliosdb_webhooks::{WebhookServer, WebhookRegistration};
let server = WebhookServer::new(config).await?;
// Register webhook endpoint
server.register(WebhookRegistration {
url: "https://api.example.com/webhooks/db-changes".to_string(),
secret: "your-webhook-secret".to_string(),
tables: vec!["users".to_string(), "orders".to_string()],
operations: vec![OperationType::Insert, OperationType::Update],
}).await?;
Managing Streams¶
-- List all streams
SHOW CDC STREAMS;
-- View stream details
SHOW CDC STREAM STATUS users_stream;
-- Pause a stream
ALTER CDC STREAM users_stream PAUSE;
-- Resume a stream
ALTER CDC STREAM users_stream RESUME;
-- Drop a stream
DROP CHANGE DATA CAPTURE users_stream;
Event Filtering¶
Table Filtering¶
Operation Filtering¶
Column Filtering¶
# Filter by changed columns
filters:
- name: price_changes
tables:
- products
columns:
products:
- price
- discount
Custom Predicates¶
# Filter with SQL predicate
filters:
- name: high_value_orders
tables:
- orders
operations:
- INSERT
predicate: "total > 1000"
Rust API¶
use heliosdb_cdc::EventFilter;
let filter = EventFilter::new()
.tables(vec!["orders".to_string()])
.operations(vec![OperationType::Insert, OperationType::Update])
.columns("orders", vec!["status".to_string()])
.predicate("total > 100")
.build();
processor.add_filter(filter).await?;
Event Transformation¶
Event Format¶
Default event structure:
{
"event_id": "evt_abc123",
"timestamp": "2026-01-04T12:00:00Z",
"sequence": 12345,
"operation": "INSERT",
"database": "production",
"table": "orders",
"key": {"id": 123},
"value": {
"id": 123,
"customer_id": 456,
"total": 99.99,
"status": "pending"
},
"old_value": null,
"transaction_id": "txn_xyz789"
}
Jinja2 Templates¶
transformations:
- name: slack_format
target: https://hooks.slack.com/services/...
template: |
{
"text": "New {{ operation }} on {{ table }}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*{{ operation }}* on `{{ table }}`\nKey: {{ key }}"
}
}
]
}
JSONPath Extraction¶
transformations:
- name: extract_fields
target: https://analytics.io/webhooks/orders
jsonpath:
order_id: "$.key.id"
customer: "$.value.customer_id"
amount: "$.value.total"
operation: "$.operation"
Delivery Configuration¶
Retry Strategy¶
Retry schedule with exponential backoff: - Attempt 1: Immediate - Attempt 2: 100ms + jitter - Attempt 3: 200ms + jitter - Attempt 4: 400ms + jitter
Worker Pool¶
Rust Configuration¶
use heliosdb_webhooks::{WebhookConfig, RetryConfig};
let config = WebhookConfig {
worker_pool_size: 100,
max_concurrent_requests: 1000,
retry: RetryConfig {
max_attempts: 3,
initial_delay_ms: 100,
max_delay_ms: 30000,
multiplier: 2.0,
jitter: true,
},
timeout_secs: 30,
..Default::default()
};
Circuit Breaker¶
The circuit breaker prevents cascading failures when webhook endpoints are unhealthy.
States¶
| State | Description | Behavior |
|---|---|---|
| Closed | Normal operation | All requests flow through |
| Open | Too many failures | Requests rejected immediately |
| Half-Open | Testing recovery | Limited requests allowed |
Configuration¶
circuit_breaker:
failure_threshold: 5 # Open after 5 failures
success_threshold: 2 # Close after 2 successes in half-open
timeout_secs: 60 # Stay open for 60 seconds
half_open_max_calls: 3 # Max calls in half-open state
Monitoring Circuit Breaker¶
// Check circuit state
let state = circuit_breaker.get_state("https://api.example.com").await?;
match state {
CircuitState::Closed => println!("Operating normally"),
CircuitState::Open => println!("Circuit open - endpoint unhealthy"),
CircuitState::HalfOpen => println!("Testing endpoint recovery"),
}
Exactly-Once Delivery¶
How It Works¶
- Each event generates a unique idempotency key
- Delivered events are logged with their idempotency key
- Retries check the delivery log before sending
- Duplicate deliveries are prevented within the delivery window
Configuration¶
Verifying Delivery¶
// Check if event was delivered
let delivered = exactly_once.was_delivered(event_id, webhook_url).await?;
if delivered {
println!("Event already delivered, skipping");
}
Security¶
HMAC Signature¶
All webhook requests include an HMAC-SHA256 signature:
POST /webhooks/db-changes HTTP/1.1
Host: api.example.com
Content-Type: application/json
X-HeliosDB-Signature: sha256=f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8
X-HeliosDB-Event-ID: evt_001
X-HeliosDB-Timestamp: 2026-01-04T12:00:00Z
Verifying Signatures (Receiver Side)¶
import hmac
import hashlib
def verify_signature(payload, signature, secret):
expected = 'sha256=' + hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
TLS Configuration¶
security:
tls:
enabled: true
cert_path: /etc/heliosdb/tls/cert.pem
key_path: /etc/heliosdb/tls/key.pem
ip_whitelist:
- 10.0.0.0/8
- 172.16.0.0/12
mTLS (Mutual TLS)¶
let client = create_mtls_client(
"/path/to/ca.pem",
"/path/to/client-cert.pem",
"/path/to/client-key.pem",
)?;
Monitoring¶
Prometheus Metrics¶
# Event processing rate
rate(heliosdb_cdc_events_processed_total[5m])
# Webhook delivery latency (P99)
histogram_quantile(0.99, rate(heliosdb_webhook_delivery_duration_seconds_bucket[5m]))
# Delivery success rate
sum(rate(heliosdb_webhook_delivery_total{status="success"}[5m])) /
sum(rate(heliosdb_webhook_delivery_total[5m]))
# Circuit breaker state
heliosdb_circuit_breaker_state == 2 # Open circuits
# Event queue backlog
heliosdb_event_queue_size{queue_type="overflow"}
Available Metrics¶
| Metric | Type | Description |
|---|---|---|
heliosdb_cdc_events_processed_total |
Counter | Events processed |
heliosdb_cdc_events_filtered_total |
Counter | Events filtered out |
heliosdb_webhook_delivery_duration_seconds |
Histogram | Delivery latency |
heliosdb_webhook_delivery_total |
Counter | Delivery attempts |
heliosdb_webhook_retry_total |
Counter | Retry attempts |
heliosdb_circuit_breaker_state |
Gauge | Circuit state |
heliosdb_event_queue_size |
Gauge | Queue size |
Grafana Dashboard¶
Import the HeliosDB CDC dashboard for real-time monitoring of: - Event throughput - Delivery latency - Success rates - Circuit breaker status - Queue backlogs
Best Practices¶
1. Use Appropriate Filters¶
# Filter to reduce volume
filters:
- name: relevant_changes
tables:
- orders # Only tables you need
operations:
- INSERT
- UPDATE # Skip DELETEs if not needed
columns:
orders:
- status
- total # Only columns you care about
2. Configure Retries Appropriately¶
# Balance between reliability and latency
delivery:
max_retries: 3 # Enough for transient failures
initial_retry_delay_ms: 100 # Quick first retry
max_retry_delay_ms: 5000 # Don't wait too long
3. Monitor Circuit Breakers¶
# Alert on open circuits
ALERT WebhookCircuitOpen
IF heliosdb_circuit_breaker_state == 2
FOR 5m
LABELS { severity = "warning" }
4. Implement Idempotent Receivers¶
def handle_webhook(event):
event_id = event['event_id']
# Check if already processed
if already_processed(event_id):
return {"status": "duplicate", "event_id": event_id}
# Process event
process_event(event)
# Mark as processed
mark_processed(event_id)
return {"status": "success", "event_id": event_id}
5. Set Appropriate Timeouts¶
API Reference¶
CdcConfig¶
pub struct CdcConfig {
pub wal_path: String,
pub checkpoint_interval: usize,
pub batch_size: usize,
pub poll_interval_ms: u64,
}
WebhookConfig¶
pub struct WebhookConfig {
pub bind_address: String,
pub worker_pool_size: usize,
pub max_concurrent_requests: usize,
pub timeout_secs: u64,
pub retry: RetryConfig,
pub circuit_breaker: CircuitBreakerConfig,
pub exactly_once: ExactlyOnceConfig,
}
Event Types¶
pub enum OperationType {
Insert,
Update,
Delete,
Truncate,
SchemaChange,
}
pub struct CdcEvent {
pub event_id: String,
pub timestamp: DateTime<Utc>,
pub sequence: u64,
pub operation: OperationType,
pub database: String,
pub table: String,
pub key: Vec<u8>,
pub value: Option<Vec<u8>>,
pub old_value: Option<Vec<u8>>,
pub transaction_id: String,
}
Related Documentation¶
- README.md - Feature overview
- TROUBLESHOOTING.md - Common issues
- Design Document - Technical architecture
- SQL Interface - SQL commands