Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions orchestrator/src/core/client/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,11 @@ pub trait QueueClient: Send + Sync {
/// * `Ok(())` - If the queue service is healthy and accessible
/// * `Err(QueueError)` - If the health check fails
async fn health_check(&self) -> Result<(), QueueError>;

/// Get the approximate number of messages in a queue
///
/// # Returns
/// * `Ok(i32)` - Approximate number of messages in the queue
/// * `Err(QueueError)` - If the operation fails
async fn get_queue_depth(&self, queue: QueueType) -> Result<i32, QueueError>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Type Safety: i32 for Queue Depth

Queue depth is returned as i32:

async fn get_queue_depth(&self, queue: QueueType) -> Result<i32, QueueError>;

Issues:

  • SQS queue depths can exceed i32::MAX (2.1 billion) in extreme cases
  • i32 allows negative values, which don't make sense for queue depth
  • SQS ApproximateNumberOfMessages returns a string that's parsed to i32, but could theoretically be larger

Impact

Potential overflow/underflow issues, though unlikely for a priority queue with max size 20. However, for consistency and future-proofing, this is a type safety concern.

Solution

Use u64 or usize for queue depth:

// In queue/mod.rs
async fn get_queue_depth(&self, queue: QueueType) -> Result<u64, QueueError>;

// In queue_control.rs
pub static MAX_PRIORITY_QUEUE_SIZE: LazyLock<u64> = LazyLock::new(|| {
    get_env_var_or_default("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE", "20")
        .parse()
        .expect("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE must be a valid integer")
});

// In sqs.rs
let count = attributes
    .attributes()
    .and_then(|attrs| attrs.get(&QueueAttributeName::ApproximateNumberOfMessages))
    .and_then(|value| value.parse::<u64>().ok()) // Changed to u64
    .unwrap_or(0);

This provides better type safety and prevents negative values.

}
22 changes: 22 additions & 0 deletions orchestrator/src/core/client/queue/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,26 @@ impl QueueClient for SQS {

Ok(())
}

async fn get_queue_depth(&self, queue: QueueType) -> Result<i32, QueueError> {
let queue_name = self.get_queue_name(&queue)?;
let queue_url = self.inner.get_queue_url_from_client(queue_name.as_str()).await?;

let attributes = self
.inner
.client()
.get_queue_attributes()
.queue_url(&queue_url)
.attribute_names(QueueAttributeName::ApproximateNumberOfMessages)
.send()
.await?;

let count = attributes
.attributes()
.and_then(|attrs| attrs.get(&QueueAttributeName::ApproximateNumberOfMessages))
.and_then(|value| value.parse::<i32>().ok())
.unwrap_or(0);

Ok(count)
}
}
4 changes: 4 additions & 0 deletions orchestrator/src/error/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ pub enum JobError {
#[error("Max Capacity Reached, Already processing")]
MaxCapacityReached,

/// Indicates that the priority queue has reached its maximum capacity
#[error("Priority queue is full. Current size: {current_size}, Max size: {max_size}. Please try again later or use normal queue.")]
PriorityQueueFull { current_size: i32, max_size: i32 },

/// Indicates an error occurred while extracting the processing lock
#[error("Error extracting processing lock: {0}")]
LockError(String),
Expand Down
7 changes: 7 additions & 0 deletions orchestrator/src/server/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ pub enum JobRouteError {
/// Contains both the job ID and the current status
#[error("Invalid status: {id}: {job_status}")]
InvalidStatus { id: String, job_status: String },

/// Indicates that the priority queue has reached capacity
#[error("Priority queue capacity exceeded: {0}")]
QueueCapacityExceeded(String),
}

/// Implementation of axum's `IntoResponse` trait for converting errors into HTTP responses.
Expand Down Expand Up @@ -106,6 +110,9 @@ impl IntoResponse for JobRouteError {
Json(ApiResponse::error(format!("Cannot retry job {id}: invalid status {job_status}"))),
)
.into_response(),
JobRouteError::QueueCapacityExceeded(msg) => {
(StatusCode::TOO_MANY_REQUESTS, Json(ApiResponse::error(msg))).into_response()
}
}
}
}
Expand Down
90 changes: 71 additions & 19 deletions orchestrator/src/server/route/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use uuid::Uuid;

