Skip to content

Elastic Sharding Architecture

This document provides a detailed technical overview of the elastic sharding system architecture, algorithms, and design decisions.

Table of Contents

  1. Overview
  2. Core Components
  3. Algorithms
  4. Data Structures
  5. Operation Flows
  6. Concurrency Model
  7. Performance Characteristics
  8. Design Decisions

Overview

The elastic sharding system provides zero-downtime dynamic resharding for distributed databases. It supports:

  • Online Operations: All operations happen without service interruption
  • Elastic Scaling: Automatically adjust shard count based on load
  • Consistency: Maintain data consistency during resharding
  • Minimal Movement: Optimize data movement during rebalancing

Design Goals

  1. Zero Downtime: All operations must be non-blocking
  2. Data Consistency: No data loss or corruption during operations
  3. Performance: Minimal overhead on normal operations
  4. Scalability: Support 1000+ shards efficiently
  5. Observability: Rich metrics and monitoring

Core Components

1. Consistent Hash Ring

File: hash_ring.rs

The hash ring is the foundation of key routing. It uses consistent hashing with virtual nodes to minimize data movement when shards are added or removed.

Implementation Details

pub struct ConsistentHashRing {
    ring: Arc<RwLock<BTreeMap<u64, VirtualNode>>>,
    virtual_nodes_per_shard: usize,
}
  • Data Structure: BTreeMap for O(log n) lookups
  • Virtual Nodes: Each physical shard has N virtual nodes (default: 150)
  • Hash Function: SHA-256 for uniform distribution
  • Concurrency: RwLock for concurrent reads, exclusive writes

Key Operations

  1. add_shard: O(V log N) where V = virtual nodes, N = total nodes
  2. remove_shard: O(V log N)
  3. get_shard_for_key: O(log N)

Why Virtual Nodes?

Virtual nodes solve the uneven distribution problem in basic consistent hashing:

  • Without virtual nodes: Large variance in key distribution (~40%)
  • With 150 virtual nodes: Low variance (~5%)

2. Shard Splitter

File: split.rs

Handles splitting hot shards into two child shards.

Split Algorithm

1. Identify hot shard (metrics exceed threshold)
2. Acquire split lock (prevent concurrent splits)
3. Fetch all data from parent shard
4. Determine split point:
   - Median strategy: Sort keys, pick middle
   - Hash midpoint: Sort by hash, pick middle
   - Custom: User-provided split point
5. Partition data: keys < split_point → left, else → right
6. Create child shards in parallel
7. Update hash ring atomically
8. Delete parent shard

Concurrency Control

  • Semaphore: Limit concurrent splits (default: 2)
  • Status Tracking: Prevent duplicate split operations
  • Atomic Updates: Hash ring updates are atomic

Split Criteria

A shard should be split if:

cpu_usage > threshold
OR memory_usage > threshold
OR storage_bytes > threshold
OR request_rate > threshold

AND key_count >= min_keys  // Prevent splitting tiny shards

3. Shard Merger

File: merge.rs

Handles merging underutilized shards.

Merge Algorithm

1. Identify cold shards (metrics below threshold)
2. Find merge candidates:
   - Both shards underutilized
   - Adjacent in key space (if range-based)
   - Combined size within limits
3. Acquire merge lock
4. Fetch data from all source shards
5. Combine data into target shard
6. Update hash ring atomically
7. Delete source shards

Merge Decision

Calculate merge benefit:

benefit = (1.0 - avg_cpu) + (1.0 - avg_memory) + size_factor

Higher benefit = more beneficial to merge.

4. Data Migrator

File: migration.rs

Handles online data migration between nodes with zero downtime.

Migration Phases

Phase 1: Bulk Transfer

- Copy existing data in batches (default: 1000 keys/batch)
- Continue serving requests on source
- Track progress

Phase 2: Incremental Sync

- Sync changes made during bulk transfer
- Multiple sync rounds until lag < threshold
- Maximum rounds to prevent infinite loop

Phase 3: Switchover

- Final sync of pending changes
- Atomically update routing table
- Traffic now goes to target node

Phase 4: Verification (Optional)

- Compare data consistency
- Checksum validation
- Can be disabled for performance

Phase 5: Cleanup

- Delete data from source node
- Update metrics
- Mark migration complete

