Skip to content

Commit 3e5cc3a

Browse files
apollo_network_benchmark: added message broadcasting task
1 parent 901885c commit 3e5cc3a

File tree

4 files changed

+80
-7
lines changed

4 files changed

+80
-7
lines changed

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/handlers.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,48 @@
1-
use std::time::SystemTime;
1+
use std::time::{Duration, SystemTime};
22

33
use apollo_metrics::metrics::LossyIntoF64;
4+
use apollo_network_benchmark::node_args::NodeArgs;
45
use libp2p::PeerId;
56

6-
use crate::message::StressTestMessage;
7+
use crate::message::{StressTestMessage, METADATA_SIZE};
78
use crate::metrics::{
89
RECEIVE_MESSAGE_BYTES,
910
RECEIVE_MESSAGE_BYTES_SUM,
1011
RECEIVE_MESSAGE_COUNT,
1112
RECEIVE_MESSAGE_DELAY_SECONDS,
1213
RECEIVE_MESSAGE_NEGATIVE_DELAY_SECONDS,
1314
};
15+
use crate::protocol::MessageSender;
16+
17+
fn get_message(id: u64, size_bytes: usize) -> StressTestMessage {
18+
let message = StressTestMessage::new(id, 0, vec![0; size_bytes - *METADATA_SIZE]);
19+
assert_eq!(Vec::<u8>::from(message.clone()).len(), size_bytes);
20+
message
21+
}
22+
23+
/// Unified implementation for sending stress test messages via any protocol
24+
pub async fn send_stress_test_messages(
25+
mut message_sender: MessageSender,
26+
args: &NodeArgs,
27+
peers: Vec<PeerId>,
28+
) {
29+
let size_bytes = args.user.message_size_bytes;
30+
let heartbeat = Duration::from_millis(args.user.heartbeat_millis);
31+
32+
let mut message_index = 0;
33+
let mut message = get_message(args.runner.id, size_bytes).clone();
34+
35+
let mut interval = tokio::time::interval(heartbeat);
36+
loop {
37+
interval.tick().await;
38+
39+
message.metadata.time = SystemTime::now();
40+
message.metadata.message_index = message_index;
41+
let message_clone = message.clone().into();
42+
message_sender.send_message(&peers, message_clone).await;
43+
message_index += 1;
44+
}
45+
}
1446

1547
pub fn receive_stress_test_message(received_message: Vec<u8>, _sender_peer_id: Option<PeerId>) {
1648
let end_time = SystemTime::now();

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
33
use std::time::Duration;
44

55
use clap::Parser;
6+
use message::METADATA_SIZE;
67
use metrics_exporter_prometheus::PrometheusBuilder;
78
use tokio_metrics::RuntimeMetricsReporterBuilder;
89
use tracing::Level;
@@ -40,6 +41,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4041

4142
println!("Starting network stress test with args:\n{args:?}");
4243

44+
assert!(
45+
args.user.message_size_bytes >= *METADATA_SIZE,
46+
"Message size must be at least {} bytes",
47+
*METADATA_SIZE
48+
);
49+
4350
// Set up metrics
4451
let builder = PrometheusBuilder::new().with_http_listener(SocketAddr::V4(SocketAddrV4::new(
4552
Ipv4Addr::UNSPECIFIED,

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,19 @@ use apollo_network::NetworkConfig;
66
use apollo_network_benchmark::node_args::NodeArgs;
77
use futures::future::{select_all, BoxFuture};
88
use futures::FutureExt;
9-
use libp2p::Multiaddr;
9+
use libp2p::swarm::dial_opts::DialOpts;
10+
use libp2p::{Multiaddr, PeerId};
1011
use tokio::task::JoinHandle;
1112
use tracing::{info, warn};
1213

13-
use crate::handlers::receive_stress_test_message;
14+
use crate::handlers::{receive_stress_test_message, send_stress_test_messages};
1415
use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender};
1516

1617
/// The main stress test node that manages network communication and monitoring
1718
pub struct BroadcastNetworkStressTestNode {
1819
args: NodeArgs,
20+
network_config: NetworkConfig,
1921
network_manager: Option<NetworkManager>,
20-
// TODO(AndrewL): Remove this once they are used
21-
#[allow(dead_code)]
2222
message_sender: Option<MessageSender>,
2323
message_receiver: Option<MessageReceiver>,
2424
}
@@ -58,7 +58,7 @@ impl BroadcastNetworkStressTestNode {
5858
let network_config = Self::create_network_config(&args);
5959

6060
// Create network manager
61-
let mut network_manager = NetworkManager::new(network_config, None, None);
61+
let mut network_manager = NetworkManager::new(network_config.clone(), None, None);
6262

6363
// Register protocol channels
6464
let (message_sender, message_receiver) = register_protocol_channels(
@@ -68,6 +68,7 @@ impl BroadcastNetworkStressTestNode {
6868
);
6969
Self {
7070
args,
71+
network_config,
7172
network_manager: Some(network_manager),
7273
message_sender: Some(message_sender),
7374
message_receiver: Some(message_receiver),
@@ -84,6 +85,30 @@ impl BroadcastNetworkStressTestNode {
8485
.boxed()
8586
}
8687

88+
fn get_peers(&self) -> Vec<PeerId> {
89+
self.network_config
90+
.bootstrap_peer_multiaddr
91+
.as_ref()
92+
.map(|peers| {
93+
peers.iter().map(|m| DialOpts::from(m.clone()).get_peer_id().unwrap()).collect()
94+
})
95+
.unwrap_or_default()
96+
}
97+
98+
/// Starts the message sending task if this node should broadcast
99+
pub async fn start_message_sender(&mut self) -> BoxFuture<'static, ()> {
100+
let message_sender =
101+
self.message_sender.take().expect("message_sender should be available");
102+
103+
let args_clone = self.args.clone();
104+
let peers = self.get_peers();
105+
106+
async move {
107+
send_stress_test_messages(message_sender, &args_clone, peers).await;
108+
}
109+
.boxed()
110+
}
111+
87112
/// Starts the message receiving task
88113
pub async fn make_message_receiver_task(&mut self) -> BoxFuture<'static, ()> {
89114
let message_receiver =
@@ -102,6 +127,7 @@ impl BroadcastNetworkStressTestNode {
102127
let mut tasks = Vec::new();
103128
tasks.push(self.start_network_manager().await);
104129
tasks.push(self.make_message_receiver_task().await);
130+
tasks.push(self.start_message_sender().await);
105131

106132
tasks
107133
}

crates/apollo_network_benchmark/src/node_args.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ pub struct UserArgs {
5353
#[arg(long, env, default_value = "gossipsub")]
5454
pub network_protocol: NetworkProtocol,
5555

56+
/// Size of StressTestMessage
57+
#[arg(long, env, default_value = "1024")]
58+
pub message_size_bytes: usize,
59+
60+
/// The time to sleep between broadcasts of StressTestMessage in milliseconds
61+
#[arg(long, env, default_value = "1000")]
62+
pub heartbeat_millis: u64,
63+
5664
/// The timeout in seconds for the node.
5765
/// When the node runs for longer than this, it will be killed.
5866
#[arg(long, env, default_value = "4000")]

0 commit comments

Comments
 (0)