Skip to content

Job Management API Documentation

Feature: F1.3 Flink Streaming - Job Management Version: 1.0 Date: October 29, 2025 Status: APPROVED Priority: P1 Implementation Timeline: Week 2 (Nov 11-17, 2025)


Executive Summary

The Job Management API provides complete control over F1.3 Flink Streaming job lifecycle: submission, monitoring, cancellation, savepoint management, and recovery. The API supports both HTTP REST and native Rust interfaces, with Prometheus/Grafana integration for monitoring.

Key Features

  • Job submission with validation
  • Job lifecycle management (submit, run, pause, cancel, restart)
  • Savepoint management (create, restore, delete)
  • Real-time monitoring (metrics, status, logs)
  • Resource management (CPU, memory, parallelism)
  • Failure recovery (automatic restart, checkpointing)

API Overview

Endpoints

Method Endpoint Description
POST /api/v1/jobs Submit new job
GET /api/v1/jobs List all jobs
GET /api/v1/jobs/{job_id} Get job details
DELETE /api/v1/jobs/{job_id} Cancel job
POST /api/v1/jobs/{job_id}/savepoints Create savepoint
GET /api/v1/jobs/{job_id}/metrics Get job metrics
GET /api/v1/jobs/{job_id}/logs Get job logs
POST /api/v1/jobs/{job_id}/restart Restart job

API Reference

1. Submit Job

Submit a new streaming job.

Endpoint: POST /api/v1/jobs

Request Body:

{
  "name": "fraud_detection_pipeline",
  "config": {
    "parallelism": 4,
    "checkpoint_interval_ms": 60000,
    "state_backend": "rocksdb",
    "restart_strategy": "fixed_delay"
  },
  "sources": [
    {
      "type": "kafka",
      "config": {
        "bootstrap_servers": ["localhost:9092"],
        "topic": "transactions",
        "group_id": "fraud_detector"
      }
    }
  ],
  "transformations": [
    {
      "type": "filter",
      "config": {
        "condition": "amount > 10000"
      }
    },
    {
      "type": "cep",
      "config": {
        "pattern": "A B+ C",
        "window": "1h"
      }
    }
  ],
  "sinks": [
    {
      "type": "database",
      "config": {
        "connection_string": "postgresql://...",
        "table": "fraud_alerts"
      }
    }
  ]
}

Response (201 Created):

{
  "job_id": "job-123e4567-e89b-12d3-a456-426614174000",
  "status": "submitted",
  "submitted_at": "2025-10-29T10:00:00Z",
  "message": "Job submitted successfully"
}

Rust API:

let job_config = JobConfig {
    name: "fraud_detection_pipeline".to_string(),
    parallelism: 4,
    checkpoint_interval: Duration::from_secs(60),
    state_backend: StateBackendType::RocksDb,
    restart_strategy: RestartStrategy::FixedDelay {
        attempts: 3,
        delay: Duration::from_secs(10),
    },
};

let job_id = job_manager.submit_job(job_config).await?;


2. List Jobs

Get list of all jobs.

Endpoint: GET /api/v1/jobs

Query Parameters: - status (optional): Filter by status (running, finished, failed, cancelled) - limit (optional): Max results (default: 100) - offset (optional): Pagination offset

Response (200 OK):

{
  "jobs": [
    {
      "job_id": "job-123...",
      "name": "fraud_detection_pipeline",
      "status": "running",
      "submitted_at": "2025-10-29T10:00:00Z",
      "started_at": "2025-10-29T10:00:05Z",
      "uptime_seconds": 3600,
      "parallelism": 4
    },
    {
      "job_id": "job-456...",
      "name": "recommendation_engine",
      "status": "finished",
      "submitted_at": "2025-10-29T08:00:00Z",
      "started_at": "2025-10-29T08:00:03Z",
      "finished_at": "2025-10-29T09:30:00Z",
      "parallelism": 8
    }
  ],
  "total": 2,
  "limit": 100,
  "offset": 0
}

Rust API:

let jobs = job_manager.list_jobs(ListJobsOptions {
    status: Some(JobStatus::Running),
    limit: 100,
    offset: 0,
}).await?;


3. Get Job Details

Get detailed information about a specific job.

Endpoint: GET /api/v1/jobs/{job_id}

Response (200 OK):

