diff --git a/docs/designs/RFE413-priority-scheduling/FS.md b/docs/designs/RFE413-priority-scheduling/FS.md index fbf4927c..bc2eedf0 100644 --- a/docs/designs/RFE413-priority-scheduling/FS.md +++ b/docs/designs/RFE413-priority-scheduling/FS.md @@ -220,11 +220,13 @@ session = flame.open_session( │ │ │ (NEW) │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ setup(): │ │ setup(): │ │ setup(): │ │ │ -│ │ │ compute │ │ compute │ │ track batch │ │ │ -│ │ │ max_needy_prio │ │ deserved/ │ │ state │ │ │ -│ │ │ among sessions │ │ allocated │ │ │ │ │ -│ │ │ with pending │ │ │ │ │ │ │ -│ │ │ tasks │ │ │ │ │ │ │ +│ │ │ read total_slots│ │ compute │ │ track batch │ │ │ +│ │ │ distribute by │ │ deserved/ │ │ state │ │ │ +│ │ │ (priority desc, │ │ allocated │ │ │ │ │ +│ │ │ creation asc) │ │ (unchanged) │ │ │ │ │ +│ │ │ → ssn_desired │ │ │ │ │ │ │ +│ │ │ init │ │ │ │ │ │ │ +│ │ │ ssn_allocated │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ssn_order_fn(): │ │ ssn_order_fn(): │ │ (no opinion) │ │ │ │ │ │ sort descending │ │ sort by alloc/ │ │ │ │ │ @@ -234,7 +236,10 @@ session = flame.open_session( │ │ │ is_underused(): │ │ is_underused(): │ │ is_underused(): │ │ │ │ │ │ Some(false) if │ │ check alloc │ │ check batch │ │ │ │ │ │ lower priority │ │ vs deserved │ │ capacity │ │ │ -│ │ │ than max_needy │ │ │ │ │ │ │ +│ │ │ than max_needy; │ │ │ │ │ │ │ +│ │ │ else Some(true) │ │ │ │ │ │ │ +│ │ │ while alloc < │ │ │ │ │ │ │ +│ │ │ ssn_desired │ │ │ │ │ │ │ │ │ └──────────────────┘ └───────────────────┘ └──────────────────┘ │ │ │ │ consulted first consulted second consulted third │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ @@ -363,15 +368,29 @@ fn session_spec_to_attributes(id: &str, spec: &SessionSpec) -> SessionAttributes **Location:** `session_manager/src/scheduler/plugins/priority.rs` -The `PriorityPlugin` implements the `Plugin` trait. It is stateful across a scheduling cycle: `setup()` computes the global `max_needy_priority`, which is then used by `ssn_order_fn` and `is_underused` during that cycle. +The `PriorityPlugin` owns priority-aware resource distribution. Each scheduling cycle, `setup()`: + +1. Retrieves the cluster's total slot count (`total_slots`). +2. Distributes `total_slots` across open sessions in descending order of priority. Within a priority tier, sessions are ordered by creation time ascending — earlier sessions take precedence. Each session's share is recorded as `ssn_desired[id]`. +3. Initializes `ssn_allocated[id]` from currently-bound executors. The update process for `ssn_allocated` after `setup()` is unchanged from the existing implementation: per-session executor counts are adjusted by the existing `on_executor_*` / `on_session_*` callbacks the plugin already provides. + +`max_needy_priority` is computed in the same pass — the highest priority among sessions that still have pending tasks — and used by `is_underused` to block lower-priority sessions even when slack appears in their tier. ```rust pub struct PriorityPlugin { - /// Maximum priority among sessions that are "needy" (have pending tasks). - /// Computed in setup(); used during ssn_order_fn and is_underused. + /// Maximum priority among open sessions with pending tasks. + /// Computed in `setup()`; used in `is_underused`. max_needy_priority: u32, /// Priority for each open session, keyed by session ID. + /// Populated in `setup()` for fast lookup during `ssn_order_fn` / `is_underused`. ssn_priority: HashMap, + /// Per-session priority-distributed share. Populated in `setup()` from the + /// total-slots distribution loop. Read-only thereafter for the cycle. + ssn_desired: HashMap, + /// Slots currently allocated to each session. + /// Initialised in `setup()` from existing bound executors; updated thereafter + /// by the executor / session lifecycle callbacks (unchanged). + ssn_allocated: HashMap, } impl PriorityPlugin { @@ -379,6 +398,8 @@ impl PriorityPlugin { Self { max_needy_priority: 0, ssn_priority: HashMap::new(), + ssn_desired: HashMap::new(), + ssn_allocated: HashMap::new(), } } } @@ -386,25 +407,57 @@ impl PriorityPlugin { impl Plugin for PriorityPlugin { fn setup(&mut self, ss: &SnapShot) -> Result<(), FlameError> { self.ssn_priority.clear(); + self.ssn_desired.clear(); + self.ssn_allocated.clear(); self.max_needy_priority = 0; - for ssn in ss.sessions.values() { - if ssn.state != SessionState::Open { - continue; - } - - let priority = ssn.priority; - self.ssn_priority.insert(ssn.id.clone(), priority); - - // A session is "needy" if it has pending tasks. - // A pending task implies the session can use more executors. + // ── Step 1: total_slots ────────────────────────────────────────────── + // Cluster's physical scheduling capacity, taken from the same snapshot + // every plugin sees this cycle. + let total_slots: f64 = ss.find_nodes(ALL_NODE)? + .values() + .map(|n| n.allocatable.to_slots(&ss.unit) as f64) + .sum(); + + // ── Step 2: distribute total_slots by (priority desc, creation_time asc) ─ + // Earlier-created sessions take precedence within a priority tier. + let mut sessions: Vec<&SessionInfoPtr> = ss + .find_sessions(OPEN_SESSION)? + .values() + .collect(); + sessions.sort_by(|a, b| { + b.priority + .cmp(&a.priority) // priority descending + .then(a.creation_time.cmp(&b.creation_time)) // earlier first + }); + + let mut remaining = total_slots; + for ssn in &sessions { + self.ssn_priority.insert(ssn.id.clone(), ssn.priority); + + let demand = compute_demand(ssn); // task-driven ceiling + let granted = demand.min(remaining.max(0.0)); + + self.ssn_desired.insert(ssn.id.clone(), granted); + self.ssn_allocated.insert(ssn.id.clone(), 0.0); + remaining -= granted; + + // max_needy_priority pass (same loop, unchanged criterion) let pending = ssn.tasks_status .get(&TaskState::Pending) .copied() .unwrap_or(0); + if pending > 0 && ssn.priority > self.max_needy_priority { + self.max_needy_priority = ssn.priority; + } + } - if pending > 0 && priority > self.max_needy_priority { - self.max_needy_priority = priority; + // ── Step 3: ssn_allocated initial counts (existing logic, unchanged) ── + for exe in ss.find_executors(ALL_EXECUTOR)?.values() { + if let Some(ssn_id) = &exe.ssn_id { + if let Some(slot) = self.ssn_allocated.get_mut(ssn_id) { + *slot += exe.slots as f64; + } } } @@ -416,34 +469,94 @@ impl Plugin for PriorityPlugin { let p2 = self.ssn_priority.get(&s2.id).copied().unwrap_or(0); if p1 != p2 { - // Higher priority sessions come first (descending order). - // p2.cmp(&p1) reverses the natural ascending order. + // Higher priority comes first (descending order). Some(p2.cmp(&p1)) } else { - // Equal priority: no opinion; defer to FairShare for ratio-based tiebreaking. + // Equal priority: defer to creation time at AllocateAction level via + // the same comparator (earlier session first); for ssn_order_fn we + // return None and let FairShare break the tie within the priority tier. None } } fn is_underused(&self, ssn: &SessionInfoPtr) -> Option { - let ssn = lock_ptr!(ssn).ok()?; - let priority = self.ssn_priority.get(&ssn.id).copied().unwrap_or(0); + let priority = self.ssn_priority.get(&ssn.id).copied()?; + // Lower than the highest needy priority → hard-blocked. if priority < self.max_needy_priority { - // A higher-priority session is still needy. - // Block this session from receiving new resources. - Some(false) + return Some(false); + } + + // Eligible tier: still underused while allocated < desired. + let desired = self.ssn_desired.get(&ssn.id).copied().unwrap_or(0.0); + let allocated = self.ssn_allocated.get(&ssn.id).copied().unwrap_or(0.0); + if desired > 0.0 && allocated < desired { + Some(true) // overrides FairShare's deserved-based veto } else { - // This session is at or above the highest needy priority. - // Defer to FairShare and GangPlugin to determine underuse. - None + None // demand met (or no demand) — defer to FairShare/Gang + } + } + + // ── ssn_allocated update: existing process, UNCHANGED ──────────────────── + // The four executor callbacks and two session callbacks below mirror the + // existing implementation. They are listed here only to make the contract + // explicit; their bodies are unchanged. + + fn on_executor_allocate(&mut self, _node: NodeInfoPtr, ssn: SessionInfoPtr) { + if let Some(slot) = self.ssn_allocated.get_mut(&ssn.id) { + *slot += ssn.slots as f64; + } + } + fn on_executor_unallocate(&mut self, _node: NodeInfoPtr, ssn: SessionInfoPtr) { + if let Some(slot) = self.ssn_allocated.get_mut(&ssn.id) { + *slot -= ssn.slots as f64; + } + } + fn on_executor_pipeline(&mut self, _exec: ExecutorInfoPtr, ssn: SessionInfoPtr) { + if let Some(slot) = self.ssn_allocated.get_mut(&ssn.id) { + *slot += ssn.slots as f64; + } + } + fn on_executor_discard(&mut self, _exec: ExecutorInfoPtr, ssn: SessionInfoPtr) { + if let Some(slot) = self.ssn_allocated.get_mut(&ssn.id) { + *slot -= ssn.slots as f64; + } + } + fn on_session_bind(&mut self, ssn: SessionInfoPtr) { + if let Some(slot) = self.ssn_allocated.get_mut(&ssn.id) { + *slot += ssn.slots as f64; + } + } + fn on_session_unbind(&mut self, ssn: SessionInfoPtr) { + if let Some(slot) = self.ssn_allocated.get_mut(&ssn.id) { + *slot -= ssn.slots as f64; } } - // All other Plugin methods return their default (None / no-op). - // PriorityPlugin does not influence node ordering, preemptibility, - // executor availability, allocatability, reclaimability, gang readiness, - // or event callbacks. + // All other Plugin methods (node ordering, preemptibility, availability, + // allocatability, reclaimability, gang readiness) return defaults. +} +``` + +**`compute_demand(ssn)`** — the per-session demand ceiling used in the distribution loop. Existing behavior: + +```rust +fn compute_demand(ssn: &SessionInfo) -> f64 { + // Sum pending + running task counts; round down to whole batches; multiply by slots. + let mut task_count = 0.0_f64; + for state in [TaskState::Pending, TaskState::Running] { + if let Some(c) = ssn.tasks_status.get(&state) { + task_count += *c as f64; + } + } + let batch_size = ssn.batch_size.max(1) as f64; + let batched = (task_count / batch_size).floor() * batch_size; + let mut demand = batched * ssn.slots as f64; + + if let Some(max_i) = ssn.max_instances { + demand = demand.min((max_i * ssn.slots) as f64); + } + demand.max((ssn.min_instances * ssn.slots) as f64) } ``` @@ -537,6 +650,8 @@ struct CreateArgs { | -------------------- | ------------------------- | --------------------------------------------------------------------------- | | `max_needy_priority` | `u32` | Highest priority among sessions with pending tasks; computed in `setup()` | | `ssn_priority` | `HashMap` | Priority for each open session; populated in `setup()`; consulted in order functions | +| `ssn_desired` | `HashMap` | Per-session priority-distributed share; populated in `setup()` step 2 from the `total_slots` distribution loop; read-only thereafter for the cycle | +| `ssn_allocated` | `HashMap` | Slots currently allocated to each session; initialised in `setup()` step 3 from bound executors; updated thereafter by the existing executor / session lifecycle callbacks | **`SessionInfo` extension:** @@ -563,6 +678,57 @@ PluginManager chain (first non-None wins): PriorityPlugin → FairShare → GangPlugin → Ordering::Equal ``` +#### Priority-Aware Resource Distribution (PriorityPlugin.setup) + +The distribution algorithm runs once per scheduling cycle inside `PriorityPlugin::setup`. It produces `ssn_desired[id]` for every open session. `ssn_allocated[id]` is updated by the existing process — initialized from current bound executors and adjusted thereafter by the executor / session lifecycle callbacks. + +``` +Input : SnapShot ss +Output: ssn_desired : map + ssn_allocated : map (initial counts only; runtime updates unchanged) + max_needy_priority : u32 + +# Step 1 — total_slots +total_slots = Σ node.allocatable.to_slots(unit) for node in ss.nodes +remaining = total_slots + +# Step 2 — distribute by (priority desc, creation_time asc) +sorted = ss.open_sessions sorted by: + primary : priority (descending) # higher priority first + secondary: creation_time (ascending) # earlier session first + +for ssn in sorted: + demand = compute_demand(ssn) # task-driven ceiling + granted = min(demand, max(remaining, 0)) + ssn_desired[ssn.id] = granted + ssn_allocated[ssn.id] = 0 # filled in step 3 + remaining -= granted + + if pending(ssn) > 0 and ssn.priority > max_needy_priority: + max_needy_priority = ssn.priority + +# Step 3 — initial ssn_allocated from bound executors (existing process, unchanged) +for exe in ss.executors where exe.ssn_id is set: + ssn_allocated[exe.ssn_id] += exe.slots + +# After setup, ssn_allocated continues to be updated by the existing +# on_executor_allocate / on_executor_unallocate / on_executor_pipeline / +# on_executor_discard / on_session_bind / on_session_unbind callbacks. +``` + +**Invariants enforced by this algorithm:** + +| Invariant | Justification | +| --------- | ------------- | +| `Σ ssn_desired ≤ total_slots` | Each grant is capped at `remaining`, which starts at `total_slots` and is monotonically decremented. | +| `Σ ssn_desired = total_slots` when `Σ compute_demand ≥ total_slots` | Higher-priority (and earlier within a tier) sessions saturate first; later sessions absorb the residual until cluster capacity is exhausted. | +| Within equal priority, earlier-created sessions are filled first | `creation_time ascending` is the secondary sort key. | +| `ssn_allocated` update flow is unchanged | Only the *initial* counts are populated in `setup()`; runtime adjustments use the same callbacks as before this RFE. | + +**Why creation time as the tiebreaker:** + +Within a priority tier, the earlier session represents work that has been waiting longer for resources. Filling it first reduces head-of-line latency for established sessions while remaining deterministic and stable across scheduling cycles. Session IDs are not used as a tiebreaker because they are not ordered by submission time and would give arbitrary winners. + #### Priority Blocking (is_underused) ``` @@ -587,74 +753,88 @@ Aggregation (ANY semantics): A session with pending tasks has work it cannot yet run — it can benefit from additional executors. A session with zero pending tasks is either idle or satisfied; it should not block lower-priority sessions even if it holds fewer executors than its `deserved` allocation. -#### FairShare `desired` Calculation (updated for priority scheduling) - -With priority scheduling active, each session's `desired` equals exactly one scheduling unit — the resources it was configured to use — rather than scaling with its task backlog: - -``` -desired = batch_size × slots (when task_count > 0) -desired = 0 (when task_count == 0, no work to do) +#### FairShare's Role with Priority Scheduling -Then apply: - desired = min(desired, max_instances × slots) # cap at max_instances if set - desired = max(desired, min_instances × slots) # floor at min_instances guarantee -``` +FairShare itself is **unchanged**. It continues to compute `deserved` and `allocated` per session and continues to provide `ssn_order_fn` (allocated/deserved ratio ascending) and `is_underused` (allocated < deserved). Its responsibilities and formulas inside `setup()` are not modified by this RFE. -**Old formula (removed):** `desired = floor(task_count / batch_size) × batch_size × slots` +What changes is **which plugin owns the cluster-capacity cap on `desired`**. PriorityPlugin's `setup()` performs the priority-aware total-slots distribution (see *Priority-Aware Resource Distribution* above) and writes the resulting per-session share into `ssn_desired`. PriorityPlugin's `is_underused` then uses `ssn_allocated < ssn_desired` to override FairShare's deserved-based veto for high-priority sessions. The plugin consultation order — `PriorityPlugin → FairShare → GangPlugin` with first-non-`None` semantics — guarantees PriorityPlugin's decision wins whenever it has an opinion. -The old formula caused sessions with large backlogs to claim proportionally more resources than sessions with small backlogs. With priority scheduling, backlog size must not influence resource allocation — that role belongs to the PriorityPlugin's ordering and blocking logic. Each session claims its one scheduling unit; priority determines which session receives it. +Because PriorityPlugin caps `Σ ssn_desired ≤ total_slots` (see invariants), the previous over-allocation symptom — where aggregate demand asked for more than the cluster physically has — cannot occur even when individual session demand is large. #### Example Walkthrough -All sessions below use `batch_size=1`, so `desired = 1 × slots = slots`. +All sessions below use `batch_size=1` for clarity, so `compute_demand(ssn) = task_count × slots` capped/floored as usual. + +**Case 1 — Cluster has slack (`total_slots = 22`):** ``` -Cluster: 6 slots total, all idle +Cluster: total_slots = 22, all idle -Sessions: - Session A: priority=100, slots=2, pending=8 → desired=2 - Session B: priority=100, slots=2, pending=4 → desired=2 - Session C: priority=10, slots=2, pending=12 → desired=2 +Sessions (creation_time shown as relative t₀ < t₁ < t₂): + Session A: priority=100, slots=2, pending=4, created t₀ → demand = 8 + Session B: priority=100, slots=2, pending=3, created t₁ → demand = 6 + Session C: priority=10, slots=4, pending=5, created t₂ → demand = 20 -FairShare.setup(): - remaining = 6 (no min_instances, no executors yet) - Distribute: A gets 2, B gets 2, C gets 2 (each desired=2, cluster has enough) - A: desired=2, deserved=2, allocated=0 - B: desired=2, deserved=2, allocated=0 - C: desired=2, deserved=2, allocated=0 +PriorityPlugin.setup() — Step 1: total_slots = 22, remaining = 22 + Step 2: sort by (priority desc, creation_time asc) + → [A (100, t₀), B (100, t₁), C (10, t₂)] -PriorityPlugin.setup(): - A: pending=8 > 0, priority=100 → max_needy_priority = 100 - B: pending=4 > 0, priority=100 → max_needy_priority = 100 (unchanged) - C: pending=12 > 0, priority=10 → 10 < 100, skip - Result: max_needy_priority = 100 - -ssn_order_fn(): - A vs B: same priority (100) → None → FairShare breaks tie - FairShare: A ratio=0/2=0.0, B ratio=0/2=0.0 → equal → Ordering::Equal - A vs C: 100 > 10 → Some(Less) → A before C - B vs C: 100 > 10 → Some(Less) → B before C - Final order: [A, B, C] (A and B equal within their tier) + A: demand = 8; granted = min(8, 22) = 8 → ssn_desired[A] = 8, remaining = 14 + B: demand = 6; granted = min(6, 14) = 6 → ssn_desired[B] = 6, remaining = 8 + C: demand = 20; granted = min(20, 8) = 8 → ssn_desired[C] = 8, remaining = 0 + + Σ ssn_desired = 22 = total_slots ✓ + max_needy_priority = 100 is_underused(): - A: priority=100 == max_needy_priority(100) → None → FairShare: 0 < 2 → true - B: priority=100 == max_needy_priority(100) → None → FairShare: 0 < 2 → true - C: priority=10 < max_needy_priority(100) → Some(false) ← BLOCKED + A: priority=100 == max_needy_priority → ssn_allocated(0) < ssn_desired(8) → Some(true) + B: priority=100 == max_needy_priority → ssn_allocated(0) < ssn_desired(6) → Some(true) + C: priority=10 < max_needy_priority → Some(false) ← BLOCKED, even though C + was granted 8 in step 2 AllocateAction iterates [A, B, C]: - A: underused → allocate 2 slots (1 executor) → allocated=2 - B: underused → allocate 2 slots (1 executor) → allocated=2 + A: underused → fill ssn_desired=8 → ssn_allocated[A] = 8 + B: underused → fill ssn_desired=6 → ssn_allocated[B] = 6 C: blocked by PriorityPlugin → skip +``` -Result: - A: 1 executor (2 slots), processes its 8 pending tasks one at a time - B: 1 executor (2 slots), processes its 4 pending tasks one at a time - C: 0 executors — blocked until both A and B drain their pending queues +**Case 2 — Cluster is contended (`total_slots = 4`):** + +Same three sessions, smaller cluster. -Key point: C has 12 pending tasks but desired=2 (= slots), the same as A and B. -The large backlog does not give C any advantage in the FairShare calculation. -Priority alone determines which sessions receive resources. ``` +Cluster: total_slots = 4 + +PriorityPlugin.setup(): + Sort: [A (100, t₀), B (100, t₁), C (10, t₂)] + + A: demand = 8; granted = min(8, 4) = 4 → ssn_desired[A] = 4, remaining = 0 + B: demand = 6; granted = min(6, 0) = 0 → ssn_desired[B] = 0, remaining = 0 + C: demand = 20; granted = min(20, 0) = 0 → ssn_desired[C] = 0, remaining = 0 + + Σ ssn_desired = 4 = total_slots ✓ (capacity binds; demand exceeds cluster) + max_needy_priority = 100 + +is_underused(): + A: priority=100 == max_needy_priority → 0 < 4 → Some(true) + B: priority=100 == max_needy_priority → 0 == 0 → None → FairShare decides + (FairShare's deserved + for B is also bounded + by the cluster.) + C: priority=10 < max_needy_priority → Some(false) + +Result: + A receives all 4 cluster slots. + B (same priority as A but created later) waits until A drains. + C waits behind both A and B. +``` + +**Key points:** + +- `priority` is the primary sort key. +- `creation_time ascending` resolves equal-priority ties — earlier sessions are filled first (Case 1 fills A before B; Case 2 starves B until A completes). +- `Σ ssn_desired ≤ total_slots` holds by construction, with equality once cluster capacity is the binding constraint (Case 1 and Case 2). +- `ssn_allocated` is initialized in step 3 of `setup()` from currently-bound executors and tracked thereafter by the existing executor / session callbacks — that update path is unchanged. ### System Considerations @@ -880,3 +1060,6 @@ Remaining 4 slots idle (cannot form another batch for llm-inferenm-inference pen | **Default `priority = 0`** | Proto3 default for `uint32` is `0`. All existing sessions automatically start at the lowest priority without any migration or explicit opt-in. Clusters that don't use the priority feature are unaffected. | | **Global priority across applications** | Priority is a per-session attribute independent of application. Sessions from different applications compete globally. This is the natural consequence of `ssn_order_fn` acting on all open sessions in a single scheduler snapshot. | | **No preemption in V1** | Preemption adds significant complexity: partial batch reclaim must respect gang constraints, executor teardown has latency, and priority inversion must be avoided. A dedicated follow-on RFE can add `is_preemptible` logic to `PriorityPlugin` once the simpler ordering semantics are validated in production. | +| **PriorityPlugin owns the cluster-capacity cap on `ssn_desired`** | The previous formula computed each session's `desired` independently of cluster size, allowing `Σ desired > total_slots` (the "FairShare allocated more than 22 slots" symptom). Moving the cap into `PriorityPlugin::setup` — as a single priority-ordered distribution loop bounded by `total_slots` — makes capacity an explicit invariant by construction. FairShare retains its existing role for within-tier fairness; only the source of `ssn_desired` changes. | +| **Within-tier tiebreaker: creation time ascending (earlier first)** | When two sessions share a priority, the one that has been waiting longer should be filled first. This reduces head-of-line latency for established sessions, is deterministic across cycles, and reflects user intuition (FIFO within priority). Session IDs are not used because they are not ordered by submission time. | +| **`ssn_allocated` update process is unchanged** | The runtime adjustments via `on_executor_allocate` / `on_executor_unallocate` / `on_executor_pipeline` / `on_executor_discard` / `on_session_bind` / `on_session_unbind` already correctly reflect bind/release events. This RFE only changes the *initial value* of `ssn_allocated` (computed in `setup()` from the snapshot's bound executors); per-event updates after `setup()` keep their existing implementation. | diff --git a/session_manager/src/scheduler/plugins/priority.rs b/session_manager/src/scheduler/plugins/priority.rs index d94f0fc2..50f6ac11 100644 --- a/session_manager/src/scheduler/plugins/priority.rs +++ b/session_manager/src/scheduler/plugins/priority.rs @@ -18,7 +18,8 @@ use common::apis::{SessionID, TaskState}; use common::FlameError; use crate::model::{ - ExecutorInfoPtr, NodeInfoPtr, SessionInfo, SessionInfoPtr, SnapShot, ALL_EXECUTOR, OPEN_SESSION, + ExecutorInfoPtr, NodeInfoPtr, SessionInfo, SessionInfoPtr, SnapShot, ALL_EXECUTOR, ALL_NODE, + OPEN_SESSION, }; use crate::scheduler::plugins::{Plugin, PluginPtr}; @@ -27,18 +28,20 @@ use crate::scheduler::plugins::{Plugin, PluginPtr}; /// # Behavior /// /// - Higher `priority` value = higher scheduling priority. -/// - Sessions with the same priority use FairShare ordering (tiebreaker). -/// - Sessions at the highest needy priority tier receive executors until their full -/// task demand is met (`ssn_desired`). PriorityPlugin returns `Some(true)` while -/// `allocated < desired`, so FairShare's conservative one-unit `deserved` is bypassed -/// for high-priority sessions. +/// - Sessions with the same priority use FairShare ordering (tiebreaker for `ssn_order_fn`) +/// and creation_time ascending order (tiebreaker for the `setup()` distribution loop). +/// - `setup()` distributes the cluster's `total_slots` across open sessions in +/// `(priority desc, creation_time asc)` order, capping per-session demand at the +/// remaining cluster capacity. This guarantees `Σ ssn_desired ≤ total_slots`. +/// - Sessions at the highest needy priority tier remain underused until their +/// priority-distributed share (`ssn_desired`) is filled. /// - All sessions at a lower priority than `max_needy_priority` are hard-blocked /// (`Some(false)`). /// /// # Interaction with PluginManager /// -/// `PluginManager::is_underused` uses "first non-`None` wins" ordering. Because -/// PriorityPlugin is registered first, its opinion is always definitive. FairShare is +/// `PluginManager::is_underused` uses "first non-`None` wins" ordering. Because +/// PriorityPlugin is registered first, its opinion is always definitive. FairShare is /// consulted only when PriorityPlugin returns `None` (satisfied sessions where desired /// is already met). /// @@ -48,14 +51,13 @@ use crate::scheduler::plugins::{Plugin, PluginPtr}; /// from the snapshot. Changes during the cycle are not reflected until the next cycle. pub struct PriorityPlugin { /// Maximum priority among open sessions that have pending tasks. - /// Computed in `setup()`; used in `ssn_order_fn` and `is_underused`. + /// Computed in `setup()`; used in `is_underused`. max_needy_priority: u32, /// Priority for each open session, keyed by session ID. /// Populated in `setup()`. ssn_priority: HashMap, - /// Full task-demand for each session (batch-aligned pending+running × slots). - /// Computed in `setup()` using the task-count formula. High-priority sessions - /// remain underused until `ssn_allocated[id] >= ssn_desired[id]`. + /// Per-session priority-distributed share. Populated in `setup()` step 2 from the + /// `total_slots` distribution loop. Read-only thereafter for the cycle. ssn_desired: HashMap, /// Executor slots currently allocated per session. /// Initialised from the snapshot in `setup()`; updated via callbacks. @@ -73,57 +75,109 @@ impl PriorityPlugin { } } +/// Per-session task-driven demand ceiling, in slots. +/// +/// Sums pending + running task counts, rounds the total down to whole batches, +/// multiplies by `slots`, then clamps to `[min_instances * slots, max_instances * slots]`. +/// This is the same formula used by the previous (uncapped) implementation; the +/// only behavioural change in this RFE is that the cluster-capacity cap moves into +/// `setup()` and is applied *after* this per-session ceiling. +fn compute_demand(ssn: &SessionInfo) -> f64 { + let mut task_count = 0.0_f64; + for state in [TaskState::Pending, TaskState::Running] { + if let Some(c) = ssn.tasks_status.get(&state) { + task_count += *c as f64; + } + } + let batch_size = ssn.batch_size.max(1) as f64; + let batched = (task_count / batch_size).floor() * batch_size; + let mut demand = batched * ssn.slots as f64; + + if let Some(max_i) = ssn.max_instances { + demand = demand.min((max_i * ssn.slots) as f64); + } + demand.max((ssn.min_instances * ssn.slots) as f64) +} + impl Plugin for PriorityPlugin { fn name(&self) -> &'static str { "priority" } + /// Priority-aware resource distribution per FS.md §3 *PriorityPlugin*. + /// + /// Three-step algorithm executed once per scheduling cycle: + /// + /// 1. Sum `total_slots` from the snapshot's nodes. + /// 2. Sort open sessions by `(priority desc, creation_time asc)` and walk them in + /// order, granting each session `min(compute_demand(ssn), remaining)` slots + /// and decrementing `remaining`. The result is `ssn_desired[id]`. + /// `max_needy_priority` is updated in the same pass for any session with + /// pending tasks. + /// 3. Initialise `ssn_allocated[id]` from currently-bound executors. + /// + /// Invariants enforced (FS.md §3 *Invariants*): + /// - `Σ ssn_desired ≤ total_slots` — each grant is capped at `remaining`, + /// which starts at `total_slots` and is monotonically decremented. + /// - Equality holds when aggregate demand exceeds cluster capacity (capacity + /// binds): higher-priority and earlier sessions saturate first, later ones + /// absorb the residual until the cluster is exhausted. + /// - Within a priority tier, earlier-created sessions are filled before later + /// ones (`creation_time ascending` is the secondary sort key). + /// - `ssn_allocated` runtime updates remain handled by the executor / session + /// lifecycle callbacks; this method only sets the *initial* counts. fn setup(&mut self, ss: &SnapShot) -> Result<(), FlameError> { self.ssn_priority.clear(); self.ssn_desired.clear(); self.ssn_allocated.clear(); self.max_needy_priority = 0; + // ── Step 1: total_slots ────────────────────────────────────────────── + // Cluster's physical scheduling capacity, taken from the same snapshot + // every plugin sees this cycle. + let total_slots: f64 = ss + .find_nodes(ALL_NODE)? + .values() + .map(|n| n.allocatable.to_slots(&ss.unit) as f64) + .sum(); + + // ── Step 2: distribute total_slots by (priority desc, creation_time asc) ─ + // Hash iteration is non-deterministic; collect and sort explicitly so that + // earlier-created sessions within a priority tier are filled first. let open_ssns = ss.find_sessions(OPEN_SESSION)?; - for ssn in open_ssns.values() { - let priority = ssn.priority; - self.ssn_priority.insert(ssn.id.clone(), priority); - - // Full task demand: batch-aligned (pending + running) × slots. - // Includes running tasks so the session retains executors that are - // already busy and continues to ask for more while pending work remains. - let mut task_count = 0.0_f64; - for state in [TaskState::Pending, TaskState::Running] { - if let Some(d) = ssn.tasks_status.get(&state) { - task_count += *d as f64; - } - } - let batch_size = ssn.batch_size.max(1) as f64; - let batched = (task_count / batch_size).floor() * batch_size; - let mut desired = batched * ssn.slots as f64; + let mut sessions: Vec = open_ssns.values().cloned().collect(); + sessions.sort_by(|a, b| { + b.priority + .cmp(&a.priority) // priority descending + .then(a.creation_time.cmp(&b.creation_time)) // earlier first + }); - if let Some(max_instances) = ssn.max_instances { - desired = desired.min((max_instances * ssn.slots) as f64); - } - let min_alloc = (ssn.min_instances * ssn.slots) as f64; - desired = desired.max(min_alloc); + let mut remaining = total_slots; + for ssn in &sessions { + self.ssn_priority.insert(ssn.id.clone(), ssn.priority); - self.ssn_desired.insert(ssn.id.clone(), desired); + let demand = compute_demand(ssn); + let granted = demand.min(remaining.max(0.0)); + + self.ssn_desired.insert(ssn.id.clone(), granted); self.ssn_allocated.insert(ssn.id.clone(), 0.0); + remaining -= granted; - // A session is "needy" if it has pending tasks — it can benefit from more executors. + // A session is "needy" if it has pending tasks — it can benefit from + // more executors. Used by is_underused() to block strictly lower tiers. let pending = ssn .tasks_status .get(&TaskState::Pending) .copied() .unwrap_or(0); - - if pending > 0 && priority > self.max_needy_priority { - self.max_needy_priority = priority; + if pending > 0 && ssn.priority > self.max_needy_priority { + self.max_needy_priority = ssn.priority; } } - // Initialise allocated counts from current executor assignments. + // ── Step 3: ssn_allocated initial counts from bound executors ──────── + // Runtime updates after this point flow through the on_executor_* / + // on_session_* callbacks (unchanged by this RFE). let executors = ss.find_executors(ALL_EXECUTOR)?; for exe in executors.values() { if let Some(ref ssn_id) = exe.ssn_id { @@ -134,7 +188,8 @@ impl Plugin for PriorityPlugin { } tracing::debug!( - "[PriorityPlugin] setup: max_needy_priority={}, tracked_sessions={}", + "[PriorityPlugin] setup: total_slots={}, max_needy_priority={}, tracked_sessions={}", + total_slots, self.max_needy_priority, self.ssn_priority.len() ); @@ -163,7 +218,7 @@ impl Plugin for PriorityPlugin { /// - `priority < max_needy_priority` → `Some(false)`: hard-blocked by a higher-priority /// needy session. /// - `priority >= max_needy_priority` and `allocated < desired` → `Some(true)`: session - /// still has unmet task demand; keep allocating. + /// still has unmet, priority-distributed demand; keep allocating. /// - `priority >= max_needy_priority` and demand satisfied → `None`: defer to FairShare. /// /// Because `PluginManager::is_underused` uses "first non-`None` wins", the `Some(true)` @@ -240,7 +295,7 @@ impl Plugin for PriorityPlugin { mod tests { use super::*; use crate::model::{AppInfo, ExecutorInfo, NodeInfo, SessionInfo, SnapShot}; - use chrono::Utc; + use chrono::{DateTime, Duration, Utc}; use common::apis::{ ExecutorState, NodeState, ResourceRequirement, SessionState, Shim, TaskState, }; @@ -257,7 +312,7 @@ mod tests { } fn create_test_session(id: &str, priority: u32, pending: i32) -> Arc { - create_test_session_full(id, priority, pending, 0, 1, 1, 0, None) + create_test_session_full(id, priority, pending, 0, 1, 1, 0, None, Utc::now()) } #[allow(clippy::too_many_arguments)] @@ -270,6 +325,7 @@ mod tests { batch_size: u32, min_instances: u32, max_instances: Option, + creation_time: DateTime, ) -> Arc { let mut tasks_status = HashMap::new(); if pending > 0 { @@ -283,7 +339,7 @@ mod tests { application: "test-app".to_string(), slots, tasks_status, - creation_time: Utc::now(), + creation_time, completion_time: None, state: SessionState::Open, min_instances, @@ -303,6 +359,38 @@ mod tests { }) } + /// Build a snapshot with the given sessions and a single node providing + /// `total_slots` worth of capacity (1 slot = `unit`). + fn create_snapshot_with_capacity( + sessions: Vec>, + total_slots: u32, + ) -> SnapShot { + let unit = ResourceRequirement { + cpu: 1, + memory: 1024, + gpu: 0, + }; + let ss = SnapShot::new(unit.clone()); + ss.add_application(create_test_app("test-app")).unwrap(); + for ssn in sessions { + ss.add_session(ssn).unwrap(); + } + // Node with allocatable = total_slots × unit + let node = Arc::new(NodeInfo { + name: "node-1".to_string(), + allocatable: ResourceRequirement { + cpu: u64::from(total_slots) * unit.cpu, + memory: u64::from(total_slots) * unit.memory, + gpu: 0, + }, + state: NodeState::Ready, + }); + ss.add_node(node).unwrap(); + ss + } + + /// Snapshot with no node — total_slots = 0. Use only for tests that don't + /// rely on the cluster-capacity cap (e.g. blocking semantics). fn create_snapshot(sessions: Vec>) -> SnapShot { let ss = SnapShot::new(ResourceRequirement { cpu: 1, @@ -320,8 +408,9 @@ mod tests { sessions: Vec>, exec_ssn_id: &str, slots: u32, + total_slots: u32, ) -> SnapShot { - let ss = create_snapshot(sessions); + let ss = create_snapshot_with_capacity(sessions, total_slots); let exec = Arc::new(ExecutorInfo { id: "exec-1".to_string(), node: "node-1".to_string(), @@ -347,7 +436,7 @@ mod tests { fn test_setup_max_needy_priority() { let ssn_high = create_test_session("ssn-high", 100, 4); let ssn_low = create_test_session("ssn-low", 10, 2); - let ss = create_snapshot(vec![ssn_high, ssn_low]); + let ss = create_snapshot_with_capacity(vec![ssn_high, ssn_low], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -358,11 +447,153 @@ mod tests { } #[test] - fn test_setup_desired_equals_batch_aligned_task_count_times_slots() { - // pending=5, running=1, batch_size=2, slots=3 - // task_count=6, batched=floor(6/2)*2=6, desired=6*3=18 - let ssn = create_test_session_full("s", 0, 5, 1, 3, 2, 0, None); - let ss = create_snapshot(vec![ssn]); + fn test_setup_distribution_case1_cluster_has_slack() { + // FS.md §3 Example Walkthrough Case 1. + // total_slots=22, three sessions, all batch_size=1. + // A: priority=100, slots=2, pending=4 → demand=8, granted=8, remaining=14 + // B: priority=100, slots=2, pending=3 → demand=6, granted=6, remaining=8 + // C: priority=10, slots=4, pending=5 → demand=20, granted=8, remaining=0 + // Σ ssn_desired = 22 = total_slots. + let t0 = Utc::now(); + let t1 = t0 + Duration::seconds(1); + let t2 = t0 + Duration::seconds(2); + + let ssn_a = create_test_session_full("A", 100, 4, 0, 2, 1, 0, None, t0); + let ssn_b = create_test_session_full("B", 100, 3, 0, 2, 1, 0, None, t1); + let ssn_c = create_test_session_full("C", 10, 5, 0, 4, 1, 0, None, t2); + let ss = create_snapshot_with_capacity(vec![ssn_a, ssn_b, ssn_c], 22); + + let mut plugin = make_plugin(); + plugin.setup(&ss).unwrap(); + + assert_eq!(plugin.ssn_desired.get("A").copied(), Some(8.0)); + assert_eq!(plugin.ssn_desired.get("B").copied(), Some(6.0)); + assert_eq!(plugin.ssn_desired.get("C").copied(), Some(8.0)); + let total: f64 = plugin.ssn_desired.values().sum(); + assert_eq!(total, 22.0); + assert_eq!(plugin.max_needy_priority, 100); + } + + #[test] + fn test_setup_distribution_case2_cluster_contended() { + // FS.md §3 Example Walkthrough Case 2. + // total_slots=4, same sessions as Case 1. + // A: granted=min(8,4)=4 → remaining=0 + // B: granted=min(6,0)=0 + // C: granted=min(20,0)=0 + // Σ ssn_desired = 4 = total_slots. + let t0 = Utc::now(); + let t1 = t0 + Duration::seconds(1); + let t2 = t0 + Duration::seconds(2); + + let ssn_a = create_test_session_full("A", 100, 4, 0, 2, 1, 0, None, t0); + let ssn_b = create_test_session_full("B", 100, 3, 0, 2, 1, 0, None, t1); + let ssn_c = create_test_session_full("C", 10, 5, 0, 4, 1, 0, None, t2); + let ss = create_snapshot_with_capacity(vec![ssn_a, ssn_b, ssn_c], 4); + + let mut plugin = make_plugin(); + plugin.setup(&ss).unwrap(); + + assert_eq!(plugin.ssn_desired.get("A").copied(), Some(4.0)); + assert_eq!(plugin.ssn_desired.get("B").copied(), Some(0.0)); + assert_eq!(plugin.ssn_desired.get("C").copied(), Some(0.0)); + let total: f64 = plugin.ssn_desired.values().sum(); + assert_eq!(total, 4.0); + assert_eq!(plugin.max_needy_priority, 100); + } + + #[test] + fn test_setup_equal_priority_orders_by_creation_time() { + // Two sessions at the same priority must be filled in creation_time ascending + // order regardless of the order they were inserted into the snapshot or their + // session IDs. total_slots=5 with two demands of 4 each → earlier gets 4, + // later gets 1. + let t_early = Utc::now(); + let t_late = t_early + Duration::seconds(60); + + // Insert "later-z" first, "earlier-a" second. Note the alphabetic id is + // adversarial: if sort fell back to id, "earlier-a" would still win, so we + // also flip the alphabetic order to "z-early" / "a-late" in a sibling test + // below to be sure creation_time — not id — is the actual key. + let ssn_late = create_test_session_full("z-early-id", 50, 4, 0, 1, 1, 0, None, t_late); + let ssn_early = create_test_session_full("a-late-id", 50, 4, 0, 1, 1, 0, None, t_early); + + // Even though "a-late-id" sorts first alphabetically, its creation_time is + // EARLIER, so it should still be filled first. (This confirms the sort + // key is creation_time, not session id, when priorities tie.) + let ss = create_snapshot_with_capacity(vec![ssn_late, ssn_early], 5); + + let mut plugin = make_plugin(); + plugin.setup(&ss).unwrap(); + + // Earlier session (a-late-id, t_early) is filled first → gets full demand=4. + // Later session (z-early-id, t_late) absorbs the residual → gets 1. + assert_eq!(plugin.ssn_desired.get("a-late-id").copied(), Some(4.0)); + assert_eq!(plugin.ssn_desired.get("z-early-id").copied(), Some(1.0)); + } + + #[test] + fn test_setup_equal_priority_creation_time_wins_over_id() { + // Mirror image of the above: earlier session has the alphabetically-LATER id. + // Confirms creation_time alone determines fill order within a priority tier. + let t_early = Utc::now(); + let t_late = t_early + Duration::seconds(60); + + let ssn_early = create_test_session_full("z-id", 50, 4, 0, 1, 1, 0, None, t_early); + let ssn_late = create_test_session_full("a-id", 50, 4, 0, 1, 1, 0, None, t_late); + let ss = create_snapshot_with_capacity(vec![ssn_early, ssn_late], 5); + + let mut plugin = make_plugin(); + plugin.setup(&ss).unwrap(); + + assert_eq!(plugin.ssn_desired.get("z-id").copied(), Some(4.0)); + assert_eq!(plugin.ssn_desired.get("a-id").copied(), Some(1.0)); + } + + #[test] + fn test_setup_sum_desired_never_exceeds_total_slots() { + // Regression test for the over-allocation bug: many high-demand sessions on + // a small cluster must not collectively exceed total_slots. + let total_slots: u32 = 22; + let mut sessions = Vec::new(); + let base = Utc::now(); + for i in 0..10 { + // priority varies, demand large (pending=50 × slots=2 = 100 each) + let priority = (i as u32) * 10; + let creation_time = base + Duration::seconds(i as i64); + sessions.push(create_test_session_full( + &format!("s-{i}"), + priority, + 50, + 0, + 2, + 1, + 0, + None, + creation_time, + )); + } + let ss = create_snapshot_with_capacity(sessions, total_slots); + + let mut plugin = make_plugin(); + plugin.setup(&ss).unwrap(); + + let sum: f64 = plugin.ssn_desired.values().sum(); + assert!( + sum <= total_slots as f64, + "Σ ssn_desired ({sum}) must not exceed total_slots ({total_slots})" + ); + // With aggregate demand far exceeding capacity, equality must hold. + assert_eq!(sum, total_slots as f64); + } + + #[test] + fn test_setup_compute_demand_batch_aligned_and_capped() { + // Reach into compute_demand via a single-session snapshot. + // pending=5, running=1, batch_size=2, slots=3, total_slots large enough + // not to bind. task_count=6, batched=floor(6/2)*2=6, demand=6*3=18. + let ssn = create_test_session_full("s", 0, 5, 1, 3, 2, 0, None, Utc::now()); + let ss = create_snapshot_with_capacity(vec![ssn], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -372,10 +603,11 @@ mod tests { } #[test] - fn test_setup_desired_respects_max_instances() { - // pending=10, slots=1, max_instances=4 → desired capped at 4 - let ssn = create_test_session_full("s", 0, 10, 0, 1, 1, 0, Some(4)); - let ss = create_snapshot(vec![ssn]); + fn test_setup_compute_demand_respects_max_instances() { + // pending=10, slots=1, max_instances=4 → per-session demand capped at 4. + // Cluster has plenty of slack so the per-session cap binds, not capacity. + let ssn = create_test_session_full("s", 0, 10, 0, 1, 1, 0, Some(4), Utc::now()); + let ss = create_snapshot_with_capacity(vec![ssn], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -384,10 +616,10 @@ mod tests { } #[test] - fn test_setup_desired_respects_min_instances() { - // pending=0, min_instances=2, slots=3 → desired floored at 6 - let ssn = create_test_session_full("s", 0, 0, 0, 3, 1, 2, None); - let ss = create_snapshot(vec![ssn]); + fn test_setup_compute_demand_respects_min_instances() { + // pending=0, min_instances=2, slots=3 → per-session demand floored at 6. + let ssn = create_test_session_full("s", 0, 0, 0, 3, 1, 2, None, Utc::now()); + let ss = create_snapshot_with_capacity(vec![ssn], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -397,9 +629,9 @@ mod tests { #[test] fn test_setup_allocated_counts_existing_executors() { - // Session with 1 executor (slots=2) already bound - let ssn = create_test_session_full("s", 0, 4, 0, 2, 1, 0, None); - let ss = create_snapshot_with_executor(vec![ssn], "s", 2); + // Session with 1 executor (slots=2) already bound. + let ssn = create_test_session_full("s", 0, 4, 0, 2, 1, 0, None, Utc::now()); + let ss = create_snapshot_with_executor(vec![ssn], "s", 2, 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -411,7 +643,7 @@ mod tests { fn test_setup_no_pending_does_not_set_max_needy() { let ssn_high = create_test_session("ssn-high", 100, 0); // 0 pending let ssn_low = create_test_session("ssn-low", 10, 5); - let ss = create_snapshot(vec![ssn_high, ssn_low]); + let ss = create_snapshot_with_capacity(vec![ssn_high, ssn_low], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -423,7 +655,7 @@ mod tests { fn test_setup_all_same_priority_no_blocking() { let ssn_a = create_test_session("ssn-a", 0, 5); let ssn_b = create_test_session("ssn-b", 0, 3); - let ss = create_snapshot(vec![ssn_a, ssn_b]); + let ss = create_snapshot_with_capacity(vec![ssn_a, ssn_b], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -431,13 +663,25 @@ mod tests { assert_eq!(plugin.max_needy_priority, 0); } + #[test] + fn test_setup_zero_total_slots_grants_zero() { + // No nodes registered → total_slots=0; every session gets ssn_desired=0. + let ssn = create_test_session_full("s", 100, 5, 0, 1, 1, 0, None, Utc::now()); + let ss = create_snapshot(vec![ssn]); + + let mut plugin = make_plugin(); + plugin.setup(&ss).unwrap(); + + assert_eq!(plugin.ssn_desired.get("s").copied(), Some(0.0)); + } + // ── ssn_order_fn() tests ───────────────────────────────────────────────── #[test] fn test_ssn_order_fn_different_priorities() { let ssn_high = create_test_session("ssn-high", 100, 4); let ssn_low = create_test_session("ssn-low", 10, 4); - let ss = create_snapshot(vec![ssn_high.clone(), ssn_low.clone()]); + let ss = create_snapshot_with_capacity(vec![ssn_high.clone(), ssn_low.clone()], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -456,7 +700,7 @@ mod tests { fn test_ssn_order_fn_equal_priorities_defers() { let ssn_a = create_test_session("ssn-a", 50, 4); let ssn_b = create_test_session("ssn-b", 50, 4); - let ss = create_snapshot(vec![ssn_a.clone(), ssn_b.clone()]); + let ss = create_snapshot_with_capacity(vec![ssn_a.clone(), ssn_b.clone()], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -470,7 +714,7 @@ mod tests { fn test_is_underused_lower_priority_blocked() { let ssn_high = create_test_session("ssn-high", 100, 4); let ssn_low = create_test_session("ssn-low", 10, 4); - let ss = create_snapshot(vec![ssn_high, ssn_low.clone()]); + let ss = create_snapshot_with_capacity(vec![ssn_high, ssn_low.clone()], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -483,7 +727,7 @@ mod tests { // High-priority session with pending tasks and no executors yet → Some(true) let ssn_high = create_test_session("ssn-high", 100, 4); let ssn_low = create_test_session("ssn-low", 10, 4); - let ss = create_snapshot(vec![ssn_high.clone(), ssn_low]); + let ss = create_snapshot_with_capacity(vec![ssn_high.clone(), ssn_low], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -497,7 +741,7 @@ mod tests { // Session at max priority but already fully allocated → None (defer to FairShare) let ssn_high = create_test_session("ssn-high", 100, 4); // desired=4 let ssn_low = create_test_session("ssn-low", 10, 4); - let ss = create_snapshot_with_executor(vec![ssn_high.clone(), ssn_low], "ssn-high", 1); + let ss = create_snapshot_with_executor(vec![ssn_high.clone(), ssn_low], "ssn-high", 1, 100); // Setup gives allocated=1 for ssn-high. desired=4, so still underused. let mut plugin = make_plugin(); @@ -517,7 +761,7 @@ mod tests { // All sessions at same priority with demand → Some(true) for each let ssn_a = create_test_session("ssn-a", 0, 5); let ssn_b = create_test_session("ssn-b", 0, 3); - let ss = create_snapshot(vec![ssn_a.clone(), ssn_b.clone()]); + let ss = create_snapshot_with_capacity(vec![ssn_a.clone(), ssn_b.clone()], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -530,7 +774,7 @@ mod tests { fn test_is_underused_no_demand_defers() { // Session with no tasks (desired=0) → None regardless of priority let ssn = create_test_session("s", 100, 0); - let ss = create_snapshot(vec![ssn.clone()]); + let ss = create_snapshot_with_capacity(vec![ssn.clone()], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -542,8 +786,8 @@ mod tests { #[test] fn test_on_executor_allocate_increments_allocated() { - let ssn = create_test_session_full("s", 0, 4, 0, 2, 1, 0, None); - let ss = create_snapshot(vec![ssn.clone()]); + let ssn = create_test_session_full("s", 0, 4, 0, 2, 1, 0, None, Utc::now()); + let ss = create_snapshot_with_capacity(vec![ssn.clone()], 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap(); @@ -560,8 +804,8 @@ mod tests { #[test] fn test_on_executor_unallocate_decrements_allocated() { - let ssn = create_test_session_full("s", 0, 4, 0, 2, 1, 0, None); - let ss = create_snapshot_with_executor(vec![ssn.clone()], "s", 2); + let ssn = create_test_session_full("s", 0, 4, 0, 2, 1, 0, None, Utc::now()); + let ss = create_snapshot_with_executor(vec![ssn.clone()], "s", 2, 100); let mut plugin = make_plugin(); plugin.setup(&ss).unwrap();