|
| 1 | +use std::str::FromStr; |
1 | 2 | use std::time::Duration; |
2 | 3 |
|
| 4 | +use apollo_network::network_manager::NetworkManager; |
| 5 | +use apollo_network::NetworkConfig; |
3 | 6 | use apollo_network_benchmark::node_args::NodeArgs; |
4 | 7 | use futures::future::{select_all, BoxFuture}; |
| 8 | +use futures::FutureExt; |
| 9 | +use libp2p::Multiaddr; |
5 | 10 | use tokio::task::JoinHandle; |
6 | 11 | use tracing::{info, warn}; |
7 | 12 |
|
8 | 13 | /// The main stress test node that manages network communication and monitoring |
9 | 14 | pub struct BroadcastNetworkStressTestNode { |
10 | 15 | args: NodeArgs, |
| 16 | + network_manager: Option<NetworkManager>, |
11 | 17 | } |
12 | 18 |
|
13 | 19 | impl BroadcastNetworkStressTestNode { |
| 20 | + /// Creates network configuration from arguments |
| 21 | + fn create_network_config(args: &NodeArgs) -> NetworkConfig { |
| 22 | + let peer_private_key = create_peer_private_key(args.runner.id); |
| 23 | + let peer_private_key_hex = |
| 24 | + peer_private_key.iter().map(|byte| format!("{byte:02x}")).collect::<String>(); |
| 25 | + info!("Secret Key: {peer_private_key_hex:#?}"); |
| 26 | + |
| 27 | + let mut network_config = NetworkConfig { |
| 28 | + port: args.runner.p2p_port, |
| 29 | + secret_key: Some(peer_private_key.to_vec().into()), |
| 30 | + ..Default::default() |
| 31 | + }; |
| 32 | + |
| 33 | + network_config.discovery_config.heartbeat_interval = Duration::from_secs(99999999); |
| 34 | + |
| 35 | + if !args.runner.bootstrap.is_empty() { |
| 36 | + let bootstrap_peers: Vec<Multiaddr> = args |
| 37 | + .runner |
| 38 | + .bootstrap |
| 39 | + .iter() |
| 40 | + .map(|s| Multiaddr::from_str(s.trim()).unwrap()) |
| 41 | + .collect(); |
| 42 | + network_config.bootstrap_peer_multiaddr = Some(bootstrap_peers); |
| 43 | + } |
| 44 | + |
| 45 | + network_config |
| 46 | + } |
| 47 | + |
14 | 48 | /// Creates a new BroadcastNetworkStressTestNode instance |
15 | 49 | pub async fn new(args: NodeArgs) -> Self { |
16 | | - Self { args } |
| 50 | + // Create network configuration |
| 51 | + let network_config = Self::create_network_config(&args); |
| 52 | + // Create network manager |
| 53 | + let network_manager = NetworkManager::new(network_config, None, None); |
| 54 | + Self { args, network_manager: Some(network_manager) } |
| 55 | + } |
| 56 | + |
| 57 | + /// Starts the network manager in the background |
| 58 | + pub async fn start_network_manager(&mut self) -> BoxFuture<'static, ()> { |
| 59 | + let network_manager = |
| 60 | + self.network_manager.take().expect("Network manager should be available"); |
| 61 | + async move { |
| 62 | + let _ = network_manager.run().await; |
| 63 | + } |
| 64 | + .boxed() |
17 | 65 | } |
18 | 66 |
|
19 | 67 | /// Gets all the tasks that need to be run |
20 | 68 | async fn get_tasks(&mut self) -> Vec<BoxFuture<'static, ()>> { |
21 | | - Vec::new() |
| 69 | + let mut tasks = Vec::new(); |
| 70 | + tasks.push(self.start_network_manager().await); |
| 71 | + |
| 72 | + tasks |
22 | 73 | } |
23 | 74 |
|
24 | 75 | /// Unified run function that handles both simple and network reset modes |
@@ -65,3 +116,11 @@ pub async fn race_and_kill_tasks(spawned_tasks: Vec<JoinHandle<()>>) { |
65 | 116 | task.abort(); |
66 | 117 | } |
67 | 118 | } |
| 119 | + |
| 120 | +fn create_peer_private_key(peer_index: u64) -> [u8; 32] { |
| 121 | + let array = peer_index.to_le_bytes(); |
| 122 | + assert_eq!(array.len(), 8); |
| 123 | + let mut private_key = [0u8; 32]; |
| 124 | + private_key[0..8].copy_from_slice(&array); |
| 125 | + private_key |
| 126 | +} |
0 commit comments