Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ anyhow = "1.0.79"
arc-swap = "1.6.0"
async-trait = "0.1"
axum = "0.8"
axum-client-ip = "1.3.1"
backon = "1.5.1"
base64 = "0.22.0"
bitflags = "2.6"
Expand Down
2 changes: 2 additions & 0 deletions core/src/blockchain_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use self::client::{
#[cfg(feature = "s3")]
pub use self::providers::S3RpcDataProvider;
pub use self::providers::{IntoRpcDataProvider, StorageRpcDataProvider};
pub use self::rate_limits::{BlockchainRpcRateLimitsConfig, BlockchainRpcTrafficLimits};
#[cfg(feature = "s3")]
pub use self::service::S3ProxyConfig;
pub use self::service::{
Expand All @@ -18,6 +19,7 @@ pub use self::service::{
mod broadcast_listener;
mod client;
mod providers;
mod rate_limits;
mod service;

pub const BAD_REQUEST_ERROR_CODE: u32 = 1;
Expand Down
129 changes: 129 additions & 0 deletions core/src/blockchain_rpc/rate_limits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use std::net::IpAddr;
use std::num::NonZeroU32;

use serde::{Deserialize, Serialize};
use tycho_network::{
OverlayIngressPolicyDecision, PublicOverlayRateLimitPolicy, PublicOverlayRateLimiter,
ServiceRequest, try_handle_prefix,
};
use tycho_util::FastHashSet;
use tycho_util::rate_limit::{RateLimitConfig, RateLimitPolicy, TrafficLimit};

use crate::proto::blockchain::rpc;
use crate::proto::overlay;

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct BlockchainRpcRateLimitsConfig {
pub limiter: RateLimitConfig,
pub whitelist: Vec<IpAddr>,
pub traffic: BlockchainRpcTrafficLimits,
}

impl From<BlockchainRpcRateLimitsConfig> for PublicOverlayRateLimiter {
fn from(config: BlockchainRpcRateLimitsConfig) -> Self {
PublicOverlayRateLimiter::new(config.limiter, BlockchainRpcRateLimitPolicy {
traffic: config.traffic,
whitelist: config.whitelist.into_iter().collect(),
})
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct BlockchainRpcTrafficLimits {
pub light_queries: TrafficLimit,
pub heavy_queries: TrafficLimit,
pub broadcasts: TrafficLimit,
}

impl Default for BlockchainRpcTrafficLimits {
fn default() -> Self {
Self {
light_queries: TrafficLimit::new(
NonZeroU32::new(20).unwrap(),
NonZeroU32::new(20).unwrap(),
),
heavy_queries: TrafficLimit::new(
NonZeroU32::new(10).unwrap(),
NonZeroU32::new(10).unwrap(),
),
broadcasts: TrafficLimit::new(
NonZeroU32::new(20).unwrap(),
NonZeroU32::new(20).unwrap(),
),
}
}
}

impl BlockchainRpcTrafficLimits {
fn policy(
&self,
class: BlockchainRpcTrafficClass,
) -> RateLimitPolicy<BlockchainRpcTrafficClass> {
let limit = match class {
BlockchainRpcTrafficClass::LightQuery => self.light_queries,
BlockchainRpcTrafficClass::HeavyQuery => self.heavy_queries,
BlockchainRpcTrafficClass::Broadcast => self.broadcasts,
};

RateLimitPolicy { class, limit }
}
}

struct BlockchainRpcRateLimitPolicy {
traffic: BlockchainRpcTrafficLimits,
whitelist: FastHashSet<IpAddr>,
}

impl BlockchainRpcRateLimitPolicy {
fn classify(constructor: u32) -> BlockchainRpcTrafficClass {
match constructor {
overlay::Ping::TL_ID
| rpc::GetArchiveInfo::TL_ID
| rpc::GetPersistentShardStateInfo::TL_ID
| rpc::GetPersistentQueueStateInfo::TL_ID
| rpc::GetArchiveChunk::TL_ID
| rpc::GetBlockDataChunk::TL_ID => BlockchainRpcTrafficClass::LightQuery,
_ => BlockchainRpcTrafficClass::HeavyQuery,
}
}
}

impl PublicOverlayRateLimitPolicy for BlockchainRpcRateLimitPolicy {
type Class = BlockchainRpcTrafficClass;

fn classify_query(&self, req: &ServiceRequest) -> OverlayIngressPolicyDecision<Self::Class> {
if self.whitelist.contains(&req.metadata.remote_address.ip()) {
return OverlayIngressPolicyDecision::Bypass;
}

let constructor = match try_handle_prefix(req) {
Ok((constructor, _)) => constructor,
Err(e) => {
tracing::debug!("failed to deserialize query: {e}");
return OverlayIngressPolicyDecision::Drop;
}
};

let class = BlockchainRpcRateLimitPolicy::classify(constructor);
OverlayIngressPolicyDecision::Allow(self.traffic.policy(class))
}

fn classify_message(&self, req: &ServiceRequest) -> OverlayIngressPolicyDecision<Self::Class> {
if self.whitelist.contains(&req.metadata.remote_address.ip()) {
OverlayIngressPolicyDecision::Bypass
} else {
OverlayIngressPolicyDecision::Allow(
self.traffic.policy(BlockchainRpcTrafficClass::Broadcast),
)
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum BlockchainRpcTrafficClass {
LightQuery,
HeavyQuery,
Broadcast,
}
7 changes: 7 additions & 0 deletions core/src/blockchain_rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tycho_util::metrics::HistogramGuard;

use crate::blockchain_rpc::broadcast_listener::{BroadcastListener, NoopBroadcastListener};
use crate::blockchain_rpc::providers::{IntoRpcDataProvider, RpcDataProvider};
use crate::blockchain_rpc::rate_limits::BlockchainRpcRateLimitsConfig;
use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
use crate::proto::blockchain::*;
use crate::proto::overlay;
Expand Down Expand Up @@ -64,6 +65,11 @@ pub struct BlockchainRpcServiceConfig {
/// Default: yes.
pub serve_persistent_states: bool,

/// Rate limits for inbound blockchain-rpc traffic.
///
/// Default: disabled.
pub rate_limits: Option<BlockchainRpcRateLimitsConfig>,

/// S3 proxy configuration.
///
/// Default: enabled.
Expand All @@ -76,6 +82,7 @@ impl Default for BlockchainRpcServiceConfig {
Self {
max_key_blocks_list_len: 8,
serve_persistent_states: true,
rate_limits: None,
#[cfg(feature = "s3")]
s3_proxy: Some(S3ProxyConfig::default()),
}
Expand Down
8 changes: 8 additions & 0 deletions core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,15 @@ impl ConfiguredNetwork {
let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
.named("blockchain_rpc")
.with_peer_resolver(self.peer_resolver.clone())
.with_rate_limiter(
base_config
.blockchain_rpc_service
.rate_limits
.clone()
.map(|rate_limit| rate_limit.into()),
)
.build(blockchain_rpc_service.clone());

self.overlay_service.add_public_overlay(&public_overlay);

let blockchain_rpc_client = BlockchainRpcClient::builder()
Expand Down
11 changes: 6 additions & 5 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ pub use types::{

pub use self::overlay::{
ChooseMultiplePrivateOverlayEntries, ChooseMultiplePublicOverlayEntries, OverlayConfig,
OverlayId, OverlayService, OverlayServiceBackgroundTasks, OverlayServiceBuilder,
PrivateOverlay, PrivateOverlayBuilder, PrivateOverlayEntries, PrivateOverlayEntriesEvent,
PrivateOverlayEntriesReadGuard, PrivateOverlayEntriesWriteGuard, PrivateOverlayEntryData,
PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries, PublicOverlayEntriesReadGuard,
PublicOverlayEntryData, UnknownPeersQueue,
OverlayId, OverlayIngressPolicyDecision, OverlayService, OverlayServiceBackgroundTasks,
OverlayServiceBuilder, PrivateOverlay, PrivateOverlayBuilder, PrivateOverlayEntries,
PrivateOverlayEntriesEvent, PrivateOverlayEntriesReadGuard, PrivateOverlayEntriesWriteGuard,
PrivateOverlayEntryData, PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries,
PublicOverlayEntriesReadGuard, PublicOverlayEntryData, PublicOverlayRateLimitPolicy,
PublicOverlayRateLimiter, UnknownPeersQueue,
};
pub use self::util::{
NetworkExt, Routable, Router, RouterBuilder, UnknownPeerError, check_peer_signature,
Expand Down
4 changes: 4 additions & 0 deletions network/src/overlay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub use self::public_overlay::{
ChooseMultiplePublicOverlayEntries, PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries,
PublicOverlayEntriesReadGuard, PublicOverlayEntryData, UnknownPeersQueue,
};
pub use self::rate_limits::{
OverlayIngressPolicyDecision, PublicOverlayRateLimitPolicy, PublicOverlayRateLimiter,
};
use crate::dht::DhtService;
use crate::network::Network;
use crate::proto::overlay::{PublicEntriesResponse, PublicEntry, PublicEntryResponse, rpc};
Expand All @@ -32,6 +35,7 @@ mod metrics;
mod overlay_id;
mod private_overlay;
mod public_overlay;
mod rate_limits;
mod tasks_stream;

pub struct OverlayServiceBackgroundTasks {
Expand Down
28 changes: 26 additions & 2 deletions network/src/overlay/public_overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::dht::{PeerResolver, PeerResolverHandle};
use crate::network::Network;
use crate::overlay::OverlayId;
use crate::overlay::metrics::Metrics;
use crate::overlay::rate_limits::PublicOverlayRateLimiter;
use crate::proto::overlay::{PublicEntry, PublicEntryToSign, rpc};
use crate::types::{BoxService, PeerId, Request, Response, Service, ServiceExt, ServiceRequest};
use crate::util::NetworkExt;
Expand All @@ -27,6 +28,7 @@ pub struct PublicOverlayBuilder {
entry_ttl: Duration,
banned_peer_ids: FastDashSet<PeerId>,
peer_resolver: Option<PeerResolver>,
rate_limiter: Option<PublicOverlayRateLimiter>,
name: Option<&'static str>,
}

Expand Down Expand Up @@ -68,6 +70,11 @@ impl PublicOverlayBuilder {
self
}

pub fn with_rate_limiter(mut self, rate_limiter: Option<PublicOverlayRateLimiter>) -> Self {
self.rate_limiter = rate_limiter;
self
}

/// Name of the overlay used in metrics.
pub fn named(mut self, name: &'static str) -> Self {
self.name = Some(name);
Expand Down Expand Up @@ -97,6 +104,7 @@ impl PublicOverlayBuilder {
min_capacity: self.min_capacity,
entry_ttl_sec,
peer_resolver: self.peer_resolver,
rate_limiter: self.rate_limiter,
entries: RwLock::new(entries),
entries_added: Notify::new(),
entries_changed: Notify::new(),
Expand Down Expand Up @@ -130,6 +138,7 @@ impl PublicOverlay {
entry_ttl: Duration::from_secs(3600),
banned_peer_ids: Default::default(),
peer_resolver: None,
rate_limiter: None,
name: None,
}
}
Expand Down Expand Up @@ -218,7 +227,7 @@ impl PublicOverlay {

pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop<Option<Response>> {
self.inner.metrics.record_rx(req.body.len());
if self.check_peer_id(&req.metadata.peer_id) {
if self.check_peer_id(&req.metadata.peer_id) && self.allow_query(&req) {
BoxFutureOrNoop::future(self.inner.service.on_query(req))
} else {
BoxFutureOrNoop::Noop
Expand All @@ -227,13 +236,27 @@ impl PublicOverlay {

pub(crate) fn handle_message(&self, req: ServiceRequest) -> BoxFutureOrNoop<()> {
self.inner.metrics.record_rx(req.body.len());
if self.check_peer_id(&req.metadata.peer_id) {
if self.check_peer_id(&req.metadata.peer_id) && self.allow_message(&req) {
BoxFutureOrNoop::future(self.inner.service.on_message(req))
} else {
BoxFutureOrNoop::Noop
}
}

fn allow_query(&self, req: &ServiceRequest) -> bool {
self.inner
.rate_limiter
.as_ref()
.is_none_or(|rate_limiter| rate_limiter.allow_query(req))
}

fn allow_message(&self, req: &ServiceRequest) -> bool {
self.inner
.rate_limiter
.as_ref()
.is_none_or(|rate_limiter| rate_limiter.allow_message(req))
}

fn check_peer_id(&self, peer_id: &PeerId) -> bool {
// TODO: Merge `banned_peer_ids` with `entires`?
if self.inner.banned_peer_ids.contains(peer_id) {
Expand Down Expand Up @@ -416,6 +439,7 @@ struct Inner {
min_capacity: usize,
entry_ttl_sec: u32,
peer_resolver: Option<PeerResolver>,
rate_limiter: Option<PublicOverlayRateLimiter>,
entries: RwLock<PublicOverlayEntries>,
entry_count: AtomicUsize,
entries_added: Notify,
Expand Down
Loading
Loading