Federated Learning User Guide¶
Table of Contents¶
- Introduction
- Quick Start
- HIPAA Compliance
- GDPR Compliance
- 100+ Node Deployment
- Best Practices
- Troubleshooting
- API Reference
Introduction¶
HeliosDB Federated Learning enables distributed machine learning training across multiple nodes while preserving data privacy and ensuring regulatory compliance (HIPAA, GDPR).
Key Features¶
- Privacy-Preserving: Differential privacy, secure aggregation
- Compliant: Built-in HIPAA and GDPR compliance
- Scalable: Tested with 150+ nodes
- Robust: Automatic failure recovery
- Transparent: Comprehensive audit logging
When to Use Federated Learning¶
Good Use Cases: - Healthcare data across multiple hospitals - Financial data across bank branches - User data across geographic regions - Sensitive data that cannot be centralized
❌ Not Recommended: - Small datasets (<1000 samples) - Real-time predictions (use centralized ML) - When data can be safely centralized
Quick Start¶
1. Basic Setup¶
use heliosdb_federated_learning::*;
#[tokio::main]
async fn main() -> Result<()> {
// Create federated learning engine
let config = Config {
aggregation_strategy: AggregationStrategy::FedAvg,
round_config: RoundConfig {
max_rounds: 100,
min_nodes_per_round: 10,
selection_strategy: NodeSelectionStrategy::All,
node_timeout_secs: 60,
target_accuracy: Some(0.95),
learning_rate: 0.01,
convergence_threshold: 0.001,
},
enable_privacy: true,
privacy_config: Some(DifferentialPrivacyConfig {
epsilon: 0.1,
delta: 1e-5,
mechanism: NoiseMechanism::Gaussian,
}),
};
let engine = FederatedLearningEngine::new(config).await?;
// Register model
let initial_params = vec![0.0; 1000];
engine.register_model(
"my_model".to_string(),
"neural_network".to_string(),
initial_params,
).await?;
// Create coordinator
let coordinator = engine.create_coordinator("my_model".to_string()).await?;
// Create worker nodes
for i in 0..10 {
let worker_config = WorkerConfig {
node_id: format!("node_{}", i),
local_epochs: 1,
batch_size: 32,
learning_rate: 0.01,
};
engine.create_worker(worker_config.clone()).await?;
coordinator.register_node(worker_config.node_id, 1000).await?;
}
// Start training
engine.start_training("my_model".to_string()).await?;
println!("Federated learning started!");
Ok(())
}
2. Training Loop¶
use heliosdb_federated_learning::*;
async fn training_loop(
coordinator: &Coordinator,
workers: Vec<Arc<FederatedWorker>>,
) -> Result<()> {
for round in 0..100 {
println!("Round {}/100", round + 1);
// Select nodes
let selected_nodes = coordinator.select_nodes_for_round().await?;
// Collect updates
let mut updates = Vec::new();
for worker in &workers {
if selected_nodes.contains(&worker.node_id()) {
let update = worker.train_local_model().await?;
updates.push(update);
}
}
// Aggregate
coordinator.execute_round(updates).await?;
// Check convergence
if coordinator.has_converged().await {
println!("Converged at round {}", round + 1);
break;
}
}
Ok(())
}
HIPAA Compliance¶
Setup HIPAA Compliance¶
use heliosdb_federated_learning::*;
async fn setup_hipaa_compliance() -> Result<()> {
// Configure HIPAA compliance
let hipaa_config = HipaaConfig {
encrypt_phi_at_rest: true,
encrypt_phi_in_transit: true,
enable_audit_logging: true,
data_residency: "US".to_string(),
enable_access_controls: true,
enable_integrity_checks: true,
min_key_size_bits: 256,
auto_detect_phi: true,
};
let mut hipaa_manager = HipaaComplianceManager::new(hipaa_config)?;
// Encrypt PHI data
let patient_data = b"Patient: John Doe, MRN: 12345";
let encrypted_phi = hipaa_manager.encrypt_phi(
patient_data,
"patient_12345".to_string(),
)?;
println!("PHI encrypted with AES-256-GCM");
// Verify gradients don't leak PHI
let gradients = vec![0.1, 0.2, 0.3, 0.4, 0.5];
let is_safe = hipaa_manager.verify_gradient_privacy(&gradients)?;
if !is_safe {
return Err(FederatedLearningError::ComplianceError(
"Gradients may leak PHI".to_string(),
));
}
// Generate compliance report
let report = hipaa_manager.verify_compliance();
report.print();
if !report.overall_compliant {
return Err(FederatedLearningError::ComplianceError(
"HIPAA compliance failed".to_string(),
));
}
Ok(())
}
HIPAA Audit Logging¶
use heliosdb_federated_learning::*;
use std::collections::HashMap;
fn audit_phi_access(
hipaa_manager: &mut HipaaComplianceManager,
user_id: String,
resource_id: String,
) {
hipaa_manager.log_audit_event(
user_id,
AuditAction::PhiAccess,
resource_id,
true, // success
HashMap::new(),
);
}
fn export_audit_log(
hipaa_manager: &HipaaComplianceManager,
) -> Vec<AuditLogEntry> {
let start_date = chrono::Utc::now() - chrono::Duration::days(30);
let end_date = chrono::Utc::now();
hipaa_manager.export_audit_log(start_date, end_date)
}
HIPAA Best Practices¶
- Encrypt All PHI: Never store PHI in plaintext
- Audit Everything: Log all PHI access and modifications
- Verify Gradients: Always check gradients before aggregation
- Regular Reports: Generate compliance reports monthly
- Data Residency: Ensure US-only storage for HIPAA data
GDPR Compliance¶
Setup GDPR Compliance¶
use heliosdb_federated_learning::*;
async fn setup_gdpr_compliance() -> Result<()> {
// Configure GDPR compliance
let gdpr_config = GdprConfig {
enable_right_to_be_forgotten: true,
enable_data_portability: true,
enable_consent_management: true,
retention_period_days: 365,
enable_data_minimization: true,
enable_purpose_limitation: true,
enable_explainability: true,
};
let mut gdpr_manager = GdprComplianceManager::new(gdpr_config);
// Record user consent
gdpr_manager.record_consent(
"user123".to_string(),
vec![
"model_training".to_string(),
"data_analytics".to_string(),
],
"v1.0".to_string(),
)?;
println!("User consent recorded");
// Add user data
gdpr_manager.add_user_data(
"user123".to_string(),
vec![1, 2, 3, 4, 5],
"model_training".to_string(),
DataCategory::TrainingData,
)?;
println!("User data added");
Ok(())
}
Right to be Forgotten¶
use heliosdb_federated_learning::*;
async fn delete_user_data(
gdpr_manager: &mut GdprComplianceManager,
user_id: &str,
node_ids: Vec<String>,
) -> Result<()> {
println!("Processing deletion request for {}", user_id);
// Delete user data from all nodes
let deletion = gdpr_manager.delete_user_data(
user_id,
node_ids,
).await?;
println!("Deleted from {} nodes", deletion.deleted_from_nodes.len());
// Verify deletion
let verified = gdpr_manager.verify_deletion(user_id).await?;
if !verified {
return Err(FederatedLearningError::ComplianceError(
format!("Failed to verify deletion for {}", user_id),
));
}
println!("Deletion verified for {}", user_id);
Ok(())
}
Data Portability¶
use heliosdb_federated_learning::*;
async fn export_user_data(
gdpr_manager: &GdprComplianceManager,
user_id: &str,
) -> Result<String> {
// Export user data
let export = gdpr_manager.export_user_data(user_id)?;
println!("Exported data for {}", user_id);
println!(" User data records: {}", export.user_data.len());
println!(" Model contributions: {}", export.model_contributions.len());
// Convert to JSON
let json = serde_json::to_string_pretty(&export)?;
Ok(json)
}
GDPR Best Practices¶
- Get Consent First: Always record consent before using data
- Purpose Limitation: Only use data for stated purposes
- Data Minimization: Collect only necessary data
- Respond Quickly: Process deletion requests within 30 days
- Audit Regularly: Review consent and data usage quarterly
100+ Node Deployment¶
Production Deployment¶
use heliosdb_federated_learning::*;
async fn deploy_100_node_cluster() -> Result<()> {
// Configure for production
let config = Config {
aggregation_strategy: AggregationStrategy::FedAvg,
round_config: RoundConfig {
max_rounds: 1000,
min_nodes_per_round: 80, // 80% quorum
selection_strategy: NodeSelectionStrategy::All,
node_timeout_secs: 30,
target_accuracy: Some(0.95),
learning_rate: 0.01,
convergence_threshold: 0.001,
},
enable_privacy: true,
privacy_config: Some(DifferentialPrivacyConfig {
epsilon: 0.1,
delta: 1e-5,
mechanism: NoiseMechanism::Gaussian,
}),
};
let engine = FederatedLearningEngine::new(config).await?;
// Register production model
let initial_params = vec![0.0; 10000]; // Large model
engine.register_model(
"production_model".to_string(),
"deep_neural_network".to_string(),
initial_params,
).await?;
// Create coordinator
let coordinator = engine.create_coordinator(
"production_model".to_string()
).await?;
// Deploy 100 worker nodes
println!("Deploying 100 worker nodes...");
for i in 0..100 {
let worker_config = WorkerConfig {
node_id: format!("prod_node_{:03}", i),
local_epochs: 1,
batch_size: 32,
learning_rate: 0.01,
};
engine.create_worker(worker_config.clone()).await?;
coordinator.register_node(worker_config.node_id, 10000).await?;
if (i + 1) % 10 == 0 {
println!(" Deployed {}/100 nodes", i + 1);
}
}
println!("All nodes deployed successfully");
Ok(())
}
Monitoring and Health Checks¶
use heliosdb_federated_learning::*;
async fn monitor_cluster_health(
coordinator: &Coordinator,
) -> Result<()> {
// Get cluster state
let state = coordinator.get_state().await;
println!("Cluster Health:");
println!(" Current round: {}/{}", state.current_round, state.total_rounds);
println!(" Is training: {}", state.is_training);
println!(" Converged: {}", state.converged);
if let Some(accuracy) = state.best_accuracy {
println!(" Best accuracy: {:.2}%", accuracy * 100.0);
}
if let Some(loss) = state.best_loss {
println!(" Best loss: {:.4}", loss);
}
// Get node information
let nodes = coordinator.list_nodes().await;
let active_nodes = nodes.iter().filter(|n| n.available).count();
println!("\nNode Status:");
println!(" Total nodes: {}", nodes.len());
println!(" Active nodes: {}", active_nodes);
println!(" Uptime: {:.1}%",
(active_nodes as f64 / nodes.len() as f64) * 100.0
);
// Alert if uptime is low
let uptime_pct = (active_nodes as f64 / nodes.len() as f64) * 100.0;
if uptime_pct < 95.0 {
eprintln!("WARNING: Cluster uptime below 95% ({:.1}%)", uptime_pct);
}
Ok(())
}
Failure Recovery¶
use heliosdb_federated_learning::*;
async fn handle_node_failure(
coordinator: &Coordinator,
failed_node_id: &str,
) -> Result<()> {
println!("Node {} failed", failed_node_id);
// Unregister failed node
coordinator.unregister_node(failed_node_id).await?;
// Check if we still have quorum
let nodes = coordinator.list_nodes().await;
let active_count = nodes.iter().filter(|n| n.available).count();
if active_count < 80 { // Assuming 100 nodes, 80% quorum
eprintln!("CRITICAL: Lost quorum ({} active nodes)", active_count);
return Err(FederatedLearningError::InsufficientNodes {
required: 80,
available: active_count,
});
}
println!("Continuing with {} active nodes", active_count);
Ok(())
}
async fn recover_failed_node(
engine: &FederatedLearningEngine,
coordinator: &Coordinator,
node_id: String,
) -> Result<()> {
println!("Recovering node {}", node_id);
// Re-create worker
let worker_config = WorkerConfig {
node_id: node_id.clone(),
local_epochs: 1,
batch_size: 32,
learning_rate: 0.01,
};
engine.create_worker(worker_config.clone()).await?;
// Re-register with coordinator
coordinator.register_node(node_id.clone(), 10000).await?;
println!("Node {} recovered", node_id);
Ok(())
}
Best Practices¶
1. Privacy Configuration¶
// For sensitive data (healthcare, financial)
let privacy_config = DifferentialPrivacyConfig {
epsilon: 0.1, // Strong privacy
delta: 1e-5, // Very small failure probability
mechanism: NoiseMechanism::Gaussian,
};
// For less sensitive data
let privacy_config = DifferentialPrivacyConfig {
epsilon: 1.0, // Weaker privacy, better accuracy
delta: 1e-3, // Larger failure probability
mechanism: NoiseMechanism::Gaussian,
};
2. Node Selection Strategies¶
// Use all nodes (best for convergence)
NodeSelectionStrategy::All
// Random sampling (faster rounds)
NodeSelectionStrategy::Random { k: 50 }
// Data-weighted sampling (better accuracy)
NodeSelectionStrategy::MostData { k: 50 }
// Performance-based sampling (faster convergence)
NodeSelectionStrategy::BestPerformance { k: 50 }
3. Convergence Configuration¶
let round_config = RoundConfig {
max_rounds: 100,
min_nodes_per_round: 10,
selection_strategy: NodeSelectionStrategy::All,
node_timeout_secs: 60,
target_accuracy: Some(0.95), // Stop when accuracy reached
learning_rate: 0.01,
convergence_threshold: 0.001, // Stop when loss change < 0.001
};
4. Error Handling¶
async fn robust_training(
coordinator: &Coordinator,
workers: Vec<Arc<FederatedWorker>>,
) -> Result<()> {
for round in 0..100 {
// Select nodes
let selected_nodes = match coordinator.select_nodes_for_round().await {
Ok(nodes) => nodes,
Err(e) => {
eprintln!("Failed to select nodes: {}", e);
continue; // Skip this round
}
};
// Collect updates with timeout
let mut updates = Vec::new();
for worker in &workers {
if selected_nodes.contains(&worker.node_id()) {
match tokio::time::timeout(
Duration::from_secs(60),
worker.train_local_model(),
).await {
Ok(Ok(update)) => updates.push(update),
Ok(Err(e)) => {
eprintln!("Worker {} failed: {}", worker.node_id(), e);
}
Err(_) => {
eprintln!("Worker {} timed out", worker.node_id());
}
}
}
}
// Only aggregate if we have enough updates
if updates.len() < selected_nodes.len() / 2 {
eprintln!("Not enough updates, skipping round {}", round);
continue;
}
// Aggregate
match coordinator.execute_round(updates).await {
Ok(_) => println!("Round {} completed", round),
Err(e) => eprintln!("Round {} failed: {}", round, e),
}
}
Ok(())
}
Troubleshooting¶
Common Issues¶
Issue: "Insufficient nodes"¶
Cause: Not enough active nodes for training round
Solution:
// Reduce minimum nodes per round
let round_config = RoundConfig {
min_nodes_per_round: 5, // Reduced from 10
..Default::default()
};
// Or wait for nodes to recover
tokio::time::sleep(Duration::from_secs(30)).await;
Issue: "Privacy budget exhausted"¶
Cause: Too many training rounds with tight privacy
Solution:
// Increase epsilon (weaker privacy)
let privacy_config = DifferentialPrivacyConfig {
epsilon: 0.5, // Increased from 0.1
delta: 1e-5,
mechanism: NoiseMechanism::Gaussian,
};
// Or reduce number of rounds
let round_config = RoundConfig {
max_rounds: 50, // Reduced from 100
..Default::default()
};
Issue: "Convergence not reached"¶
Cause: Model not converging within max rounds
Solution:
// Increase max rounds
let round_config = RoundConfig {
max_rounds: 200, // Increased from 100
..Default::default()
};
// Or adjust learning rate
let round_config = RoundConfig {
learning_rate: 0.001, // Reduced from 0.01
..Default::default()
};
// Or relax convergence threshold
let round_config = RoundConfig {
convergence_threshold: 0.01, // Increased from 0.001
..Default::default()
};
Debugging Tools¶
// Enable verbose logging
use tracing::Level;
use tracing_subscriber;
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();
// Monitor round history
let history = coordinator.get_round_history().await;
for round in history {
println!("Round {}: {} nodes, loss: {:?}",
round.round_number,
round.participating_nodes.len(),
round.aggregated_loss,
);
}
// Check node health
for node in coordinator.list_nodes().await {
println!("Node {}: available={}, rounds={}",
node.node_id,
node.available,
node.rounds_participated,
);
}
API Reference¶
Core Types¶
// Federated learning engine
pub struct FederatedLearningEngine { /* ... */ }
// Training coordinator
pub struct Coordinator { /* ... */ }
// Worker node
pub struct FederatedWorker { /* ... */ }
// HIPAA compliance manager
pub struct HipaaComplianceManager { /* ... */ }
// GDPR compliance manager
pub struct GdprComplianceManager { /* ... */ }
Configuration Types¶
pub struct Config {
pub aggregation_strategy: AggregationStrategy,
pub round_config: RoundConfig,
pub enable_privacy: bool,
pub privacy_config: Option<DifferentialPrivacyConfig>,
}
pub struct RoundConfig {
pub max_rounds: u64,
pub min_nodes_per_round: usize,
pub selection_strategy: NodeSelectionStrategy,
pub node_timeout_secs: u64,
pub target_accuracy: Option<f64>,
pub learning_rate: f32,
pub convergence_threshold: f64,
}
pub struct DifferentialPrivacyConfig {
pub epsilon: f64,
pub delta: f64,
pub mechanism: NoiseMechanism,
}
Enums¶
pub enum AggregationStrategy {
FedAvg, // Federated averaging
FedProx, // Proximal term regularization
FedOpt, // Adaptive optimization
}
pub enum NodeSelectionStrategy {
All,
Random { k: usize },
MostData { k: usize },
BestPerformance { k: usize },
}
pub enum NoiseMechanism {
Gaussian,
Laplace,
}
For complete API documentation, see API.md.
Version: 1.0.0 Last Updated: 2025-11-15 Module: heliosdb-federated-learning