{
  "job_id": "job-123...",
  "name": "fraud_detection_pipeline",
  "status": "running",
  "config": {
    "parallelism": 4,
    "checkpoint_interval_ms": 60000,
    "state_backend": "rocksdb",
    "restart_strategy": "fixed_delay"
  },
  "submitted_at": "2025-10-29T10:00:00Z",
  "started_at": "2025-10-29T10:00:05Z",
  "uptime_seconds": 3600,
  "metrics": {
    "events_processed": 1523400,
    "throughput_per_sec": 423,
    "latency_p50_ms": 3.2,
    "latency_p99_ms": 12.5,
    "checkpoint_count": 60,
    "last_checkpoint_duration_ms": 45,
    "backpressure_events": 0,
    "restarts": 0
  },
  "tasks": [
    {
      "task_id": "task-1",
      "name": "kafka-source",
      "status": "running",
      "parallelism": 1
    },
    {
      "task_id": "task-2",
      "name": "filter",
      "status": "running",
      "parallelism": 4
    },
    {
      "task_id": "task-3",
      "name": "cep-matcher",
      "status": "running",
      "parallelism": 4
    },
    {
      "task_id": "task-4",
      "name": "database-sink",
      "status": "running",
      "parallelism": 2
    }
  ],
  "checkpoints": [
    {
      "checkpoint_id": "checkpoint-1",
      "timestamp": "2025-10-29T10:59:00Z",
      "duration_ms": 45,
      "size_bytes": 1048576,
      "status": "completed"
    }
  ]
}

Rust API:

let job_details = job_manager.get_job_details(job_id).await?;
println!("Job status: {:?}", job_details.status);
println!("Events processed: {}", job_details.metrics.events_processed);


4. Cancel Job

Cancel a running or paused job.

Endpoint: DELETE /api/v1/jobs/{job_id}

Query Parameters: - savepoint (optional): Create savepoint before cancelling (true/false, default: false)

Response (200 OK):

{
  "job_id": "job-123...",
  "status": "cancelled",
  "cancelled_at": "2025-10-29T11:00:00Z",
  "savepoint_id": "savepoint-789..." // if savepoint=true
}

Rust API:

// Cancel without savepoint
job_manager.cancel_job(job_id).await?;

// Cancel with savepoint
let savepoint_id = job_manager.cancel_job_with_savepoint(job_id).await?;


5. Create Savepoint

Create a savepoint for a running job (for backup or migration).

Endpoint: POST /api/v1/jobs/{job_id}/savepoints

Request Body (optional):

{
  "description": "Pre-upgrade savepoint"
}

Response (201 Created):

{
  "savepoint_id": "savepoint-789...",
  "job_id": "job-123...",
  "created_at": "2025-10-29T11:00:00Z",
  "size_bytes": 10485760,
  "location": "s3://checkpoints/savepoint-789...",
  "description": "Pre-upgrade savepoint"
}

Rust API:

let savepoint_id = job_manager.create_savepoint(
    job_id,
    Some("Pre-upgrade savepoint".to_string())
).await?;


6. Get Job Metrics

Get real-time metrics for a job.

Endpoint: GET /api/v1/jobs/{job_id}/metrics

Query Parameters: - window (optional): Time window (1m, 5m, 1h, 24h, default: 5m)

Response (200 OK):

{
  "job_id": "job-123...",
  "timestamp": "2025-10-29T11:00:00Z",
  "metrics": {
    "events_processed_total": 1523400,
    "throughput_per_sec": 423,
    "latency": {
      "p50_ms": 3.2,
      "p95_ms": 8.7,
      "p99_ms": 12.5,
      "p999_ms": 25.3
    },
    "checkpoint": {
      "count": 60,
      "last_duration_ms": 45,
      "failures": 0
    },
    "backpressure": {
      "events_total": 0,
      "current_level": 0.0
    },
    "resources": {
      "cpu_usage_percent": 45.2,
      "memory_used_mb": 512,
      "memory_total_mb": 2048
    },
    "errors": {
      "count": 0,
      "rate_per_min": 0.0
    }
  },
  "history": [
    {
      "timestamp": "2025-10-29T10:55:00Z",
      "throughput_per_sec": 420
    },
    {
      "timestamp": "2025-10-29T10:56:00Z",
      "throughput_per_sec": 425
    }
  ]
}

Rust API:

let metrics = job_manager.get_job_metrics(
    job_id,
    MetricsWindow::Minutes(5)
).await?;

println!("Throughput: {} events/sec", metrics.throughput_per_sec);
println!("Latency p99: {}ms", metrics.latency.p99_ms);


7. Get Job Logs

Get logs for a job.

Endpoint: GET /api/v1/jobs/{job_id}/logs

