From 8cfa2be310bc6d8bf294b7d5b1c8f6678e50df67 Mon Sep 17 00:00:00 2001 From: Maksim Greshniakov Date: Wed, 1 Apr 2026 09:14:43 +0200 Subject: [PATCH 1/2] feature(collator): remove dispatcher --- collator/src/collator/mod.rs | 221 ++++++++-- collator/src/tracing_targets.rs | 1 - collator/src/utils/async_queued_dispatcher.rs | 277 ------------ collator/src/utils/mod.rs | 2 - collator/src/utils/task_descr.rs | 410 ------------------ 5 files changed, 181 insertions(+), 730 deletions(-) delete mode 100644 collator/src/utils/async_queued_dispatcher.rs delete mode 100644 collator/src/utils/task_descr.rs diff --git a/collator/src/collator/mod.rs b/collator/src/collator/mod.rs index f76791ee22..637f51d81f 100644 --- a/collator/src/collator/mod.rs +++ b/collator/src/collator/mod.rs @@ -8,8 +8,7 @@ use async_trait::async_trait; use error::CollatorError; use futures_util::future::Future; use messages_reader::{MessagesReader, MessagesReaderContext}; -use tokio::sync::{Notify, oneshot}; -use tokio_util::sync::CancellationToken; +use tokio::sync::{Notify, mpsc, oneshot}; use tracing::Instrument; use tycho_block_util::block::calc_next_block_id_short; use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff, choose_genesis_info}; @@ -31,15 +30,12 @@ use crate::internal_queue::types::message::EnqueuedMessage; use crate::mempool::{GetAnchorResult, MempoolAdapter, MempoolAnchorId}; use crate::queue_adapter::MessageQueueAdapter; use crate::state_node::StateNodeAdapter; +use crate::tracing_targets; use crate::types::processed_upto::ProcessedUptoInfoExtension; use crate::types::{ BlockCollationResult, CollationSessionId, CollationSessionInfo, CollatorConfig, DebugDisplay, DisplayBlockIdsIntoIter, McData, TopBlockDescription, }; -use crate::utils::async_queued_dispatcher::{ - AsyncQueuedDispatcher, STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE, -}; -use crate::{method_to_queued_async_closure, tracing_targets}; mod anchors_cache; mod debug_info; @@ -77,6 +73,8 @@ pub(crate) use messages_reader::tests::{TestInternalMessage, TestMessageFactory} use crate::collator::anchors_cache::AnchorsCacheTransaction; // FACTORY +const COLLATOR_COMMAND_QUEUE_BUFFER_SIZE: usize = 100; + pub struct CollatorContext { pub mq_adapter: Arc>, pub mpool_adapter: Arc, @@ -167,13 +165,159 @@ pub trait Collator: Send + Sync + 'static { ) -> Result<()>; } +enum CollatorCommand { + Init { + prev_blocks_ids: Vec, + mc_data: Arc, + working_state_tx: oneshot::Sender>>, + }, + ResumeCollation { + mc_data: Arc, + reset: bool, + collation_session: Arc, + prev_blocks_ids: Vec, + }, + TryCollate { + force_one_anchor_import: bool, + }, + DoCollate { + top_shard_blocks_info: Vec, + next_chain_time: u64, + }, + Stop, +} + +impl CollatorCommand { + fn descr(&self) -> &'static str { + match self { + Self::Init { .. } => "init_collator", + Self::ResumeCollation { .. } => "resume_collation", + Self::TryCollate { .. } => "try_collate", + Self::DoCollate { .. } => "do_collate", + Self::Stop => "stop_collator", + } + } +} + +#[derive(Clone)] +pub struct CollatorHandle { + commands_tx: mpsc::Sender, +} + +impl CollatorHandle { + fn new(queue_buffer_size: usize) -> (Self, mpsc::Receiver) { + let (commands_tx, commands_rx) = mpsc::channel::(queue_buffer_size); + (Self { commands_tx }, commands_rx) + } + + fn run(mut worker: CollatorStdImpl, mut commands_rx: mpsc::Receiver) { + tokio::spawn(async move { + loop { + let Some(command) = commands_rx.recv().await else { + tracing::info!( + target: tracing_targets::COLLATOR, + "Collator command channel closed", + ); + break; + }; + + let command_descr = command.descr(); + tracing::trace!( + target: tracing_targets::COLLATOR, + "Collator command ({command_descr}) received", + ); + let should_stop = matches!(&command, CollatorCommand::Stop); + + let command_result = match command { + CollatorCommand::Init { + prev_blocks_ids, + mc_data, + working_state_tx, + } => { + worker + .init_collator_wrapper(prev_blocks_ids, mc_data, working_state_tx) + .await + } + CollatorCommand::ResumeCollation { + mc_data, + reset, + collation_session, + prev_blocks_ids, + } => { + worker + .resume_collation_wrapper( + mc_data, + reset, + collation_session, + prev_blocks_ids, + ) + .await + } + CollatorCommand::TryCollate { + force_one_anchor_import, + } => { + worker + .wait_state_and_try_collate_wrapper(force_one_anchor_import) + .await + } + CollatorCommand::DoCollate { + top_shard_blocks_info, + next_chain_time, + } => { + worker + .wait_state_and_do_collate_wrapper( + top_shard_blocks_info, + next_chain_time, + ) + .await + } + CollatorCommand::Stop => worker.stop_collator().await, + }; + + if let Err(err) = command_result { + panic!("Collator command ({command_descr}) failed: {err:?}"); + } + + tracing::trace!( + target: tracing_targets::COLLATOR, + "Collator command ({command_descr}) executed", + ); + + if should_stop { + break; + } + } + + tracing::info!( + target: tracing_targets::COLLATOR, + "Collator command loop stopped", + ); + }); + } + + async fn enqueue_command(&self, command: CollatorCommand) -> Result<()> { + let command_descr = command.descr(); + self.commands_tx + .send(command) + .await + .map_err(|err| anyhow::anyhow!("collator commands receiver dropped {err:?}"))?; + + tracing::trace!( + target: tracing_targets::COLLATOR, + "Collator command ({command_descr}) enqueued", + ); + + Ok(()) + } +} + pub struct CollatorStdImplFactory { pub wu_tuner_event_sender: Option>, } #[async_trait] impl CollatorFactory for CollatorStdImplFactory { - type Collator = AsyncQueuedDispatcher; + type Collator = CollatorHandle; async fn start(&self, cx: CollatorContext) -> Result { CollatorStdImpl::start( @@ -196,11 +340,9 @@ impl CollatorFactory for CollatorStdImplFactory { } #[async_trait] -impl Collator for AsyncQueuedDispatcher { +impl Collator for CollatorHandle { async fn enqueue_stop(&self) -> Result<()> { - let cancel_token = self.cancel_token().clone(); - self.enqueue_task(method_to_queued_async_closure!(stop_collator, cancel_token)) - .await + self.enqueue_command(CollatorCommand::Stop).await } /// Enqueue update `McData` if newer, reset `PrevData` if required and run next collation attempt @@ -211,20 +353,19 @@ impl Collator for AsyncQueuedDispatcher { collation_session: Arc, prev_blocks_ids: Vec, ) -> Result<()> { - self.enqueue_task(method_to_queued_async_closure!( - resume_collation_wrapper, + self.enqueue_command(CollatorCommand::ResumeCollation { mc_data, reset, collation_session, - prev_blocks_ids - )) + prev_blocks_ids, + }) .await } async fn enqueue_try_collate(&self) -> Result<()> { - self.enqueue_task(method_to_queued_async_closure!( - wait_state_and_try_collate_wrapper, - )) + self.enqueue_command(CollatorCommand::TryCollate { + force_one_anchor_import: false, + }) .await } @@ -233,11 +374,10 @@ impl Collator for AsyncQueuedDispatcher { top_shard_blocks_info: Vec, next_chain_time: u64, ) -> Result<()> { - self.enqueue_task(method_to_queued_async_closure!( - wait_state_and_do_collate_wrapper, + self.enqueue_command(CollatorCommand::DoCollate { top_shard_blocks_info, - next_chain_time - )) + next_chain_time, + }) .await } } @@ -295,7 +435,7 @@ impl CollatorStdImpl { mempool_config_override: Option, cancel_collation: Arc, wu_tuner_event_sender: Option>, - ) -> Result> { + ) -> Result { const BLOCK_CELL_COUNT_BASELINE: usize = 100_000; let next_block_info = calc_next_block_id_short(&prev_blocks_ids); @@ -352,40 +492,38 @@ impl CollatorStdImpl { } }); - // create dispatcher for own async tasks queue - let dispatcher = - AsyncQueuedDispatcher::create(processor, STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE); + // start collator command loop (commands are executed sequentially) + let (collator, commands_rx) = CollatorHandle::new(COLLATOR_COMMAND_QUEUE_BUFFER_SIZE); + CollatorHandle::run(processor, commands_rx); tracing::trace!(target: tracing_targets::COLLATOR, - "(next_block_id={}): collator tasks queue dispatcher started", next_block_info, + "(next_block_id={}): collator command loop started", next_block_info, ); - // equeue first initialization task + // enqueue first initialization command // sending to the receiver here cannot return Error because it is guaranteed not closed or dropped - dispatcher - .enqueue_task(method_to_queued_async_closure!( - init_collator_wrapper, + collator + .enqueue_command(CollatorCommand::Init { prev_blocks_ids, mc_data, - working_state_tx - )) + working_state_tx, + }) .await - .context("task receiver had to be not closed or dropped here")?; + .context("collator commands receiver had to be not closed or dropped here")?; tracing::info!(target: tracing_targets::COLLATOR, - "(next_block_id={}): collator initialization task enqueued", next_block_info, + "(next_block_id={}): collator initialization command enqueued", next_block_info, ); tracing::info!(target: tracing_targets::COLLATOR, "(next_block_id={}): collator started", next_block_info, ); - Ok(dispatcher) + Ok(collator) } - async fn stop_collator(&mut self, dispatcher_cancel_token: CancellationToken) -> Result<()> { + async fn stop_collator(&mut self) -> Result<()> { self.listener .on_collator_stopped(self.collation_session.id()) .await?; - dispatcher_cancel_token.cancel(); Ok(()) } @@ -1681,8 +1819,11 @@ impl CollatorStdImpl { Ok(has_pending_internals) } - async fn wait_state_and_try_collate_wrapper(&mut self) -> Result<()> { - self.wait_state_and_try_collate(false) + async fn wait_state_and_try_collate_wrapper( + &mut self, + force_one_anchor_import: bool, + ) -> Result<()> { + self.wait_state_and_try_collate(force_one_anchor_import) .await .with_context(|| format!("next_block_id: {}", self.next_block_info)) } diff --git a/collator/src/tracing_targets.rs b/collator/src/tracing_targets.rs index 8e0bfe64f1..016fc9e086 100644 --- a/collator/src/tracing_targets.rs +++ b/collator/src/tracing_targets.rs @@ -7,4 +7,3 @@ pub const COLLATOR: &str = "collator"; pub const COLLATOR_READ_NEXT_EXTS: &str = "collator::read_next_externals"; pub const EXEC_MANAGER: &str = "exec_manager"; pub const VALIDATOR: &str = "validator"; -pub const ASYNC_QUEUE_DISPATCHER: &str = "async_queued_dispatcher"; diff --git a/collator/src/utils/async_queued_dispatcher.rs b/collator/src/utils/async_queued_dispatcher.rs deleted file mode 100644 index 2f0ebf5b2e..0000000000 --- a/collator/src/utils/async_queued_dispatcher.rs +++ /dev/null @@ -1,277 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::sync::atomic::Ordering; - -use anyhow::{Result, anyhow}; -use metrics::atomics::AtomicU64; -use tokio::sync::{mpsc, oneshot}; -use tokio_util::sync::CancellationToken; - -use super::task_descr::{TaskDesc, TaskResponder}; -use crate::tracing_targets; - -pub const STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE: usize = 100; - -type AsyncTaskDesc = TaskDesc< - dyn FnOnce(W) -> Pin)> + Send>> + Send, - Result, ->; - -pub struct AsyncQueuedDispatcher { - task_id_counter: Arc, - tasks_queue: mpsc::Sender>, - cancel_token: CancellationToken, -} - -impl Clone for AsyncQueuedDispatcher { - fn clone(&self) -> Self { - Self { - task_id_counter: self.task_id_counter.clone(), - tasks_queue: self.tasks_queue.clone(), - cancel_token: self.cancel_token.clone(), - } - } -} - -impl AsyncQueuedDispatcher -where - W: Send + 'static, // Send and 'static - to use inside tokio::spawn() - R: Send + 'static, -{ - pub fn new(queue_buffer_size: usize) -> (Self, mpsc::Receiver>) { - let (sender, receiver) = mpsc::channel::>(queue_buffer_size); - let dispatcher = Self { - task_id_counter: Arc::new(AtomicU64::default()), - tasks_queue: sender, - cancel_token: CancellationToken::new(), - }; - (dispatcher, receiver) - } - - pub fn run(&self, mut worker: W, mut receiver: mpsc::Receiver>) { - let cancel_token = self.cancel_token.clone(); - tokio::spawn(async move { - loop { - tokio::select! { - res = receiver.recv() => { - let Some(task) = res else { - tracing::info!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Tasks receiver channel closed", - ); - break; - }; - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): received", - task.id(), - task.get_descr(), - ); - let (task_id, task_descr) = (task.id(), task.get_descr()); - let (func, responder) = task.extract(); - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): executing...", task_id, &task_descr, - ); - let future = func(worker); - let (updated_worker, res) = future.await; - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): executed", task_id, &task_descr, - ); - worker = updated_worker; - if let Some(res) = responder.respond(res) { - // no responder, or no receiver can handle result, should panic if task result is Error - if let Err(err) = res { - panic!( - "Task #{} ({}): result error! {:?}", - task_id, &task_descr, err, - ) - } - } else { - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): result responded", task_id, &task_descr, - ); - } - - if cancel_token.is_cancelled() { - break; - } - }, - _ = cancel_token.cancelled() => { - break; - } - } - } - tracing::info!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Dispatcher stopped", - ); - }); - } - - pub fn stop(&self) { - self.cancel_token.cancel(); - } - - pub fn cancel_token(&self) -> CancellationToken { - self.cancel_token.clone() - } - - pub fn create(worker: W, queue_buffer_size: usize) -> Self { - let (dispatcher, receiver) = Self::new(queue_buffer_size); - dispatcher.run(worker, receiver); - dispatcher - } - - async fn _enqueue_task(&self, task: AsyncTaskDesc) -> Result<()> { - let (task_id, task_descr) = (task.id(), task.get_descr()); - self.tasks_queue - .send(task) - .await - .map_err(|err| anyhow!("dispatcher queue receiver dropped {err:?}"))?; - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): enqueued", task_id, task_descr, - ); - Ok(()) - } - - fn _enqueue_task_blocking(&self, task: AsyncTaskDesc) -> Result<()> { - let (task_id, task_descr) = (task.id(), task.get_descr()); - self.tasks_queue - .blocking_send(task) - .map_err(|err| anyhow!("dispatcher queue receiver dropped {err:?}"))?; - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): enqueued (blocking)", task_id, task_descr, - ); - Ok(()) - } - - pub async fn enqueue_task( - &self, - (task_descr, task_fn): ( - &str, - impl FnOnce(W) -> Pin)> + Send>> + Send + 'static, - ), - ) -> Result<()> { - let id = self.task_id_counter.fetch_add(1, Ordering::Release); - let task = AsyncTaskDesc::::create(id, task_descr, Box::new(task_fn)); - self._enqueue_task(task).await - } - - pub fn enqueue_task_blocking( - &self, - (task_descr, task_fn): ( - &str, - impl FnOnce(W) -> Pin)> + Send>> + Send + 'static, - ), - ) -> Result<()> { - let id = self.task_id_counter.fetch_add(1, Ordering::Release); - let task = AsyncTaskDesc::::create(id, task_descr, Box::new(task_fn)); - self._enqueue_task_blocking(task) - } - - pub async fn enqueue_task_with_responder( - &self, - (task_descr, task_fn): ( - &str, - impl FnOnce(W) -> Pin)> + Send>> + Send + 'static, - ), - ) -> Result>> { - let id = self.task_id_counter.fetch_add(1, Ordering::Release); - let (task, receiver) = - AsyncTaskDesc::::create_with_responder(id, task_descr, Box::new(task_fn)); - let (task_id, task_descr) = (task.id(), task.get_descr()); - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): enqueue_task_with_responder()", - task_id, - &task_descr - ); - self._enqueue_task(task).await?; - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): enqueue_task_with_responder(): receiver returned", - task_id, - task_descr, - ); - Ok(receiver) - } - - pub async fn execute_task( - &self, - (task_descr, task_fn): ( - &str, - impl FnOnce(W) -> Pin)> + Send>> + Send + 'static, - ), - ) -> Result { - let id = self.task_id_counter.fetch_add(1, Ordering::Release); - let (task, receiver) = - AsyncTaskDesc::::create_with_responder(id, task_descr, Box::new(task_fn)); - let (task_id, task_descr) = (task.id(), task.get_descr()); - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): execute_task()", task_id, &task_descr, - ); - self._enqueue_task(task).await?; - let res = receiver.await?; - tracing::trace!( - target: tracing_targets::ASYNC_QUEUE_DISPATCHER, - "Task #{} ({}): execute_task(): result received and forwarded", - task_id, - task_descr, - ); - res - } -} - -#[macro_export] -macro_rules! method_to_queued_async_closure { - ($method:ident, $($arg:expr),*) => { - (stringify!($method), - #[allow(unused_mut)] - move |mut worker| { - Box::pin(async move { - let res = worker.$method($($arg),*).await; - (worker, res) - }) - }) - }; -} - -#[cfg(test)] -#[tokio::test] -async fn test() { - struct Worker {} - impl Worker { - async fn action(&mut self, arg: &str) -> Result { - Ok(format!("action result with arg: {arg}")) - } - } - - let worker_obj = Worker {}; - - let dispatcher = AsyncQueuedDispatcher::<_, String>::create(worker_obj, 10); - - // use marco to just call a worker method - let _ = dispatcher - .enqueue_task(method_to_queued_async_closure!(action, "test1")) - .await; - - // or build a closure by yourself - let _ = dispatcher - .enqueue_task(("taskdescr", move |mut worker| { - Box::pin(async move { - // some async code block - let subres = worker.action("test2").await.unwrap(); - let res = format!("converted result from ({subres})"); - - (worker, Ok(res)) - }) - })) - .await; -} diff --git a/collator/src/utils/mod.rs b/collator/src/utils/mod.rs index 03d35d73a5..b1ffbac277 100644 --- a/collator/src/utils/mod.rs +++ b/collator/src/utils/mod.rs @@ -1,10 +1,8 @@ mod async_action; pub mod async_dispatcher; -pub mod async_queued_dispatcher; pub mod block; mod enum_try_into; pub mod shard; -pub mod task_descr; pub mod vset_cache; pub use async_action::*; diff --git a/collator/src/utils/task_descr.rs b/collator/src/utils/task_descr.rs deleted file mode 100644 index 6fa192e88b..0000000000 --- a/collator/src/utils/task_descr.rs +++ /dev/null @@ -1,410 +0,0 @@ -use std::future::Future; - -use tokio::sync::oneshot; - -pub struct TaskDesc { - id: u64, - descr: String, - closure: Box, // closure for execution - responder: Option>, -} - -impl TaskDesc { - pub fn create(id: u64, descr: &str, closure: Box) -> Self { - Self { - id, - descr: descr.into(), - closure, - responder: None, - } - } - pub fn create_with_responder( - id: u64, - descr: &str, - closure: Box, - ) -> (Self, oneshot::Receiver) { - let (sender, receiver) = oneshot::channel::(); - let task = Self { - id, - descr: descr.into(), - closure, - responder: Some(sender), - }; - (task, receiver) - } - pub fn id(&self) -> u64 { - self.id - } - pub fn descr(&self) -> &str { - &self.descr - } - pub fn get_descr(&self) -> String { - self.descr.clone() - } - pub fn extract(self) -> (Box, Option>) { - (self.closure, self.responder) - } - pub fn closure(&self) -> &F { - &self.closure - } - pub fn respond(self, res: R) -> Option { - self.responder.respond(res) - } -} - -pub trait TaskResponder { - /// Respond to receiver with result and return None. - /// Return Some(res) if no responder or receiver exist - fn respond(self, res: R) -> Option; -} -impl TaskResponder for Option> { - fn respond(self, res: R) -> Option { - if let Some(responder) = self { - match responder.send(res) { - Ok(()) => None, - Err(res) => { - tracing::warn!("response receiver dropped"); - Some(res) - } - } - } else { - Some(res) - } - } -} - -#[allow(unused)] -pub struct TaskResponseReceiver { - inner_receiver: oneshot::Receiver>, -} -#[allow(unused)] -impl TaskResponseReceiver -where - R: Send + 'static, -{ - pub fn create(receiver: oneshot::Receiver>) -> Self { - Self { - inner_receiver: receiver, - } - } - pub async fn try_recv(self) -> anyhow::Result { - // TODO: awaiting error and error in result are merged here, need to fix - self.inner_receiver.await? - } - - /// Example: - /// ```ignore - /// let dispatcher = self.dispatcher.clone(); - /// receiver.process_on_recv(|res| async move { - /// dispatcher - /// .enqueue_task(method_to_async_task_closure!( - /// refresh_collation_sessions, - /// res - /// )) - /// .await - /// }); - /// ``` - pub async fn process_on_recv( - self, - process_callback: impl FnOnce(R) -> Fut + Send + 'static, - ) where - Fut: Future> + Send, - { - tokio::spawn(async move { - match self.try_recv().await { - Ok(res) => { - if let Err(e) = process_callback(res).await { - tracing::error!("Error processing task response: {e:?}"); - // TODO: may be unwind panic? - } - } - Err(err) => tracing::error!("Error in task result or on receiving: {err:?}"), - } - }); - } -} - -#[allow(unused)] -pub struct TaskResponseReceiverWithConvert -where - T: TryFrom, -{ - _marker_t: std::marker::PhantomData, - inner_receiver: oneshot::Receiver>, -} -#[allow(unused)] -impl TaskResponseReceiverWithConvert -where - R: Send + 'static, - T: TryFrom + Send + 'static, -{ - pub fn create(receiver: oneshot::Receiver>) -> Self { - Self { - _marker_t: std::marker::PhantomData, - inner_receiver: receiver, - } - } - pub async fn try_recv(self) -> anyhow::Result { - // TODO: awaiting error and error in result are merged here, need to fix - self.inner_receiver.await?.and_then(|res| res.try_into()) - } - - /// Example: - /// ```ignore - /// let dispatcher = self.dispatcher.clone(); - /// receiver.process_on_recv(|res| async move { - /// dispatcher - /// .enqueue_task(method_to_async_task_closure!( - /// refresh_collation_sessions, - /// res - /// )) - /// .await - /// }); - /// ``` - pub async fn process_on_recv( - self, - process_callback: impl FnOnce(T) -> Fut + Send + 'static, - ) where - Fut: Future> + Send, - { - tokio::spawn(async move { - match self.try_recv().await { - Ok(res) => { - if let Err(e) = process_callback(res).await { - tracing::error!("Error processing task response: {e:?}"); - // TODO: may be unwind panic? - } - } - Err(err) => tracing::error!("Error in task result or on receiving: {err:?}"), - } - }); - } -} - -#[cfg(test)] -mod tests { - use super::TaskDesc; - - #[test] - fn void_task_without_responder() { - let task = TaskDesc::create( - 1, - "task descr", - Box::new(|| { - println!("task executed"); - }), - ); - - let task_fn = task.closure(); - - task_fn(); - - let respond_res = task.respond(()); - println!("resond_res: {respond_res:?}"); - - assert!( - respond_res.is_some(), - "task without responder should return Some(()) when call .respond()" - ); - } - - #[tokio::test] - async fn void_task_with_responder() { - let (task, receiver) = TaskDesc::create_with_responder( - 1, - "task descr", - Box::new(|| { - println!("task executed"); - }), - ); - - let task_fn = task.closure(); - - task_fn(); - - let respond_res = task.respond(()); - println!("resond_res: {respond_res:?}"); - - assert!( - respond_res.is_none(), - "task with responder should return None when call .respond()" - ); - - let received_res = receiver.await; - println!("received_res: {received_res:?}"); - - assert!(received_res.is_ok()); - } - - #[test] - fn returning_task_without_responder() { - let task = TaskDesc::create( - 1, - "task descr", - Box::new(|| { - println!("task executed"); - String::from("task result") - }), - ); - - let task_fn = task.closure(); - - let res = task_fn(); - println!("task res: {res:?}"); - - assert_eq!(&res, "task result"); - - let respond_res = task.respond(res); - println!("resond_res: {respond_res:?}"); - - assert!( - respond_res.is_some(), - "task without responder should return Some(res) when call .respond()" - ); - } - - #[tokio::test] - async fn returning_task_with_responder() { - let (task, receiver) = TaskDesc::create_with_responder( - 1, - "task descr", - Box::new(|| { - println!("task executed"); - String::from("task result") - }), - ); - - let task_fn = task.closure(); - - let res = task_fn(); - println!("task res: {res:?}"); - - assert_eq!(&res, "task result"); - - let expected_received = res.clone(); - - let respond_res = task.respond(res); - println!("resond_res: {respond_res:?}"); - - assert!( - respond_res.is_none(), - "task with responder should return None when call .respond()" - ); - - let received_res = receiver.await; - println!("received_res: {received_res:?}"); - - assert!(received_res.is_ok()); - - let received = received_res.unwrap(); - - assert_eq!(received, expected_received); - } - - #[tokio::test] - async fn returning_task_with_responder_and_dropped_receiver() { - let task = { - let (task, _receiver) = TaskDesc::create_with_responder( - 1, - "task descr", - Box::new(|| { - println!("task executed"); - String::from("task result") - }), - ); - task - }; - - let task_fn = task.closure(); - - let res = task_fn(); - println!("task res: {res:?}"); - - assert_eq!(&res, "task result"); - - let respond_res = task.respond(res); - println!("resond_res: {respond_res:?}"); - - assert!( - respond_res.is_some(), - "task with responder should return Some(res) when call .respond() when receiver is dropped" - ); - } - - async fn async_test_void_func() { - println!("async test void func executed"); - } - - async fn async_test_func() -> String { - println!("async test func executed"); - String::from("async task result") - } - - #[tokio::test] - async fn async_void_task_with_responder() { - let (task, receiver) = TaskDesc::create_with_responder( - 1, - "task descr", - Box::new(|| { - println!("task executed"); - async_test_void_func() - }), - ); - - let task_fn = task.closure(); - - let res = task_fn().await; - println!("task res: {res:?}"); - - let respond_res = task.respond(res); - println!("resond_res: {respond_res:?}"); - - assert!( - respond_res.is_none(), - "task with responder should return None when call .respond()" - ); - - let received_res = receiver.await; - println!("received_res: {received_res:?}"); - - assert!(received_res.is_ok()); - } - - #[tokio::test] - async fn returning_void_task_with_responder() { - let (task, receiver) = TaskDesc::create_with_responder( - 1, - "task descr", - Box::new(|| { - println!("task executed"); - async_test_func() - }), - ); - - let task_fn = task.closure(); - - let res = task_fn().await; - println!("task res: {res:?}"); - - assert_eq!(&res, "async task result"); - - let expected_received = res.clone(); - - let respond_res = task.respond(res); - println!("resond_res: {respond_res:?}"); - - assert!( - respond_res.is_none(), - "task with responder should return None when call .respond()" - ); - - let received_res = receiver.await; - println!("received_res: {received_res:?}"); - - assert!(received_res.is_ok()); - - let received = received_res.unwrap(); - - assert_eq!(received, expected_received); - } -} From 559e73ac297120431452c62f9dadd004b2b8e508 Mon Sep 17 00:00:00 2001 From: Maksim Greshniakov Date: Wed, 1 Apr 2026 17:47:59 +0200 Subject: [PATCH 2/2] feature(collator): cancel collation directly --- collator/src/collator/do_collate/mod.rs | 3 + collator/src/collator/mod.rs | 35 ++-- collator/src/manager/mod.rs | 243 +++++++++++++++--------- collator/src/manager/types.rs | 3 +- 4 files changed, 168 insertions(+), 116 deletions(-) diff --git a/collator/src/collator/do_collate/mod.rs b/collator/src/collator/do_collate/mod.rs index 50954478c9..4ee0a8a405 100644 --- a/collator/src/collator/do_collate/mod.rs +++ b/collator/src/collator/do_collate/mod.rs @@ -332,6 +332,9 @@ impl CollatorStdImpl { execute_result, final_result, } = match do_collate_res { + Err(CollatorError::Cancelled(CollationCancelReason::ExternalCancel)) => { + return Ok(()); + } Err(CollatorError::Cancelled(reason)) => { // cancel collation self.listener diff --git a/collator/src/collator/mod.rs b/collator/src/collator/mod.rs index 637f51d81f..82bdb9deaa 100644 --- a/collator/src/collator/mod.rs +++ b/collator/src/collator/mod.rs @@ -89,6 +89,8 @@ pub struct CollatorContext { /// For graceful collation cancellation pub cancel_collation: Arc, + /// Notified when collator completes current command + pub command_complete: Arc, pub zerostate_id: ZerostateId, } @@ -278,6 +280,10 @@ impl CollatorHandle { panic!("Collator command ({command_descr}) failed: {err:?}"); } + // Notify that the current command has completed. + // Used by the manager to await collation cancellation. + worker.command_complete.notify_one(); + tracing::trace!( target: tracing_targets::COLLATOR, "Collator command ({command_descr}) executed", @@ -333,6 +339,7 @@ impl CollatorFactory for CollatorStdImplFactory { cx.mc_data, cx.mempool_config_override, cx.cancel_collation, + cx.command_complete, self.wu_tuner_event_sender.clone(), ) .await @@ -413,6 +420,8 @@ pub struct CollatorStdImpl { /// For graceful collation cancellation cancel_collation: Arc, + /// Notified when collator completes current command + command_complete: Arc, /// Events sender for Work Units tuner service wu_tuner_event_sender: Option>, @@ -434,6 +443,7 @@ impl CollatorStdImpl { mc_data: Arc, mempool_config_override: Option, cancel_collation: Arc, + command_complete: Arc, wu_tuner_event_sender: Option>, ) -> Result { const BLOCK_CELL_COUNT_BASELINE: usize = 100_000; @@ -474,6 +484,7 @@ impl CollatorStdImpl { shard_blocks_count_from_last_anchor: 0, mempool_config_override, cancel_collation, + command_complete, wu_tuner_event_sender, zerostate_id, }; @@ -798,6 +809,9 @@ impl CollatorStdImpl { anchors_info, mut anchors_count_above_last_imported_in_current_shard, } = match import_res { + Err(CollatorError::Cancelled(CollationCancelReason::ExternalCancel)) => { + return Ok(NextCollationFlowStep::Cancel); + } Err(CollatorError::Cancelled(reason)) => { self.listener .on_cancelled( @@ -1895,13 +1909,6 @@ impl CollatorStdImpl { "collation was cancelled by manager on wait_state_and_do_collate", ); metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels).increment(1); - self.listener - .on_cancelled( - working_state.mc_data.block_id, - working_state.next_block_id_short, - CollationCancelReason::ExternalCancel, - ) - .await?; self.delayed_working_state.delay(working_state); return Ok(()); } @@ -2069,13 +2076,6 @@ impl CollatorStdImpl { "collation was cancelled by manager on try_collate_next_master_block", ); metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels).increment(1); - self.listener - .on_cancelled( - working_state.mc_data.block_id, - working_state.next_block_id_short, - CollationCancelReason::ExternalCancel, - ) - .await?; self.delayed_working_state.delay(working_state); return Ok(()); } @@ -2342,13 +2342,6 @@ impl CollatorStdImpl { "collation was cancelled by manager on try_collate_next_shard_block", ); metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels).increment(1); - self.listener - .on_cancelled( - working_state.mc_data.block_id, - working_state.next_block_id_short, - CollationCancelReason::ExternalCancel, - ) - .await?; self.delayed_working_state.delay(working_state); return Ok(()); } diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 0cebcc8108..f4e58dbef4 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use ahash::HashMapExt; use anyhow::{Context, Result, anyhow, bail}; use async_trait::async_trait; -use futures_util::TryFutureExt; +use futures_util::{FutureExt, TryFutureExt}; use parking_lot::{Mutex, RwLock}; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; @@ -643,7 +643,6 @@ where match cancel_reason { CollationCancelReason::AnchorNotFound(_) | CollationCancelReason::NextAnchorNotFound(_) - | CollationCancelReason::ExternalCancel | CollationCancelReason::DiffNotFoundInQueue(_) => { // sync cache and collator state access self.ready_to_sync.notified().await; @@ -655,9 +654,10 @@ where }); // run sync if all collators cancelled or waiting - let has_active = self.active_collators.iter().any(|ac| { - ac.state == CollatorState::Active || ac.state == CollatorState::CancelPending - }); + let has_active = self + .active_collators + .iter() + .any(|ac| ac.state == CollatorState::Active); if !has_active { tracing::info!(target: tracing_targets::COLLATION_MANAGER, "no active collators in shards and masterchain, \ @@ -677,6 +677,11 @@ where .await?; } } + CollationCancelReason::ExternalCancel => { + tracing::warn!(target: tracing_targets::COLLATION_MANAGER, + "received unexpected external cancel through dispatcher", + ); + } } Ok(()) } @@ -698,13 +703,12 @@ where self.ready_to_sync.notified().await; scopeguard::defer!(self.ready_to_sync.notify_one()); - // cancel collator if cancel was requested during active collation try - let updated_collator_state = self.set_collator_state(&next_block_id_short.shard, |ac| { - if ac.state == CollatorState::CancelPending { - ac.state = CollatorState::Cancelled; - } - }); - if updated_collator_state == Some(CollatorState::Cancelled) { + // skip if collator was already cancelled + if self + .active_collators + .get(&next_block_id_short.shard) + .is_some_and(|ac| ac.state == CollatorState::Cancelled) + { tracing::debug!(target: tracing_targets::COLLATION_MANAGER, shard_id = %next_block_id_short.shard, "collator was cancelled before", @@ -858,13 +862,11 @@ where } }; - // cancel collator now after produced block if cancel was requested during active collation - let updated_collator_state = self.set_collator_state(&block_id.shard, |ac| { - if ac.state == CollatorState::CancelPending { - ac.state = CollatorState::Cancelled; - } - }); - let collator_cancelled = updated_collator_state == Some(CollatorState::Cancelled); + // check if collator was already cancelled (e.g. by cancel_active_collators) + let collator_cancelled = self + .active_collators + .get(&block_id.shard) + .is_some_and(|ac| ac.state == CollatorState::Cancelled); let store_res = if collator_cancelled { tracing::info!(target: tracing_targets::COLLATION_MANAGER, @@ -936,19 +938,22 @@ where // when master block mismatched then should cancel shard collators as well if block_id.is_masterchain() { - for mut ac in self + let shard_collators: Vec<_> = self .active_collators - .iter_mut() - .filter(|ac| ac.key() != &block_id.shard) - { - // now we cannot cancel active collation directly - // so we mark collators to be cancelled when they finish current active collation - ac.state = match ac.state { - CollatorState::Waiting | CollatorState::Cancelled => { - CollatorState::Cancelled - } - _ => CollatorState::CancelPending, - }; + .iter() + .filter(|ac| { + ac.key() != &block_id.shard && ac.state == CollatorState::Active + }) + .map(|ac| *ac.key()) + .collect(); + if !shard_collators.is_empty() { + self.cancel_active_collators(&shard_collators).await; + } + // also mark non-active (Waiting) as Cancelled + for mut ac in self.active_collators.iter_mut().filter(|ac| { + ac.key() != &block_id.shard && ac.state != CollatorState::Cancelled + }) { + ac.state = CollatorState::Cancelled; } } @@ -1059,24 +1064,28 @@ where self.set_collator_state(&block_id.shard, |ac| ac.state = CollatorState::Cancelled); - // run sync if all collators cancelled, or waiting, or there are no collators - // and we have applied mc blocks - let has_active = self.active_collators.iter().any(|ac| { - ac.state == CollatorState::Active || ac.state == CollatorState::CancelPending - }); - if !has_active { - tracing::info!(target: tracing_targets::COLLATION_MANAGER, - "sync_to_applied_mc_block: no active collators in shards and master, \ - will run sync to last applied mc block", - ); - - // run sync if have applied mc blocks - self.sync_to_applied_mc_block_if_exist( - store_res.last_collated_mc_block_id, - store_res.applied_mc_queue_range, - ) - .await?; + // cancel all remaining active collators and wait + let active_shards: Vec<_> = self + .active_collators + .iter() + .filter(|ac| ac.state == CollatorState::Active) + .map(|ac| *ac.key()) + .collect(); + if !active_shards.is_empty() { + self.cancel_active_collators(&active_shards).await; } + + tracing::info!(target: tracing_targets::COLLATION_MANAGER, + "sync_to_applied_mc_block: all collators cancelled, \ + will run sync to last applied mc block", + ); + + // run sync if have applied mc blocks + self.sync_to_applied_mc_block_if_exist( + store_res.last_collated_mc_block_id, + store_res.applied_mc_queue_range, + ) + .await?; } } else if block_id.is_masterchain() { // when candidate is master @@ -1302,32 +1311,51 @@ where let labels = [("workchain", block_id.shard.workchain().to_string())]; metrics::counter!("tycho_collator_block_mismatch_count", &labels).increment(1); - // now we cannot cancel active collation directly - // so we mark collators to be cancelled when they finish current active collation - self.set_collator_state(&block_id.shard, |ac| { - ac.state = match ac.state { - CollatorState::Waiting | CollatorState::Cancelled => CollatorState::Cancelled, - _ => CollatorState::CancelPending, - }; - }); - - // when master block mismatched then should cancel shard collators as well - if block_id.is_masterchain() { - for mut ac in self + // cancel all active collators including this shard and wait + { + let mut shards_to_cancel: Vec<_> = vec![]; + if block_id.is_masterchain() { + // cancel all collators + shards_to_cancel.extend( + self.active_collators + .iter() + .filter(|ac| ac.state == CollatorState::Active) + .map(|ac| *ac.key()), + ); + } else { + // cancel only this shard's collator + if self + .active_collators + .get(&block_id.shard) + .is_some_and(|ac| ac.state == CollatorState::Active) + { + shards_to_cancel.push(block_id.shard); + } + } + if !shards_to_cancel.is_empty() { + self.cancel_active_collators(&shards_to_cancel).await; + } + // mark all relevant collators as Cancelled (including Waiting ones) + if block_id.is_masterchain() { + for mut ac in self + .active_collators + .iter_mut() + .filter(|ac| ac.state != CollatorState::Cancelled) + { + ac.state = CollatorState::Cancelled; + } + } else if self .active_collators - .iter_mut() - .filter(|ac| ac.key() != &block_id.shard) + .get(&block_id.shard) + .is_some_and(|ac| ac.state != CollatorState::Cancelled) { - ac.state = match ac.state { - CollatorState::Waiting | CollatorState::Cancelled => { - CollatorState::Cancelled - } - _ => CollatorState::CancelPending, - }; + self.set_collator_state(&block_id.shard, |ac| { + ac.state = CollatorState::Cancelled; + }); } } - // When received blokc mismatches with collated one + // When received block mismatches with collated one // then we should clear uncommitted queue diffs. // The queue diff from last collated master and its shard blocks // are uncommitted and will be removed because they are incorrect. @@ -1434,38 +1462,33 @@ where }; if should_sync { - // we should sync but we can run sync right now only when there are no active collators - let mut has_active = false; - for active_collator in self.active_collators.iter().filter(|ac| { - ac.state == CollatorState::Active - || ac.state == CollatorState::CancelPending - }) { - // try to gracefully cancel active collations - active_collator.cancel_collation.notify_one(); - has_active = true; - } - - if has_active { + let active_shards: Vec<_> = self + .active_collators + .iter() + .filter(|ac| ac.state == CollatorState::Active) + .map(|ac| *ac.key()) + .collect(); + + if !active_shards.is_empty() { tracing::info!(target: tracing_targets::COLLATION_MANAGER, last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()), last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()), last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()), received_is_key_block = is_key_block, - "check_should_sync: cannot sync when there are active collations, \ - try to gracefully cancel them", + "check_should_sync: cancelling active collations before sync", ); - false - } else { - tracing::info!(target: tracing_targets::COLLATION_MANAGER, - last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()), - last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()), - last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()), - received_is_key_block = is_key_block, - "check_should_sync: can sync to last applied mc block \ - when all collators were cancelled, or waiting, or there are no collators (node not in set)", - ); - true + self.cancel_active_collators(&active_shards).await; } + + tracing::info!(target: tracing_targets::COLLATION_MANAGER, + last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()), + last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()), + last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()), + received_is_key_block = is_key_block, + "check_should_sync: can sync to last applied mc block \ + when all active collations are finished", + ); + true } else { false } @@ -2526,6 +2549,7 @@ where ); let cancel_collation_notify = Arc::new(Notify::new()); + let command_complete_notify = Arc::new(Notify::new()); match self .collator_factory @@ -2542,6 +2566,7 @@ where mc_data: mc_data.clone(), mempool_config_override: self.mempool_config_override.clone(), cancel_collation: cancel_collation_notify.clone(), + command_complete: command_complete_notify.clone(), }) .await { @@ -2557,6 +2582,7 @@ where collator: Arc::new(collator), state: CollatorState::Active, cancel_collation: cancel_collation_notify, + command_complete: command_complete_notify, }); } } @@ -2684,6 +2710,35 @@ where } } + /// Cancel active collators for the given shards and wait for them to complete their current command. + /// Safe to call while holding `ready_to_sync` does not go through the dispatcher. + async fn cancel_active_collators(&self, shards: &[ShardIdent]) { + let mut cancel_futures = vec![]; + for shard in shards { + if let Some(mut ac) = self.active_collators.get_mut(shard) + && ac.state == CollatorState::Active + { + ac.state = CollatorState::Cancelled; + + let command_complete = ac.command_complete.clone(); + while command_complete.notified().now_or_never().is_some() {} + + // NOTE: This only waits for the currently executing command to finish. + // It does not prevent already queued commands from starting afterwards. + // If we need "cancel cuts off all old queued commands", add a generation/epoch + // to commands and skip stale ones in the collator command loop before + // executing them. + let notified = async move { + command_complete.notified().await; + }; + + ac.cancel_collation.notify_one(); + cancel_futures.push(notified); + } + } + futures_util::future::join_all(cancel_futures).await; + } + fn set_active_sync_info(&self, target_mc_block_seqno: BlockSeqno) -> Result { let mut guard = self.collation_sync_state.lock(); if let Some(active_sync) = &guard.active_sync_to_applied { diff --git a/collator/src/manager/types.rs b/collator/src/manager/types.rs index 12e6563888..fef9812a64 100644 --- a/collator/src/manager/types.rs +++ b/collator/src/manager/types.rs @@ -30,7 +30,6 @@ pub(super) enum CollatorState { Active, Waiting, Cancelled, - CancelPending, } pub(super) struct ActiveCollator { @@ -39,6 +38,8 @@ pub(super) struct ActiveCollator { /// For graceful collation cancellation pub cancel_collation: Arc, + /// Notified when collator completes current command + pub command_complete: Arc, } #[derive(Default)]