use super::super::error::JobRouteError;
use super::super::types::{
ApiResponse, JobId, JobRouteResult, JobStatusQuery, JobStatusResponse, JobStatusResponseItem,
ApiResponse, JobId, JobRouteResult, JobStatusQuery, JobStatusResponse, JobStatusResponseItem, PriorityQuery,
};
use crate::core::config::Config;
use crate::utils::metrics::ORCHESTRATOR_METRICS;
Expand All @@ -22,12 +22,13 @@ use crate::worker::service::JobService;
/// This endpoint initiates the processing of a job identified by its UUID. It performs the
/// following:
/// 1. Validates and parses the job ID from the URL path parameter
/// 2. Calls the job processing logic
/// 2. Calls the job processing logic (optionally via priority queue)
/// 3. Records metrics for successful/failed operations
/// 4. Returns an appropriate API response
///
/// # Arguments
/// * `Path(JobId { id })` - The job ID extracted from the URL path
/// * `Query(query)` - Query parameters including optional priority flag
/// * `State(config)` - Shared application configuration
///
/// # Returns
Expand All @@ -38,25 +39,49 @@ use crate::worker::service::JobService;
/// * `JobRouteError::ProcessingError` - If job processing fails
async fn handle_process_job_request(
Path(JobId { id }): Path<JobId>,
Query(query): Query<PriorityQuery>,
State(config): State<Arc<Config>>,
) -> JobRouteResult {
let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?;
// Record job_id in the current request span for consistent logging
Span::current().record("job_id", tracing::field::display(job_id));

match JobService::queue_job_for_processing(job_id, config.clone()).await {
match JobService::queue_job_for_processing(job_id, config.clone(), query.priority).await {
Ok(_) => {
info!("Job queued for processing successfully");
ORCHESTRATOR_METRICS
.successful_job_operations
.add(1.0, &[KeyValue::new("operation_type", "queue_process")]);
Ok(Json(ApiResponse::<()>::success(Some(format!("Job with id {} queued for processing", id))))
.into_response())
let queue_type = if query.priority { "PRIORITY" } else { "normal" };
info!("Job queued for {} processing successfully", queue_type);
ORCHESTRATOR_METRICS.successful_job_operations.add(
1.0,
&[
KeyValue::new("operation_type", "queue_process"),
KeyValue::new("priority", if query.priority { "true" } else { "false" }),
],
);
Ok(Json(ApiResponse::<()>::success(Some(format!(
"Job with id {} queued for {} processing",
id, queue_type
))))
.into_response())
}
Err(e) => {
error!(error = %e, "Failed to queue job for processing");
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_process")]);
Err(JobRouteError::ProcessingError(e.to_string()))
ORCHESTRATOR_METRICS.failed_job_operations.add(
1.0,
&[
KeyValue::new("operation_type", "queue_process"),
KeyValue::new("priority", if query.priority { "true" } else { "false" }),
],
);
// Map PriorityQueueFull to QueueCapacityExceeded for proper HTTP status code
match e {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔍 Error Handling: Manual Error Mapping

The manual mapping of JobError::PriorityQueueFull to JobRouteError inside the route handler is repetitive and appears in both handle_process_job_request and handle_verify_job_request:

match e {
    crate::error::job::JobError::PriorityQueueFull { current_size, max_size } => {
        Err(JobRouteError::QueueCapacityExceeded(format!(
            "Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
            current_size, max_size
        )))
    }
    _ => Err(JobRouteError::ProcessingError(e.to_string())),
}

Solution

Implement From<JobError> for JobRouteError to centralize this conversion:

// In orchestrator/src/server/error.rs
impl From<JobError> for JobRouteError {
    fn from(err: JobError) -> Self {
        match err {
            JobError::PriorityQueueFull { current_size, max_size } => {
                JobRouteError::QueueCapacityExceeded(format!(
                    "Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
                    current_size, max_size
                ))
            }
            _ => JobRouteError::ProcessingError(err.to_string()),
        }
    }
}

// Then in route handlers, simplify to:
match JobService::queue_job_for_processing(job_id, config.clone(), query.priority).await {
    Ok(_) => { /* success */ }
    Err(e) => Err(e.into()), // Automatic conversion
}

This follows Rust idioms and reduces duplication.

crate::error::job::JobError::PriorityQueueFull { current_size, max_size } => {
Err(JobRouteError::QueueCapacityExceeded(format!(
"Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
current_size, max_size
)))
}
_ => Err(JobRouteError::ProcessingError(e.to_string())),
}
}
}
}
Expand All @@ -65,13 +90,14 @@ async fn handle_process_job_request(
///
/// This endpoint queues the job for verification by:
/// 1. Validates and parses the job ID
/// 2. Adds the job to the verification queue
/// 2. Adds the job to the verification queue (optionally via priority queue)
/// 3. Resets verification attempt counter
/// 4. Records metrics for the queue operation
/// 5. Returns immediate response
///
/// # Arguments
/// * `Path(JobId { id })` - The job ID extracted from the URL path
/// * `Query(query)` - Query parameters including optional priority flag
/// * `State(config)` - Shared application configuration
///
/// # Returns
Expand All @@ -82,23 +108,49 @@ async fn handle_process_job_request(
/// * `JobRouteError::ProcessingError` - If queueing for verification fails
async fn handle_verify_job_request(
Path(JobId { id }): Path<JobId>,
Query(query): Query<PriorityQuery>,
State(config): State<Arc<Config>>,
) -> JobRouteResult {
let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?;
// Record job_id in the current request span for consistent logging
Span::current().record("job_id", tracing::field::display(job_id));

match JobService::queue_job_for_verification(job_id, config.clone()).await {
match JobService::queue_job_for_verification(job_id, config.clone(), query.priority).await {
Ok(_) => {
info!("Job queued for verification successfully");
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_verify")]);
Ok(Json(ApiResponse::<()>::success(Some(format!("Job with id {} queued for verification", id))))
.into_response())
let queue_type = if query.priority { "PRIORITY" } else { "normal" };
info!("Job queued for {} verification successfully", queue_type);
ORCHESTRATOR_METRICS.successful_job_operations.add(
1.0,
&[
KeyValue::new("operation_type", "queue_verify"),
KeyValue::new("priority", if query.priority { "true" } else { "false" }),
],
);
Ok(Json(ApiResponse::<()>::success(Some(format!(
"Job with id {} queued for {} verification",
id, queue_type
))))
.into_response())
}
Err(e) => {
error!(error = %e, "Failed to queue job for verification");
ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_verify")]);
Err(JobRouteError::ProcessingError(e.to_string()))
ORCHESTRATOR_METRICS.failed_job_operations.add(
1.0,
&[
KeyValue::new("operation_type", "queue_verify"),
KeyValue::new("priority", if query.priority { "true" } else { "false" }),
],
);
// Map PriorityQueueFull to QueueCapacityExceeded for proper HTTP status code
match e {
crate::error::job::JobError::PriorityQueueFull { current_size, max_size } => {
Err(JobRouteError::QueueCapacityExceeded(format!(
"Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
current_size, max_size
)))
}
_ => Err(JobRouteError::ProcessingError(e.to_string())),
}
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions orchestrator/src/server/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ pub struct JobStatusQuery {
pub status: JobStatus,
}

/// Represents query parameters for priority queue selection.
#[derive(Deserialize)]
pub struct PriorityQuery {
/// Whether to use the priority queue (defaults to false)
#[serde(default)]
pub priority: bool,
}

/// Represents a standardized API response structure.
///
/// This struct provides a consistent format for all API responses, including
Expand Down
10 changes: 10 additions & 0 deletions orchestrator/src/types/queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::error::event::EventSystemError;
use crate::types::jobs::types::JobType;
use serde::{Deserialize, Serialize};
use strum_macros::{Display, EnumIter};

#[derive(Display, Debug, Clone, PartialEq, Eq, EnumIter, Hash)]
Expand All @@ -8,6 +9,12 @@ pub enum JobState {
Verification,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum JobAction {
Process,
Verify,
}

#[derive(Display, Debug, Clone, PartialEq, Eq, EnumIter, Hash)]
pub enum QueueType {
#[strum(serialize = "snos_job_processing")]
Expand Down Expand Up @@ -38,6 +45,8 @@ pub enum QueueType {
JobHandleFailure,
#[strum(serialize = "worker_trigger")]
WorkerTrigger,
#[strum(serialize = "priority_job_queue")]
PriorityJobQueue,
}

impl TryFrom<QueueType> for JobState {
Expand All @@ -58,6 +67,7 @@ impl TryFrom<QueueType> for JobState {
QueueType::AggregatorJobVerification => JobState::Verification,
QueueType::JobHandleFailure => Err(Self::Error::InvalidJobType(QueueType::JobHandleFailure.to_string()))?,
QueueType::WorkerTrigger => Err(Self::Error::InvalidJobType(QueueType::WorkerTrigger.to_string()))?,
QueueType::PriorityJobQueue => Err(Self::Error::InvalidJobType(QueueType::PriorityJobQueue.to_string()))?,
};
Ok(state)
}
Expand Down
17 changes: 17 additions & 0 deletions orchestrator/src/types/queue_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ use orchestrator_utils::env_utils::get_env_var_or_default;
use std::collections::HashMap;
use std::sync::LazyLock;

/// Maximum number of messages allowed in the priority queue
/// Can be configured via MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE environment variable
pub static MAX_PRIORITY_QUEUE_SIZE: LazyLock<i32> = LazyLock::new(|| {
get_env_var_or_default("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE", "20")
.parse()
.expect("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE must be a valid integer")
});

#[derive(Clone)]
pub struct DlqConfig {
pub max_receive_count: u32,
Expand Down Expand Up @@ -173,5 +181,14 @@ pub static QUEUES: LazyLock<HashMap<QueueType, QueueConfig>> = LazyLock::new(||
supported_layers: vec![Layer::L2],
},
);
map.insert(
QueueType::PriorityJobQueue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Visibility Timeout Risk with Message Bouncing

The PriorityJobQueue has a visibility_timeout: 30 seconds. When a worker nacks a non-matching message:

  1. Message becomes visible again immediately (visibility timeout is 0 on nack)
  2. If no worker of the correct type is available, the message will bounce between wrong workers
  3. If a message type has NO active workers (e.g., all DataSubmission workers are down), the message could bounce indefinitely consuming resources

Impact

  • Resource waste: Messages bouncing between workers consume CPU and SQS API calls
  • Potential message loss: If visibility timeout handling is inconsistent, messages could get stuck
  • No circuit breaker: No mechanism to detect when a message type has no active workers

Solution

Option 1: Document this behavior and add monitoring:

// Add comment in queue_control.rs
// PriorityJobQueue visibility timeout is set to 30 seconds.
// Note: When messages are nacked due to non-matching worker types,
// they become visible immediately. If no matching worker is available,
// messages will bounce until a matching worker comes online or visibility
// timeout expires. Consider implementing a max-bounce counter or DLQ
// routing for messages that bounce too many times.

Option 2: Implement max-bounce counter:

// Add to PriorityQueueMessage:
pub struct PriorityQueueMessage {
    pub id: Uuid,
    pub job_type: JobType,
    pub action: JobAction,
    pub bounce_count: Option<u32>, // Track bounces
}

// In get_priority_message, when nacking:
if bounce_count.unwrap_or(0) >= MAX_BOUNCES {
    // Move to DLQ or log critical error
    error!("Message {} has bounced {} times, no matching worker available", id, bounce_count);
}

Option 3: Add alerting for messages that exceed visibility timeout multiple times.

Recommendation: Start with Option 1 (documentation + monitoring), then add Option 2 if bouncing becomes a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increased it to 300 seconds. this is the max time we have given for any job, so ideally we should be able to complete the job in that time

QueueConfig {
visibility_timeout: 30,
dlq_config: None, // No DLQ - failed priority jobs go through normal error handling
queue_control: QueueControlConfig::new(10), // Not used directly, but set for consistency
supported_layers: vec![Layer::L2, Layer::L3],
},
);
map
});
Loading