Query Parameters: - level (optional): Filter by log level (debug, info, warn, error) - limit (optional): Max log lines (default: 1000) - tail (optional): Return last N lines (default: false)

Response (200 OK):

{
  "job_id": "job-123...",
  "logs": [
    {
      "timestamp": "2025-10-29T10:00:05.123Z",
      "level": "info",
      "message": "Job started successfully",
      "task_id": null
    },
    {
      "timestamp": "2025-10-29T10:00:05.456Z",
      "level": "info",
      "message": "Kafka source connected to localhost:9092",
      "task_id": "task-1"
    },
    {
      "timestamp": "2025-10-29T10:01:00.789Z",
      "level": "info",
      "message": "Checkpoint 1 completed (45ms)",
      "task_id": null
    },
    {
      "timestamp": "2025-10-29T10:05:12.345Z",
      "level": "warn",
      "message": "Temporary backpressure detected",
      "task_id": "task-2"
    }
  ],
  "total": 4,
  "limit": 1000
}

Rust API:

let logs = job_manager.get_job_logs(
    job_id,
    LogOptions {
        level: Some(LogLevel::Warn),
        limit: 1000,
        tail: true,
    }
).await?;

for log in logs {
    println!("[{}] {}: {}", log.timestamp, log.level, log.message);
}


8. Restart Job

Restart a failed or cancelled job (optionally from savepoint).

Endpoint: POST /api/v1/jobs/{job_id}/restart

Request Body (optional):

{
  "savepoint_id": "savepoint-789...", // Optional
  "config_overrides": { // Optional
    "parallelism": 8
  }
}

Response (200 OK):

{
  "job_id": "job-123...",
  "status": "running",
  "restarted_at": "2025-10-29T11:05:00Z",
  "restored_from_savepoint": "savepoint-789...",
  "message": "Job restarted successfully"
}

Rust API:

// Restart from latest checkpoint
job_manager.restart_job(job_id, None).await?;

// Restart from specific savepoint
job_manager.restart_job(
    job_id,
    Some(RestartOptions {
        savepoint_id: Some(savepoint_id),
        config_overrides: None,
    })
).await?;


๐Ÿ— Rust API (Native)

JobManager

pub struct JobManager {
    job_registry: Arc<RwLock<HashMap<JobId, JobMetadata>>>,
    scheduler: Arc<Scheduler>,
    checkpoint_coordinator: Arc<CheckpointCoordinator>,
    metrics_collector: Arc<MetricsCollector>,
}

impl JobManager {
    /// Create new job manager
    pub fn new(max_parallelism: usize) -> Result<Self>;

    /// Submit a new job
    pub async fn submit_job(&self, config: JobConfig) -> Result<JobId>;

    /// Cancel a running job
    pub async fn cancel_job(&self, job_id: JobId) -> Result<()>;

    /// Cancel job with savepoint
    pub async fn cancel_job_with_savepoint(&self, job_id: JobId) -> Result<SavepointId>;

    /// Create savepoint
    pub async fn create_savepoint(
        &self,
        job_id: JobId,
        description: Option<String>
    ) -> Result<SavepointId>;

    /// Restart job
    pub async fn restart_job(
        &self,
        job_id: JobId,
        options: Option<RestartOptions>
    ) -> Result<()>;

    /// Get job status
    pub async fn get_job_status(&self, job_id: JobId) -> Result<JobStatus>;

    /// Get job details
    pub async fn get_job_details(&self, job_id: JobId) -> Result<JobDetails>;

    /// Get job metrics
    pub async fn get_job_metrics(
        &self,
        job_id: JobId,
        window: MetricsWindow
    ) -> Result<JobMetrics>;

    /// Get job logs
    pub async fn get_job_logs(
        &self,
        job_id: JobId,
        options: LogOptions
    ) -> Result<Vec<LogEntry>>;

    /// List all jobs
    pub async fn list_jobs(&self, options: ListJobsOptions) -> Result<Vec<JobSummary>>;
}

