Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 156 additions & 4 deletions crates/actors/src/block_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use irys_types::{
use irys_vdf::state::VdfStateReadonly;
use reth::tasks::shutdown::Shutdown;
use reth_db::Database as _;
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{
mpsc::{self, error::SendError, UnboundedSender},
oneshot::{self, error::RecvError},
Semaphore,
};
use tracing::{debug, error, info, trace, warn, Instrument as _};

Expand Down Expand Up @@ -147,14 +148,16 @@ pub struct BlockDiscoveryServiceInner {
pub vdf_steps_guard: VdfStateReadonly,
/// Service Senders
pub service_senders: ServiceSenders,
/// Semaphore to limit concurrent block discovery tasks
pub message_handler_semaphore: Arc<Semaphore>,
}

#[derive(Debug)]

pub struct BlockDiscoveryService {
shutdown: Shutdown,
msg_rx: mpsc::UnboundedReceiver<BlockDiscoveryMessage>,
inner: Arc<BlockDiscoveryServiceInner>,
runtime_handle: tokio::runtime::Handle,
}

impl BlockDiscoveryService {
Expand All @@ -168,12 +171,14 @@ impl BlockDiscoveryService {

let (shutdown_tx, shutdown_rx) = reth::tasks::shutdown::signal();

let runtime_handle_clone = runtime_handle.clone();
let handle = runtime_handle.spawn(
async move {
let service = Self {
shutdown: shutdown_rx,
msg_rx: rx,
inner,
runtime_handle: runtime_handle_clone,
};
service
.start()
Expand Down Expand Up @@ -206,8 +211,21 @@ impl BlockDiscoveryService {
// Handle commands
cmd = self.msg_rx.recv() => {
match cmd {
Some(cmd) => {
self.handle_message(cmd).await?;
Some(msg) => {
let semaphore = self.inner.message_handler_semaphore.clone();
match semaphore.clone().try_acquire_owned() {
Ok(permit) => {
self.spawn_validation_task(permit, msg);
}
Err(tokio::sync::TryAcquireError::NoPermits) => {
warn!("Block discovery semaphore at capacity, waiting for permit");
self.spawn_with_timeout(semaphore, msg).await;
}
Err(tokio::sync::TryAcquireError::Closed) => {
error!("Block discovery semaphore closed");
break;
}
}
}
None => {
warn!("Command channel closed unexpectedly");
Expand All @@ -222,6 +240,50 @@ impl BlockDiscoveryService {
Ok(())
}

/// Spawn a validation task with the acquired permit
fn spawn_validation_task(
&self,
permit: tokio::sync::OwnedSemaphorePermit,
msg: BlockDiscoveryMessage,
) {
let inner = Arc::clone(&self.inner);
let runtime_handle = self.runtime_handle.clone();

runtime_handle.spawn(
async move {
let _permit = permit; // Hold until completion

match msg {
BlockDiscoveryMessage::BlockDiscovered(block, skip_vdf, response_tx) => {
let result = inner
.validate_and_forward_with_parent_wait(block, skip_vdf)
.await;

if let Some(tx) = response_tx {
let _ = tx.send(result);
}
}
}
}
.in_current_span(),
);
}

/// Wait with timeout when semaphore is at capacity
async fn spawn_with_timeout(&self, semaphore: Arc<Semaphore>, msg: BlockDiscoveryMessage) {
match tokio::time::timeout(Duration::from_secs(60), semaphore.acquire_owned()).await {
Ok(Ok(permit)) => {
self.spawn_validation_task(permit, msg);
}
Ok(Err(e)) => {
error!("Failed to acquire block discovery permit: {:?}", e);
}
Err(_) => {
error!("Timed out waiting for block discovery permit");
}
}
}

#[tracing::instrument(level = "trace", skip_all)]
async fn handle_message(&self, msg: BlockDiscoveryMessage) -> eyre::Result<()> {
match msg {
Expand Down Expand Up @@ -254,6 +316,96 @@ pub enum BlockDiscoveryMessage {
}

impl BlockDiscoveryServiceInner {
/// Validate block and wait for parent to be in tree before forwarding
pub async fn validate_and_forward_with_parent_wait(
self: Arc<Self>,
block: Arc<IrysBlockHeader>,
skip_vdf: bool,
) -> Result<(), BlockDiscoveryError> {
let parent_hash = block.previous_block_hash;

debug!(
block.hash = %block.block_hash,
block.height = block.height,
block.parent_hash = %parent_hash,
"Waiting for parent block to be in tree before validation"
);

// Step 1: Wait for parent to be in block tree (with 5 second timeout)
let parent_ready = self
.clone()
.wait_for_parent_in_tree(parent_hash, Duration::from_secs(5))
.await;

if !parent_ready {
warn!(
block.hash = %block.block_hash,
block.parent_hash = %parent_hash,
"Parent not in tree after 5s timeout, dropping block"
);
return Err(BlockDiscoveryError::InternalError(
BlockDiscoveryInternalError::BlockTreeRequestFailed(
"Parent block not in tree within timeout".to_string(),
),
));
}

debug!(
block.hash = %block.block_hash,
block.parent_hash = %parent_hash,
"Parent found in tree, proceeding with validation"
);

// Step 2: Run pre-validation and send to block tree
self.block_discovered(block.clone(), skip_vdf).await
}

/// Wait for parent block to appear in block tree
pub async fn wait_for_parent_in_tree(
self: Arc<Self>,
parent_hash: BlockHash,
timeout: Duration,
) -> bool {
// Subscribe to block state updates
let mut block_state_rx = self.service_senders.subscribe_block_state_updates();

// Create timeout future
let timeout_future = tokio::time::sleep(timeout);
tokio::pin!(timeout_future);

loop {
// Check if parent is in tree
{
let tree = self.block_tree_guard.read();
if tree.blocks.contains_key(&parent_hash) {
debug!(parent.hash = %parent_hash, "Parent found in block tree");
return true;
}
}

// Wait for either timeout or block state update
tokio::select! {
_ = &mut timeout_future => {
warn!(parent.hash = %parent_hash, "Timeout waiting for parent");
return false;
}
result = block_state_rx.recv() => {
match result {
Ok(event) if event.block_hash == parent_hash => {
// Parent state changed, check again
continue;
}
Ok(_) => continue, // Other block, keep waiting
Err(_) => {
error!("Block state channel closed");
return false;
}
}
}
}
}
}

#[tracing::instrument(level = "trace", skip_all, fields(block.height = %block.height, block.hash = %block.block_hash))]
pub async fn block_discovered(
&self,
Expand Down
8 changes: 8 additions & 0 deletions crates/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,11 @@ impl IrysNode {
block_discovery_rx: UnboundedReceiver<BlockDiscoveryMessage>,
runtime_handle: Handle,
) -> TokioServiceHandle {
let max_concurrent_block_discovery_tasks = config
.node_config
.mempool
.max_concurrent_block_discovery_tasks;

let block_discovery_inner = BlockDiscoveryServiceInner {
block_index_guard: block_index_guard.clone(),
block_tree_guard: block_tree_guard.clone(),
Expand All @@ -1828,6 +1833,9 @@ impl IrysNode {
vdf_steps_guard: vdf_steps_guard.clone(),
service_senders: service_senders.clone(),
reward_curve,
message_handler_semaphore: Arc::new(tokio::sync::Semaphore::new(
max_concurrent_block_discovery_tasks,
)),
};
BlockDiscoveryService::spawn_service(
Arc::new(block_discovery_inner),
Expand Down
6 changes: 0 additions & 6 deletions crates/p2p/src/block_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,12 +781,6 @@ where
current_block_hash
);

// TODO: validate this UNTRUSTED height against the parent block's height (as we have processed it)

self.block_status_provider
.wait_for_block_tree_can_process_height(block_header.height)
.await;

// Send cached transactions (if any) to the mempool before handling the block
let cached_txs = self
.blocks_cache
Expand Down
33 changes: 0 additions & 33 deletions crates/p2p/src/block_status_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,39 +131,6 @@ impl BlockStatusProvider {
}
}

// waits for the block tree to be "able" to processed this block, based on height.
// this is to prevent overwhelming the block tree with blocks, such that it prunes blocks still undergoing validation.
pub async fn wait_for_block_tree_can_process_height(&self, block_height: u64) {
const ATTEMPTS_PER_SECOND: u64 = 5;
let mut attempts = 0;

loop {
attempts += 1;

if attempts % ATTEMPTS_PER_SECOND == 0 {
debug!(
"Block tree did not catch up to height {} after {} seconds, waiting...",
block_height,
attempts / ATTEMPTS_PER_SECOND
);
}

let can_process_height = {
let binding = self.block_tree_read_guard.read();
binding.can_process_height(block_height)
};

if can_process_height {
return;
}

tokio::time::sleep(tokio::time::Duration::from_millis(
1000 / ATTEMPTS_PER_SECOND,
))
.await;
}
}

pub fn is_height_in_the_index(&self, block_height: u64) -> bool {
let binding = self.block_index_read_guard.read();
let index_item = binding.get_item(block_height);
Expand Down
7 changes: 7 additions & 0 deletions crates/types/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ impl From<&NodeConfig> for MempoolConfig {
anchor_expiry_depth: consensus.tx_anchor_expiry_depth,
commitment_fee: consensus.commitment_fee,
max_concurrent_mempool_tasks: value.mempool.max_concurrent_mempool_tasks,
max_concurrent_block_discovery_tasks: value
.mempool
.max_concurrent_block_discovery_tasks,
}
}
}
Expand Down Expand Up @@ -232,6 +235,10 @@ pub struct MempoolConfig {

/// Maximum number of concurrent handlers for mempool messages
pub max_concurrent_mempool_tasks: usize,

/// Maximum number of concurrent block discovery validation tasks
/// Controls the number of blocks that can be pre-validated concurrently
pub max_concurrent_block_discovery_tasks: usize,
}

pub mod serde_utils {
Expand Down
11 changes: 11 additions & 0 deletions crates/types/src/config/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,21 @@ pub struct MempoolNodeConfig {
/// Maximum number of concurrent handlers for the mempool messages
#[serde(default = "default_max_concurrent_mempool_tasks")]
pub max_concurrent_mempool_tasks: usize,

/// Maximum number of concurrent block discovery validation tasks
/// Controls the number of blocks that can be pre-validated concurrently
#[serde(default = "default_max_concurrent_block_discovery_tasks")]
pub max_concurrent_block_discovery_tasks: usize,
}

pub fn default_max_concurrent_mempool_tasks() -> usize {
30
}

pub fn default_max_concurrent_block_discovery_tasks() -> usize {
8
}

impl NodeConfig {
pub fn consensus_config(&self) -> ConsensusConfig {
// load the consensus config
Expand Down Expand Up @@ -737,6 +746,7 @@ impl NodeConfig {
max_valid_commitment_addresses: 300,
max_commitments_per_address: 20,
max_concurrent_mempool_tasks: 30,
max_concurrent_block_discovery_tasks: 8,
},

vdf: VdfNodeConfig {
Expand Down Expand Up @@ -876,6 +886,7 @@ impl NodeConfig {
max_valid_commitment_addresses: 300,
max_commitments_per_address: 20,
max_concurrent_mempool_tasks: 30,
max_concurrent_block_discovery_tasks: 8,
},

vdf: VdfNodeConfig {
Expand Down
Loading