tokio/runtime/
coop.rs

1#![cfg_attr(not(feature = "full"), allow(dead_code))]
2
3//! Yield points for improved cooperative scheduling.
4//!
5//! Documentation for this can be found in the [`tokio::task`] module.
6//!
7//! [`tokio::task`]: crate::task.
8
9// ```ignore
10// # use tokio_stream::{Stream, StreamExt};
11// async fn drop_all<I: Stream + Unpin>(mut input: I) {
12//     while let Some(_) = input.next().await {
13//         tokio::coop::proceed().await;
14//     }
15// }
16// ```
17//
18// The `proceed` future will coordinate with the executor to make sure that
19// every so often control is yielded back to the executor so it can run other
20// tasks.
21//
22// # Placing yield points
23//
24// Voluntary yield points should be placed _after_ at least some work has been
25// done. If they are not, a future sufficiently deep in the task hierarchy may
26// end up _never_ getting to run because of the number of yield points that
27// inevitably appear before it is reached. In general, you will want yield
28// points to only appear in "leaf" futures -- those that do not themselves poll
29// other futures. By doing this, you avoid double-counting each iteration of
30// the outer future against the cooperating budget.
31
32use crate::runtime::context;
33
34/// Opaque type tracking the amount of "work" a task may still do before
35/// yielding back to the scheduler.
36#[derive(Debug, Copy, Clone)]
37pub(crate) struct Budget(Option<u8>);
38
39pub(crate) struct BudgetDecrement {
40    success: bool,
41    hit_zero: bool,
42}
43
44impl Budget {
45    /// Budget assigned to a task on each poll.
46    ///
47    /// The value itself is chosen somewhat arbitrarily. It needs to be high
48    /// enough to amortize wakeup and scheduling costs, but low enough that we
49    /// do not starve other tasks for too long. The value also needs to be high
50    /// enough that particularly deep tasks are able to do at least some useful
51    /// work at all.
52    ///
53    /// Note that as more yield points are added in the ecosystem, this value
54    /// will probably also have to be raised.
55    const fn initial() -> Budget {
56        Budget(Some(128))
57    }
58
59    /// Returns an unconstrained budget. Operations will not be limited.
60    pub(super) const fn unconstrained() -> Budget {
61        Budget(None)
62    }
63
64    fn has_remaining(self) -> bool {
65        self.0.map_or(true, |budget| budget > 0)
66    }
67}
68
69/// Runs the given closure with a cooperative task budget. When the function
70/// returns, the budget is reset to the value prior to calling the function.
71#[inline(always)]
72pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
73    with_budget(Budget::initial(), f)
74}
75
76/// Runs the given closure with an unconstrained task budget. When the function returns, the budget
77/// is reset to the value prior to calling the function.
78#[inline(always)]
79pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
80    with_budget(Budget::unconstrained(), f)
81}
82
83#[inline(always)]
84fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
85    struct ResetGuard {
86        prev: Budget,
87    }
88
89    impl Drop for ResetGuard {
90        fn drop(&mut self) {
91            let _ = context::budget(|cell| {
92                cell.set(self.prev);
93            });
94        }
95    }
96
97    #[allow(unused_variables)]
98    let maybe_guard = context::budget(|cell| {
99        let prev = cell.get();
100        cell.set(budget);
101
102        ResetGuard { prev }
103    });
104
105    // The function is called regardless even if the budget is not successfully
106    // set due to the thread-local being destroyed.
107    f()
108}
109
110#[inline(always)]
111pub(crate) fn has_budget_remaining() -> bool {
112    // If the current budget cannot be accessed due to the thread-local being
113    // shutdown, then we assume there is budget remaining.
114    context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
115}
116
117cfg_rt_multi_thread! {
118    /// Sets the current task's budget.
119    pub(crate) fn set(budget: Budget) {
120        let _ = context::budget(|cell| cell.set(budget));
121    }
122}
123
124cfg_rt! {
125    /// Forcibly removes the budgeting constraints early.
126    ///
127    /// Returns the remaining budget
128    pub(crate) fn stop() -> Budget {
129        context::budget(|cell| {
130            let prev = cell.get();
131            cell.set(Budget::unconstrained());
132            prev
133        }).unwrap_or(Budget::unconstrained())
134    }
135}
136
137cfg_coop! {
138    use pin_project_lite::pin_project;
139    use std::cell::Cell;
140    use std::future::Future;
141    use std::pin::Pin;
142    use std::task::{ready, Context, Poll};
143
144    #[must_use]
145    pub(crate) struct RestoreOnPending(Cell<Budget>);
146
147    impl RestoreOnPending {
148        pub(crate) fn made_progress(&self) {
149            self.0.set(Budget::unconstrained());
150        }
151    }
152
153    impl Drop for RestoreOnPending {
154        fn drop(&mut self) {
155            // Don't reset if budget was unconstrained or if we made progress.
156            // They are both represented as the remembered budget being unconstrained.
157            let budget = self.0.get();
158            if !budget.is_unconstrained() {
159                let _ = context::budget(|cell| {
160                    cell.set(budget);
161                });
162            }
163        }
164    }
165
166    /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
167    ///
168    /// When you call this method, the current budget is decremented. However, to ensure that
169    /// progress is made every time a task is polled, the budget is automatically restored to its
170    /// former value if the returned `RestoreOnPending` is dropped. It is the caller's
171    /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
172    /// that the budget empties appropriately.
173    ///
174    /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
175    /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
176    /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
177    /// that progress was made.
178    #[inline]
179    pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
180        context::budget(|cell| {
181            let mut budget = cell.get();
182
183            let decrement = budget.decrement();
184
185            if decrement.success {
186                let restore = RestoreOnPending(Cell::new(cell.get()));
187                cell.set(budget);
188
189                // avoid double counting
190                if decrement.hit_zero {
191                    inc_budget_forced_yield_count();
192                }
193
194                Poll::Ready(restore)
195            } else {
196                cx.waker().wake_by_ref();
197                Poll::Pending
198            }
199        }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
200    }
201
202    cfg_rt! {
203        cfg_unstable_metrics! {
204            #[inline(always)]
205            fn inc_budget_forced_yield_count() {
206                let _ = context::with_current(|handle| {
207                    handle.scheduler_metrics().inc_budget_forced_yield_count();
208                });
209            }
210        }
211
212        cfg_not_unstable_metrics! {
213            #[inline(always)]
214            fn inc_budget_forced_yield_count() {}
215        }
216    }
217
218    cfg_not_rt! {
219        #[inline(always)]
220        fn inc_budget_forced_yield_count() {}
221    }
222
223    impl Budget {
224        /// Decrements the budget. Returns `true` if successful. Decrementing fails
225        /// when there is not enough remaining budget.
226        fn decrement(&mut self) -> BudgetDecrement {
227            if let Some(num) = &mut self.0 {
228                if *num > 0 {
229                    *num -= 1;
230
231                    let hit_zero = *num == 0;
232
233                    BudgetDecrement { success: true, hit_zero }
234                } else {
235                    BudgetDecrement { success: false, hit_zero: false }
236                }
237            } else {
238                BudgetDecrement { success: true, hit_zero: false }
239            }
240        }
241
242        fn is_unconstrained(self) -> bool {
243            self.0.is_none()
244        }
245    }
246
247    pin_project! {
248        /// Future wrapper to ensure cooperative scheduling.
249        ///
250        /// When being polled `poll_proceed` is called before the inner future is polled to check
251        /// if the inner future has exceeded its budget. If the inner future resolves, this will
252        /// automatically call `RestoreOnPending::made_progress` before resolving this future with
253        /// the result of the inner one. If polling the inner future is pending, polling this future
254        /// type will also return a `Poll::Pending`.
255        #[must_use = "futures do nothing unless polled"]
256        pub(crate) struct Coop<F: Future> {
257            #[pin]
258            pub(crate) fut: F,
259        }
260    }
261
262    impl<F: Future> Future for Coop<F> {
263        type Output = F::Output;
264
265        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
266            let coop = ready!(poll_proceed(cx));
267            let me = self.project();
268            if let Poll::Ready(ret) = me.fut.poll(cx) {
269                coop.made_progress();
270                Poll::Ready(ret)
271            } else {
272                Poll::Pending
273            }
274        }
275    }
276
277    /// Run a future with a budget constraint for cooperative scheduling.
278    /// If the future exceeds its budget while being polled, control is yielded back to the
279    /// runtime.
280    #[inline]
281    pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> {
282        Coop { fut }
283    }
284}
285
286#[cfg(all(test, not(loom)))]
287mod test {
288    use super::*;
289
290    #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
291    use wasm_bindgen_test::wasm_bindgen_test as test;
292
293    fn get() -> Budget {
294        context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained())
295    }
296
297    #[test]
298    fn budgeting() {
299        use std::future::poll_fn;
300        use tokio_test::*;
301
302        assert!(get().0.is_none());
303
304        let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
305
306        assert!(get().0.is_none());
307        drop(coop);
308        assert!(get().0.is_none());
309
310        budget(|| {
311            assert_eq!(get().0, Budget::initial().0);
312
313            let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
314            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
315            drop(coop);
316            // we didn't make progress
317            assert_eq!(get().0, Budget::initial().0);
318
319            let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
320            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
321            coop.made_progress();
322            drop(coop);
323            // we _did_ make progress
324            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
325
326            let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
327            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
328            coop.made_progress();
329            drop(coop);
330            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
331
332            budget(|| {
333                assert_eq!(get().0, Budget::initial().0);
334
335                let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
336                assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
337                coop.made_progress();
338                drop(coop);
339                assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
340            });
341
342            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
343        });
344
345        assert!(get().0.is_none());
346
347        budget(|| {
348            let n = get().0.unwrap();
349
350            for _ in 0..n {
351                let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
352                coop.made_progress();
353            }
354
355            let mut task = task::spawn(poll_fn(|cx| {
356                let coop = std::task::ready!(poll_proceed(cx));
357                coop.made_progress();
358                Poll::Ready(())
359            }));
360
361            assert_pending!(task.poll());
362        });
363    }
364}