Data Structures

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobConfig {
    pub name: String,
    pub parallelism: usize,
    pub checkpoint_interval: Duration,
    pub state_backend: StateBackendType,
    pub restart_strategy: RestartStrategy,
    pub sources: Vec<SourceConfig>,
    pub transformations: Vec<TransformationConfig>,
    pub sinks: Vec<SinkConfig>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum JobStatus {
    Submitted,
    Running,
    Paused,
    Finished,
    Failed,
    Cancelled,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobDetails {
    pub job_id: JobId,
    pub name: String,
    pub status: JobStatus,
    pub config: JobConfig,
    pub submitted_at: SystemTime,
    pub started_at: Option<SystemTime>,
    pub finished_at: Option<SystemTime>,
    pub uptime_secs: u64,
    pub metrics: JobMetrics,
    pub tasks: Vec<TaskInfo>,
    pub checkpoints: Vec<CheckpointInfo>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobMetrics {
    pub events_processed: u64,
    pub throughput_per_sec: f64,
    pub latency_p50_ms: f64,
    pub latency_p99_ms: f64,
    pub checkpoint_count: u64,
    pub last_checkpoint_duration_ms: u64,
    pub backpressure_events: u64,
    pub restarts: u32,
    pub cpu_usage_percent: f64,
    pub memory_used_mb: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RestartStrategy {
    /// No automatic restart
    None,
    /// Restart with fixed delay
    FixedDelay {
        attempts: u32,
        delay: Duration,
    },
    /// Restart with exponential backoff
    ExponentialBackoff {
        max_attempts: u32,
        initial_delay: Duration,
        max_delay: Duration,
    },
}

Prometheus Metrics

Job Metrics

  • heliosdb_job_count (gauge): Number of active jobs
  • heliosdb_job_status (gauge): Job status by job_id
  • heliosdb_job_events_processed_total (counter): Total events processed
  • heliosdb_job_throughput (gauge): Current throughput (events/sec)
  • heliosdb_job_latency_seconds (histogram): Processing latency
  • heliosdb_job_checkpoint_duration_seconds (histogram): Checkpoint duration
  • heliosdb_job_checkpoint_failures_total (counter): Checkpoint failures
  • heliosdb_job_backpressure_events_total (counter): Backpressure events
  • heliosdb_job_restarts_total (counter): Job restarts
  • heliosdb_job_errors_total (counter): Job errors

Examples

Example 1: Submit and Monitor Job

use heliosdb_streaming::job::*;

#[tokio::main]
async fn main() -> Result<()> {
    // Create job manager
    let job_manager = JobManager::new(16)?;

    // Configure job
    let config = JobConfig {
        name: "realtime_analytics".to_string(),
        parallelism: 4,
        checkpoint_interval: Duration::from_secs(60),
        state_backend: StateBackendType::RocksDb,
        restart_strategy: RestartStrategy::FixedDelay {
            attempts: 3,
            delay: Duration::from_secs(10),
        },
        sources: vec![
            SourceConfig::kafka(...),
        ],
        transformations: vec![
            TransformationConfig::filter(...),
            TransformationConfig::window(...),
        ],
        sinks: vec![
            SinkConfig::database(...),
        ],
    };

    // Submit job
    let job_id = job_manager.submit_job(config).await?;
    println!("Job submitted: {}", job_id);

    // Monitor job
    loop {
        let metrics = job_manager.get_job_metrics(job_id, MetricsWindow::Minutes(1)).await?;
        println!("Throughput: {} events/sec", metrics.throughput_per_sec);
        println!("Latency p99: {}ms", metrics.latency_p99_ms);

        tokio::time::sleep(Duration::from_secs(5)).await;
    }
}

Example 2: Savepoint and Restart

// Create savepoint before maintenance
let savepoint_id = job_manager.create_savepoint(
    job_id,
    Some("Pre-maintenance savepoint".to_string())
).await?;

// Cancel job
job_manager.cancel_job(job_id).await?;

// Perform maintenance...

// Restart job from savepoint
job_manager.restart_job(
    job_id,
    Some(RestartOptions {
        savepoint_id: Some(savepoint_id),
        config_overrides: Some(JobConfigOverrides {
            parallelism: Some(8), // Increase parallelism
        }),
    })
).await?;

๐Ÿงช Testing

Integration Tests (10 tests)

  1. Submit job and verify status
  2. Cancel job
  3. Create savepoint
  4. Restart from savepoint
  5. Get job metrics
  6. Get job logs
  7. List jobs with filters
  8. Job failure and automatic restart
  9. Concurrent job management
  10. Resource cleanup after job completion

Acceptance Criteria

  • [ ] All 10 integration tests passing
  • [ ] HTTP API endpoints functional
  • [ ] Rust API documented
  • [ ] Prometheus metrics exported
  • [ ] Grafana dashboards created
  • [ ] Performance: API latency <100ms
  • [ ] Documentation complete

Document Version: 1.0 Last Updated: October 29, 2025 Status: APPROVED Implementation: Week 2 (Nov 11-17, 2025)