Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions applications/tests/test_partitioned_edf/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "test_partitioned_edf"
version = "0.1.0"
edition = "2021"

[dependencies]
log = "0.4"

[dependencies.awkernel_async_lib]
path = "../../../awkernel_async_lib"
default-features = false

[dependencies.awkernel_lib]
path = "../../../awkernel_lib"
default-features = false
108 changes: 108 additions & 0 deletions applications/tests/test_partitioned_edf/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#![no_std]

extern crate alloc;
use alloc::string::String;
use alloc::string::ToString;

use awkernel_async_lib::sleep;
use awkernel_async_lib::{scheduler::SchedulerType, spawn};
use awkernel_lib::{
cpu::{cpu_id, num_cpu},
delay::{uptime, wait_microsec},
};
use core::time::Duration;

pub async fn run() {
wait_microsec(2_000_000);

if num_cpu() < 2 {
log::warn!("test_partitioned_edf: requires at least 2 CPUs, skipping");
return;
}

test_core_pinning().await;
test_edf_preemption().await;
test_multi_core().await;

wait_microsec(100_000_000);
}

/// Test 1: Verify tasks are pinned to their assigned cores.
/// Spawns one periodic task per worker core (1..num_cpu()).
/// Each task checks cpu_id() matches the assigned core.
async fn test_core_pinning() {
log::info!("=== test_core_pinning start ===");
for core in 1..num_cpu() {
let core_u16 = core as u16;
spawn(
alloc::format!("pinning_core{core}").into(),
async move {
for _ in 0..3 {
let actual = cpu_id();
if actual == core {
log::info!("core_pinning: task on core {core} ran on cpu {actual} [OK]");
} else {
log::error!("core_pinning: task on core {core} ran on cpu {actual} [FAIL]");
}
wait_microsec(100_000);
sleep(Duration::from_millis(200)).await;
awkernel_async_lib::r#yield().await;
}
},
SchedulerType::PartitionedEDF(1_000_000, core_u16),
)
.await;
}
}

/// Test 2: Verify EDF preemption on a single core.
/// heavy (deadline=9900ms) is preempted by light (deadline=990ms) on core 1.
async fn test_edf_preemption() {
log::info!("=== test_edf_preemption start ===");
spawn_periodic_task("heavy".to_string(), 9_600_000, 10_000_000, 9_900_000, 1).await;
spawn_periodic_task("light".to_string(), 900_000, 1_000_000, 990_000, 1).await;
}

/// Test 3: Verify core independence when num_cpu >= 3.
/// Tasks on core 1 and core 2 run in parallel without interfering.
async fn test_multi_core() {
if num_cpu() < 3 {
log::info!("test_multi_core: skipped (num_cpu < 3)");
return;
}
log::info!("=== test_multi_core start ===");
spawn_periodic_task("task_core1".to_string(), 4_000_000, 5_000_000, 4_900_000, 1).await;
spawn_periodic_task("task_core2".to_string(), 4_000_000, 5_000_000, 4_900_000, 2).await;
}

/// Spawn a pseudo-periodic task pinned to `core`.
async fn spawn_periodic_task(
task_name: String,
exe_time: u64,
period: u64,
relative_deadline: u64,
core: u16,
) {
let task_name_clone = task_name.clone();
spawn(
task_name.into(),
async move {
loop {
let start_time = uptime();
wait_microsec(exe_time);
let end_time = uptime();
log::debug!(
"{}: cpu={}, start={}, end={}",
task_name_clone,
cpu_id(),
start_time,
end_time,
);
sleep(Duration::from_micros(period - exe_time)).await;
awkernel_async_lib::r#yield().await;
}
},
SchedulerType::PartitionedEDF(relative_deadline, core),
)
.await;
}
140 changes: 127 additions & 13 deletions awkernel_async_lib/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use alloc::boxed::Box;

pub mod gedf;
pub(super) mod panicked;
mod partitioned_edf;
mod prioritized_fifo;
mod prioritized_rr;

