-
Notifications
You must be signed in to change notification settings - Fork 76
feat: add support for priority queue #888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Mohiiit
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review Summary
This PR introduces a priority queue feature. I've identified several issues that need attention:
🚨 Critical: Worker starvation bug that must be fixed
🔍 Code Quality: Several DRY violations and error handling improvements needed
🐛 Additional Issues: Documentation, metrics, and type safety improvements
See inline comments for details.
|
|
||
| // Process new messages (with backpressure) | ||
| // 3. NEW: Check priority queue BEFORE normal queue | ||
| priority_result = self.get_priority_message(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 Critical: Worker Starvation Bug
The tokio::select! loop with biased mode creates a critical starvation bug that prevents normal queue processing:
get_priority_message()returns immediately (non-blocking) withOk(None)when the priority queue is emptyget_message()is a blocking call that waits for messages- In a
biasedselect, if the first branch (priority_result) is ready (even withNone), it is selected - The
get_message()future is dropped/cancelled - The loop repeats,
get_priority_message()returnsNoneinstantly again - Result: The worker busy-loops, consuming 100% CPU, and never waits for the normal queue
Solution
Move the priority check outside the tokio::select! loop to prevent starvation:
loop {
// Check if shutdown was requested
if self.is_shutdown_requested() {
info!("Shutdown requested, stopping message processing");
break;
}
// 1. Check priority queue first (non-blocking)
if tasks.len() < max_concurrent_tasks && self.should_check_priority_queue() {
match self.get_priority_message().await {
Ok(Some((delivery, priority_msg))) => {
// Convert priority message to JobQueueMessage for processing
let job_queue_msg = JobQueueMessage { id: priority_msg.id };
let parsed_message = ParsedMessage::JobQueue(Box::new(job_queue_msg));
let worker = self.clone();
tasks.spawn(async move {
worker.process_message(delivery, parsed_message).await
});
debug!("Spawned PRIORITY task, active: {}", tasks.len());
continue; // Process next iteration
}
Ok(None) => {
// No priority messages, fall through to normal queue
}
Err(e) => {
error!("Error processing priority queue: {:?}", e);
sleep(Duration::from_millis(100)).await;
continue;
}
}
}
// 2. Block on normal queue or shutdown (only when no priority messages)
tokio::select! {
biased;
Some(result) = tasks.join_next(), if !tasks.is_empty() => {
Self::handle_task_result(result);
// ... rest of handling
}
_ = self.cancellation_token.cancelled() => {
info!("Shutdown signal received, breaking from main loop");
break;
}
message_result = self.get_message(), if tasks.len() < max_concurrent_tasks => {
// ... process normal queue
}
}
}This ensures the normal queue is always checked when priority queue is empty, preventing the busy-loop.
| use uuid::Uuid; | ||
|
|
||
| #[derive(Debug, Serialize, Deserialize, Clone)] | ||
| pub struct PriorityQueueMessage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ Data Consistency Issue
The PriorityQueueMessage struct duplicates job_type:
pub struct PriorityQueueMessage {
pub id: Uuid,
pub job_type: JobType, // <--- Redundant?
pub action: JobAction,
}However, the worker later fetches the job from the DB (JobService::get_job) which contains the authoritative job_type. This creates a potential inconsistency:
- Risk: If a job's state changes in the DB between enqueue and processing, the worker might filter based on stale message data but process based on fresh DB data
- The message's
job_typeis used for routing (inmessage_matches_worker), but the actual processing uses the DB'sjob_type
Solution Options
Option 1: Simplify the message to just { id, action } (if action is needed for routing):
pub struct PriorityQueueMessage {
pub id: Uuid,
pub action: JobAction, // Keep action for routing
}Then in message_matches_worker, fetch the job from DB first to check if it matches.
Option 2: Keep current structure but document that job_type is for routing optimization only, and the DB is the source of truth. Add validation in message_matches_worker to verify consistency.
Recommendation: Option 1 is cleaner, but Option 2 is acceptable if you want to avoid DB hits for wrong workers (performance trade-off).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think this is an issue. fetching the job type from db would add additional time
| pub enum ParsedMessage { | ||
| WorkerTrigger(Box<WorkerTriggerMessage>), | ||
| JobQueue(Box<JobQueueMessage>), | ||
| PriorityJobQueue(Box<PriorityQueueMessage>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔍 Unnecessary Enum Variant
ParsedMessage::PriorityJobQueue is defined but effectively unused. The code immediately converts it to JobQueueMessage inside the worker loop:
// In get_priority_message():
let parsed_msg = PriorityQueueMessage::parse_message(&delivery)?;
// Then immediately converted:
let job_queue_msg = JobQueueMessage { id: priority_msg.id };
let parsed_message = ParsedMessage::JobQueue(Box::new(job_queue_msg));The PriorityJobQueue variant only appears in error handling spans, which is unnecessary complexity.
Solution
Remove the PriorityJobQueue variant entirely. Parse the priority message, extract the ID, and directly construct a JobQueueMessage:
// In get_priority_message(), return:
Ok(Some((delivery, priority_msg.id))) // Just return the ID
// In the run loop:
match self.get_priority_message().await {
Ok(Some((delivery, job_id))) => {
let job_queue_msg = JobQueueMessage { id: job_id };
let parsed_message = ParsedMessage::JobQueue(Box::new(job_queue_msg));
// ... rest of processing
}
// ...
}This simplifies the code and removes dead code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this enum variant is being used in process_message, post_processing, etc. I don't think it's redundant. unless I am misssing something 🤔 ?
| } | ||
|
|
||
| /// Checks if a priority message matches this worker's responsibility | ||
| fn message_matches_worker(&self, msg: &PriorityQueueMessage) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔍 DRY Violation: Repetitive Matching Logic
The message_matches_worker function is a long, repetitive match statement (60+ lines) that repeats the same pattern:
QueueType::SnosJobProcessing => msg.job_type == JobType::SnosRun && msg.action == JobAction::Process,
QueueType::SnosJobVerification => msg.job_type == JobType::SnosRun && msg.action == JobAction::Verify,
// ... repeated 12 timesThis violates DRY (Don't Repeat Yourself) and makes the code harder to maintain.
Solution
Implement helper methods on QueueType to extract the target job type and action:
// In types/queue.rs
impl QueueType {
/// Returns the job type this queue processes, if applicable
pub fn target_job_type(&self) -> Option<JobType> {
match self {
Self::SnosJobProcessing | Self::SnosJobVerification => Some(JobType::SnosRun),
Self::ProvingJobProcessing | Self::ProvingJobVerification => Some(JobType::ProofCreation),
Self::ProofRegistrationJobProcessing | Self::ProofRegistrationJobVerification => {
Some(JobType::ProofRegistration)
}
Self::DataSubmissionJobProcessing | Self::DataSubmissionJobVerification => {
Some(JobType::DataSubmission)
}
Self::UpdateStateJobProcessing | Self::UpdateStateJobVerification => {
Some(JobType::StateTransition)
}
Self::AggregatorJobProcessing | Self::AggregatorJobVerification => Some(JobType::Aggregator),
_ => None,
}
}
/// Returns the action this queue performs, if applicable
pub fn target_action(&self) -> Option<JobAction> {
match self {
Self::SnosJobProcessing
| Self::ProvingJobProcessing
| Self::ProofRegistrationJobProcessing
| Self::DataSubmissionJobProcessing
| Self::UpdateStateJobProcessing
| Self::AggregatorJobProcessing => Some(JobAction::Process),
Self::SnosJobVerification
| Self::ProvingJobVerification
| Self::ProofRegistrationJobVerification
| Self::DataSubmissionJobVerification
| Self::UpdateStateJobVerification
| Self::AggregatorJobVerification => Some(JobAction::Verify),
_ => None,
}
}
}
// In worker
fn message_matches_worker(&self, msg: &PriorityQueueMessage) -> bool {
let Some(target_type) = self.queue_type.target_job_type() else { return false; };
let Some(target_action) = self.queue_type.target_action() else { return false; };
msg.job_type == target_type && msg.action == target_action
}This reduces the function from 60+ lines to 4 lines and makes it easier to maintain.
| ], | ||
| ); | ||
| // Map PriorityQueueFull to QueueCapacityExceeded for proper HTTP status code | ||
| match e { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔍 Error Handling: Manual Error Mapping
The manual mapping of JobError::PriorityQueueFull to JobRouteError inside the route handler is repetitive and appears in both handle_process_job_request and handle_verify_job_request:
match e {
crate::error::job::JobError::PriorityQueueFull { current_size, max_size } => {
Err(JobRouteError::QueueCapacityExceeded(format!(
"Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
current_size, max_size
)))
}
_ => Err(JobRouteError::ProcessingError(e.to_string())),
}Solution
Implement From<JobError> for JobRouteError to centralize this conversion:
// In orchestrator/src/server/error.rs
impl From<JobError> for JobRouteError {
fn from(err: JobError) -> Self {
match err {
JobError::PriorityQueueFull { current_size, max_size } => {
JobRouteError::QueueCapacityExceeded(format!(
"Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
current_size, max_size
))
}
_ => JobRouteError::ProcessingError(err.to_string()),
}
}
}
// Then in route handlers, simplify to:
match JobService::queue_job_for_processing(job_id, config.clone(), query.priority).await {
Ok(_) => { /* success */ }
Err(e) => Err(e.into()), // Automatic conversion
}This follows Rust idioms and reduces duplication.
| // No matching priority messages, will check normal queue | ||
| } | ||
| Err(e) => { | ||
| error!("Error processing priority queue: {:?}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Priority Queue Check Error Swallows Real Issues
When get_priority_message() returns an error (not NoData), the code logs an error message and sleeps 100ms, but the error is swallowed:
Err(e) => {
error!("Error processing priority queue: {:?}", e);
sleep(Duration::from_millis(100)).await;
}Impact
- Transient errors (network hiccups) will retry, which is fine
- Persistent errors (permissions, queue misconfiguration, queue doesn't exist) will cause constant error logging without surfacing as a critical failure
- No way to distinguish between transient and persistent failures
- Could mask configuration issues in production
Solution
Track consecutive errors and escalate after a threshold:
let mut consecutive_priority_errors = 0;
const MAX_CONSECUTIVE_ERRORS: usize = 10;
// In the loop:
Err(e) => {
consecutive_priority_errors += 1;
error!(
"Error processing priority queue (consecutive: {}): {:?}",
consecutive_priority_errors, e
);
if consecutive_priority_errors >= MAX_CONSECUTIVE_ERRORS {
error!(
"CRITICAL: Priority queue has failed {} consecutive times. This may indicate a configuration issue.",
MAX_CONSECUTIVE_ERRORS
);
// Consider: Return error to stop worker, or send alert
}
sleep(Duration::from_millis(100)).await;
}
// Reset counter on success:
Ok(Some(_)) | Ok(None) => {
consecutive_priority_errors = 0;
}This helps distinguish between transient issues and persistent configuration problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this is relevant. we are following the same pattern in other queues as well!
| /// Determines if this worker should check the priority queue | ||
| /// Only job processing and verification workers check priority queue | ||
| /// WorkerTrigger and JobHandleFailure workers do not | ||
| fn should_check_priority_queue(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Missing Documentation
The function excludes PriorityJobQueue itself from checking the priority queue, but the reasoning isn't documented:
fn should_check_priority_queue(&self) -> bool {
!matches!(self.queue_type, QueueType::WorkerTrigger | QueueType::JobHandleFailure | QueueType::PriorityJobQueue)
}This makes it unclear if it's preventing infinite recursion or if there's another reason.
Solution
Add a doc comment explaining the reasoning:
/// Determines if this worker should check the priority queue.
///
/// Only job processing and verification workers check the priority queue.
/// WorkerTrigger and JobHandleFailure workers do not, as they handle special
/// system-level operations.
///
/// PriorityJobQueue workers are excluded to prevent circular consumption:
/// a PriorityJobQueue worker checking the PriorityJobQueue would create
/// an infinite loop where priority messages are consumed by the wrong worker type.
fn should_check_priority_queue(&self) -> bool {
!matches!(self.queue_type, QueueType::WorkerTrigger | QueueType::JobHandleFailure | QueueType::PriorityJobQueue)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it's self explanatory that we don't want to move dlq and worker trigger messages to priority queues
| config: Arc<Config>, | ||
| ) -> Result<(), JobError> { | ||
| // Check priority queue depth before adding | ||
| let queue_depth = config.queue().get_queue_depth(QueueType::PriorityJobQueue).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Inconsistent Error Propagation
If get_queue_depth fails, it propagates as a generic error, making it hard to distinguish between:
- "Queue depth exceeded" (expected, user-facing error)
- "Failed to check queue depth" (system error, should be logged differently)
Solution
Add explicit error handling to distinguish between these cases:
let queue_depth = match config.queue().get_queue_depth(QueueType::PriorityJobQueue).await {
Ok(depth) => depth,
Err(e) => {
error!(
"Failed to check priority queue depth: {:?}. Allowing job addition but this should be investigated.",
e
);
// Option 1: Fail fast
return Err(JobError::Other(format!("Failed to check queue depth: {}", e)));
// Option 2: Allow but log (if queue depth check is best-effort)
// 0 // Assume empty if check fails
}
};
if queue_depth >= max_size {
return Err(JobError::PriorityQueueFull { current_size: queue_depth, max_size });
}This provides better observability and distinguishes between user-facing and system errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what this will achieve practically?
| /// # Returns | ||
| /// * `Ok(i32)` - Approximate number of messages in the queue | ||
| /// * `Err(QueueError)` - If the operation fails | ||
| async fn get_queue_depth(&self, queue: QueueType) -> Result<i32, QueueError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Type Safety: i32 for Queue Depth
Queue depth is returned as i32:
async fn get_queue_depth(&self, queue: QueueType) -> Result<i32, QueueError>;Issues:
- SQS queue depths can exceed
i32::MAX(2.1 billion) in extreme cases i32allows negative values, which don't make sense for queue depth- SQS
ApproximateNumberOfMessagesreturns a string that's parsed toi32, but could theoretically be larger
Impact
Potential overflow/underflow issues, though unlikely for a priority queue with max size 20. However, for consistency and future-proofing, this is a type safety concern.
Solution
Use u64 or usize for queue depth:
// In queue/mod.rs
async fn get_queue_depth(&self, queue: QueueType) -> Result<u64, QueueError>;
// In queue_control.rs
pub static MAX_PRIORITY_QUEUE_SIZE: LazyLock<u64> = LazyLock::new(|| {
get_env_var_or_default("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE", "20")
.parse()
.expect("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE must be a valid integer")
});
// In sqs.rs
let count = attributes
.attributes()
.and_then(|attrs| attrs.get(&QueueAttributeName::ApproximateNumberOfMessages))
.and_then(|value| value.parse::<u64>().ok()) // Changed to u64
.unwrap_or(0);This provides better type safety and prevents negative values.
| }, | ||
| ); | ||
| map.insert( | ||
| QueueType::PriorityJobQueue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Visibility Timeout Risk with Message Bouncing
The PriorityJobQueue has a visibility_timeout: 30 seconds. When a worker nacks a non-matching message:
- Message becomes visible again immediately (visibility timeout is 0 on nack)
- If no worker of the correct type is available, the message will bounce between wrong workers
- If a message type has NO active workers (e.g., all DataSubmission workers are down), the message could bounce indefinitely consuming resources
Impact
- Resource waste: Messages bouncing between workers consume CPU and SQS API calls
- Potential message loss: If visibility timeout handling is inconsistent, messages could get stuck
- No circuit breaker: No mechanism to detect when a message type has no active workers
Solution
Option 1: Document this behavior and add monitoring:
// Add comment in queue_control.rs
// PriorityJobQueue visibility timeout is set to 30 seconds.
// Note: When messages are nacked due to non-matching worker types,
// they become visible immediately. If no matching worker is available,
// messages will bounce until a matching worker comes online or visibility
// timeout expires. Consider implementing a max-bounce counter or DLQ
// routing for messages that bounce too many times.Option 2: Implement max-bounce counter:
// Add to PriorityQueueMessage:
pub struct PriorityQueueMessage {
pub id: Uuid,
pub job_type: JobType,
pub action: JobAction,
pub bounce_count: Option<u32>, // Track bounces
}
// In get_priority_message, when nacking:
if bounce_count.unwrap_or(0) >= MAX_BOUNCES {
// Move to DLQ or log critical error
error!("Message {} has bounced {} times, no matching worker available", id, bounce_count);
}Option 3: Add alerting for messages that exceed visibility timeout multiple times.
Recommendation: Start with Option 1 (documentation + monitoring), then add Option 2 if bouncing becomes a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
increased it to 300 seconds. this is the max time we have given for any job, so ideally we should be able to complete the job in that time
prkpndy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this type is used in process_message, post_processing, etc. for matching the message. I don't think that we can remove the type. Or am I missing something?
Add Priority Queue Mechanism for Job Processing
Pull Request type
What is the current behavior?
All jobs are processed inorder through their respective queues. There's no mechanism to prioritize urgent jobs.
Resolves: #NA
What is the new behavior?
Implements a priority queue mechanism that allows urgent jobs to "jump the line" while respecting per-job-type concurrency limits.
Key Changes:
?priority=truequery parameterGET /jobs/{id}/process?priority=true GET /jobs/{id}/verify?priority=trueFiles Modified:
Does this introduce a breaking change?
Yes - Internal API signature changes (backward compatible for external users)