Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ statistical = "1.0.0"
strum = "0.25.0"
strum_macros = "0.25.2"
syn = "2.0.39"
sysinfo = "0.37.2"
tempfile = "3.7.0"
test-case = "3.2.1"
test-log = "0.2.14"
Expand Down
1 change: 1 addition & 0 deletions crates/apollo_network_benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ lazy_static.workspace = true
libp2p = { workspace = true, features = ["identify"] }
metrics-exporter-prometheus.workspace = true
serde.workspace = true
sysinfo.workspace = true
tokio = { workspace = true, features = ["full", "sync"] }
tokio-metrics = { workspace = true, features = ["metrics-rs-integration", "rt"] }
tracing.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ mod message;
pub mod metrics;
mod protocol;
mod stress_test_node;
mod system_metrics;

use apollo_network_benchmark::node_args::NodeArgs;
use stress_test_node::BroadcastNetworkStressTestNode;
use system_metrics::monitor_process_metrics;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -53,6 +55,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.describe_and_run(),
);

// Start the process metrics monitoring task
tokio::spawn(async {
monitor_process_metrics(1).await;
});

// Create and run the stress test node
let stress_test_node = BroadcastNetworkStressTestNode::new(args).await;
stress_test_node.run().await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,16 @@ define_metrics!(
MetricCounter { RECEIVE_MESSAGE_BYTES_SUM, "receive_message_bytes_sum", "Sum of the stress test messages received via broadcast", init = 0 },
MetricHistogram { RECEIVE_MESSAGE_DELAY_SECONDS, "receive_message_delay_seconds", "Message delay in seconds" },
MetricHistogram { RECEIVE_MESSAGE_NEGATIVE_DELAY_SECONDS, "receive_message_negative_delay_seconds", "Negative message delay in seconds" },

// system metrics for the node
MetricGauge { SYSTEM_TOTAL_MEMORY_BYTES, "system_total_memory_bytes", "Total system memory in bytes" },
MetricGauge { SYSTEM_AVAILABLE_MEMORY_BYTES, "system_available_memory_bytes", "Available system memory in bytes" },
MetricGauge { SYSTEM_USED_MEMORY_BYTES, "system_used_memory_bytes", "Used system memory in bytes" },
MetricGauge { SYSTEM_CPU_COUNT, "system_cpu_count", "Number of logical CPU cores in the system" },

// system metrics for the process
MetricGauge { SYSTEM_PROCESS_CPU_USAGE_PERCENT, "system_process_cpu_usage_percent", "CPU usage percentage of the current process" },
MetricGauge { SYSTEM_PROCESS_MEMORY_USAGE_BYTES, "system_process_memory_usage_bytes", "Memory usage in bytes of the current process" },
MetricGauge { SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES, "system_process_virtual_memory_usage_bytes", "Virtual memory usage in bytes of the current process" },
},
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::time::Duration;

use apollo_metrics::metrics::LossyIntoF64;
use sysinfo::{Pid, System};
use tokio::time::interval;
use tracing::warn;

use crate::metrics::{
SYSTEM_AVAILABLE_MEMORY_BYTES,
SYSTEM_CPU_COUNT,
SYSTEM_PROCESS_CPU_USAGE_PERCENT,
SYSTEM_PROCESS_MEMORY_USAGE_BYTES,
SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES,
SYSTEM_TOTAL_MEMORY_BYTES,
SYSTEM_USED_MEMORY_BYTES,
};

/// Collects system-wide and process-specific metrics (CPU, memory)
fn collect_system_and_process_metrics(system: &mut System, current_pid: Pid) {
system.refresh_all();
let total_memory: f64 = system.total_memory().into_f64();
let available_memory: f64 = system.available_memory().into_f64();
let used_memory: f64 = system.used_memory().into_f64();
let cpu_count: f64 = system.cpus().len().into_f64();

SYSTEM_TOTAL_MEMORY_BYTES.set(total_memory);
SYSTEM_AVAILABLE_MEMORY_BYTES.set(available_memory);
SYSTEM_USED_MEMORY_BYTES.set(used_memory);
SYSTEM_CPU_COUNT.set(cpu_count);

if let Some(process) = system.process(current_pid) {
let cpu_usage: f64 = process.cpu_usage().into();
let memory_usage: f64 = process.memory().into_f64();
let virtual_memory_usage: f64 = process.virtual_memory().into_f64();

SYSTEM_PROCESS_CPU_USAGE_PERCENT.set(cpu_usage);
SYSTEM_PROCESS_MEMORY_USAGE_BYTES.set(memory_usage);
SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES.set(virtual_memory_usage);
} else {
warn!("Could not find process information for PID: {}", current_pid);
}
}

pub async fn monitor_process_metrics(interval_seconds: u64) {
let mut interval = interval(Duration::from_secs(interval_seconds));
let current_pid = sysinfo::get_current_pid().expect("Failed to get current process PID");

struct State {
system: System,
}

let mut state = Some(State { system: System::new_all() });

loop {
interval.tick().await;

let mut passed_state = state.take().unwrap();
// the metrics update need to be done in a blocking context to avoid slowing down tokio
// threads
state = tokio::task::spawn_blocking(move || {
collect_system_and_process_metrics(&mut passed_state.system, current_pid);
Some(passed_state)
})
.await
.unwrap();
}
}
Loading