Skip to content

Conversation

@prkpndy
Copy link
Contributor

@prkpndy prkpndy commented Dec 5, 2025

Add Priority Queue Mechanism for Job Processing

Pull Request type

  • Bugfix
  • Feature
  • Code style update (formatting, renaming)
  • Refactoring (no functional changes, no API changes)
  • Build-related changes
  • Documentation content changes
  • Testing
  • Other (please describe):

What is the current behavior?

All jobs are processed inorder through their respective queues. There's no mechanism to prioritize urgent jobs.

Resolves: #NA

What is the new behavior?

Implements a priority queue mechanism that allows urgent jobs to "jump the line" while respecting per-job-type concurrency limits.

Key Changes:

  • New API Parameter: Jobs can be submitted with ?priority=true query parameter
    GET /jobs/{id}/process?priority=true
    GET /jobs/{id}/verify?priority=true
    
  • Priority Injection Architecture:
    • Added shared PriorityJobQueue
    • All 12 job workers check priority queue before their normal queues
    • Priority messages contain {job_id, job_type, action} for routing
    • Workers filter and process only matching messages
  • Concurrency Limits Respected: Priority jobs still respect existing limits (SNOS max 5, others max 10)

Files Modified:

  • New: orchestrator/src/worker/parser/priority_queue_message.rs
  • Modified: 8 files (queue types, event worker, job service, API routes)

Does this introduce a breaking change?

Yes - Internal API signature changes (backward compatible for external users)

@Mohiiit Mohiiit added the orchestrator This change is relevant to orchestrator label Dec 5, 2025
Copy link
Member

@Mohiiit Mohiiit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

This PR introduces a priority queue feature. I've identified several issues that need attention:

🚨 Critical: Worker starvation bug that must be fixed
⚠️ Major: Data consistency concerns
🔍 Code Quality: Several DRY violations and error handling improvements needed
🐛 Additional Issues: Documentation, metrics, and type safety improvements

See inline comments for details.


