Skip to content

Commit a65b9ee

Browse files
apollo_network_benchmark: added running tasks infrastructure
1 parent cf44021 commit a65b9ee

File tree

5 files changed

+79
-1
lines changed

5 files changed

+79
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_network_benchmark/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ testing = []
1010

1111
[dependencies]
1212
clap = { workspace = true, features = ["derive", "env"] }
13+
futures.workspace = true
1314
lazy_static.workspace = true
1415
metrics-exporter-prometheus.workspace = true
1516
tokio = { workspace = true, features = ["full", "sync"] }

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ use tracing::Level;
1111
mod message_test;
1212

1313
mod message;
14+
mod stress_test_node;
1415

1516
use apollo_network_benchmark::node_args::NodeArgs;
17+
use stress_test_node::BroadcastNetworkStressTestNode;
1618

1719
#[tokio::main]
1820
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -48,5 +50,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4850
.describe_and_run(),
4951
);
5052

51-
Ok(())
53+
// Create and run the stress test node
54+
let stress_test_node = BroadcastNetworkStressTestNode::new(args).await;
55+
stress_test_node.run().await
5256
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use std::time::Duration;
2+
3+
use apollo_network_benchmark::node_args::NodeArgs;
4+
use futures::future::{select_all, BoxFuture};
5+
use tokio::task::JoinHandle;
6+
use tracing::{info, warn};
7+
8+
/// The main stress test node that manages network communication and monitoring
9+
pub struct BroadcastNetworkStressTestNode {
10+
args: NodeArgs,
11+
}
12+
13+
impl BroadcastNetworkStressTestNode {
14+
/// Creates a new BroadcastNetworkStressTestNode instance
15+
pub async fn new(args: NodeArgs) -> Self {
16+
Self { args }
17+
}
18+
19+
/// Gets all the tasks that need to be run
20+
async fn get_tasks(&mut self) -> Vec<BoxFuture<'static, ()>> {
21+
Vec::new()
22+
}
23+
24+
/// Unified run function that handles both simple and network reset modes
25+
pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error>> {
26+
let test_timeout = Duration::from_secs(self.args.user.timeout);
27+
let start_time = tokio::time::Instant::now();
28+
// Main loop - restart if network reset is enabled, otherwise run once
29+
30+
info!("Starting/restarting all tasks");
31+
32+
// Start all common tasks
33+
let tasks = self.get_tasks().await;
34+
35+
// Wait for either timeout or any task completion
36+
let remaining_time = test_timeout.saturating_sub(start_time.elapsed());
37+
let spawned_tasks: Vec<_> = tasks.into_iter().map(|task| tokio::spawn(task)).collect();
38+
let task_completed =
39+
tokio::time::timeout(remaining_time, race_and_kill_tasks(spawned_tasks)).await.is_ok();
40+
41+
if !task_completed {
42+
info!("Test timeout reached");
43+
return Err("Test timeout".into());
44+
}
45+
46+
Err("Tasks should never end".into())
47+
}
48+
}
49+
50+
pub async fn race_and_kill_tasks(spawned_tasks: Vec<JoinHandle<()>>) {
51+
if spawned_tasks.is_empty() {
52+
return;
53+
}
54+
55+
// Wait for any task to complete
56+
let (result, _index, remaining_tasks) = select_all(spawned_tasks).await;
57+
58+
// Log the result of the completed task
59+
if let Err(e) = result {
60+
warn!("Task completed with error: {:?}", e);
61+
}
62+
63+
// Abort all remaining tasks
64+
for task in remaining_tasks {
65+
task.abort();
66+
}
67+
}

crates/apollo_network_benchmark/src/node_args.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ pub struct UserArgs {
1616
/// Set the verbosity level of the logger, the higher the more verbose
1717
#[arg(short, long, env, default_value = "2")]
1818
pub verbosity: u8,
19+
20+
/// The timeout in seconds for the node.
21+
/// When the node runs for longer than this, it will be killed.
22+
#[arg(long, env, default_value = "4000")]
23+
pub timeout: u64,
1924
}
2025

2126
#[derive(Parser, Debug, Clone)]

0 commit comments

Comments
 (0)