-
Notifications
You must be signed in to change notification settings - Fork 76
feat: add support for priority queue #888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
e9fe744
e516805
5bc2800
679a4cd
7fc39a0
df119a3
0f33e9a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,7 +10,7 @@ use uuid::Uuid; | |
|
|
||
| use super::super::error::JobRouteError; | ||
| use super::super::types::{ | ||
| ApiResponse, JobId, JobRouteResult, JobStatusQuery, JobStatusResponse, JobStatusResponseItem, | ||
| ApiResponse, JobId, JobRouteResult, JobStatusQuery, JobStatusResponse, JobStatusResponseItem, PriorityQuery, | ||
| }; | ||
| use crate::core::config::Config; | ||
| use crate::utils::metrics::ORCHESTRATOR_METRICS; | ||
|
|
@@ -22,12 +22,13 @@ use crate::worker::service::JobService; | |
| /// This endpoint initiates the processing of a job identified by its UUID. It performs the | ||
| /// following: | ||
| /// 1. Validates and parses the job ID from the URL path parameter | ||
| /// 2. Calls the job processing logic | ||
| /// 2. Calls the job processing logic (optionally via priority queue) | ||
| /// 3. Records metrics for successful/failed operations | ||
| /// 4. Returns an appropriate API response | ||
| /// | ||
| /// # Arguments | ||
| /// * `Path(JobId { id })` - The job ID extracted from the URL path | ||
| /// * `Query(query)` - Query parameters including optional priority flag | ||
| /// * `State(config)` - Shared application configuration | ||
| /// | ||
| /// # Returns | ||
|
|
@@ -38,25 +39,49 @@ use crate::worker::service::JobService; | |
| /// * `JobRouteError::ProcessingError` - If job processing fails | ||
| async fn handle_process_job_request( | ||
| Path(JobId { id }): Path<JobId>, | ||
| Query(query): Query<PriorityQuery>, | ||
| State(config): State<Arc<Config>>, | ||
| ) -> JobRouteResult { | ||
| let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?; | ||
| // Record job_id in the current request span for consistent logging | ||
| Span::current().record("job_id", tracing::field::display(job_id)); | ||
|
|
||
| match JobService::queue_job_for_processing(job_id, config.clone()).await { | ||
| match JobService::queue_job_for_processing(job_id, config.clone(), query.priority).await { | ||
| Ok(_) => { | ||
| info!("Job queued for processing successfully"); | ||
| ORCHESTRATOR_METRICS | ||
| .successful_job_operations | ||
| .add(1.0, &[KeyValue::new("operation_type", "queue_process")]); | ||
| Ok(Json(ApiResponse::<()>::success(Some(format!("Job with id {} queued for processing", id)))) | ||
| .into_response()) | ||
| let queue_type = if query.priority { "PRIORITY" } else { "normal" }; | ||
| info!("Job queued for {} processing successfully", queue_type); | ||
| ORCHESTRATOR_METRICS.successful_job_operations.add( | ||
| 1.0, | ||
| &[ | ||
| KeyValue::new("operation_type", "queue_process"), | ||
| KeyValue::new("priority", if query.priority { "true" } else { "false" }), | ||
| ], | ||
| ); | ||
| Ok(Json(ApiResponse::<()>::success(Some(format!( | ||
| "Job with id {} queued for {} processing", | ||
| id, queue_type | ||
| )))) | ||
| .into_response()) | ||
| } | ||
| Err(e) => { | ||
| error!(error = %e, "Failed to queue job for processing"); | ||
| ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_process")]); | ||
| Err(JobRouteError::ProcessingError(e.to_string())) | ||
| ORCHESTRATOR_METRICS.failed_job_operations.add( | ||
| 1.0, | ||
| &[ | ||
| KeyValue::new("operation_type", "queue_process"), | ||
| KeyValue::new("priority", if query.priority { "true" } else { "false" }), | ||
| ], | ||
| ); | ||
| // Map PriorityQueueFull to QueueCapacityExceeded for proper HTTP status code | ||
| match e { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔍 Error Handling: Manual Error MappingThe manual mapping of match e {
crate::error::job::JobError::PriorityQueueFull { current_size, max_size } => {
Err(JobRouteError::QueueCapacityExceeded(format!(
"Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
current_size, max_size
)))
}
_ => Err(JobRouteError::ProcessingError(e.to_string())),
}SolutionImplement // In orchestrator/src/server/error.rs
impl From<JobError> for JobRouteError {
fn from(err: JobError) -> Self {
match err {
JobError::PriorityQueueFull { current_size, max_size } => {
JobRouteError::QueueCapacityExceeded(format!(
"Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.",
current_size, max_size
))
}
_ => JobRouteError::ProcessingError(err.to_string()),
}
}
}
// Then in route handlers, simplify to:
match JobService::queue_job_for_processing(job_id, config.clone(), query.priority).await {
Ok(_) => { /* success */ }
Err(e) => Err(e.into()), // Automatic conversion
}This follows Rust idioms and reduces duplication. |
||
| crate::error::job::JobError::PriorityQueueFull { current_size, max_size } => { | ||
| Err(JobRouteError::QueueCapacityExceeded(format!( | ||
| "Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.", | ||
| current_size, max_size | ||
| ))) | ||
| } | ||
| _ => Err(JobRouteError::ProcessingError(e.to_string())), | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -65,13 +90,14 @@ async fn handle_process_job_request( | |
| /// | ||
| /// This endpoint queues the job for verification by: | ||
| /// 1. Validates and parses the job ID | ||
| /// 2. Adds the job to the verification queue | ||
| /// 2. Adds the job to the verification queue (optionally via priority queue) | ||
| /// 3. Resets verification attempt counter | ||
| /// 4. Records metrics for the queue operation | ||
| /// 5. Returns immediate response | ||
| /// | ||
| /// # Arguments | ||
| /// * `Path(JobId { id })` - The job ID extracted from the URL path | ||
| /// * `Query(query)` - Query parameters including optional priority flag | ||
| /// * `State(config)` - Shared application configuration | ||
| /// | ||
| /// # Returns | ||
|
|
@@ -82,23 +108,49 @@ async fn handle_process_job_request( | |
| /// * `JobRouteError::ProcessingError` - If queueing for verification fails | ||
| async fn handle_verify_job_request( | ||
| Path(JobId { id }): Path<JobId>, | ||
| Query(query): Query<PriorityQuery>, | ||
| State(config): State<Arc<Config>>, | ||
| ) -> JobRouteResult { | ||
| let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?; | ||
| // Record job_id in the current request span for consistent logging | ||
| Span::current().record("job_id", tracing::field::display(job_id)); | ||
|
|
||
| match JobService::queue_job_for_verification(job_id, config.clone()).await { | ||
| match JobService::queue_job_for_verification(job_id, config.clone(), query.priority).await { | ||
| Ok(_) => { | ||
| info!("Job queued for verification successfully"); | ||
| ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_verify")]); | ||
| Ok(Json(ApiResponse::<()>::success(Some(format!("Job with id {} queued for verification", id)))) | ||
| .into_response()) | ||
| let queue_type = if query.priority { "PRIORITY" } else { "normal" }; | ||
| info!("Job queued for {} verification successfully", queue_type); | ||
| ORCHESTRATOR_METRICS.successful_job_operations.add( | ||
| 1.0, | ||
| &[ | ||
| KeyValue::new("operation_type", "queue_verify"), | ||
| KeyValue::new("priority", if query.priority { "true" } else { "false" }), | ||
| ], | ||
| ); | ||
| Ok(Json(ApiResponse::<()>::success(Some(format!( | ||
| "Job with id {} queued for {} verification", | ||
| id, queue_type | ||
| )))) | ||
| .into_response()) | ||
| } | ||
| Err(e) => { | ||
| error!(error = %e, "Failed to queue job for verification"); | ||
| ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_verify")]); | ||
| Err(JobRouteError::ProcessingError(e.to_string())) | ||
| ORCHESTRATOR_METRICS.failed_job_operations.add( | ||
| 1.0, | ||
| &[ | ||
| KeyValue::new("operation_type", "queue_verify"), | ||
| KeyValue::new("priority", if query.priority { "true" } else { "false" }), | ||
| ], | ||
| ); | ||
| // Map PriorityQueueFull to QueueCapacityExceeded for proper HTTP status code | ||
| match e { | ||
| crate::error::job::JobError::PriorityQueueFull { current_size, max_size } => { | ||
| Err(JobRouteError::QueueCapacityExceeded(format!( | ||
| "Priority queue is full (current: {}, max: {}). Please try again later or use normal queue.", | ||
| current_size, max_size | ||
| ))) | ||
| } | ||
| _ => Err(JobRouteError::ProcessingError(e.to_string())), | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,14 @@ use orchestrator_utils::env_utils::get_env_var_or_default; | |
| use std::collections::HashMap; | ||
| use std::sync::LazyLock; | ||
|
|
||
| /// Maximum number of messages allowed in the priority queue | ||
| /// Can be configured via MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE environment variable | ||
| pub static MAX_PRIORITY_QUEUE_SIZE: LazyLock<i32> = LazyLock::new(|| { | ||
| get_env_var_or_default("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE", "20") | ||
| .parse() | ||
| .expect("MADARA_ORCHESTRATOR_MAX_PRIORITY_QUEUE_SIZE must be a valid integer") | ||
| }); | ||
|
|
||
| #[derive(Clone)] | ||
| pub struct DlqConfig { | ||
| pub max_receive_count: u32, | ||
|
|
@@ -173,5 +181,14 @@ pub static QUEUES: LazyLock<HashMap<QueueType, QueueConfig>> = LazyLock::new(|| | |
| supported_layers: vec![Layer::L2], | ||
| }, | ||
| ); | ||
| map.insert( | ||
| QueueType::PriorityJobQueue, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🐛 Visibility Timeout Risk with Message BouncingThe PriorityJobQueue has a
Impact
SolutionOption 1: Document this behavior and add monitoring: // Add comment in queue_control.rs
// PriorityJobQueue visibility timeout is set to 30 seconds.
// Note: When messages are nacked due to non-matching worker types,
// they become visible immediately. If no matching worker is available,
// messages will bounce until a matching worker comes online or visibility
// timeout expires. Consider implementing a max-bounce counter or DLQ
// routing for messages that bounce too many times.Option 2: Implement max-bounce counter: // Add to PriorityQueueMessage:
pub struct PriorityQueueMessage {
pub id: Uuid,
pub job_type: JobType,
pub action: JobAction,
pub bounce_count: Option<u32>, // Track bounces
}
// In get_priority_message, when nacking:
if bounce_count.unwrap_or(0) >= MAX_BOUNCES {
// Move to DLQ or log critical error
error!("Message {} has bounced {} times, no matching worker available", id, bounce_count);
}Option 3: Add alerting for messages that exceed visibility timeout multiple times. Recommendation: Start with Option 1 (documentation + monitoring), then add Option 2 if bouncing becomes a problem.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. increased it to 300 seconds. this is the max time we have given for any job, so ideally we should be able to complete the job in that time |
||
| QueueConfig { | ||
| visibility_timeout: 30, | ||
| dlq_config: None, // No DLQ - failed priority jobs go through normal error handling | ||
| queue_control: QueueControlConfig::new(10), // Not used directly, but set for consistency | ||
| supported_layers: vec![Layer::L2, Layer::L3], | ||
| }, | ||
| ); | ||
| map | ||
| }); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Type Safety: i32 for Queue Depth
Queue depth is returned as
i32:Issues:
i32::MAX(2.1 billion) in extreme casesi32allows negative values, which don't make sense for queue depthApproximateNumberOfMessagesreturns a string that's parsed toi32, but could theoretically be largerImpact
Potential overflow/underflow issues, though unlikely for a priority queue with max size 20. However, for consistency and future-proofing, this is a type safety concern.
Solution
Use
u64orusizefor queue depth:This provides better type safety and prevents negative values.