Skip to content

Commit 3f5a553

Browse files
committed
finito
1 parent d301efb commit 3f5a553

File tree

3 files changed

+401
-320
lines changed

3 files changed

+401
-320
lines changed
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
use crate::cache::AllocationStrategy;
18+
use iggy_common::TcpSender;
19+
use rand::{SeedableRng, rngs::StdRng, seq::SliceRandom};
20+
use std::{
21+
cell::RefCell,
22+
collections::{HashMap, HashSet},
23+
rc::Rc,
24+
};
25+
26+
const MAX_CONNECTIONS_PER_REPLICA: usize = 8;
27+
28+
// TODO: Move to some common trait location.
29+
pub trait ShardedState {
30+
type Entry;
31+
type Delta;
32+
33+
fn apply(&mut self, delta: Self::Delta);
34+
}
35+
36+
/// Least-loaded allocation strategy for connections
37+
pub struct LeastLoadedStrategy {
38+
total_shards: usize,
39+
connections_per_shard: RefCell<Vec<(u16, usize)>>,
40+
replica_to_shards: RefCell<HashMap<u8, HashSet<u16>>>,
41+
rng_seed: u64,
42+
}
43+
44+
impl LeastLoadedStrategy {
45+
pub fn new(total_shards: usize, seed: u64) -> Self {
46+
Self {
47+
total_shards,
48+
connections_per_shard: RefCell::new((0..total_shards).map(|s| (s as u16, 0)).collect()),
49+
replica_to_shards: RefCell::new(HashMap::new()),
50+
rng_seed: seed,
51+
}
52+
}
53+
54+
fn create_shard_mappings(
55+
&self,
56+
mappings: &mut Vec<ShardAssignment>,
57+
replica: u8,
58+
mut conn_shards: Vec<u16>,
59+
) {
60+
for shard in &conn_shards {
61+
mappings.push(ShardAssignment {
62+
replica,
63+
shard: *shard,
64+
conn_shard: *shard,
65+
});
66+
}
67+
68+
let mut rng = StdRng::seed_from_u64(self.rng_seed);
69+
conn_shards.shuffle(&mut rng);
70+
71+
let mut j = 0;
72+
for shard in 0..self.total_shards {
73+
let shard = shard as u16;
74+
if conn_shards.contains(&shard) {
75+
continue;
76+
}
77+
let conn_idx = j % conn_shards.len();
78+
mappings.push(ShardAssignment {
79+
replica,
80+
shard,
81+
conn_shard: conn_shards[conn_idx],
82+
});
83+
j += 1;
84+
}
85+
}
86+
}
87+
88+
/// Identifies a connection on a specific shard
89+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90+
pub struct ConnectionAssignment {
91+
replica: u8,
92+
shard: u16,
93+
}
94+
95+
/// Maps a source shard to the shard that owns the connection
96+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97+
pub struct ShardAssignment {
98+
replica: u8,
99+
shard: u16,
100+
conn_shard: u16,
101+
}
102+
103+
/// Changeset for connection-based allocation
104+
#[derive(Debug, Clone)]
105+
pub enum ConnectionChanges {
106+
Allocate {
107+
connections: Vec<ConnectionAssignment>,
108+
mappings: Vec<ShardAssignment>,
109+
},
110+
Deallocate {
111+
connections: Vec<ConnectionAssignment>,
112+
mappings: Vec<ConnectionAssignment>,
113+
},
114+
}
115+
116+
type Delta = <ConnectionCache as ShardedState>::Delta;
117+
impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy {
118+
fn allocate(&self, replica: u8) -> Option<Delta> {
119+
if self.replica_to_shards.borrow().contains_key(&replica) {
120+
return None;
121+
}
122+
123+
let mut connections = Vec::new();
124+
let mut mappings = Vec::new();
125+
let connections_needed = self.total_shards.min(MAX_CONNECTIONS_PER_REPLICA);
126+
127+
let mut rng = StdRng::seed_from_u64(self.rng_seed);
128+
self.connections_per_shard.borrow_mut().shuffle(&mut rng);
129+
self.connections_per_shard
130+
.borrow_mut()
131+
.sort_by_key(|(_, count)| *count);
132+
133+
let mut assigned_shards = HashSet::with_capacity(connections_needed);
134+
135+
for i in 0..connections_needed {
136+
let mut connections_per_shard = self.connections_per_shard.borrow_mut();
137+
let (shard, count) = connections_per_shard.get_mut(i).unwrap();
138+
connections.push(ConnectionAssignment {
139+
replica,
140+
shard: *shard,
141+
});
142+
*count += 1;
143+
assigned_shards.insert(*shard);
144+
}
145+
146+
self.replica_to_shards
147+
.borrow_mut()
148+
.insert(replica, assigned_shards.clone());
149+
150+
self.create_shard_mappings(
151+
&mut mappings,
152+
replica,
153+
assigned_shards.into_iter().collect(),
154+
);
155+
156+
Some(Delta::Allocate {
157+
connections,
158+
mappings,
159+
})
160+
}
161+
162+
fn deallocate(&self, replica: u8) -> Option<Delta> {
163+
let conn_shards = self.replica_to_shards.borrow_mut().remove(&replica)?;
164+
165+
let mut connections = Vec::new();
166+
let mut mappings = Vec::new();
167+
168+
for shard in &conn_shards {
169+
if let Some((_, count)) = self
170+
.connections_per_shard
171+
.borrow_mut()
172+
.iter_mut()
173+
.find(|(s, _)| s == shard)
174+
{
175+
*count = count.saturating_sub(1);
176+
}
177+
connections.push(ConnectionAssignment {
178+
replica,
179+
shard: *shard,
180+
});
181+
}
182+
183+
for shard in 0..self.total_shards {
184+
let shard = shard as u16;
185+
mappings.push(ConnectionAssignment { replica, shard });
186+
}
187+
188+
Some(Delta::Deallocate {
189+
connections,
190+
mappings,
191+
})
192+
}
193+
}
194+
195+
/// Coordinator that wraps a strategy for a specific sharded state type
196+
pub struct Coordinator<A, SS>
197+
where
198+
SS: ShardedState,
199+
A: AllocationStrategy<SS>,
200+
{
201+
strategy: A,
202+
_ss: std::marker::PhantomData<SS>,
203+
}
204+
205+
impl<A, SS> Coordinator<A, SS>
206+
where
207+
SS: ShardedState,
208+
A: AllocationStrategy<SS>,
209+
{
210+
pub fn new(strategy: A) -> Self {
211+
Self {
212+
strategy,
213+
_ss: std::marker::PhantomData,
214+
}
215+
}
216+
217+
pub fn allocate(&self, entry: SS::Entry) -> Option<SS::Delta> {
218+
self.strategy.allocate(entry)
219+
}
220+
221+
pub fn deallocate(&self, entry: SS::Entry) -> Option<SS::Delta> {
222+
self.strategy.deallocate(entry)
223+
}
224+
}
225+
226+
pub struct ShardedConnections<A, SS>
227+
where
228+
SS: ShardedState,
229+
A: AllocationStrategy<SS>,
230+
{
231+
pub coordinator: Coordinator<A, SS>,
232+
pub state: SS,
233+
}
234+
235+
impl<A, SS> ShardedConnections<A, SS>
236+
where
237+
SS: ShardedState,
238+
A: AllocationStrategy<SS>,
239+
{
240+
pub fn allocate(&mut self, entry: SS::Entry) -> bool {
241+
if let Some(delta) = self.coordinator.allocate(entry) {
242+
// TODO: broadcast to other shards.
243+
self.state.apply(delta);
244+
true
245+
} else {
246+
false
247+
}
248+
}
249+
250+
pub fn deallocate(&mut self, entry: SS::Entry) -> bool {
251+
if let Some(delta) = self.coordinator.deallocate(entry) {
252+
// TODO: broadcast to other shards.
253+
self.state.apply(delta);
254+
true
255+
} else {
256+
false
257+
}
258+
}
259+
}
260+
261+
/// Cache for connection state per shard
262+
#[derive(Default)]
263+
pub struct ConnectionCache {
264+
pub shard_id: u16,
265+
pub connections: HashMap<u8, Option<Rc<TcpSender>>>,
266+
pub connection_map: HashMap<u8, u16>,
267+
}
268+
269+
impl ConnectionCache {
270+
pub fn get_connection(&self, replica: u8) -> Option<Rc<TcpSender>> {
271+
self.connections.get(&replica).and_then(|opt| opt.clone())
272+
}
273+
274+
pub fn get_mapped_shard(&self, replica: u8) -> Option<u16> {
275+
self.connection_map.get(&replica).copied()
276+
}
277+
}
278+
279+
impl ShardedState for ConnectionCache {
280+
type Entry = u8; // replica id
281+
type Delta = ConnectionChanges;
282+
283+
fn apply(&mut self, delta: Self::Delta) {
284+
let shard_id = self.shard_id;
285+
match delta {
286+
ConnectionChanges::Allocate {
287+
connections,
288+
mappings,
289+
} => {
290+
for conn in connections.iter().filter(|c| c.shard == shard_id) {
291+
self.connections.insert(conn.replica, None);
292+
}
293+
for mapping in &mappings {
294+
self.connection_map
295+
.insert(mapping.replica, mapping.conn_shard);
296+
}
297+
}
298+
ConnectionChanges::Deallocate {
299+
connections,
300+
mappings,
301+
} => {
302+
for conn in connections.iter().filter(|c| c.shard == shard_id) {
303+
self.connections.remove(&conn.replica);
304+
}
305+
for mapping in &mappings {
306+
self.connection_map.remove(&mapping.replica);
307+
}
308+
}
309+
}
310+
}
311+
}

core/message_bus/src/cache/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use crate::cache::connection::ShardedState;
2+
3+
// TODO: Move to some common trait location.
4+
/// Allocation strategy that produces deltas for a specific sharded state type
5+
pub trait AllocationStrategy<SS>
6+
where
7+
SS: ShardedState,
8+
{
9+
fn allocate(&self, entry: SS::Entry) -> Option<SS::Delta>;
10+
fn deallocate(&self, entry: SS::Entry) -> Option<SS::Delta>;
11+
}
12+
13+
pub(crate) mod connection;

0 commit comments

Comments
 (0)