Skip to content

Commit ee54a68

Browse files
apollo_network_benchmark: added infrastructure for multiple network protocols
1 parent 54c2611 commit ee54a68

File tree

6 files changed

+143
-3
lines changed

6 files changed

+143
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_network_benchmark/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ futures.workspace = true
1515
lazy_static.workspace = true
1616
libp2p = { workspace = true, features = ["identify"] }
1717
metrics-exporter-prometheus.workspace = true
18+
serde.workspace = true
1819
tokio = { workspace = true, features = ["full", "sync"] }
1920
tokio-metrics = { workspace = true, features = ["metrics-rs-integration", "rt"] }
2021
tracing.workspace = true

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use tracing::Level;
1111
mod message_test;
1212

1313
mod message;
14+
mod protocol;
1415
mod stress_test_node;
1516

1617
use apollo_network_benchmark::node_args::NodeArgs;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// TODO(AndrewL): Remove this once the sender and receiver are used
2+
#![allow(dead_code)]
3+
4+
use apollo_network::network_manager::{
5+
BroadcastTopicChannels,
6+
BroadcastTopicClient,
7+
BroadcastTopicClientTrait,
8+
BroadcastTopicServer,
9+
NetworkManager,
10+
};
11+
use apollo_network_benchmark::node_args::NetworkProtocol;
12+
use futures::StreamExt;
13+
use libp2p::gossipsub::{Sha256Topic, Topic};
14+
use libp2p::PeerId;
15+
16+
// ================================
17+
// Types and Constants
18+
// ================================
19+
20+
lazy_static::lazy_static! {
21+
pub static ref TOPIC: Sha256Topic = Topic::new("stress_test_topic".to_string());
22+
}
23+
24+
pub type TopicType = Vec<u8>;
25+
26+
/// Registers protocol channels on an existing network manager.
27+
/// Returns a sender and receiver for the configured protocol.
28+
pub fn register_protocol_channels(
29+
network_manager: &mut NetworkManager,
30+
buffer_size: usize,
31+
protocol: &NetworkProtocol,
32+
) -> (MessageSender, MessageReceiver) {
33+
match protocol {
34+
NetworkProtocol::Gossipsub => {
35+
let channels = network_manager
36+
.register_broadcast_topic::<TopicType>(TOPIC.clone(), buffer_size)
37+
.expect("Failed to register broadcast topic");
38+
let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } =
39+
channels;
40+
41+
(
42+
MessageSender::Gossipsub(broadcast_topic_client),
43+
MessageReceiver::Gossipsub(broadcasted_messages_receiver),
44+
)
45+
}
46+
}
47+
}
48+
49+
// ================================
50+
// MessageSender
51+
// ================================
52+
53+
/// Message sender abstraction for different protocols
54+
pub enum MessageSender {
55+
Gossipsub(BroadcastTopicClient<TopicType>),
56+
}
57+
58+
impl MessageSender {
59+
pub async fn send_message(&mut self, _peers: &[PeerId], message: TopicType) {
60+
match self {
61+
MessageSender::Gossipsub(client) => {
62+
client.broadcast_message(message).await.unwrap();
63+
}
64+
}
65+
}
66+
}
67+
68+
// ================================
69+
// MessageReceiver
70+
// ================================
71+
72+
pub enum MessageReceiver {
73+
Gossipsub(BroadcastTopicServer<TopicType>),
74+
}
75+
76+
impl MessageReceiver {
77+
pub async fn for_each<F>(self, mut f: F)
78+
where
79+
F: FnMut(TopicType, Option<PeerId>) + Copy,
80+
{
81+
match self {
82+
MessageReceiver::Gossipsub(receiver) => {
83+
receiver
84+
.for_each(|message| async move {
85+
let peer_id = message.1.originator_id.private_get_peer_id();
86+
f(message.0.unwrap(), Some(peer_id));
87+
})
88+
.await
89+
}
90+
}
91+
}
92+
}

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,18 @@ use libp2p::Multiaddr;
1010
use tokio::task::JoinHandle;
1111
use tracing::{info, warn};
1212

13+
use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender};
14+
1315
/// The main stress test node that manages network communication and monitoring
1416
pub struct BroadcastNetworkStressTestNode {
1517
args: NodeArgs,
1618
network_manager: Option<NetworkManager>,
19+
// TODO(AndrewL): Remove this once they are used
20+
#[allow(dead_code)]
21+
message_sender: Option<MessageSender>,
22+
// TODO(AndrewL): Remove this once they are used
23+
#[allow(dead_code)]
24+
message_receiver: Option<MessageReceiver>,
1725
}
1826

1927
impl BroadcastNetworkStressTestNode {
@@ -49,9 +57,22 @@ impl BroadcastNetworkStressTestNode {
4957
pub async fn new(args: NodeArgs) -> Self {
5058
// Create network configuration
5159
let network_config = Self::create_network_config(&args);
60+
5261
// Create network manager
53-
let network_manager = NetworkManager::new(network_config, None, None);
54-
Self { args, network_manager: Some(network_manager) }
62+
let mut network_manager = NetworkManager::new(network_config, None, None);
63+
64+
// Register protocol channels
65+
let (message_sender, message_receiver) = register_protocol_channels(
66+
&mut network_manager,
67+
args.user.buffer_size,
68+
&args.user.network_protocol,
69+
);
70+
Self {
71+
args,
72+
network_manager: Some(network_manager),
73+
message_sender: Some(message_sender),
74+
message_receiver: Some(message_receiver),
75+
}
5576
}
5677

5778
/// Starts the network manager in the background

crates/apollo_network_benchmark/src/node_args.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,20 @@
1-
use clap::Parser;
1+
use std::fmt::Display;
2+
3+
use clap::{Parser, ValueEnum};
4+
use serde::{Deserialize, Serialize};
5+
6+
#[derive(Debug, Clone, ValueEnum, PartialEq, Eq, Serialize, Deserialize)]
7+
pub enum NetworkProtocol {
8+
/// Use gossipsub for broadcasting (default)
9+
#[value(name = "gossipsub")]
10+
Gossipsub,
11+
}
12+
13+
impl Display for NetworkProtocol {
14+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15+
write!(f, "{}", self.to_possible_value().unwrap().get_name())
16+
}
17+
}
218

319
#[derive(Parser, Debug, Clone)]
420
#[command(version, about, long_about = None)]
@@ -29,6 +45,14 @@ pub struct UserArgs {
2945
#[arg(short, long, env, default_value = "2")]
3046
pub verbosity: u8,
3147

48+
/// Buffer size for the broadcast topic
49+
#[arg(long, env, default_value = "100000")]
50+
pub buffer_size: usize,
51+
52+
/// The network protocol to use for communication (default: gossipsub)
53+
#[arg(long, env, default_value = "gossipsub")]
54+
pub network_protocol: NetworkProtocol,
55+
3256
/// The timeout in seconds for the node.
3357
/// When the node runs for longer than this, it will be killed.
3458
#[arg(long, env, default_value = "4000")]

0 commit comments

Comments
 (0)