Skip to content

Commit f5dc4fa

Browse files
committed
feat(mgmt,dataplane): define MgmtParams
Define struct MgmtParams to pass all of the parameters that the mgmt thread requires. This makes it much easier to pass new stuff, without the need to change several function signatures every time. Signed-off-by: Fredi Raspall <[email protected]>
1 parent 1e2c2da commit f5dc4fa

File tree

8 files changed

+105
-114
lines changed

8 files changed

+105
-114
lines changed

dataplane/src/main.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ use crate::statistics::MetricsServer;
1414
use args::{CmdArgs, Parser};
1515

1616
use drivers::kernel::DriverKernel;
17-
18-
use mgmt::processor::launch::start_mgmt;
17+
use mgmt::{ConfigProcessorParams, MgmtParams, start_mgmt};
1918

2019
use pyroscope::PyroscopeAgent;
2120
use pyroscope_pprofrs::{PprofConfig, pprof_backend};
@@ -127,19 +126,21 @@ fn main() {
127126

128127
MetricsServer::new(args.metrics_address(), setup.stats);
129128

130-
/* pipeline builder */
129+
// pipeline builder
131130
let pipeline_factory = setup.pipeline;
132131

133132
/* start management */
134-
start_mgmt(
133+
start_mgmt(MgmtParams {
135134
grpc_addr,
136-
setup.router.get_ctl_tx(),
137-
setup.nattablew,
138-
setup.natallocatorw,
139-
setup.vpcdtablesw,
140-
setup.vpcmapw,
141-
setup.vpc_stats_store,
142-
)
135+
processor_params: ConfigProcessorParams {
136+
router_ctl: setup.router.get_ctl_tx(),
137+
vpcmapw: setup.vpcmapw,
138+
nattablesw: setup.nattablesw,
139+
natallocatorw: setup.natallocatorw,
140+
vpcdtablesw: setup.vpcdtablesw,
141+
vpc_stats_store: setup.vpc_stats_store,
142+
},
143+
})
143144
.expect("Failed to start gRPC server");
144145

145146
/* start driver with the provided pipeline builder */

dataplane/src/packet_processor/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ where
3636
pub router: Router,
3737
pub pipeline: Arc<dyn Send + Sync + Fn() -> DynPipeline<Buf>>,
3838
pub vpcmapw: VpcMapWriter<VpcMapName>,
39-
pub nattablew: NatTablesWriter,
39+
pub nattablesw: NatTablesWriter,
4040
pub natallocatorw: NatAllocatorWriter,
4141
pub vpcdtablesw: VpcDiscTablesWriter,
4242
pub stats: StatsCollector,
@@ -47,7 +47,7 @@ where
4747
pub(crate) fn start_router<Buf: PacketBufferMut>(
4848
params: RouterParams,
4949
) -> Result<InternalSetup<Buf>, RouterError> {
50-
let nattablew = NatTablesWriter::new();
50+
let nattablesw = NatTablesWriter::new();
5151
let natallocatorw = NatAllocatorWriter::new();
5252
let vpcdtablesw = VpcDiscTablesWriter::new();
5353
let router = Router::new(params)?;
@@ -67,7 +67,7 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
6767
let fibtr_factory = router.get_fibtr_factory();
6868
let vpcdtablesr_factory = vpcdtablesw.get_reader_factory();
6969
let atabler_factory = router.get_atabler_factory();
70-
let nattabler_factory = nattablew.get_reader_factory();
70+
let nattabler_factory = nattablesw.get_reader_factory();
7171
let natallocator_factory = natallocatorw.get_reader_factory();
7272

7373
let pipeline_builder = move || {
@@ -108,7 +108,7 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
108108
router,
109109
pipeline: Arc::new(pipeline_builder),
110110
vpcmapw,
111-
nattablew,
111+
nattablesw,
112112
natallocatorw,
113113
vpcdtablesw,
114114
stats,

mgmt/src/lib.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,13 @@
33

44
//! Dataplane management module
55
6-
/* gRPC entry point */
7-
pub mod grpc;
8-
9-
/* Configuration processor */
10-
pub mod processor;
11-
12-
/* VPC manager */
6+
mod grpc;
7+
mod processor;
8+
mod tests;
139
pub mod vpc_manager;
1410

15-
#[cfg(test)]
16-
mod tests;
11+
pub use processor::launch::{MgmtParams, start_mgmt};
12+
pub use processor::proc::ConfigProcessorParams;
1713

1814
use tracectl::trace_target;
1915
trace_target!("mgmt", LevelFilter::DEBUG, &["management"]);

mgmt/src/processor/gwconfigdb.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tracing::{debug, error, info};
99

1010
/// Configuration database, keeps a set of [`GwConfig`]s keyed by generation id [`GenId`]
1111
#[derive(Default)]
12-
pub struct GwConfigDatabase {
12+
pub(crate) struct GwConfigDatabase {
1313
configs: BTreeMap<GenId, GwConfig>, /* collection of configs */
1414
current: Option<GenId>, /* [`GenId`] of currently applied config */
1515
}
@@ -52,7 +52,7 @@ impl GwConfigDatabase {
5252
pub fn get_mut(&mut self, generation: GenId) -> Option<&mut GwConfig> {
5353
self.configs.get_mut(&generation)
5454
}
55-
55+
#[allow(unused)]
5656
pub fn remove(&mut self, genid: GenId) -> ConfigResult {
5757
if genid == ExternalConfig::BLANK_GENID {
5858
debug!("Will not remove config {genid} as it is protected");

mgmt/src/processor/launch.rs

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use crate::processor::proc::ConfigChannelRequest;
55
use crate::processor::proc::ConfigProcessor;
6-
6+
use args::GrpcAddress;
77
use std::fmt::Display;
88
use std::io::Error;
99
use std::net::SocketAddr;
@@ -16,17 +16,10 @@ use tokio::net::UnixListener;
1616
use tokio::sync::mpsc::Sender;
1717
use tokio_stream::Stream;
1818

19-
use nat::stateful::NatAllocatorWriter;
20-
use nat::stateless::NatTablesWriter;
21-
use pkt_meta::dst_vpcd_lookup::VpcDiscTablesWriter;
22-
use routing::RouterCtlSender;
23-
2419
use crate::grpc::server::create_config_service;
20+
use crate::processor::proc::ConfigProcessorParams;
2521
use tonic::transport::Server;
26-
27-
use stats::VpcMapName;
2822
use tracing::{debug, error, info, warn};
29-
use vpcmap::map::VpcMapWriter;
3023

3124
/// Start the gRPC server on TCP
3225
async fn start_grpc_server_tcp(
@@ -161,20 +154,17 @@ impl Display for ServerAddress {
161154
}
162155
}
163156

157+
pub struct MgmtParams {
158+
pub grpc_addr: GrpcAddress,
159+
pub processor_params: ConfigProcessorParams,
160+
}
161+
164162
/// Start the mgmt service with either type of socket
165-
pub fn start_mgmt(
166-
grpc_addr: args::GrpcAddress,
167-
router_ctl: RouterCtlSender,
168-
nattablew: NatTablesWriter,
169-
natallocatorw: NatAllocatorWriter,
170-
vpcdtablesw: VpcDiscTablesWriter,
171-
vpcmapw: VpcMapWriter<VpcMapName>,
172-
vps_stats_store: std::sync::Arc<stats::VpcStatsStore>,
173-
) -> Result<std::thread::JoinHandle<()>, Error> {
163+
pub fn start_mgmt(params: MgmtParams) -> Result<std::thread::JoinHandle<()>, Error> {
174164
/* build server address from provided grpc address */
175-
let server_address = match grpc_addr {
176-
args::GrpcAddress::Tcp(addr) => ServerAddress::Tcp(addr),
177-
args::GrpcAddress::UnixSocket(path) => ServerAddress::Unix(path.into()),
165+
let server_address = match params.grpc_addr {
166+
GrpcAddress::Tcp(addr) => ServerAddress::Tcp(addr),
167+
GrpcAddress::UnixSocket(path) => ServerAddress::Unix(path.into()),
178168
};
179169
debug!("Will start gRPC listening on {server_address}");
180170

@@ -192,15 +182,8 @@ pub fn start_mgmt(
192182

193183
/* block thread to run gRPC and configuration processor */
194184
rt.block_on(async {
195-
let (processor, tx) = ConfigProcessor::new(
196-
router_ctl,
197-
vpcmapw,
198-
nattablew,
199-
natallocatorw,
200-
vpcdtablesw,
201-
vps_stats_store,
202-
);
203-
tokio::task::spawn(async { processor.run().await });
185+
let (processor, tx) = ConfigProcessor::new(params.processor_params);
186+
tokio::spawn(async { processor.run().await });
204187

205188
// Start the appropriate server based on address type
206189
let result = match server_address {

mgmt/src/processor/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
//! Dataplane configuration processor.
55
//! This module implements the core logic to determine and build internal configurations.
66
7-
pub mod confbuild;
7+
pub(crate) mod confbuild;
88
mod display;
9-
pub mod gwconfigdb;
10-
pub mod launch;
11-
pub mod proc;
9+
pub(crate) mod gwconfigdb;
10+
pub(crate) mod launch;
11+
pub(crate) mod proc;

mgmt/src/processor/proc.rs

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -79,20 +79,6 @@ impl ConfigChannelRequest {
7979
}
8080
}
8181

82-
/// A configuration processor entity. This is the RPC-independent entity responsible for
83-
/// accepting/rejecting configurations, storing them in the configuration database and
84-
/// applying them.
85-
pub(crate) struct ConfigProcessor {
86-
config_db: GwConfigDatabase,
87-
rx: mpsc::Receiver<ConfigChannelRequest>,
88-
router_ctl: RouterCtlSender,
89-
vpc_mgr: VpcManager<RequiredInformationBase>,
90-
vpcmapw: VpcMapWriter<VpcMapName>,
91-
nattablew: NatTablesWriter,
92-
natallocatorw: NatAllocatorWriter,
93-
vnitablesw: VpcDiscTablesWriter,
94-
vpc_stats_store: Arc<VpcStatsStore>,
95-
}
9682
/// Populate FRR status into the dataplane status structure
9783
pub async fn populate_status_with_frr(
9884
status: &mut DataplaneStatus,
@@ -103,25 +89,47 @@ pub async fn populate_status_with_frr(
10389
if let Ok(Some(FrrAppliedConfig { genid, .. })) = router_ctl.get_frr_applied_config().await {
10490
frr = frr.set_applied_config_gen(genid);
10591
}
106-
10792
status.set_frr_status(frr);
10893
}
10994

95+
/// A configuration processor entity. This is the RPC-independent entity responsible for
96+
/// accepting/rejecting configurations, storing them in the configuration database and
97+
/// applying them.
98+
pub(crate) struct ConfigProcessor {
99+
config_db: GwConfigDatabase,
100+
rx: mpsc::Receiver<ConfigChannelRequest>,
101+
vpc_mgr: VpcManager<RequiredInformationBase>,
102+
proc_params: ConfigProcessorParams,
103+
}
104+
105+
pub struct ConfigProcessorParams {
106+
// channel to router
107+
pub router_ctl: RouterCtlSender,
108+
109+
// writer for vpc mapping table
110+
pub vpcmapw: VpcMapWriter<VpcMapName>,
111+
112+
// writer for stateless NAT tables
113+
pub nattablesw: NatTablesWriter,
114+
115+
// writer for stateful NAT allocator
116+
pub natallocatorw: NatAllocatorWriter,
117+
118+
// writer for VPC routing table
119+
pub vpcdtablesw: VpcDiscTablesWriter,
120+
121+
// store for vpc stats
122+
pub vpc_stats_store: Arc<VpcStatsStore>,
123+
}
124+
110125
impl ConfigProcessor {
111126
const CHANNEL_SIZE: usize = 1; // This should not be changed
112127

113128
/////////////////////////////////////////////////////////////////////////////////
114129
/// Create a [`ConfigProcessor`]
115130
/////////////////////////////////////////////////////////////////////////////////
116131
#[must_use]
117-
pub(crate) fn new(
118-
router_ctl: RouterCtlSender,
119-
vpcmapw: VpcMapWriter<VpcMapName>,
120-
nattablew: NatTablesWriter,
121-
natallocatorw: NatAllocatorWriter,
122-
vnitablesw: VpcDiscTablesWriter,
123-
vpc_stats_store: Arc<stats::VpcStatsStore>,
124-
) -> (Self, Sender<ConfigChannelRequest>) {
132+
pub(crate) fn new(proc_params: ConfigProcessorParams) -> (Self, Sender<ConfigChannelRequest>) {
125133
debug!("Creating config processor...");
126134
let (tx, rx) = mpsc::channel(Self::CHANNEL_SIZE);
127135

@@ -133,16 +141,11 @@ impl ConfigProcessor {
133141
let netlink = Arc::new(netlink);
134142
let vpc_mgr = VpcManager::<RequiredInformationBase>::new(netlink);
135143

136-
let processor = Self {
144+
let processor = ConfigProcessor {
137145
config_db: GwConfigDatabase::new(),
138146
rx,
139-
router_ctl,
140147
vpc_mgr,
141-
vpcmapw,
142-
nattablew,
143-
natallocatorw,
144-
vnitablesw,
145-
vpc_stats_store,
148+
proc_params,
146149
};
147150
(processor, tx)
148151
}
@@ -190,15 +193,16 @@ impl ConfigProcessor {
190193
debug!("The current config is {}", current.genid());
191194
}
192195

196+
// FIXME(fredi): pass &mut self.params
193197
apply_gw_config(
194198
&self.vpc_mgr,
195199
&mut config,
196200
current.as_deref(),
197-
&mut self.router_ctl,
198-
&mut self.vpcmapw,
199-
&mut self.nattablew,
200-
&mut self.natallocatorw,
201-
&mut self.vnitablesw,
201+
&mut self.proc_params.router_ctl,
202+
&mut self.proc_params.vpcmapw,
203+
&mut self.proc_params.nattablesw,
204+
&mut self.proc_params.natallocatorw,
205+
&mut self.proc_params.vpcdtablesw,
202206
)
203207
.await?;
204208

@@ -219,15 +223,16 @@ impl ConfigProcessor {
219223
let rollback_cfg = current.unwrap_or(ExternalConfig::BLANK_GENID);
220224
info!("Rolling back to config '{rollback_cfg}'...");
221225
if let Some(prior) = self.config_db.get_mut(rollback_cfg) {
226+
// FIXME(fredi): pass &mut self.params
222227
let _ = apply_gw_config(
223228
&self.vpc_mgr,
224229
prior,
225230
None,
226-
&mut self.router_ctl,
227-
&mut self.vpcmapw,
228-
&mut self.nattablew,
229-
&mut self.natallocatorw,
230-
&mut self.vnitablesw,
231+
&mut self.proc_params.router_ctl,
232+
&mut self.proc_params.vpcmapw,
233+
&mut self.proc_params.nattablesw,
234+
&mut self.proc_params.natallocatorw,
235+
&mut self.proc_params.vpcdtablesw,
231236
)
232237
.await;
233238
}
@@ -262,9 +267,11 @@ impl ConfigProcessor {
262267
async fn handle_get_dataplane_status(&mut self) -> ConfigResponse {
263268
let mut status = DataplaneStatus::new();
264269

265-
let names = self.vpc_stats_store.snapshot_names().await;
266-
let pair_snap = self.vpc_stats_store.snapshot_pairs().await;
267-
let vpc_snap = self.vpc_stats_store.snapshot_vpcs().await;
270+
let stats_store = &self.proc_params.vpc_stats_store;
271+
272+
let names = stats_store.snapshot_names().await;
273+
let pair_snap = stats_store.snapshot_pairs().await;
274+
let vpc_snap = stats_store.snapshot_vpcs().await;
268275

269276
// Helper to check if a flow stats has any traffic
270277
#[inline]
@@ -375,7 +382,7 @@ impl ConfigProcessor {
375382
}
376383

377384
// FRR minimal info
378-
populate_status_with_frr(&mut status, &mut self.router_ctl).await;
385+
populate_status_with_frr(&mut status, &mut self.proc_params.router_ctl).await;
379386

380387
ConfigResponse::GetDataplaneStatus(Box::new(status))
381388
}
@@ -587,6 +594,7 @@ fn apply_device_config(device: &DeviceConfig) -> ConfigResult {
587594

588595
#[allow(clippy::too_many_arguments)]
589596
/// Main function to apply a config
597+
// FIXME(fredi): receive &mut self.params
590598
async fn apply_gw_config(
591599
vpc_mgr: &VpcManager<RequiredInformationBase>,
592600
config: &mut GwConfig,

0 commit comments

Comments
 (0)