// Process new messages (with backpressure)
// 3. NEW: Check priority queue BEFORE normal queue
priority_result = self.get_priority_message(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Critical: Worker Starvation Bug

The tokio::select! loop with biased mode creates a critical starvation bug that prevents normal queue processing:

  1. get_priority_message() returns immediately (non-blocking) with Ok(None) when the priority queue is empty
  2. get_message() is a blocking call that waits for messages
  3. In a biased select, if the first branch (priority_result) is ready (even with None), it is selected
  4. The get_message() future is dropped/cancelled
  5. The loop repeats, get_priority_message() returns None instantly again
  6. Result: The worker busy-loops, consuming 100% CPU, and never waits for the normal queue

Solution

Move the priority check outside the tokio::select! loop to prevent starvation:

loop {
    // Check if shutdown was requested
    if self.is_shutdown_requested() {
        info!("Shutdown requested, stopping message processing");
        break;
    }

    // 1. Check priority queue first (non-blocking)
    if tasks.len() < max_concurrent_tasks && self.should_check_priority_queue() {
        match self.get_priority_message().await {
            Ok(Some((delivery, priority_msg))) => {
                // Convert priority message to JobQueueMessage for processing
                let job_queue_msg = JobQueueMessage { id: priority_msg.id };
                let parsed_message = ParsedMessage::JobQueue(Box::new(job_queue_msg));

                let worker = self.clone();
                tasks.spawn(async move {
                    worker.process_message(delivery, parsed_message).await
                });
                debug!("Spawned PRIORITY task, active: {}", tasks.len());
                continue; // Process next iteration
            }
            Ok(None) => {
                // No priority messages, fall through to normal queue
            }
            Err(e) => {
                error!("Error processing priority queue: {:?}", e);
                sleep(Duration::from_millis(100)).await;
                continue;
            }
        }
    }

    // 2. Block on normal queue or shutdown (only when no priority messages)
    tokio::select! {
        biased;
        Some(result) = tasks.join_next(), if !tasks.is_empty() => {
            Self::handle_task_result(result);
            // ... rest of handling
        }
        _ = self.cancellation_token.cancelled() => {
            info!("Shutdown signal received, breaking from main loop");
            break;
        }
        message_result = self.get_message(), if tasks.len() < max_concurrent_tasks => {
            // ... process normal queue
        }
    }
}

This ensures the normal queue is always checked when priority queue is empty, preventing the busy-loop.

use uuid::Uuid;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PriorityQueueMessage {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Data Consistency Issue

The PriorityQueueMessage struct duplicates job_type:

pub struct PriorityQueueMessage {
    pub id: Uuid,
    pub job_type: JobType, // <--- Redundant?
    pub action: JobAction,
}

However, the worker later fetches the job from the DB (JobService::get_job) which contains the authoritative job_type. This creates a potential inconsistency:

  • Risk: If a job's state changes in the DB between enqueue and processing, the worker might filter based on stale message data but process based on fresh DB data
  • The message's job_type is used for routing (in message_matches_worker), but the actual processing uses the DB's job_type

Solution Options

Option 1: Simplify the message to just { id, action } (if action is needed for routing):

pub struct PriorityQueueMessage {
    pub id: Uuid,
    pub action: JobAction, // Keep action for routing
}

Then in message_matches_worker, fetch the job from DB first to check if it matches.

Option 2: Keep current structure but document that job_type is for routing optimization only, and the DB is the source of truth. Add validation in message_matches_worker to verify consistency.

Recommendation: Option 1 is cleaner, but Option 2 is acceptable if you want to avoid DB hits for wrong workers (performance trade-off).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think this is an issue. fetching the job type from db would add additional time

pub enum ParsedMessage {
WorkerTrigger(Box<WorkerTriggerMessage>),
JobQueue(Box<JobQueueMessage>),
PriorityJobQueue(Box<PriorityQueueMessage>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔍 Unnecessary Enum Variant

ParsedMessage::PriorityJobQueue is defined but effectively unused. The code immediately converts it to JobQueueMessage inside the worker loop:

// In get_priority_message():
let parsed_msg = PriorityQueueMessage::parse_message(&delivery)?;

// Then immediately converted:
let job_queue_msg = JobQueueMessage { id: priority_msg.id };
let parsed_message = ParsedMessage::JobQueue(Box::new(job_queue_msg));

The PriorityJobQueue variant only appears in error handling spans, which is unnecessary complexity.

Solution

Remove the PriorityJobQueue variant entirely. Parse the priority message, extract the ID, and directly construct a JobQueueMessage:

// In get_priority_message(), return:
Ok(Some((delivery, priority_msg.id))) // Just return the ID

// In the run loop:
match self.get_priority_message().await {
    Ok(Some((delivery, job_id))) => {
        let job_queue_msg = JobQueueMessage { id: job_id };
        let parsed_message = ParsedMessage::JobQueue(Box::new(job_queue_msg));
        // ... rest of processing
    }
    // ...
}

This simplifies the code and removes dead code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this enum variant is being used in process_message, post_processing, etc. I don't think it's redundant. unless I am misssing something 🤔 ?

}

/// Checks if a priority message matches this worker's responsibility
fn message_matches_worker(&self, msg: &PriorityQueueMessage) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔍 DRY Violation: Repetitive Matching Logic

The message_matches_worker function is a long, repetitive match statement (60+ lines) that repeats the same pattern:

QueueType::SnosJobProcessing => msg.job_type == JobType::SnosRun && msg.action == JobAction::Process,
QueueType::SnosJobVerification => msg.job_type == JobType::SnosRun && msg.action == JobAction::Verify,
// ... repeated 12 times

This violates DRY (Don't Repeat Yourself) and makes the code harder to maintain.

Solution

Implement helper methods on QueueType to extract the target job type and action:

// In types/queue.rs
impl QueueType {
    /// Returns the job type this queue processes, if applicable
    pub fn target_job_type(&self) -> Option<JobType> {
        match self {
            Self::SnosJobProcessing | Self::SnosJobVerification => Some(JobType::SnosRun),
            Self::ProvingJobProcessing | Self::ProvingJobVerification => Some(JobType::ProofCreation),
            Self::ProofRegistrationJobProcessing | Self::ProofRegistrationJobVerification => {
                Some(JobType::ProofRegistration)
            }
            Self::DataSubmissionJobProcessing | Self::DataSubmissionJobVerification => {
                Some(JobType::DataSubmission)
            }
            Self::UpdateStateJobProcessing | Self::UpdateStateJobVerification => {
                Some(JobType::StateTransition)
            }
            Self::AggregatorJobProcessing | Self::AggregatorJobVerification => Some(JobType::Aggregator),
            _ => None,
        }
    }

    /// Returns the action this queue performs, if applicable
    pub fn target_action(&self) -> Option<JobAction> {
        match self {
            Self::SnosJobProcessing
            | Self::ProvingJobProcessing
            | Self::ProofRegistrationJobProcessing
            | Self::DataSubmissionJobProcessing
            | Self::UpdateStateJobProcessing
            | Self::AggregatorJobProcessing => Some(JobAction::Process),
            Self::SnosJobVerification
            | Self::ProvingJobVerification
            | Self::ProofRegistrationJobVerification
            | Self::DataSubmissionJobVerification
            | Self::UpdateStateJobVerification
            | Self::AggregatorJobVerification => Some(JobAction::Verify),
            _ => None,
        }
    }
}

// In worker
fn message_matches_worker(&self, msg: &PriorityQueueMessage) -> bool {
    let Some(target_type) = self.queue_type.target_job_type() else { return false; };
    let Some(target_action) = self.queue_type.target_action() else { return false; };
    
    msg.job_type == target_type && msg.action == target_action
}

This reduces the function from 60+ lines to 4 lines and makes it easier to maintain.

],
);
// Map PriorityQueueFull to QueueCapacityExceeded for proper HTTP status code
match e {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔍 Error Handling: Manual Error Mapping

The manual mapping of JobError::PriorityQueueFull to JobRouteError inside the route handler is repetitive and appears in both handle_process_job_request and handle_verify_job_request:

match e {
    crate::error::job::JobError::PriorityQueueFull { current_size, max_size } => {
        Err(JobRouteError::QueueCapacityExceeded(format!(
            "Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
            current_size, max_size
        )))
    }
    _ => Err(JobRouteError::ProcessingError(e.to_string())),
}

Solution

Implement From<JobError> for JobRouteError to centralize this conversion:

// In orchestrator/src/server/error.rs
impl From<JobError> for JobRouteError {
    fn from(err: JobError) -> Self {
        match err {
            JobError::PriorityQueueFull { current_size, max_size } => {
                JobRouteError::QueueCapacityExceeded(format!(
                    "Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
                    current_size, max_size
                ))
            }
            _ => JobRouteError::ProcessingError(err.to_string()),
        }
    }
}

// Then in route handlers, simplify to:
match JobService::queue_job_for_processing(job_id, config.clone(), query.priority).await {
    Ok(_) => { /* success */ }
    Err(e) => Err(e.into()), // Automatic conversion
}

This follows Rust idioms and reduces duplication.

// No matching priority messages, will check normal queue
}
Err(e) => {
error!("Error processing priority queue: {:?}", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Priority Queue Check Error Swallows Real Issues

When get_priority_message() returns an error (not NoData), the code logs an error message and sleeps 100ms, but the error is swallowed:

Err(e) => {
    error!("Error processing priority queue: {:?}", e);
    sleep(Duration::from_millis(100)).await;
}

Impact

  • Transient errors (network hiccups) will retry, which is fine
  • Persistent errors (permissions, queue misconfiguration, queue doesn't exist) will cause constant error logging without surfacing as a critical failure
  • No way to distinguish between transient and persistent failures
  • Could mask configuration issues in production

Solution

Track consecutive errors and escalate after a threshold:

let mut consecutive_priority_errors = 0;
const MAX_CONSECUTIVE_ERRORS: usize = 10;

// In the loop:
Err(e) => {
    consecutive_priority_errors += 1;
    error!(
        "Error processing priority queue (consecutive: {}): {:?}",
        consecutive_priority_errors, e
    );
    
    if consecutive_priority_errors >= MAX_CONSECUTIVE_ERRORS {
        error!(
            "CRITICAL: Priority queue has failed {} consecutive times. This may indicate a configuration issue.",
            MAX_CONSECUTIVE_ERRORS
        );
        // Consider: Return error to stop worker, or send alert
    }
    
    sleep(Duration::from_millis(100)).await;
}

// Reset counter on success:
Ok(Some(_)) | Ok(None) => {
    consecutive_priority_errors = 0;
}

This helps distinguish between transient issues and persistent configuration problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is relevant. we are following the same pattern in other queues as well!

/// Determines if this worker should check the priority queue
/// Only job processing and verification workers check priority queue
/// WorkerTrigger and JobHandleFailure workers do not
fn should_check_priority_queue(&self) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Missing Documentation

The function excludes PriorityJobQueue itself from checking the priority queue, but the reasoning isn't documented:

fn should_check_priority_queue(&self) -> bool {
    !matches!(self.queue_type, QueueType::WorkerTrigger | QueueType::JobHandleFailure | QueueType::PriorityJobQueue)
}

This makes it unclear if it's preventing infinite recursion or if there's another reason.

Solution

Add a doc comment explaining the reasoning:

/// Determines if this worker should check the priority queue.
///
/// Only job processing and verification workers check the priority queue.
/// WorkerTrigger and JobHandleFailure workers do not, as they handle special
/// system-level operations.
///
/// PriorityJobQueue workers are excluded to prevent circular consumption:
/// a PriorityJobQueue worker checking the PriorityJobQueue would create
/// an infinite loop where priority messages are consumed by the wrong worker type.
fn should_check_priority_queue(&self) -> bool {
    !matches!(self.queue_type, QueueType::WorkerTrigger | QueueType::JobHandleFailure | QueueType::PriorityJobQueue)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's self explanatory that we don't want to move dlq and worker trigger messages to priority queues

config: Arc<Config>,
) -> Result<(), JobError> {
// Check priority queue depth before adding
let queue_depth = config.queue().get_queue_depth(QueueType::PriorityJobQueue).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Inconsistent Error Propagation

If get_queue_depth fails, it propagates as a generic error, making it hard to distinguish between:

  • "Queue depth exceeded" (expected, user-facing error)
  • "Failed to check queue depth" (system error, should be logged differently)

Solution

Add explicit error handling to distinguish between these cases:

let queue_depth = match config.queue().get_queue_depth(QueueType::PriorityJobQueue).await {
    Ok(depth) => depth,
    Err(e) => {
        error!(
            "Failed to check priority queue depth: {:?}. Allowing job addition but this should be investigated.",
            e
        );
        // Option 1: Fail fast
        return Err(JobError::Other(format!("Failed to check queue depth: {}", e)));
        // Option 2: Allow but log (if queue depth check is best-effort)
        // 0 // Assume empty if check fails
    }
};

if queue_depth >= max_size {
    return Err(JobError::PriorityQueueFull { current_size: queue_depth, max_size });
}

This provides better observability and distinguishes between user-facing and system errors.

Copy link
Contributor Author

@prkpndy prkpndy Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what this will achieve practically?

/// # Returns
/// * `Ok(i32)` - Approximate number of messages in the queue
/// * `Err(QueueError)` - If the operation fails
async fn get_queue_depth(&self, queue: QueueType) -> Result<i32, QueueError>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Type Safety: i32 for Queue Depth

Queue depth is returned as i32:

async fn get_queue_depth(&self, queue: QueueType) -> Result<i32, QueueError>;

Issues:

  • SQS queue depths can exceed i32::MAX (2.1 billion) in extreme cases
  • i32 allows negative values, which don't make sense for queue depth
  • SQS ApproximateNumberOfMessages returns a string that's parsed to i32, but could theoretically be larger

Impact

Potential overflow/underflow issues, though unlikely for a priority queue with max size 20. However, for consistency and future-proofing, this is a type safety concern.

Solution

Use u64 or usize for queue depth:

// In queue/mod.rs
async fn get_queue_depth(&self, queue: QueueType) -> Result<u64, QueueError>;

// In queue_control.rs
pub static MAX_PRIORITY_QUEUE_SIZE: LazyLock<u64> = LazyLock::new(|| {
    get_env_var_or_default("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE", "20")
        .parse()
        .expect("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE must be a valid integer")
});

// In sqs.rs
let count = attributes
    .attributes()
    .and_then(|attrs| attrs.get(&QueueAttributeName::ApproximateNumberOfMessages))
    .and_then(|value| value.parse::<u64>().ok()) // Changed to u64
    .unwrap_or(0);

This provides better type safety and prevents negative values.

},
);
map.insert(
QueueType::PriorityJobQueue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Visibility Timeout Risk with Message Bouncing

The PriorityJobQueue has a visibility_timeout: 30 seconds. When a worker nacks a non-matching message:

  1. Message becomes visible again immediately (visibility timeout is 0 on nack)
  2. If no worker of the correct type is available, the message will bounce between wrong workers
  3. If a message type has NO active workers (e.g., all DataSubmission workers are down), the message could bounce indefinitely consuming resources

Impact

  • Resource waste: Messages bouncing between workers consume CPU and SQS API calls
  • Potential message loss: If visibility timeout handling is inconsistent, messages could get stuck
  • No circuit breaker: No mechanism to detect when a message type has no active workers

Solution

Option 1: Document this behavior and add monitoring:

// Add comment in queue_control.rs
// PriorityJobQueue visibility timeout is set to 30 seconds.
// Note: When messages are nacked due to non-matching worker types,
// they become visible immediately. If no matching worker is available,
// messages will bounce until a matching worker comes online or visibility
// timeout expires. Consider implementing a max-bounce counter or DLQ
// routing for messages that bounce too many times.

Option 2: Implement max-bounce counter:

// Add to PriorityQueueMessage:
pub struct PriorityQueueMessage {
    pub id: Uuid,
    pub job_type: JobType,
    pub action: JobAction,
    pub bounce_count: Option<u32>, // Track bounces
}

// In get_priority_message, when nacking:
if bounce_count.unwrap_or(0) >= MAX_BOUNCES {
    // Move to DLQ or log critical error
    error!("Message {} has bounced {} times, no matching worker available", id, bounce_count);
}

Option 3: Add alerting for messages that exceed visibility timeout multiple times.

Recommendation: Start with Option 1 (documentation + monitoring), then add Option 2 if bouncing becomes a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increased it to 300 seconds. this is the max time we have given for any job, so ideally we should be able to complete the job in that time

Copy link
Contributor Author

@prkpndy prkpndy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this type is used in process_message, post_processing, etc. for matching the message. I don't think that we can remove the type. Or am I missing something?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

orchestrator This change is relevant to orchestrator

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

3 participants