Skip to content

Commit e19eb5a

Browse files
apollo_network: added network manager register_broadcast_topic docs (#8970)
* apollo_network: added misconduct score AddAssign docs * apollo_network: added network manager NetworkError docs * apollo_network: added network manager GenericNetworkManager docs * apollo_network: added network manager GenericNetworkManager::run docs * apollo_network: added network manager register_sqmr_protocol_server docs * apollo_network: added network manager register_sqmr_protocol_client docs * apollo_network: added network manager register_broadcast_topic docs
1 parent f18b7fa commit e19eb5a

File tree

1 file changed

+114
-4
lines changed
  • crates/apollo_network/src/network_manager

1 file changed

+114
-4
lines changed

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,120 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
483483
SqmrClientSender::new(Box::new(payload_sender), buffer_size)
484484
}
485485

486-
/// Register a new subscriber for broadcasting and receiving broadcasts for a given topic.
487-
/// Panics if this topic is already subscribed.
488-
// TODO(Shahak): consider splitting into register_broadcast_topic_client and
489-
// register_broadcast_topic_server
486+
/// Registers for broadcasting and receiving messages on a GossipSub topic.
487+
///
488+
/// This method sets up bidirectional communication for a specific topic using the
489+
/// GossipSub protocol. The node can both broadcast messages to the network and
490+
/// receive messages broadcast by other peers.
491+
///
492+
/// # Type Parameters
493+
///
494+
/// * `T` - The message type for this topic (must implement serialization traits)
495+
///
496+
/// # Arguments
497+
///
498+
/// * `topic` - The GossipSub topic to subscribe to
499+
/// * `buffer_size` - Size of the internal buffers for messages
500+
///
501+
/// # Returns
502+
///
503+
/// * `Ok(BroadcastTopicChannels<T>)` - Channels for sending and receiving messages
504+
/// * `Err(SubscriptionError)` - If subscription to the topic fails
505+
///
506+
/// # Panics
507+
///
508+
/// Panics if this topic has already been registered.
509+
///
510+
/// # Examples
511+
///
512+
/// ```rust,no_run
513+
/// use apollo_network::gossipsub_impl::Topic;
514+
/// use apollo_network::network_manager::{BroadcastTopicClientTrait, NetworkManager};
515+
/// use apollo_network::NetworkConfig;
516+
/// use futures::StreamExt;
517+
/// use serde::{Deserialize, Serialize};
518+
///
519+
/// // Example transaction type for demonstration
520+
/// #[derive(Serialize, Deserialize, Clone)]
521+
/// struct Transaction {
522+
/// hash: String,
523+
/// amount: u64,
524+
/// }
525+
///
526+
/// impl TryFrom<Vec<u8>> for Transaction {
527+
/// type Error = Box<dyn std::error::Error + Send + Sync>;
528+
/// fn try_from(bytes: Vec<u8>) -> Result<Self, Self::Error> {
529+
/// Ok(Transaction { hash: String::from_utf8(bytes)?, amount: 100 })
530+
/// }
531+
/// }
532+
/// impl From<Transaction> for Vec<u8> {
533+
/// fn from(tx: Transaction) -> Vec<u8> {
534+
/// tx.hash.into_bytes()
535+
/// }
536+
/// }
537+
///
538+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
539+
/// let mut network_manager = NetworkManager::new(NetworkConfig::default(), None, None);
540+
///
541+
/// // Register for transaction broadcasting
542+
/// let topic = Topic::new("transactions");
543+
/// let mut channels = network_manager.register_broadcast_topic::<Transaction>(
544+
/// topic, 1000, // buffer size
545+
/// )?;
546+
///
547+
/// // Broadcast a transaction
548+
/// let transaction = Transaction { hash: "tx123".to_string(), amount: 100 };
549+
/// channels.broadcast_topic_client.broadcast_message(transaction).await?;
550+
///
551+
/// // Helper functions for the example
552+
/// fn validate_transaction(tx: &Transaction) -> bool {
553+
/// !tx.hash.is_empty()
554+
/// }
555+
/// fn process_transaction(tx: Transaction) {
556+
/// println!("Processing {}", tx.hash);
557+
/// }
558+
///
559+
/// // Receive and process broadcasted transactions
560+
/// while let Some((result, metadata)) = channels.broadcasted_messages_receiver.next().await {
561+
/// match result {
562+
/// Ok(transaction) => {
563+
/// if validate_transaction(&transaction) {
564+
/// // Valid transaction - continue propagation
565+
/// channels.broadcast_topic_client.continue_propagation(&metadata).await?;
566+
/// process_transaction(transaction);
567+
/// } else {
568+
/// // Invalid transaction - report the originator
569+
/// channels.broadcast_topic_client.report_peer(metadata).await?;
570+
/// }
571+
/// }
572+
/// Err(e) => {
573+
/// // Malformed message - report the originator
574+
/// eprintln!("Failed to deserialize transaction: {}", e);
575+
/// channels.broadcast_topic_client.report_peer(metadata).await?;
576+
/// }
577+
/// }
578+
/// }
579+
/// # Ok(())
580+
/// # }
581+
/// ```
582+
///
583+
/// # Topic Subscription
584+
///
585+
/// Once registered, the node joins the GossipSub mesh for the topic and will:
586+
/// - Receive all messages broadcast on this topic by other peers
587+
/// - Participate in message propagation according to GossipSub rules
588+
/// - Maintain mesh connections with other peers interested in this topic
589+
///
590+
/// # Message Validation
591+
///
592+
/// Received messages should be validated before propagation. Use:
593+
/// - [`BroadcastTopicClient::continue_propagation`] for valid messages
594+
/// - [`BroadcastTopicClient::report_peer`] for invalid messages
595+
///
596+
/// # Buffer Management
597+
///
598+
/// The `buffer_size` parameter controls buffering for both outbound and inbound
599+
/// messages. Larger buffers can handle traffic bursts but use more memory.
490600
pub fn register_broadcast_topic<T>(
491601
&mut self,
492602
topic: Topic,

0 commit comments

Comments
 (0)