Job Management API Documentation¶
Feature: F1.3 Flink Streaming - Job Management Version: 1.0 Date: October 29, 2025 Status: APPROVED Priority: P1 Implementation Timeline: Week 2 (Nov 11-17, 2025)
Executive Summary¶
The Job Management API provides complete control over F1.3 Flink Streaming job lifecycle: submission, monitoring, cancellation, savepoint management, and recovery. The API supports both HTTP REST and native Rust interfaces, with Prometheus/Grafana integration for monitoring.
Key Features¶
- Job submission with validation
- Job lifecycle management (submit, run, pause, cancel, restart)
- Savepoint management (create, restore, delete)
- Real-time monitoring (metrics, status, logs)
- Resource management (CPU, memory, parallelism)
- Failure recovery (automatic restart, checkpointing)
API Overview¶
Endpoints¶
| Method | Endpoint | Description |
|---|---|---|
POST |
/api/v1/jobs |
Submit new job |
GET |
/api/v1/jobs |
List all jobs |
GET |
/api/v1/jobs/{job_id} |
Get job details |
DELETE |
/api/v1/jobs/{job_id} |
Cancel job |
POST |
/api/v1/jobs/{job_id}/savepoints |
Create savepoint |
GET |
/api/v1/jobs/{job_id}/metrics |
Get job metrics |
GET |
/api/v1/jobs/{job_id}/logs |
Get job logs |
POST |
/api/v1/jobs/{job_id}/restart |
Restart job |
API Reference¶
1. Submit Job¶
Submit a new streaming job.
Endpoint: POST /api/v1/jobs
Request Body:
{
"name": "fraud_detection_pipeline",
"config": {
"parallelism": 4,
"checkpoint_interval_ms": 60000,
"state_backend": "rocksdb",
"restart_strategy": "fixed_delay"
},
"sources": [
{
"type": "kafka",
"config": {
"bootstrap_servers": ["localhost:9092"],
"topic": "transactions",
"group_id": "fraud_detector"
}
}
],
"transformations": [
{
"type": "filter",
"config": {
"condition": "amount > 10000"
}
},
{
"type": "cep",
"config": {
"pattern": "A B+ C",
"window": "1h"
}
}
],
"sinks": [
{
"type": "database",
"config": {
"connection_string": "postgresql://...",
"table": "fraud_alerts"
}
}
]
}
Response (201 Created):
{
"job_id": "job-123e4567-e89b-12d3-a456-426614174000",
"status": "submitted",
"submitted_at": "2025-10-29T10:00:00Z",
"message": "Job submitted successfully"
}
Rust API:
let job_config = JobConfig {
name: "fraud_detection_pipeline".to_string(),
parallelism: 4,
checkpoint_interval: Duration::from_secs(60),
state_backend: StateBackendType::RocksDb,
restart_strategy: RestartStrategy::FixedDelay {
attempts: 3,
delay: Duration::from_secs(10),
},
};
let job_id = job_manager.submit_job(job_config).await?;
2. List Jobs¶
Get list of all jobs.
Endpoint: GET /api/v1/jobs
Query Parameters:
- status (optional): Filter by status (running, finished, failed, cancelled)
- limit (optional): Max results (default: 100)
- offset (optional): Pagination offset
Response (200 OK):
{
"jobs": [
{
"job_id": "job-123...",
"name": "fraud_detection_pipeline",
"status": "running",
"submitted_at": "2025-10-29T10:00:00Z",
"started_at": "2025-10-29T10:00:05Z",
"uptime_seconds": 3600,
"parallelism": 4
},
{
"job_id": "job-456...",
"name": "recommendation_engine",
"status": "finished",
"submitted_at": "2025-10-29T08:00:00Z",
"started_at": "2025-10-29T08:00:03Z",
"finished_at": "2025-10-29T09:30:00Z",
"parallelism": 8
}
],
"total": 2,
"limit": 100,
"offset": 0
}
Rust API:
let jobs = job_manager.list_jobs(ListJobsOptions {
status: Some(JobStatus::Running),
limit: 100,
offset: 0,
}).await?;
3. Get Job Details¶
Get detailed information about a specific job.
Endpoint: GET /api/v1/jobs/{job_id}
Response (200 OK):
{
"job_id": "job-123...",
"name": "fraud_detection_pipeline",
"status": "running",
"config": {
"parallelism": 4,
"checkpoint_interval_ms": 60000,
"state_backend": "rocksdb",
"restart_strategy": "fixed_delay"
},
"submitted_at": "2025-10-29T10:00:00Z",
"started_at": "2025-10-29T10:00:05Z",
"uptime_seconds": 3600,
"metrics": {
"events_processed": 1523400,
"throughput_per_sec": 423,
"latency_p50_ms": 3.2,
"latency_p99_ms": 12.5,
"checkpoint_count": 60,
"last_checkpoint_duration_ms": 45,
"backpressure_events": 0,
"restarts": 0
},
"tasks": [
{
"task_id": "task-1",
"name": "kafka-source",
"status": "running",
"parallelism": 1
},
{
"task_id": "task-2",
"name": "filter",
"status": "running",
"parallelism": 4
},
{
"task_id": "task-3",
"name": "cep-matcher",
"status": "running",
"parallelism": 4
},
{
"task_id": "task-4",
"name": "database-sink",
"status": "running",
"parallelism": 2
}
],
"checkpoints": [
{
"checkpoint_id": "checkpoint-1",
"timestamp": "2025-10-29T10:59:00Z",
"duration_ms": 45,
"size_bytes": 1048576,
"status": "completed"
}
]
}
Rust API:
let job_details = job_manager.get_job_details(job_id).await?;
println!("Job status: {:?}", job_details.status);
println!("Events processed: {}", job_details.metrics.events_processed);
4. Cancel Job¶
Cancel a running or paused job.
Endpoint: DELETE /api/v1/jobs/{job_id}
Query Parameters:
- savepoint (optional): Create savepoint before cancelling (true/false, default: false)
Response (200 OK):
{
"job_id": "job-123...",
"status": "cancelled",
"cancelled_at": "2025-10-29T11:00:00Z",
"savepoint_id": "savepoint-789..." // if savepoint=true
}
Rust API:
// Cancel without savepoint
job_manager.cancel_job(job_id).await?;
// Cancel with savepoint
let savepoint_id = job_manager.cancel_job_with_savepoint(job_id).await?;
5. Create Savepoint¶
Create a savepoint for a running job (for backup or migration).
Endpoint: POST /api/v1/jobs/{job_id}/savepoints
Request Body (optional):
Response (201 Created):
{
"savepoint_id": "savepoint-789...",
"job_id": "job-123...",
"created_at": "2025-10-29T11:00:00Z",
"size_bytes": 10485760,
"location": "s3://checkpoints/savepoint-789...",
"description": "Pre-upgrade savepoint"
}
Rust API:
let savepoint_id = job_manager.create_savepoint(
job_id,
Some("Pre-upgrade savepoint".to_string())
).await?;
6. Get Job Metrics¶
Get real-time metrics for a job.
Endpoint: GET /api/v1/jobs/{job_id}/metrics
Query Parameters:
- window (optional): Time window (1m, 5m, 1h, 24h, default: 5m)
Response (200 OK):
{
"job_id": "job-123...",
"timestamp": "2025-10-29T11:00:00Z",
"metrics": {
"events_processed_total": 1523400,
"throughput_per_sec": 423,
"latency": {
"p50_ms": 3.2,
"p95_ms": 8.7,
"p99_ms": 12.5,
"p999_ms": 25.3
},
"checkpoint": {
"count": 60,
"last_duration_ms": 45,
"failures": 0
},
"backpressure": {
"events_total": 0,
"current_level": 0.0
},
"resources": {
"cpu_usage_percent": 45.2,
"memory_used_mb": 512,
"memory_total_mb": 2048
},
"errors": {
"count": 0,
"rate_per_min": 0.0
}
},
"history": [
{
"timestamp": "2025-10-29T10:55:00Z",
"throughput_per_sec": 420
},
{
"timestamp": "2025-10-29T10:56:00Z",
"throughput_per_sec": 425
}
]
}
Rust API:
let metrics = job_manager.get_job_metrics(
job_id,
MetricsWindow::Minutes(5)
).await?;
println!("Throughput: {} events/sec", metrics.throughput_per_sec);
println!("Latency p99: {}ms", metrics.latency.p99_ms);
7. Get Job Logs¶
Get logs for a job.
Endpoint: GET /api/v1/jobs/{job_id}/logs
Query Parameters:
- level (optional): Filter by log level (debug, info, warn, error)
- limit (optional): Max log lines (default: 1000)
- tail (optional): Return last N lines (default: false)
Response (200 OK):
{
"job_id": "job-123...",
"logs": [
{
"timestamp": "2025-10-29T10:00:05.123Z",
"level": "info",
"message": "Job started successfully",
"task_id": null
},
{
"timestamp": "2025-10-29T10:00:05.456Z",
"level": "info",
"message": "Kafka source connected to localhost:9092",
"task_id": "task-1"
},
{
"timestamp": "2025-10-29T10:01:00.789Z",
"level": "info",
"message": "Checkpoint 1 completed (45ms)",
"task_id": null
},
{
"timestamp": "2025-10-29T10:05:12.345Z",
"level": "warn",
"message": "Temporary backpressure detected",
"task_id": "task-2"
}
],
"total": 4,
"limit": 1000
}
Rust API:
let logs = job_manager.get_job_logs(
job_id,
LogOptions {
level: Some(LogLevel::Warn),
limit: 1000,
tail: true,
}
).await?;
for log in logs {
println!("[{}] {}: {}", log.timestamp, log.level, log.message);
}
8. Restart Job¶
Restart a failed or cancelled job (optionally from savepoint).
Endpoint: POST /api/v1/jobs/{job_id}/restart
Request Body (optional):
{
"savepoint_id": "savepoint-789...", // Optional
"config_overrides": { // Optional
"parallelism": 8
}
}
Response (200 OK):
{
"job_id": "job-123...",
"status": "running",
"restarted_at": "2025-10-29T11:05:00Z",
"restored_from_savepoint": "savepoint-789...",
"message": "Job restarted successfully"
}
Rust API:
// Restart from latest checkpoint
job_manager.restart_job(job_id, None).await?;
// Restart from specific savepoint
job_manager.restart_job(
job_id,
Some(RestartOptions {
savepoint_id: Some(savepoint_id),
config_overrides: None,
})
).await?;
๐ Rust API (Native)¶
JobManager¶
pub struct JobManager {
job_registry: Arc<RwLock<HashMap<JobId, JobMetadata>>>,
scheduler: Arc<Scheduler>,
checkpoint_coordinator: Arc<CheckpointCoordinator>,
metrics_collector: Arc<MetricsCollector>,
}
impl JobManager {
/// Create new job manager
pub fn new(max_parallelism: usize) -> Result<Self>;
/// Submit a new job
pub async fn submit_job(&self, config: JobConfig) -> Result<JobId>;
/// Cancel a running job
pub async fn cancel_job(&self, job_id: JobId) -> Result<()>;
/// Cancel job with savepoint
pub async fn cancel_job_with_savepoint(&self, job_id: JobId) -> Result<SavepointId>;
/// Create savepoint
pub async fn create_savepoint(
&self,
job_id: JobId,
description: Option<String>
) -> Result<SavepointId>;
/// Restart job
pub async fn restart_job(
&self,
job_id: JobId,
options: Option<RestartOptions>
) -> Result<()>;
/// Get job status
pub async fn get_job_status(&self, job_id: JobId) -> Result<JobStatus>;
/// Get job details
pub async fn get_job_details(&self, job_id: JobId) -> Result<JobDetails>;
/// Get job metrics
pub async fn get_job_metrics(
&self,
job_id: JobId,
window: MetricsWindow
) -> Result<JobMetrics>;
/// Get job logs
pub async fn get_job_logs(
&self,
job_id: JobId,
options: LogOptions
) -> Result<Vec<LogEntry>>;
/// List all jobs
pub async fn list_jobs(&self, options: ListJobsOptions) -> Result<Vec<JobSummary>>;
}
Data Structures¶
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobConfig {
pub name: String,
pub parallelism: usize,
pub checkpoint_interval: Duration,
pub state_backend: StateBackendType,
pub restart_strategy: RestartStrategy,
pub sources: Vec<SourceConfig>,
pub transformations: Vec<TransformationConfig>,
pub sinks: Vec<SinkConfig>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum JobStatus {
Submitted,
Running,
Paused,
Finished,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobDetails {
pub job_id: JobId,
pub name: String,
pub status: JobStatus,
pub config: JobConfig,
pub submitted_at: SystemTime,
pub started_at: Option<SystemTime>,
pub finished_at: Option<SystemTime>,
pub uptime_secs: u64,
pub metrics: JobMetrics,
pub tasks: Vec<TaskInfo>,
pub checkpoints: Vec<CheckpointInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobMetrics {
pub events_processed: u64,
pub throughput_per_sec: f64,
pub latency_p50_ms: f64,
pub latency_p99_ms: f64,
pub checkpoint_count: u64,
pub last_checkpoint_duration_ms: u64,
pub backpressure_events: u64,
pub restarts: u32,
pub cpu_usage_percent: f64,
pub memory_used_mb: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RestartStrategy {
/// No automatic restart
None,
/// Restart with fixed delay
FixedDelay {
attempts: u32,
delay: Duration,
},
/// Restart with exponential backoff
ExponentialBackoff {
max_attempts: u32,
initial_delay: Duration,
max_delay: Duration,
},
}
Prometheus Metrics¶
Job Metrics¶
heliosdb_job_count(gauge): Number of active jobsheliosdb_job_status(gauge): Job status by job_idheliosdb_job_events_processed_total(counter): Total events processedheliosdb_job_throughput(gauge): Current throughput (events/sec)heliosdb_job_latency_seconds(histogram): Processing latencyheliosdb_job_checkpoint_duration_seconds(histogram): Checkpoint durationheliosdb_job_checkpoint_failures_total(counter): Checkpoint failuresheliosdb_job_backpressure_events_total(counter): Backpressure eventsheliosdb_job_restarts_total(counter): Job restartsheliosdb_job_errors_total(counter): Job errors
Examples¶
Example 1: Submit and Monitor Job¶
use heliosdb_streaming::job::*;
#[tokio::main]
async fn main() -> Result<()> {
// Create job manager
let job_manager = JobManager::new(16)?;
// Configure job
let config = JobConfig {
name: "realtime_analytics".to_string(),
parallelism: 4,
checkpoint_interval: Duration::from_secs(60),
state_backend: StateBackendType::RocksDb,
restart_strategy: RestartStrategy::FixedDelay {
attempts: 3,
delay: Duration::from_secs(10),
},
sources: vec![
SourceConfig::kafka(...),
],
transformations: vec![
TransformationConfig::filter(...),
TransformationConfig::window(...),
],
sinks: vec![
SinkConfig::database(...),
],
};
// Submit job
let job_id = job_manager.submit_job(config).await?;
println!("Job submitted: {}", job_id);
// Monitor job
loop {
let metrics = job_manager.get_job_metrics(job_id, MetricsWindow::Minutes(1)).await?;
println!("Throughput: {} events/sec", metrics.throughput_per_sec);
println!("Latency p99: {}ms", metrics.latency_p99_ms);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
Example 2: Savepoint and Restart¶
// Create savepoint before maintenance
let savepoint_id = job_manager.create_savepoint(
job_id,
Some("Pre-maintenance savepoint".to_string())
).await?;
// Cancel job
job_manager.cancel_job(job_id).await?;
// Perform maintenance...
// Restart job from savepoint
job_manager.restart_job(
job_id,
Some(RestartOptions {
savepoint_id: Some(savepoint_id),
config_overrides: Some(JobConfigOverrides {
parallelism: Some(8), // Increase parallelism
}),
})
).await?;
๐งช Testing¶
Integration Tests (10 tests)¶
- Submit job and verify status
- Cancel job
- Create savepoint
- Restart from savepoint
- Get job metrics
- Get job logs
- List jobs with filters
- Job failure and automatic restart
- Concurrent job management
- Resource cleanup after job completion
Acceptance Criteria¶
- [ ] All 10 integration tests passing
- [ ] HTTP API endpoints functional
- [ ] Rust API documented
- [ ] Prometheus metrics exported
- [ ] Grafana dashboards created
- [ ] Performance: API latency <100ms
- [ ] Documentation complete
Document Version: 1.0 Last Updated: October 29, 2025 Status: APPROVED Implementation: Week 2 (Nov 11-17, 2025)