Skip to content

Commit 024bd60

Browse files
authored
task: improve the example of poll_proceed (#7586)
Signed-off-by: ADD-SP <[email protected]>
1 parent 7127e25 commit 024bd60

File tree

1 file changed

+22
-58
lines changed

1 file changed

+22
-58
lines changed

tokio/src/task/coop/mod.rs

Lines changed: 22 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -306,74 +306,38 @@ cfg_coop! {
306306
///
307307
/// # Examples
308308
///
309-
/// This example shows a simple countdown latch that uses [`poll_proceed`] to participate in
310-
/// cooperative scheduling.
309+
/// This example wraps the `futures::channel::mpsc::UnboundedReceiver` to
310+
/// cooperate with the Tokio scheduler. Each time a value is received, task budget
311+
/// is consumed. If no budget is available, the task yields to the scheduler.
311312
///
312313
/// ```
313-
/// use std::future::{Future};
314314
/// use std::pin::Pin;
315-
/// use std::task::{ready, Context, Poll, Waker};
315+
/// use std::task::{ready, Context, Poll};
316316
/// use tokio::task::coop;
317+
/// use futures::stream::{Stream, StreamExt};
318+
/// use futures::channel::mpsc::UnboundedReceiver;
317319
///
318-
/// struct CountdownLatch<T> {
319-
/// counter: usize,
320-
/// value: Option<T>,
321-
/// waker: Option<Waker>
320+
/// struct CoopUnboundedReceiver<T> {
321+
/// receiver: UnboundedReceiver<T>,
322322
/// }
323323
///
324-
/// impl<T> CountdownLatch<T> {
325-
/// fn new(value: T, count: usize) -> Self {
326-
/// CountdownLatch {
327-
/// counter: count,
328-
/// value: Some(value),
329-
/// waker: None
330-
/// }
331-
/// }
332-
/// fn count_down(&mut self) {
333-
/// if self.counter <= 0 {
334-
/// return;
335-
/// }
336-
///
337-
/// self.counter -= 1;
338-
/// if self.counter == 0 {
339-
/// if let Some(w) = self.waker.take() {
340-
/// w.wake();
341-
/// }
342-
/// }
343-
/// }
344-
/// }
345-
///
346-
/// impl<T> Future for CountdownLatch<T> {
347-
/// type Output = T;
348-
///
349-
/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
350-
/// // `poll_proceed` checks with the runtime if this task is still allowed to proceed
351-
/// // with performing work.
352-
/// // If not, `Pending` is returned and `ready!` ensures this function returns.
353-
/// // If we are allowed to proceed, coop now represents the budget consumption
324+
/// impl<T> Stream for CoopUnboundedReceiver<T> {
325+
/// type Item = T;
326+
/// fn poll_next(
327+
/// mut self: Pin<&mut Self>,
328+
/// cx: &mut Context<'_>
329+
/// ) -> Poll<Option<T>> {
354330
/// let coop = ready!(coop::poll_proceed(cx));
355-
///
356-
/// // Get a mutable reference to the CountdownLatch
357-
/// let this = Pin::get_mut(self);
358-
///
359-
/// // Next we check if the latch is ready to release its value
360-
/// if this.counter == 0 {
361-
/// let t = this.value.take();
362-
/// // The latch made progress so call `made_progress` to ensure the budget
363-
/// // is not reverted.
364-
/// coop.made_progress();
365-
/// Poll::Ready(t.unwrap())
366-
/// } else {
367-
/// // If the latch is not ready so return pending and simply drop `coop`.
368-
/// // This will restore the budget making it available again to perform any
369-
/// // other work.
370-
/// this.waker = Some(cx.waker().clone());
371-
/// Poll::Pending
372-
/// }
331+
/// match self.receiver.poll_next_unpin(cx) {
332+
/// Poll::Ready(v) => {
333+
/// // We received a value, so consume budget.
334+
/// coop.made_progress();
335+
/// Poll::Ready(v)
336+
/// }
337+
/// Poll::Pending => Poll::Pending,
338+
/// }
373339
/// }
374340
/// }
375-
///
376-
/// impl<T> Unpin for CountdownLatch<T> {}
377341
/// ```
378342
#[inline]
379343
pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {

0 commit comments

Comments
 (0)