Expand Down Expand Up @@ -72,7 +73,8 @@ pub fn move_preemption_pending(cpu_id: usize) -> Option<BinaryHeap<Arc<Task>>> {
/// 0 is the lowest priority and 31 is the highest priority.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SchedulerType {
GEDF(u64), // relative deadline
PartitionedEDF(u64, u16), // relative deadline and partitioned core
GEDF(u64), // relative deadline
PrioritizedFIFO(u8),
PrioritizedRR(u8),
Panicked,
Expand All @@ -83,6 +85,10 @@ impl SchedulerType {
matches!(
(self, other),
(SchedulerType::GEDF(_), SchedulerType::GEDF(_))
| (
SchedulerType::PartitionedEDF(_, _),
SchedulerType::PartitionedEDF(_, _)
)
| (
SchedulerType::PrioritizedFIFO(_),
SchedulerType::PrioritizedFIFO(_)
Expand All @@ -94,28 +100,55 @@ impl SchedulerType {
| (SchedulerType::Panicked, SchedulerType::Panicked)
)
}

/// Return the partitioned core index if this is a [`SchedulerType::PartitionedEDF`] scheduler.
///
/// Returns `Some(n)` where `n` is the CPU core index (`1..num_cpu()`) assigned to the
/// partitioned EDF scheduler. Returns `None` for all other scheduler variants.
pub const fn partitioned_core(&self) -> Option<u16> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new public method has no doc comment. The semantics of Some(n) (the assigned core index, expected to be in 1..num_cpu()) and the fact that all non-PartitionedEDF variants return None should be documented for consumers.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. A doc comment has been added:

/// Return the partitioned core index if this is a [`SchedulerType::PartitionedEDF`] scheduler.
///
/// Returns `Some(n)` where `n` is the CPU core index (`1..num_cpu()`) assigned to the
/// partitioned EDF scheduler. Returns `None` for all other scheduler variants.

match self {
SchedulerType::PartitionedEDF(_, n) => Some(*n),
_ => None,
}
}
}

/// # Priority
///
/// `priority()` returns the priority of the scheduler for preemption.
///
/// - The highest priority.
/// - GEDF scheduler.
/// - Partitioned EDF scheduler.
/// - The second highest priority.
/// - Prioritized FIFO scheduler.
/// - GEDF scheduler.
/// - The third highest priority.
/// - Round-Robin scheduler.
/// - Priority-based Round-Robin scheduler.
/// - Prioritized FIFO scheduler.
/// - The fourth highest priority.
/// - Prioritized Round-Robin scheduler.
/// - The lowest priority.
/// - Panicked scheduler.
static PRIORITY_LIST: [SchedulerType; 4] = [
static PRIORITY_LIST: [SchedulerType; 5] = [
SchedulerType::PartitionedEDF(0, 0),
SchedulerType::GEDF(0),
SchedulerType::PrioritizedFIFO(0),
SchedulerType::PrioritizedRR(0),
SchedulerType::Panicked,
];

/// Return the number of partitioned schedulers in `PRIORITY_LIST`.
/// Update this function if you add a new partitioned scheduler to `PRIORITY_LIST`.
const fn get_num_partitioned_schedulers() -> usize {
let mut count = 0;
while count < PRIORITY_LIST.len() {
if matches!(PRIORITY_LIST[count], SchedulerType::PartitionedEDF(_, _)) {
count += 1;
} else {
break;
}
}
count
}

/// For exclusion execution of `wake_task` and `get_next` across all schedulers.
/// In order to resolve priority inversion in multiple priority-based schedulers,
/// the decision to preempt, dequeuing, enqueuing, and updating of RUNNING must be executed exclusively.
Expand All @@ -139,18 +172,32 @@ pub(crate) trait Scheduler {
/// Get the next executable task.
#[inline]
pub(crate) fn get_next_task(execution_ensured: bool) -> Option<Arc<Task>> {
let cpu_id = awkernel_lib::cpu::cpu_id();

let mut node = MCSNode::new();
let _guard = GLOBAL_WAKE_GET_MUTEX.lock(&mut node);

let task = PRIORITY_LIST
.iter()
.find_map(|&scheduler_type| get_scheduler(scheduler_type).get_next(execution_ensured));
let num_partitioned_tasks =
crate::task::NUM_PARTITIONED_TASKS_IN_QUEUE[cpu_id].load(Ordering::Relaxed);

if task.is_some() {
crate::task::NUM_TASK_IN_QUEUE.fetch_sub(1, Ordering::Relaxed);
}
if num_partitioned_tasks > 0 {
let task = PRIORITY_LIST[..get_num_partitioned_schedulers()]
.iter()
.find_map(|&scheduler_type| get_scheduler(scheduler_type).get_next(execution_ensured));

// Decrement is handled by PartitionedTask::Drop inside get_next().
task
} else {
let task = PRIORITY_LIST
.iter()
.find_map(|&scheduler_type| get_scheduler(scheduler_type).get_next(execution_ensured));

task
if task.is_some() {
crate::task::NUM_TASK_IN_QUEUE.fetch_sub(1, Ordering::Relaxed);
}

task
}
}

/// Get a scheduler.
Expand All @@ -159,6 +206,7 @@ pub(crate) fn get_scheduler(sched_type: SchedulerType) -> &'static dyn Scheduler
SchedulerType::PrioritizedFIFO(_) => &prioritized_fifo::SCHEDULER,
SchedulerType::PrioritizedRR(_) => &prioritized_rr::SCHEDULER,
SchedulerType::GEDF(_) => &gedf::SCHEDULER,
SchedulerType::PartitionedEDF(_, _) => &partitioned_edf::SCHEDULER,
SchedulerType::Panicked => &panicked::SCHEDULER,
}
}
Expand All @@ -174,6 +222,72 @@ pub const fn get_priority(sched_type: SchedulerType) -> u8 {
panic!("Scheduler type not registered in PRIORITY_LIST or equals()")
}

/// RAII wrapper representing one slot in `NUM_PARTITIONED_TASKS_IN_QUEUE[cpu_id]`.
///
/// Constructing a `PartitionedTask` increments the counter for `cpu_id`.
/// The counter is decremented exactly once — either via [`take`] (explicit
/// ownership transfer) or via `Drop` (e.g. when a terminated task is
/// discarded). If `take` has already been called, `Drop` is a no-op.
///
/// [`take`]: PartitionedTask::take
pub(crate) struct PartitionedTask<T> {
inner: Option<T>,
cpu_id: usize,
}

impl<T> PartitionedTask<T> {
pub(crate) fn new(inner: T, cpu_id: usize) -> Self {
crate::task::NUM_PARTITIONED_TASKS_IN_QUEUE[cpu_id].fetch_add(1, Ordering::Relaxed);
Self {
inner: Some(inner),
cpu_id,
}
}

/// Take the inner value and decrement the counter.
///
/// Returns `None` if the value has already been taken.
/// After this call `Drop` will be a no-op.
pub(crate) fn take(&mut self) -> Option<T> {
let val = self.inner.take();
if val.is_some() {
crate::task::NUM_PARTITIONED_TASKS_IN_QUEUE[self.cpu_id]
.fetch_sub(1, Ordering::Relaxed);
}
val
}
}

impl<T> Drop for PartitionedTask<T> {
fn drop(&mut self) {
// Decrement only if take() has not been called yet.
if self.inner.is_some() {
crate::task::NUM_PARTITIONED_TASKS_IN_QUEUE[self.cpu_id]
.fetch_sub(1, Ordering::Relaxed);
}
}
}

impl<T: PartialEq> PartialEq for PartitionedTask<T> {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner
}
}

impl<T: Eq> Eq for PartitionedTask<T> {}

impl<T: PartialOrd> PartialOrd for PartitionedTask<T> {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
self.inner.partial_cmp(&other.inner)
}
}

impl<T: Ord> Ord for PartitionedTask<T> {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
self.inner.cmp(&other.inner)
}
}

/// Maintain sleeping tasks by a delta list.
struct SleepingTasks {
delta_list: DeltaList<Box<dyn FnOnce() + Send>>,
Expand Down
Loading
Loading