-
Notifications
You must be signed in to change notification settings - Fork 6
fix(perf): features for evaluation #678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4b34c9a
dda7c64
29e5173
9cabca9
9832297
9bac530
47de021
efb1f02
b1b921b
1b18749
3c112f9
1cfa9ed
5b5bcaf
4e8a8c3
60d50db
028ca6a
f6e2fcf
b75b75c
48f29f1
aed1c5c
f024138
3aca1f9
87742e5
5d30fc2
a1f0619
59b5354
b77385e
85d9042
c88633a
8495759
8e03ef4
a728ff8
6585af5
bc19c69
adfa4fd
377d1b1
ddd69c2
37cae77
3ff0804
d8e28cd
2de13a9
9a9d2da
14f92ca
968c058
efc54b6
64e87b6
12fb54a
0e3156a
bf7ba7b
5ff88eb
465f75e
74b6d27
f8589f5
160c9eb
2c08c1c
696a272
6545deb
1e85519
ef6f24e
dd39f3e
74fef12
9b76322
786398a
b54e541
50794a2
95c8fef
c5ed577
e83e3cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,3 +45,4 @@ no_preempt = [] | |
| spinlock = ["awkernel_lib/spinlock"] | ||
| clippy = [] | ||
| perf = [] | ||
| need-get-period = ["perf"] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,13 @@ use crate::{ | |
| time_interval::interval, | ||
| Attribute, MultipleReceiver, MultipleSender, VectorToPublishers, VectorToSubscribers, | ||
| }; | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| use crate::task::perf::{ | ||
| get_period_count, increment_period_count, subscribe_timestamp_at, | ||
| update_fin_recv_outer_timestamp_at, update_pre_send_outer_timestamp_at, | ||
| }; | ||
|
|
||
| use alloc::{ | ||
| borrow::Cow, | ||
| boxed::Box, | ||
|
|
@@ -926,10 +933,31 @@ where | |
| Args::create_subscribers(subscribe_topic_names, Attribute::default()); | ||
|
|
||
| loop { | ||
| let args: <<Args as VectorToSubscribers>::Subscribers as MultipleReceiver>::Item = | ||
| subscribers.recv_all().await; | ||
| let results = f(args); | ||
| publishers.send_all(results).await; | ||
| #[cfg(feature = "need-get-period")] | ||
| { | ||
| let (args, count_st): ( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The naming of
|
||
| <<Args as VectorToSubscribers>::Subscribers as MultipleReceiver>::Item, | ||
| u32, | ||
| ) = subscribers.recv_all_with_period().await; | ||
|
|
||
| // [end] pubsub communication latency | ||
| let end = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; | ||
| subscribe_timestamp_at(count_st as usize, end, 1, dag_info.node_id.clone()); | ||
|
|
||
| let results = f(args); | ||
| publishers | ||
| .send_all_with_meta(results, 1, count_st as usize, dag_info.node_id) | ||
|
nokosaaan marked this conversation as resolved.
|
||
| .await; | ||
| } | ||
|
|
||
| #[cfg(not(feature = "need-get-period"))] | ||
| { | ||
| let args: <<Args as VectorToSubscribers>::Subscribers as MultipleReceiver>::Item = | ||
| subscribers.recv_all().await; | ||
|
|
||
| let results = f(args); | ||
| publishers.send_all(results).await; | ||
| } | ||
| } | ||
| }; | ||
|
|
||
|
|
@@ -966,13 +994,31 @@ where | |
| Attribute::default(), | ||
| ); | ||
|
|
||
| let mut interval = interval(period); | ||
| let mut interval = interval(period, dag_info.dag_id); | ||
| // Consume the first tick here to start the loop's main body without an initial delay. | ||
| interval.tick().await; | ||
|
|
||
| loop { | ||
| let results = f(); | ||
| publishers.send_all(results).await; | ||
| #[cfg(feature = "need-get-period")] | ||
| { | ||
| let index = get_period_count(dag_info.dag_id) as usize; | ||
| if index != 0 { | ||
| // [start] cycle deviation index >= 1 | ||
| let release_time = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; | ||
| update_pre_send_outer_timestamp_at(index, release_time, dag_info.dag_id); | ||
| } | ||
| let results = f(); | ||
| publishers | ||
| .send_all_with_meta(results, 0, index, dag_info.node_id) | ||
| .await; | ||
| increment_period_count(dag_info.dag_id); | ||
| } | ||
|
nokosaaan marked this conversation as resolved.
|
||
|
|
||
| #[cfg(not(feature = "need-get-period"))] | ||
| { | ||
| let results = f(); | ||
| publishers.send_all(results).await; | ||
| } | ||
|
|
||
| #[cfg(feature = "perf")] | ||
| periodic_measure(); | ||
|
|
@@ -1006,8 +1052,29 @@ where | |
| Args::create_subscribers(subscribe_topic_names, Attribute::default()); | ||
|
|
||
| loop { | ||
| let args: <Args::Subscribers as MultipleReceiver>::Item = subscribers.recv_all().await; | ||
| f(args); | ||
| #[cfg(feature = "need-get-period")] | ||
| { | ||
| let (args, count_st): (<Args::Subscribers as MultipleReceiver>::Item, u32) = | ||
| subscribers.recv_all_with_period().await; | ||
|
|
||
| // [end] pubsub communication latency | ||
| let end = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; | ||
| subscribe_timestamp_at(count_st as usize, end, 2, dag_info.node_id.clone()); | ||
|
|
||
| let timenow = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; | ||
| if count_st != 0 { | ||
| update_fin_recv_outer_timestamp_at(count_st as usize, timenow, dag_info.dag_id); | ||
| } | ||
|
|
||
| f(args); | ||
| } | ||
|
|
||
| #[cfg(not(feature = "need-get-period"))] | ||
| { | ||
| let args: <Args::Subscribers as MultipleReceiver>::Item = | ||
| subscribers.recv_all().await; | ||
| f(args); | ||
| } | ||
| } | ||
| }; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,11 +52,16 @@ use core::{ | |
| use futures::Future; | ||
| use pin_project::pin_project; | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| use crate::task::perf::publish_timestamp_at; | ||
|
|
||
| /// Data and timestamp. | ||
| #[derive(Clone)] | ||
| pub struct Data<T> { | ||
| pub timestamp: awkernel_lib::time::Time, | ||
| pub data: T, | ||
| #[cfg(feature = "need-get-period")] | ||
| pub index: u32, | ||
| } | ||
|
|
||
| /// Publisher. | ||
|
|
@@ -260,6 +265,8 @@ struct Sender<'a, T: 'static + Send> { | |
| subscribers: VecDeque<ArcInner<T>>, | ||
| state: SenderState, | ||
| timestamp: awkernel_lib::time::Time, | ||
| #[cfg(feature = "need-get-period")] | ||
| index: u32, | ||
| } | ||
|
|
||
| enum SenderState { | ||
|
|
@@ -276,8 +283,16 @@ impl<'a, T: Send> Sender<'a, T> { | |
| subscribers: Default::default(), | ||
| state: SenderState::Start, | ||
| timestamp: awkernel_lib::time::Time::now(), | ||
| #[cfg(feature = "need-get-period")] | ||
| index: 0, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| } | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| pub(super) fn with_period(mut self, index: u32) -> Self { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| self.index = index; | ||
| self | ||
| } | ||
| } | ||
|
|
||
| impl<T> Future for Sender<'_, T> | ||
|
|
@@ -309,6 +324,8 @@ where | |
| if let Err(data) = guard.push(Data { | ||
| timestamp: awkernel_lib::time::Time::now(), | ||
| data: data.clone(), | ||
| #[cfg(feature = "need-get-period")] | ||
| index: *this.index, | ||
| }) { | ||
| // If the send buffer is full, then remove the oldest one and store again. | ||
| guard.pop(); | ||
|
|
@@ -342,6 +359,8 @@ where | |
| match inner.queue.push(Data { | ||
| timestamp: *this.timestamp, | ||
| data: data.clone(), | ||
| #[cfg(feature = "need-get-period")] | ||
| index: *this.index, | ||
| }) { | ||
| Ok(_) => { | ||
| // Wake the subscriber up. | ||
|
|
@@ -386,8 +405,94 @@ where | |
| sender.await; | ||
| r#yield().await; | ||
| } | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| pub async fn send_with_meta(&self, data: T, pub_id: u32, index: usize, node_id: u32) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| // [start] pubsub communication latency | ||
| let start = awkernel_lib::time::Time::now().uptime().as_nanos() as u64; | ||
| publish_timestamp_at(index, start, pub_id, node_id); | ||
| let period = match u32::try_from(index) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| Ok(period) => period, | ||
| Err(_) => { | ||
| log::warn!( | ||
| "Period index {} exceeds u32::MAX; saturating period metadata", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "saturating period metadata" → "saturating period index" |
||
| index | ||
| ); | ||
| u32::MAX | ||
| } | ||
| }; | ||
| let sender = Sender::new(self, data).with_period(period); | ||
| sender.await; | ||
| r#yield().await; | ||
| } | ||
| } | ||
|
|
||
|
nokosaaan marked this conversation as resolved.
|
||
| #[cfg(all(test, feature = "need-get-period"))] | ||
| mod need_get_period_tests { | ||
| use super::*; | ||
| use core::{ | ||
| future::Future, | ||
| pin::Pin, | ||
| task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, | ||
| }; | ||
|
|
||
| fn block_on<F: Future>(future: F) -> F::Output { | ||
| fn raw_waker() -> RawWaker { | ||
| fn clone(_: *const ()) -> RawWaker { | ||
| raw_waker() | ||
| } | ||
| fn wake(_: *const ()) {} | ||
| fn wake_by_ref(_: *const ()) {} | ||
| fn drop(_: *const ()) {} | ||
|
|
||
| RawWaker::new( | ||
| core::ptr::null(), | ||
| &RawWakerVTable::new(clone, wake, wake_by_ref, drop), | ||
| ) | ||
| } | ||
|
|
||
| let waker = unsafe { Waker::from_raw(raw_waker()) }; | ||
| let mut future = Box::pin(future); | ||
|
|
||
| loop { | ||
| let mut context = Context::from_waker(&waker); | ||
| match Pin::as_mut(&mut future).poll(&mut context) { | ||
| Poll::Ready(output) => return output, | ||
| Poll::Pending => {} | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn send_with_meta_propagates_period_to_receiver() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. send_with_period_index_propagates_period_index_to_receiver |
||
| block_on(async { | ||
| let (publisher, subscriber) = create_pubsub::<u32>(Attribute::default()); | ||
| publisher.send_with_meta(42, 1, 7, 99).await; | ||
|
|
||
| let received = subscriber.recv().await; | ||
| assert_eq!(received.data, 42); | ||
| assert_eq!(received.index, 7); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. received.period_index |
||
| }); | ||
| } | ||
|
|
||
| #[test] | ||
| fn tuple_recv_all_with_period_returns_shared_period() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tuple_recv_all_with_period_index_returns_shared_period_index |
||
| block_on(async { | ||
| let (publisher1, subscriber1) = create_pubsub::<u32>(Attribute::default()); | ||
| let (publisher2, subscriber2) = create_pubsub::<u32>(Attribute::default()); | ||
|
|
||
| publisher1.send_with_meta(10, 11, 3, 21).await; | ||
| publisher2.send_with_meta(20, 12, 3, 22).await; | ||
|
|
||
| let ((value1, value2), period) = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. period_index |
||
| (subscriber1, subscriber2).recv_all_with_period().await; | ||
|
|
||
| assert_eq!(value1, 10); | ||
| assert_eq!(value2, 20); | ||
| assert_eq!(period, 3); | ||
| }); | ||
| } | ||
| } | ||
| /// Create an anonymous publisher and an anonymous subscriber. | ||
| /// This channel works as a channel of multiple producers and multiple consumers. | ||
| /// | ||
|
|
@@ -756,12 +861,24 @@ pub trait MultipleReceiver { | |
| type Item; | ||
|
|
||
| fn recv_all(&self) -> Pin<Box<dyn Future<Output = Self::Item> + Send + '_>>; | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| fn recv_all_with_period(&self) -> Pin<Box<dyn Future<Output = (Self::Item, u32)> + Send + '_>>; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. recv_all_with_period_index |
||
| } | ||
|
|
||
| pub trait MultipleSender { | ||
| type Item; | ||
|
|
||
| fn send_all(&self, item: Self::Item) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>; | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| fn send_all_with_meta( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. send_all_with_period_index |
||
| &self, | ||
| item: Self::Item, | ||
| pub_id: u32, | ||
| index: usize, | ||
| node_id: u32, | ||
| ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>; | ||
|
nokosaaan marked this conversation as resolved.
|
||
| } | ||
|
nokosaaan marked this conversation as resolved.
|
||
| pub trait VectorToPublishers { | ||
| type Publishers: MultipleSender; | ||
|
|
@@ -834,6 +951,13 @@ macro_rules! impl_async_receiver_for_tuple { | |
| fn recv_all(&self) -> Pin<Box<dyn Future<Output = Self::Item> + Send + '_>> { | ||
| Box::pin(async move{}) | ||
| } | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| fn recv_all_with_period( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. recv_all_with_period_index |
||
| &self, | ||
| ) -> Pin<Box<dyn Future<Output = (Self::Item, u32)> + Send + '_>> { | ||
| Box::pin(async move { ((), 0) }) | ||
| } | ||
| } | ||
|
|
||
| impl MultipleSender for () { | ||
|
|
@@ -842,6 +966,17 @@ macro_rules! impl_async_receiver_for_tuple { | |
| fn send_all(&self, _item: Self::Item) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> { | ||
| Box::pin(async move{}) | ||
| } | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| fn send_all_with_meta( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. send_all_with_period_index |
||
| &self, | ||
| _item: Self::Item, | ||
| _pub_id: u32, | ||
| _index: usize, | ||
| _node_id: u32, | ||
| ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> { | ||
| Box::pin(async move{}) | ||
| } | ||
| } | ||
| }; | ||
| ($(($T:ident, $idx:tt, $idx2:tt)),+) => { | ||
|
|
@@ -854,6 +989,34 @@ macro_rules! impl_async_receiver_for_tuple { | |
| ($($idx.recv().await.data,)+) | ||
| }) | ||
| } | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| fn recv_all_with_period( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. recv_all_with_period_index |
||
| &self, | ||
| ) -> Pin<Box<dyn Future<Output = (Self::Item, u32)> + Send + '_>> { | ||
| let ($($idx,)+) = self; | ||
| Box::pin(async move { | ||
| let mut period: Option<u32> = None; | ||
| $( | ||
| let item = $idx.recv().await; | ||
| match period { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. period_index |
||
| Some(expected) => { | ||
| assert!( | ||
| expected == item.index, | ||
| "recv_all_with_period received mismatched periods: expected {}, got {}", | ||
| expected, | ||
| item.index | ||
| ); | ||
| } | ||
| None => { | ||
| period = Some(item.index); | ||
| } | ||
| } | ||
| let $idx2 = item.data; | ||
| )+ | ||
| (($($idx2,)+), period.expect("recv_all_with_period requires at least one subscriber")) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| impl<$($T: Clone + Sync + Send + 'static),+> MultipleSender for ($(Publisher<$T>,)+) { | ||
|
|
@@ -868,6 +1031,23 @@ macro_rules! impl_async_receiver_for_tuple { | |
| )+ | ||
| }) | ||
| } | ||
|
|
||
| #[cfg(feature = "need-get-period")] | ||
| fn send_all_with_meta( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. send_all_with_period_index |
||
| &self, | ||
| item: Self::Item, | ||
| pub_id: u32, | ||
| index: usize, | ||
| node_id: u32, | ||
| ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> { | ||
| let ($($idx,)+) = self; | ||
| let ($($idx2,)+) = item; | ||
| Box::pin(async move { | ||
| $( | ||
| $idx.send_with_meta($idx2, pub_id, index, node_id).await; | ||
| )+ | ||
| }) | ||
| } | ||
| } | ||
| }; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the name
need-get-periodsuggests that weneed to retrieve the duration, what is actually being sent and received is not the duration but the index.Some more accurate name candidates might include:
I believe these would better reflect the actual functionality.