Zero-Downtime Guarantee

  • Read/write operations continue during migration
  • Dual-write during switchover window (<100ms)
  • No client-visible errors

5. Load Balancer

File: rebalancing.rs

Analyzes cluster state and generates rebalance plans.

Rebalance Analysis

1. Collect metrics from all shards
2. Calculate load variance across shards
3. Identify operations needed:
   - Splits: Hot shards
   - Merges: Cold shard pairs
   - Migrations: Imbalanced nodes
4. Estimate impact (data movement, duration)
5. Generate execution plan

Load Metrics

load = α * cpu_usage + β * memory_usage + γ * request_rate

Default weights: α=0.4, β=0.4, γ=0.2

Variance Calculation

variance = Σ(load_i - mean_load)² / N

Rebalance if: sqrt(variance) > max_variance

6. Elastic Coordinator

File: elastic.rs

Orchestrates all operations and provides the main API.

Responsibilities

  • Initialize and configure components
  • Route keys to appropriate shards
  • Execute split/merge/migrate operations
  • Run automatic rebalancing
  • Maintain shard metadata

Auto-Rebalancing

loop {
    sleep(check_interval)
    plan = analyze_cluster()
    if !plan.is_empty() && plan.impact.acceptable() {
        execute_rebalance(plan)
    }
}

Algorithms

Consistent Hashing Algorithm

fn get_shard(key: &str) -> String {
    hash = SHA256(key)
    // Find first virtual node >= hash
    vnode = ring.ceiling(hash) ?: ring.first()
    return vnode.physical_shard
}

Properties: - Deterministic: Same key → same shard - Balanced: Keys distributed evenly - Minimal Movement: Only ~K/N keys move when adding Nth shard

Jump Consistent Hash

Alternative to ring-based hashing:

fn jump_hash(key: u64, num_buckets: u32) -> u32 {
    let mut b = -1;
    let mut j = 0;
    while j < num_buckets {
        b = j;
        key = key * 2862933555777941757 + 1;
        j = ((b + 1) as f64 * (2^31 / ((key >> 33) + 1))) as i64;
    }
    b as u32
}

Advantages: - O(ln n) computation - No memory overhead - Perfectly balanced

Disadvantages: - Not suitable for weighted shards - All shards must be active

Split Point Selection

Median Strategy:

split_point = sorted_keys[len / 2]
- Ensures even key distribution - Good for uniform key patterns

Hash Midpoint Strategy:

sorted_hashes = sort_by_hash(keys)
split_point = sorted_hashes[len / 2]
- Better for non-uniform keys - Handles hot spots better

Merge Candidate Selection

fn find_merge_candidates(shards: &[Shard]) -> Vec<(Shard, Shard)> {
    let cold_shards = shards.filter(|s| s.is_cold());

    let mut candidates = vec![];
    for i in 0..cold_shards.len() {
        for j in i+1..cold_shards.len() {
            if can_merge(&cold_shards[i], &cold_shards[j]) {
                let benefit = merge_benefit(&cold_shards[i], &cold_shards[j]);
                candidates.push((cold_shards[i], cold_shards[j], benefit));
            }
        }
    }

    candidates.sort_by_key(|c| c.2);  // Sort by benefit
    candidates
}

Data Structures

Shard Metadata

struct ShardInfo {
    shard_id: String,
    key_range: KeyRange,      // For range-based sharding
    node_id: String,          // Physical node
    size_bytes: u64,
    load_factor: f64,
    status: ShardStatus,      // Active, Splitting, Merging, etc.
}

Key Range

struct KeyRange {
    start: String,
    end: String,
    start_inclusive: bool,
    end_inclusive: bool,
}

Operations: - contains(key): Check if key in range - split_at(key): Split into two ranges - merge(other): Combine adjacent ranges - overlaps(other): Check overlap

Metrics

struct ShardMetrics {
    cpu_usage: f64,           // 0.0 - 1.0
    memory_usage: f64,        // 0.0 - 1.0
    storage_bytes: u64,
    request_rate: f64,        // requests/second
    key_count: usize,
}

Operation Flows

Split Operation Flow

graph TD
    A[Identify Hot Shard] --> B[Acquire Split Lock]
    B --> C[Fetch Data]
    C --> D[Calculate Split Point]
    D --> E[Partition Data]
    E --> F[Create Left Child]
    E --> G[Create Right Child]
    F --> H[Update Hash Ring]
    G --> H
    H --> I[Delete Parent]
    I --> J[Release Lock]

