diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/handlers.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/handlers.rs new file mode 100644 index 00000000000..3bee799ddc8 --- /dev/null +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/handlers.rs @@ -0,0 +1,23 @@ +use std::time::SystemTime; + +use libp2p::PeerId; +use tracing::trace; + +use crate::message::StressTestMessage; + +pub fn receive_stress_test_message(received_message: Vec, sender_peer_id: Option) { + let end_time = SystemTime::now(); + + let received_message: StressTestMessage = received_message.into(); + let start_time = received_message.metadata.time; + let delay_seconds = match end_time.duration_since(start_time) { + Ok(duration) => duration.as_secs_f64(), + Err(_) => { + let negative_duration = start_time.duration_since(end_time).unwrap(); + -negative_duration.as_secs_f64() + } + }; + + // TODO(AndrewL): Replace this with metric updates + trace!("Received stress test message from {sender_peer_id:?} in {delay_seconds} seconds"); +} diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs index 91846d89c9a..e90ccd282da 100644 --- a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs @@ -10,6 +10,7 @@ use tracing::Level; #[cfg(test)] mod message_test; +mod handlers; mod message; mod protocol; mod stress_test_node; diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs index d67c23244c0..c77effdab47 100644 --- a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs @@ -10,6 +10,7 @@ use libp2p::Multiaddr; use tokio::task::JoinHandle; use tracing::{info, warn}; +use crate::handlers::receive_stress_test_message; use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender}; /// The main stress test node that manages network communication and monitoring @@ -19,8 +20,6 @@ pub struct BroadcastNetworkStressTestNode { // TODO(AndrewL): Remove this once they are used #[allow(dead_code)] message_sender: Option, - // TODO(AndrewL): Remove this once they are used - #[allow(dead_code)] message_receiver: Option, } @@ -85,10 +84,24 @@ impl BroadcastNetworkStressTestNode { .boxed() } + /// Starts the message receiving task + pub async fn make_message_receiver_task(&mut self) -> BoxFuture<'static, ()> { + let message_receiver = + self.message_receiver.take().expect("message_receiver should be available"); + + async move { + info!("Starting message receiver"); + message_receiver.for_each(receive_stress_test_message).await; + info!("Message receiver task ended"); + } + .boxed() + } + /// Gets all the tasks that need to be run async fn get_tasks(&mut self) -> Vec> { let mut tasks = Vec::new(); tasks.push(self.start_network_manager().await); + tasks.push(self.make_message_receiver_task().await); tasks }