Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions collator/src/collator/do_collate/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::finalize::FinalizeState;
use super::phase::{Phase, PhaseState};
use super::work_units::PrepareMsgGroupsWu;
use crate::collator::do_collate::work_units::ExecuteWu;
use crate::collator::error::{CollationCancelReason, CollatorError};
use crate::collator::error::CollatorError;
use crate::collator::messages_reader::{
GetNextMessageGroupMode, MessagesReader, MessagesReaderMetrics,
};
Expand Down Expand Up @@ -119,9 +119,7 @@ impl<'a, 'b> Phase<ExecuteState<'a, 'b>> {

// exit collation if cancelled
if self.state.collation_is_cancelled.check() {
return Err(CollatorError::Cancelled(
CollationCancelReason::ExternalCancel,
));
return Err(CollatorError::Cancelled);
}

let timer = std::time::Instant::now();
Expand Down
4 changes: 2 additions & 2 deletions collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::collator::do_collate::finalize::vset_update_start::{
KbNextSessionStart, KbNextSessionUpdate,
};
use crate::collator::do_collate::work_units::FinalizeWu;
use crate::collator::error::{CollationCancelReason, CollatorError};
use crate::collator::error::{CollationAbortReason, CollatorError};
use crate::collator::execution_manager::MessagesExecutor;
use crate::collator::max_anchors_processing_lag_rounds;
use crate::collator::messages_reader::{FinalizedMessagesReader, MessagesReader};
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Phase<FinalizeState> {
"finalize_messages_reader: cannot get diff with stats from queue for block {}",
top_block_id.as_short_id(),
);
CollatorError::Cancelled(CollationCancelReason::DiffNotFoundInQueue(
CollatorError::Aborted(CollationAbortReason::DiffNotFoundInQueue(
top_block_id.as_short_id(),
))
})?;
Expand Down
28 changes: 18 additions & 10 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use self::phase::{ActualState, Phase};
use self::prepare::PrepareState;
use self::work_units::{DoCollateWu, WuEvent, WuEventData};
use crate::collator::anchors_cache::AnchorsCacheTransaction;
use crate::collator::error::{CollationCancelReason, CollatorError};
use crate::collator::error::CollatorError;
use crate::collator::messages_reader::state::ReaderState;
use crate::collator::types::{
AnchorsCache, BlockCollationData, BlockCollationDataBuilder, BlockSerializerCache,
Expand Down Expand Up @@ -232,9 +232,7 @@ impl CollatorStdImpl {
.and_then(|(collation_result, pending_queue_diff_tx)| {
// exit collation if cancelled before writing queue diff
if collation_is_cancelled.check() {
return Err(CollatorError::Cancelled(
CollationCancelReason::ExternalCancel,
));
return Err(CollatorError::Cancelled);
}
Ok((collation_result, pending_queue_diff_tx))
});
Expand Down Expand Up @@ -314,7 +312,7 @@ impl CollatorStdImpl {
_ = async move {
collation_cancelled.await;
tracing::info!(target: tracing_targets::COLLATOR,
"collation was cancelled by manager on do_collate",
"collation cancel requested on do_collate",
);
collation_is_cancelled.cancel();
std::future::pending::<()>().await;
Expand All @@ -333,9 +331,21 @@ impl CollatorStdImpl {
execute_result,
final_result,
} = match do_collate_res {
Err(CollatorError::Cancelled(reason)) => {
Err(CollatorError::Cancelled) => {
tracing::info!(target: tracing_targets::COLLATOR,
"collation was cancelled by manager on do_collate",
);

// cancel collation
return Ok(CollatorResult::cancelled(
return Ok(CollatorResult::cancelled(mc_block_id, next_block_id_short));
}
Err(CollatorError::Aborted(reason)) => {
tracing::info!(target: tracing_targets::COLLATOR,
"collation was aborted on do_collate",
);

// abort collation
return Ok(CollatorResult::aborted(
mc_block_id,
next_block_id_short,
reason,
Expand Down Expand Up @@ -640,9 +650,7 @@ impl CollatorStdImpl {

// exit collation if cancelled
if collation_is_cancelled.check() {
return Err(CollatorError::Cancelled(
CollationCancelReason::ExternalCancel,
));
return Err(CollatorError::Cancelled);
}

let diff_tail_len =
Expand Down
5 changes: 1 addition & 4 deletions collator/src/collator/do_collate/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use tycho_types::models::GlobalCapability;
use super::execute::ExecuteState;
use super::execution_wrapper::ExecutorWrapper;
use super::phase::{Phase, PhaseState};
use crate::collator::CollationCancelReason;
use crate::collator::anchors_cache::AnchorsCacheTransaction;
use crate::collator::do_collate::phase::ActualState;
use crate::collator::error::CollatorError;
Expand Down Expand Up @@ -176,9 +175,7 @@ impl<'a, 'b> Phase<PrepareState<'a, 'b>> {

// exit collation if cancelled
if collation_is_cancelled_debounce.check() {
return Err(CollatorError::Cancelled(
CollationCancelReason::ExternalCancel,
));
return Err(CollatorError::Cancelled);
}
}

Expand Down
9 changes: 5 additions & 4 deletions collator/src/collator/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ use tycho_types::models::BlockIdShort;
use crate::mempool::MempoolAnchorId;

#[derive(Debug)]
pub enum CollationCancelReason {
pub enum CollationAbortReason {
AnchorNotFound(MempoolAnchorId),
NextAnchorNotFound(MempoolAnchorId),
ExternalCancel,
DiffNotFoundInQueue(BlockIdShort),
}

#[derive(thiserror::Error, Debug)]
pub enum CollatorError {
#[error("Cancelled(reason: {0:?})")]
Cancelled(CollationCancelReason),
#[error("Aborted(reason: {0:?})")]
Aborted(CollationAbortReason),
#[error("Cancelled externally")]
Cancelled,
#[error(transparent)]
Anyhow(#[from] anyhow::Error),
}
Expand Down
101 changes: 70 additions & 31 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use self::error::CollatorError;
use self::messages_reader::state::ReaderState;
use self::messages_reader::{MessagesReader, MessagesReaderContext};
use self::types::{
BlockSerializerCache, CollatorStats, MsgsExecutionParamsStuff, NextCollationFlowStep, PrevData,
WorkingState,
AbortedContext, BlockSerializerCache, CollatorStats, MsgsExecutionParamsStuff,
NextCollationFlowStep, PrevData, WorkingState,
};
use crate::internal_queue::types::message::EnqueuedMessage;
use crate::mempool::{GetAnchorResult, MempoolAdapter, MempoolAnchorId};
Expand All @@ -50,7 +50,7 @@ mod statistics;
mod types;

pub use do_collate::{is_first_block_after_prev_master, work_units};
pub use error::CollationCancelReason;
pub use error::CollationAbortReason;
pub use types::{
CancelledContext, CollatorResult, DebugCollatorResult, ForceMasterCollation,
ShardDescriptionExt,
Expand Down Expand Up @@ -116,7 +116,9 @@ where

#[async_trait]
pub trait Collator: Send + Sync + 'static {
fn next_block_id_short(&self) -> &BlockIdShort;
fn shard_id(&self) -> &ShardIdent;

/// Init collator and try to collate next block
async fn init(
self: Box<Self>,
Expand Down Expand Up @@ -194,6 +196,10 @@ pub struct CollatorStdImpl {

#[async_trait]
impl Collator for CollatorStdImpl {
fn next_block_id_short(&self) -> &BlockIdShort {
&self.next_block_info
}

fn shard_id(&self) -> &ShardIdent {
&self.shard_id
}
Expand Down Expand Up @@ -340,12 +346,27 @@ impl CollatorStdImpl {
} = self.handle_mempool_genesis(&mut working_state).await?;

// try import init anchors
if let NextCollationFlowStep::Cancel(cancel_ctx) = self
let try_import_res = self
.try_import_init_anchors(&mut working_state, anchors_proc_info_opt, &genesis_info)
.await?
{
self.delayed_working_state.delay(working_state);
return Ok(CollatorResult::Cancelled(cancel_ctx));
.await?;
match try_import_res {
NextCollationFlowStep::Cancel(cancel_ctx) => {
tracing::info!(target: tracing_targets::COLLATOR,
"collation was cancelled by manager on init_collator",
);

self.delayed_working_state.delay(working_state);
return Ok(CollatorResult::Cancelled(cancel_ctx));
}
NextCollationFlowStep::Abort(abort_ctx) => {
tracing::info!(target: tracing_targets::COLLATOR,
"collation was aborted on init_collator",
);

self.delayed_working_state.delay(working_state);
return Ok(CollatorResult::Aborted(abort_ctx));
}
NextCollationFlowStep::Continue => {}
}

self.timer = std::time::Instant::now();
Expand Down Expand Up @@ -483,8 +504,8 @@ impl CollatorStdImpl {
// if last processed_to anchor is before the start round for master,
// then cancel collation because we need to receive more blocks from bc
else if self.shard_id.is_masterchain() {
return Err(CollatorError::Cancelled(
CollationCancelReason::AnchorNotFound(anchors_proc_info.processed_to_anchor_id),
return Err(CollatorError::Aborted(
CollationAbortReason::AnchorNotFound(anchors_proc_info.processed_to_anchor_id),
));
}
// last processed_to anchor in shard can be before last processed in master
Expand Down Expand Up @@ -557,23 +578,29 @@ impl CollatorStdImpl {
res = import_fut => res,
_ = cancel_collation.notified() => {
tracing::info!(target: tracing_targets::COLLATOR,
"collation was cancelled by manager",
"collation was cancelled by manager on try_import_init_anchors",
);
let labels = [("workchain", self.shard_id.workchain().to_string())];
metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels[..]).increment(1);
Err(CollatorError::Cancelled(CollationCancelReason::ExternalCancel))
Err(CollatorError::Cancelled)
}
};

let ImportInitAnchorsResult {
anchors_info,
mut anchors_count_above_last_imported_in_current_shard,
} = match import_res {
Err(CollatorError::Cancelled(cancel_reason)) => {
Err(CollatorError::Cancelled) => {
return Ok(NextCollationFlowStep::Cancel(CancelledContext {
prev_mc_block_id: working_state.mc_data.block_id,
next_block_id_short: working_state.next_block_id_short,
cancel_reason,
}));
}
Err(CollatorError::Aborted(reason)) => {
return Ok(NextCollationFlowStep::Abort(AbortedContext {
prev_mc_block_id: working_state.mc_data.block_id,
next_block_id_short: working_state.next_block_id_short,
reason,
}));
}
res => res?,
Expand Down Expand Up @@ -605,7 +632,7 @@ impl CollatorStdImpl {
Ok(NextCollationFlowStep::Continue)
}

#[tracing::instrument("resume_collation", skip_all, fields(next_block_id = %self.next_block_info))]
#[tracing::instrument(name = "resume_collation", skip_all, fields(next_block_id = %self.next_block_info))]
async fn resume_collation_impl(
&mut self,
mc_data: Arc<McData>,
Expand Down Expand Up @@ -698,12 +725,27 @@ impl CollatorStdImpl {
genesis_was_updated = genesis_updated;

// try import init anchors
if let NextCollationFlowStep::Cancel(cancel_ctx) = self
let try_import_res = self
.try_import_init_anchors(&mut working_state, anchors_proc_info_opt, &genesis_info)
.await?
{
self.delayed_working_state.delay(working_state);
return Ok(CollatorResult::Cancelled(cancel_ctx));
.await?;
match try_import_res {
NextCollationFlowStep::Cancel(cancel_ctx) => {
tracing::info!(target: tracing_targets::COLLATOR,
"collation was cancelled by manager on resume_collation",
);

self.delayed_working_state.delay(working_state);
return Ok(CollatorResult::Cancelled(cancel_ctx));
}
NextCollationFlowStep::Abort(abort_ctx) => {
tracing::info!(target: tracing_targets::COLLATOR,
"collation was aborted on resume_collation",
);

self.delayed_working_state.delay(working_state);
return Ok(CollatorResult::Aborted(abort_ctx));
}
NextCollationFlowStep::Continue => {}
}

working_state
Expand Down Expand Up @@ -1237,8 +1279,8 @@ impl CollatorStdImpl {
.get_anchor_by_id(processed_to_anchor_id)
.await?
else {
return Err(CollatorError::Cancelled(
CollationCancelReason::AnchorNotFound(processed_to_anchor_id),
return Err(CollatorError::Aborted(
CollationAbortReason::AnchorNotFound(processed_to_anchor_id),
));
};
prev_anchor_id = anchor.id;
Expand Down Expand Up @@ -1290,8 +1332,8 @@ impl CollatorStdImpl {
let GetAnchorResult::Exist(anchor) =
mpool_adapter.get_next_anchor(prev_anchor_id).await?
else {
return Err(CollatorError::Cancelled(
CollationCancelReason::NextAnchorNotFound(prev_anchor_id),
return Err(CollatorError::Aborted(
CollationAbortReason::NextAnchorNotFound(prev_anchor_id),
));
};
prev_anchor_id = anchor.id;
Expand Down Expand Up @@ -1477,7 +1519,6 @@ impl CollatorStdImpl {
let res = CollatorResult::cancelled(
working_state.mc_data.block_id,
working_state.next_block_id_short,
CollationCancelReason::ExternalCancel,
);
self.delayed_working_state.delay(working_state);

Expand Down Expand Up @@ -1663,7 +1704,6 @@ impl CollatorStdImpl {
let res = CollatorResult::cancelled(
working_state.mc_data.block_id,
working_state.next_block_id_short,
CollationCancelReason::ExternalCancel,
);
self.delayed_working_state.delay(working_state);

Expand All @@ -1686,10 +1726,10 @@ impl CollatorStdImpl {
"next anchor not exist, cancel collation attempts",
);

let res = CollatorResult::cancelled(
let res = CollatorResult::aborted(
working_state.mc_data.block_id,
working_state.next_block_id_short,
CollationCancelReason::NextAnchorNotFound(prev_anchor_id),
CollationAbortReason::NextAnchorNotFound(prev_anchor_id),
);
self.delayed_working_state.delay(working_state);

Expand Down Expand Up @@ -1944,7 +1984,6 @@ impl CollatorStdImpl {
let res = CollatorResult::cancelled(
working_state.mc_data.block_id,
working_state.next_block_id_short,
CollationCancelReason::ExternalCancel,
);
self.delayed_working_state.delay(working_state);

Expand All @@ -1968,10 +2007,10 @@ impl CollatorStdImpl {
"next anchor not exist, cancel collation attempts",
);

let res = CollatorResult::cancelled(
let res = CollatorResult::aborted(
working_state.mc_data.block_id,
working_state.next_block_id_short,
CollationCancelReason::NextAnchorNotFound(prev_anchor_id),
CollationAbortReason::NextAnchorNotFound(prev_anchor_id),
);
self.delayed_working_state.delay(working_state);

Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/tests/messages_reader_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ impl<V: InternalMessageValue> TestCollator<V> {
}

#[allow(clippy::too_many_arguments)]
#[tracing::instrument("test_collate", skip_all, fields(block_id = %BlockIdShort { shard: self.shard_id, seqno: self.block_seqno + 1 }))]
#[tracing::instrument(name = "test_collate", skip_all, fields(block_id = %BlockIdShort { shard: self.shard_id, seqno: self.block_seqno + 1 }))]
fn test_collate_block_and_check_refill<F>(
&mut self,
block_tx_limit: usize,
Expand Down
Loading
Loading