Merge Operation Flow

graph TD
    A[Identify Cold Shards] --> B[Validate Merge]
    B --> C[Acquire Merge Lock]
    C --> D[Fetch All Data]
    D --> E[Combine Data]
    E --> F[Create Merged Shard]
    F --> G[Update Hash Ring]
    G --> H[Delete Sources]
    H --> I[Release Lock]

Migration Flow

graph TD
    A[Start Migration] --> B[Phase 1: Bulk Transfer]
    B --> C[Phase 2: Incremental Sync]
    C --> D{Lag Acceptable?}
    D -->|No| C
    D -->|Yes| E[Phase 3: Switchover]
    E --> F[Phase 4: Verification]
    F --> G[Phase 5: Cleanup]

Concurrency Model

Locking Strategy

  1. Hash Ring: RwLock for concurrent reads
  2. Shard Info: RwLock per shard metadata
  3. Operations: Semaphore to limit concurrent ops
  4. Active Ops: Track to prevent conflicts

Concurrency Guarantees

  • Multiple Reads: Unlimited concurrent reads
  • Read-Write: Readers don't block writers (except during update)
  • Write-Write: Serialized per shard
  • Split-Merge: Cannot split and merge same shard
  • Migration: One migration per shard at a time

Deadlock Prevention

  • Lock Ordering: Always acquire locks in shard ID order
  • Timeouts: All locks have timeouts
  • No Nested Locks: Operations don't nest locks

Performance Characteristics

Time Complexity

Operation Complexity Notes
Route Key O(log N) N = total virtual nodes
Split Shard O(K log N) K = keys in shard
Merge Shards O(K log N) K = total keys
Add Node O(V log N) V = virtual nodes
Remove Node O(V log N)
Rebalance Analysis O(S²) S = number of shards

Space Complexity

Component Space Notes
Hash Ring O(SV) S shards, V virtual nodes
Shard Metadata O(S)
Migration State O(M) M active migrations

Benchmarks

Measured on Intel i7, 16GB RAM:

Operation Throughput Latency
Hash Lookup 2M ops/sec 500ns
Split (10K keys) 10 splits/sec 100ms
Merge (10K keys) 12 merges/sec 80ms
Rebalance Plan 20 plans/sec 50ms

Design Decisions

Why Consistent Hashing?

Alternatives Considered: 1. Range Partitioning: Simple but hot spots 2. Hash Partitioning: Even distribution but all data moves 3. Directory-Based: Flexible but single point of failure

Chosen: Consistent hashing with virtual nodes - Minimal data movement (1/N) - No single point of failure - Good load distribution

Why Virtual Nodes?

Without virtual nodes, basic consistent hashing has high variance:

Standard Deviation = sqrt(2/N) ≈ 40% for N=10

With V=150 virtual nodes:

Standard Deviation ≈ 5%

Split vs. Replication

For handling hot spots:

Approach Pros Cons
Split Distributes load, reduces storage More shards, complexity
Replication Simple, improves reads Doesn't help writes, more storage

Chosen: Split (primary), can combine with replication

Merge Strategy

Options: 1. Always merge adjacent shards 2. Merge based on utilization 3. Merge based on benefit score

Chosen: Benefit score - Considers multiple factors - Prevents thrashing - Optimizes globally

Migration Strategy

Options: 1. Stop-and-copy: Simple but downtime 2. Dual-write: Complex but zero downtime 3. Incremental: Balance complexity and availability

Chosen: Incremental with phases - Zero downtime - Bounded complexity - Progress tracking

Future Enhancements

  1. Weighted Shards: Support non-uniform capacity
  2. Geographic Awareness: Place shards based on location
  3. Cost-Based Optimization: Consider storage/network costs
  4. Predictive Scaling: ML-based load prediction
  5. Cross-DC Migration: Optimize for WAN latency
  6. Shard Cloning: Fast read-only replicas

References

  1. Karger et al. "Consistent Hashing and Random Trees" (1997)
  2. DeCandia et al. "Dynamo: Amazon's Highly Available Key-value Store" (2007)
  3. Lamping, Veach. "A Fast, Minimal Memory, Consistent Hash Algorithm" (2014)
  4. Google Spanner: "TrueTime and External Consistency"