tokio/task/
local.rs

1//! Runs `!Send` futures on the current thread.
2use crate::loom::cell::UnsafeCell;
3use crate::loom::sync::{Arc, Mutex};
4#[cfg(tokio_unstable)]
5use crate::runtime;
6use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task, TaskHarnessScheduleHooks};
7use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD};
8use crate::sync::AtomicWaker;
9use crate::util::trace::SpawnMeta;
10use crate::util::RcCell;
11
12use std::cell::Cell;
13use std::collections::VecDeque;
14use std::fmt;
15use std::future::Future;
16use std::marker::PhantomData;
17use std::mem;
18use std::pin::Pin;
19use std::rc::Rc;
20use std::task::Poll;
21
22use pin_project_lite::pin_project;
23
24cfg_rt! {
25    /// A set of tasks which are executed on the same thread.
26    ///
27    /// In some cases, it is necessary to run one or more futures that do not
28    /// implement [`Send`] and thus are unsafe to send between threads. In these
29    /// cases, a [local task set] may be used to schedule one or more `!Send`
30    /// futures to run together on the same thread.
31    ///
32    /// For example, the following code will not compile:
33    ///
34    /// ```rust,compile_fail
35    /// use std::rc::Rc;
36    ///
37    /// #[tokio::main]
38    /// async fn main() {
39    ///     // `Rc` does not implement `Send`, and thus may not be sent between
40    ///     // threads safely.
41    ///     let nonsend_data = Rc::new("my nonsend data...");
42    ///
43    ///     let nonsend_data = nonsend_data.clone();
44    ///     // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
45    ///     // Since `tokio::spawn` requires the spawned future to implement `Send`, this
46    ///     // will not compile.
47    ///     tokio::spawn(async move {
48    ///         println!("{}", nonsend_data);
49    ///         // ...
50    ///     }).await.unwrap();
51    /// }
52    /// ```
53    ///
54    /// # Use with `run_until`
55    ///
56    /// To spawn `!Send` futures, we can use a local task set to schedule them
57    /// on the thread calling [`Runtime::block_on`]. When running inside of the
58    /// local task set, we can use [`task::spawn_local`], which can spawn
59    /// `!Send` futures. For example:
60    ///
61    /// ```rust
62    /// use std::rc::Rc;
63    /// use tokio::task;
64    ///
65    /// #[tokio::main]
66    /// async fn main() {
67    ///     let nonsend_data = Rc::new("my nonsend data...");
68    ///
69    ///     // Construct a local task set that can run `!Send` futures.
70    ///     let local = task::LocalSet::new();
71    ///
72    ///     // Run the local task set.
73    ///     local.run_until(async move {
74    ///         let nonsend_data = nonsend_data.clone();
75    ///         // `spawn_local` ensures that the future is spawned on the local
76    ///         // task set.
77    ///         task::spawn_local(async move {
78    ///             println!("{}", nonsend_data);
79    ///             // ...
80    ///         }).await.unwrap();
81    ///     }).await;
82    /// }
83    /// ```
84    /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
85    /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
86    /// cannot be used inside a task spawned with `tokio::spawn`.
87    ///
88    /// ## Awaiting a `LocalSet`
89    ///
90    /// Additionally, a `LocalSet` itself implements `Future`, completing when
91    /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
92    /// several futures on a `LocalSet` and drive the whole set until they
93    /// complete. For example,
94    ///
95    /// ```rust
96    /// use tokio::{task, time};
97    /// use std::rc::Rc;
98    ///
99    /// #[tokio::main]
100    /// async fn main() {
101    ///     let nonsend_data = Rc::new("world");
102    ///     let local = task::LocalSet::new();
103    ///
104    ///     let nonsend_data2 = nonsend_data.clone();
105    ///     local.spawn_local(async move {
106    ///         // ...
107    ///         println!("hello {}", nonsend_data2)
108    ///     });
109    ///
110    ///     local.spawn_local(async move {
111    ///         time::sleep(time::Duration::from_millis(100)).await;
112    ///         println!("goodbye {}", nonsend_data)
113    ///     });
114    ///
115    ///     // ...
116    ///
117    ///     local.await;
118    /// }
119    /// ```
120    /// **Note:** Awaiting a `LocalSet` can only be done inside
121    /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
122    /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
123    /// `tokio::spawn`.
124    ///
125    /// ## Use inside `tokio::spawn`
126    ///
127    /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
128    /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
129    /// something else. The solution is to create the `LocalSet` somewhere else,
130    /// and communicate with it using an [`mpsc`] channel.
131    ///
132    /// The following example puts the `LocalSet` inside a new thread.
133    /// ```
134    /// use tokio::runtime::Builder;
135    /// use tokio::sync::{mpsc, oneshot};
136    /// use tokio::task::LocalSet;
137    ///
138    /// // This struct describes the task you want to spawn. Here we include
139    /// // some simple examples. The oneshot channel allows sending a response
140    /// // to the spawner.
141    /// #[derive(Debug)]
142    /// enum Task {
143    ///     PrintNumber(u32),
144    ///     AddOne(u32, oneshot::Sender<u32>),
145    /// }
146    ///
147    /// #[derive(Clone)]
148    /// struct LocalSpawner {
149    ///    send: mpsc::UnboundedSender<Task>,
150    /// }
151    ///
152    /// impl LocalSpawner {
153    ///     pub fn new() -> Self {
154    ///         let (send, mut recv) = mpsc::unbounded_channel();
155    ///
156    ///         let rt = Builder::new_current_thread()
157    ///             .enable_all()
158    ///             .build()
159    ///             .unwrap();
160    ///
161    ///         std::thread::spawn(move || {
162    ///             let local = LocalSet::new();
163    ///
164    ///             local.spawn_local(async move {
165    ///                 while let Some(new_task) = recv.recv().await {
166    ///                     tokio::task::spawn_local(run_task(new_task));
167    ///                 }
168    ///                 // If the while loop returns, then all the LocalSpawner
169    ///                 // objects have been dropped.
170    ///             });
171    ///
172    ///             // This will return once all senders are dropped and all
173    ///             // spawned tasks have returned.
174    ///             rt.block_on(local);
175    ///         });
176    ///
177    ///         Self {
178    ///             send,
179    ///         }
180    ///     }
181    ///
182    ///     pub fn spawn(&self, task: Task) {
183    ///         self.send.send(task).expect("Thread with LocalSet has shut down.");
184    ///     }
185    /// }
186    ///
187    /// // This task may do !Send stuff. We use printing a number as an example,
188    /// // but it could be anything.
189    /// //
190    /// // The Task struct is an enum to support spawning many different kinds
191    /// // of operations.
192    /// async fn run_task(task: Task) {
193    ///     match task {
194    ///         Task::PrintNumber(n) => {
195    ///             println!("{}", n);
196    ///         },
197    ///         Task::AddOne(n, response) => {
198    ///             // We ignore failures to send the response.
199    ///             let _ = response.send(n + 1);
200    ///         },
201    ///     }
202    /// }
203    ///
204    /// #[tokio::main]
205    /// async fn main() {
206    ///     let spawner = LocalSpawner::new();
207    ///
208    ///     let (send, response) = oneshot::channel();
209    ///     spawner.spawn(Task::AddOne(10, send));
210    ///     let eleven = response.await.unwrap();
211    ///     assert_eq!(eleven, 11);
212    /// }
213    /// ```
214    ///
215    /// [`Send`]: trait@std::marker::Send
216    /// [local task set]: struct@LocalSet
217    /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
218    /// [`task::spawn_local`]: fn@spawn_local
219    /// [`mpsc`]: mod@crate::sync::mpsc
220    pub struct LocalSet {
221        /// Current scheduler tick.
222        tick: Cell<u8>,
223
224        /// State available from thread-local.
225        context: Rc<Context>,
226
227        /// This type should not be Send.
228        _not_send: PhantomData<*const ()>,
229    }
230}
231
232/// State available from the thread-local.
233struct Context {
234    /// State shared between threads.
235    shared: Arc<Shared>,
236
237    /// True if a task panicked without being handled and the local set is
238    /// configured to shutdown on unhandled panic.
239    unhandled_panic: Cell<bool>,
240}
241
242/// `LocalSet` state shared between threads.
243struct Shared {
244    /// # Safety
245    ///
246    /// This field must *only* be accessed from the thread that owns the
247    /// `LocalSet` (i.e., `Thread::current().id() == owner`).
248    local_state: LocalState,
249
250    /// Remote run queue sender.
251    queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
252
253    /// Wake the `LocalSet` task.
254    waker: AtomicWaker,
255
256    /// How to respond to unhandled task panics.
257    #[cfg(tokio_unstable)]
258    pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
259}
260
261/// Tracks the `LocalSet` state that must only be accessed from the thread that
262/// created the `LocalSet`.
263struct LocalState {
264    /// The `ThreadId` of the thread that owns the `LocalSet`.
265    owner: ThreadId,
266
267    /// Local run queue sender and receiver.
268    local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
269
270    /// Collection of all active tasks spawned onto this executor.
271    owned: LocalOwnedTasks<Arc<Shared>>,
272}
273
274pin_project! {
275    #[derive(Debug)]
276    struct RunUntil<'a, F> {
277        local_set: &'a LocalSet,
278        #[pin]
279        future: F,
280    }
281}
282
283tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
284    ctx: RcCell::new(),
285    wake_on_schedule: Cell::new(false),
286} });
287
288struct LocalData {
289    ctx: RcCell<Context>,
290    wake_on_schedule: Cell<bool>,
291}
292
293impl LocalData {
294    /// Should be called except when we call `LocalSet::enter`.
295    /// Especially when we poll a `LocalSet`.
296    #[must_use = "dropping this guard will reset the entered state"]
297    fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
298        let ctx = self.ctx.replace(Some(ctx));
299        let wake_on_schedule = self.wake_on_schedule.replace(false);
300        LocalDataEnterGuard {
301            local_data_ref: self,
302            ctx,
303            wake_on_schedule,
304        }
305    }
306}
307
308/// A guard for `LocalData::enter()`
309struct LocalDataEnterGuard<'a> {
310    local_data_ref: &'a LocalData,
311    ctx: Option<Rc<Context>>,
312    wake_on_schedule: bool,
313}
314
315impl<'a> Drop for LocalDataEnterGuard<'a> {
316    fn drop(&mut self) {
317        self.local_data_ref.ctx.set(self.ctx.take());
318        self.local_data_ref
319            .wake_on_schedule
320            .set(self.wake_on_schedule)
321    }
322}
323
324cfg_rt! {
325    /// Spawns a `!Send` future on the current [`LocalSet`] or [`LocalRuntime`].
326    ///
327    /// The spawned future will run on the same thread that called `spawn_local`.
328    ///
329    /// The provided future will start running in the background immediately
330    /// when `spawn_local` is called, even if you don't await the returned
331    /// `JoinHandle`.
332    ///
333    /// # Panics
334    ///
335    /// This function panics if called outside of a [`LocalSet`].
336    ///
337    /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
338    /// resulting new task will _not_ be inside the `LocalSet`, so you must use
339    /// `spawn_local` if you want to stay within the `LocalSet`.
340    ///
341    /// # Examples
342    ///
343    /// ```rust
344    /// use std::rc::Rc;
345    /// use tokio::task;
346    ///
347    /// #[tokio::main]
348    /// async fn main() {
349    ///     let nonsend_data = Rc::new("my nonsend data...");
350    ///
351    ///     let local = task::LocalSet::new();
352    ///
353    ///     // Run the local task set.
354    ///     local.run_until(async move {
355    ///         let nonsend_data = nonsend_data.clone();
356    ///         task::spawn_local(async move {
357    ///             println!("{}", nonsend_data);
358    ///             // ...
359    ///         }).await.unwrap();
360    ///     }).await;
361    /// }
362    /// ```
363    ///
364    /// [`LocalSet`]: struct@crate::task::LocalSet
365    /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
366    /// [`tokio::spawn`]: fn@crate::task::spawn
367    #[track_caller]
368    pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
369    where
370        F: Future + 'static,
371        F::Output: 'static,
372    {
373        let fut_size = std::mem::size_of::<F>();
374        if fut_size > BOX_FUTURE_THRESHOLD {
375            spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
376        } else {
377            spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size))
378        }
379    }
380
381
382    #[track_caller]
383    pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
384    where F: Future + 'static,
385          F::Output: 'static
386    {
387        use crate::runtime::{context, task};
388
389        let mut future = Some(future);
390
391        let res = context::with_current(|handle| {
392            Some(if handle.is_local() {
393                if !handle.can_spawn_local_on_local_runtime() {
394                    return None;
395                }
396
397                let future = future.take().unwrap();
398
399                #[cfg(all(
400                    tokio_unstable,
401                    tokio_taskdump,
402                    feature = "rt",
403                    target_os = "linux",
404                    any(
405                        target_arch = "aarch64",
406                        target_arch = "x86",
407                        target_arch = "x86_64"
408                    )
409                ))]
410                let future = task::trace::Trace::root(future);
411                let id = task::Id::next();
412                let task = crate::util::trace::task(future, "task", meta, id.as_u64());
413
414                // safety: we have verified that this is a `LocalRuntime` owned by the current thread
415                unsafe { handle.spawn_local(task, id) }
416            } else {
417                match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
418                    None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"),
419                    Some(cx) => cx.spawn(future.take().unwrap(), meta)
420                }
421            })
422        });
423
424        match res {
425            Ok(None) => panic!("Local tasks can only be spawned on a LocalRuntime from the thread the runtime was created on"),
426            Ok(Some(join_handle)) => join_handle,
427            Err(_) => match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
428                None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"),
429                Some(cx) => cx.spawn(future.unwrap(), meta)
430            }
431        }
432    }
433}
434
435/// Initial queue capacity.
436const INITIAL_CAPACITY: usize = 64;
437
438/// Max number of tasks to poll per tick.
439const MAX_TASKS_PER_TICK: usize = 61;
440
441/// How often it check the remote queue first.
442const REMOTE_FIRST_INTERVAL: u8 = 31;
443
444/// Context guard for `LocalSet`
445pub struct LocalEnterGuard {
446    ctx: Option<Rc<Context>>,
447
448    /// Distinguishes whether the context was entered or being polled.
449    /// When we enter it, the value `wake_on_schedule` is set. In this case
450    /// `spawn_local` refers the context, whereas it is not being polled now.
451    wake_on_schedule: bool,
452}
453
454impl Drop for LocalEnterGuard {
455    fn drop(&mut self) {
456        CURRENT.with(
457            |LocalData {
458                 ctx,
459                 wake_on_schedule,
460             }| {
461                ctx.set(self.ctx.take());
462                wake_on_schedule.set(self.wake_on_schedule);
463            },
464        );
465    }
466}
467
468impl fmt::Debug for LocalEnterGuard {
469    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470        f.debug_struct("LocalEnterGuard").finish()
471    }
472}
473
474impl LocalSet {
475    /// Returns a new local task set.
476    pub fn new() -> LocalSet {
477        let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
478
479        LocalSet {
480            tick: Cell::new(0),
481            context: Rc::new(Context {
482                shared: Arc::new(Shared {
483                    local_state: LocalState {
484                        owner,
485                        owned: LocalOwnedTasks::new(),
486                        local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
487                    },
488                    queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
489                    waker: AtomicWaker::new(),
490                    #[cfg(tokio_unstable)]
491                    unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
492                }),
493                unhandled_panic: Cell::new(false),
494            }),
495            _not_send: PhantomData,
496        }
497    }
498
499    /// Enters the context of this `LocalSet`.
500    ///
501    /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
502    /// context you are inside.
503    ///
504    /// [`spawn_local`]: fn@crate::task::spawn_local
505    pub fn enter(&self) -> LocalEnterGuard {
506        CURRENT.with(
507            |LocalData {
508                 ctx,
509                 wake_on_schedule,
510                 ..
511             }| {
512                let ctx = ctx.replace(Some(self.context.clone()));
513                let wake_on_schedule = wake_on_schedule.replace(true);
514                LocalEnterGuard {
515                    ctx,
516                    wake_on_schedule,
517                }
518            },
519        )
520    }
521
522    /// Spawns a `!Send` task onto the local task set.
523    ///
524    /// This task is guaranteed to be run on the current thread.
525    ///
526    /// Unlike the free function [`spawn_local`], this method may be used to
527    /// spawn local tasks when the `LocalSet` is _not_ running. The provided
528    /// future will start running once the `LocalSet` is next started, even if
529    /// you don't await the returned `JoinHandle`.
530    ///
531    /// # Examples
532    ///
533    /// ```rust
534    /// use tokio::task;
535    ///
536    /// #[tokio::main]
537    /// async fn main() {
538    ///     let local = task::LocalSet::new();
539    ///
540    ///     // Spawn a future on the local set. This future will be run when
541    ///     // we call `run_until` to drive the task set.
542    ///     local.spawn_local(async {
543    ///        // ...
544    ///     });
545    ///
546    ///     // Run the local task set.
547    ///     local.run_until(async move {
548    ///         // ...
549    ///     }).await;
550    ///
551    ///     // When `run` finishes, we can spawn _more_ futures, which will
552    ///     // run in subsequent calls to `run_until`.
553    ///     local.spawn_local(async {
554    ///        // ...
555    ///     });
556    ///
557    ///     local.run_until(async move {
558    ///         // ...
559    ///     }).await;
560    /// }
561    /// ```
562    /// [`spawn_local`]: fn@spawn_local
563    #[track_caller]
564    pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
565    where
566        F: Future + 'static,
567        F::Output: 'static,
568    {
569        let fut_size = mem::size_of::<F>();
570        if fut_size > BOX_FUTURE_THRESHOLD {
571            self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
572        } else {
573            self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
574        }
575    }
576
577    /// Runs a future to completion on the provided runtime, driving any local
578    /// futures spawned on this task set on the current thread.
579    ///
580    /// This runs the given future on the runtime, blocking until it is
581    /// complete, and yielding its resolved result. Any tasks or timers which
582    /// the future spawns internally will be executed on the runtime. The future
583    /// may also call [`spawn_local`] to `spawn_local` additional local futures on the
584    /// current thread.
585    ///
586    /// This method should not be called from an asynchronous context.
587    ///
588    /// # Panics
589    ///
590    /// This function panics if the executor is at capacity, if the provided
591    /// future panics, or if called within an asynchronous execution context.
592    ///
593    /// # Notes
594    ///
595    /// Since this function internally calls [`Runtime::block_on`], and drives
596    /// futures in the local task set inside that call to `block_on`, the local
597    /// futures may not use [in-place blocking]. If a blocking call needs to be
598    /// issued from a local task, the [`spawn_blocking`] API may be used instead.
599    ///
600    /// For example, this will panic:
601    /// ```should_panic
602    /// use tokio::runtime::Runtime;
603    /// use tokio::task;
604    ///
605    /// let rt  = Runtime::new().unwrap();
606    /// let local = task::LocalSet::new();
607    /// local.block_on(&rt, async {
608    ///     let join = task::spawn_local(async {
609    ///         let blocking_result = task::block_in_place(|| {
610    ///             // ...
611    ///         });
612    ///         // ...
613    ///     });
614    ///     join.await.unwrap();
615    /// })
616    /// ```
617    /// This, however, will not panic:
618    /// ```
619    /// use tokio::runtime::Runtime;
620    /// use tokio::task;
621    ///
622    /// let rt  = Runtime::new().unwrap();
623    /// let local = task::LocalSet::new();
624    /// local.block_on(&rt, async {
625    ///     let join = task::spawn_local(async {
626    ///         let blocking_result = task::spawn_blocking(|| {
627    ///             // ...
628    ///         }).await;
629    ///         // ...
630    ///     });
631    ///     join.await.unwrap();
632    /// })
633    /// ```
634    ///
635    /// [`spawn_local`]: fn@spawn_local
636    /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
637    /// [in-place blocking]: fn@crate::task::block_in_place
638    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
639    #[track_caller]
640    #[cfg(feature = "rt")]
641    #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
642    pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
643    where
644        F: Future,
645    {
646        rt.block_on(self.run_until(future))
647    }
648
649    /// Runs a future to completion on the local set, returning its output.
650    ///
651    /// This returns a future that runs the given future with a local set,
652    /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
653    /// Any local futures spawned on the local set will be driven in the
654    /// background until the future passed to `run_until` completes. When the future
655    /// passed to `run_until` finishes, any local futures which have not completed
656    /// will remain on the local set, and will be driven on subsequent calls to
657    /// `run_until` or when [awaiting the local set] itself.
658    ///
659    /// # Cancel safety
660    ///
661    /// This method is cancel safe when `future` is cancel safe.
662    ///
663    /// # Examples
664    ///
665    /// ```rust
666    /// use tokio::task;
667    ///
668    /// #[tokio::main]
669    /// async fn main() {
670    ///     task::LocalSet::new().run_until(async {
671    ///         task::spawn_local(async move {
672    ///             // ...
673    ///         }).await.unwrap();
674    ///         // ...
675    ///     }).await;
676    /// }
677    /// ```
678    ///
679    /// [`spawn_local`]: fn@spawn_local
680    /// [awaiting the local set]: #awaiting-a-localset
681    pub async fn run_until<F>(&self, future: F) -> F::Output
682    where
683        F: Future,
684    {
685        let run_until = RunUntil {
686            future,
687            local_set: self,
688        };
689        run_until.await
690    }
691
692    #[track_caller]
693    pub(in crate::task) fn spawn_named<F>(
694        &self,
695        future: F,
696        meta: SpawnMeta<'_>,
697    ) -> JoinHandle<F::Output>
698    where
699        F: Future + 'static,
700        F::Output: 'static,
701    {
702        self.spawn_named_inner(future, meta)
703    }
704
705    #[track_caller]
706    fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
707    where
708        F: Future + 'static,
709        F::Output: 'static,
710    {
711        let handle = self.context.spawn(future, meta);
712
713        // Because a task was spawned from *outside* the `LocalSet`, wake the
714        // `LocalSet` future to execute the new task, if it hasn't been woken.
715        //
716        // Spawning via the free fn `spawn` does not require this, as it can
717        // only be called from *within* a future executing on the `LocalSet` —
718        // in that case, the `LocalSet` must already be awake.
719        self.context.shared.waker.wake();
720        handle
721    }
722
723    /// Ticks the scheduler, returning whether the local future needs to be
724    /// notified again.
725    fn tick(&self) -> bool {
726        for _ in 0..MAX_TASKS_PER_TICK {
727            // Make sure we didn't hit an unhandled panic
728            assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
729
730            match self.next_task() {
731                // Run the task
732                //
733                // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
734                // used. We are responsible for maintaining the invariant that
735                // `run_unchecked` is only called on threads that spawned the
736                // task initially. Because `LocalSet` itself is `!Send`, and
737                // `spawn_local` spawns into the `LocalSet` on the current
738                // thread, the invariant is maintained.
739                Some(task) => crate::runtime::coop::budget(|| task.run()),
740                // We have fully drained the queue of notified tasks, so the
741                // local future doesn't need to be notified again — it can wait
742                // until something else wakes a task in the local set.
743                None => return false,
744            }
745        }
746
747        true
748    }
749
750    fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
751        let tick = self.tick.get();
752        self.tick.set(tick.wrapping_add(1));
753
754        let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
755            self.context
756                .shared
757                .queue
758                .lock()
759                .as_mut()
760                .and_then(|queue| queue.pop_front())
761                .or_else(|| self.pop_local())
762        } else {
763            self.pop_local().or_else(|| {
764                self.context
765                    .shared
766                    .queue
767                    .lock()
768                    .as_mut()
769                    .and_then(VecDeque::pop_front)
770            })
771        };
772
773        task.map(|task| unsafe {
774            // Safety: because the `LocalSet` itself is `!Send`, we know we are
775            // on the same thread if we have access to the `LocalSet`, and can
776            // therefore access the local run queue.
777            self.context.shared.local_state.assert_owner(task)
778        })
779    }
780
781    fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
782        unsafe {
783            // Safety: because the `LocalSet` itself is `!Send`, we know we are
784            // on the same thread if we have access to the `LocalSet`, and can
785            // therefore access the local run queue.
786            self.context.shared.local_state.task_pop_front()
787        }
788    }
789
790    fn with<T>(&self, f: impl FnOnce() -> T) -> T {
791        CURRENT.with(|local_data| {
792            let _guard = local_data.enter(self.context.clone());
793            f()
794        })
795    }
796
797    /// This method is like `with`, but it just calls `f` without setting the thread-local if that
798    /// fails.
799    fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
800        let mut f = Some(f);
801
802        let res = CURRENT.try_with(|local_data| {
803            let _guard = local_data.enter(self.context.clone());
804            (f.take().unwrap())()
805        });
806
807        match res {
808            Ok(res) => res,
809            Err(_access_error) => (f.take().unwrap())(),
810        }
811    }
812}
813
814cfg_unstable! {
815    impl LocalSet {
816        /// Configure how the `LocalSet` responds to an unhandled panic on a
817        /// spawned task.
818        ///
819        /// By default, an unhandled panic (i.e. a panic not caught by
820        /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
821        /// execution. The panic is error value is forwarded to the task's
822        /// [`JoinHandle`] and all other spawned tasks continue running.
823        ///
824        /// The `unhandled_panic` option enables configuring this behavior.
825        ///
826        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
827        ///   spawned tasks have no impact on the `LocalSet`'s execution.
828        /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
829        ///   shutdown immediately when a spawned task panics even if that
830        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
831        ///   will immediately terminate and further calls to
832        ///   [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
833        ///
834        /// # Panics
835        ///
836        /// This method panics if called after the `LocalSet` has started
837        /// running.
838        ///
839        /// # Unstable
840        ///
841        /// This option is currently unstable and its implementation is
842        /// incomplete. The API may change or be removed in the future. See
843        /// tokio-rs/tokio#4516 for more details.
844        ///
845        /// # Examples
846        ///
847        /// The following demonstrates a `LocalSet` configured to shutdown on
848        /// panic. The first spawned task panics and results in the `LocalSet`
849        /// shutting down. The second spawned task never has a chance to
850        /// execute. The call to `run_until` will panic due to the runtime being
851        /// forcibly shutdown.
852        ///
853        /// ```should_panic
854        /// use tokio::runtime::UnhandledPanic;
855        ///
856        /// # #[tokio::main]
857        /// # async fn main() {
858        /// tokio::task::LocalSet::new()
859        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
860        ///     .run_until(async {
861        ///         tokio::task::spawn_local(async { panic!("boom"); });
862        ///         tokio::task::spawn_local(async {
863        ///             // This task never completes
864        ///         });
865        ///
866        ///         // Do some work, but `run_until` will panic before it completes
867        /// # loop { tokio::task::yield_now().await; }
868        ///     })
869        ///     .await;
870        /// # }
871        /// ```
872        ///
873        /// [`JoinHandle`]: struct@crate::task::JoinHandle
874        pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
875            // TODO: This should be set as a builder
876            Rc::get_mut(&mut self.context)
877                .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
878                .expect("Unhandled Panic behavior modified after starting LocalSet")
879                .unhandled_panic = behavior;
880            self
881        }
882
883        /// Returns the [`Id`] of the current `LocalSet` runtime.
884        ///
885        /// # Examples
886        ///
887        /// ```rust
888        /// use tokio::task;
889        ///
890        /// #[tokio::main]
891        /// async fn main() {
892        ///     let local_set = task::LocalSet::new();
893        ///     println!("Local set id: {}", local_set.id());
894        /// }
895        /// ```
896        ///
897        /// **Note**: This is an [unstable API][unstable]. The public API of this type
898        /// may break in 1.x releases. See [the documentation on unstable
899        /// features][unstable] for details.
900        ///
901        /// [unstable]: crate#unstable-features
902        /// [`Id`]: struct@crate::runtime::Id
903        pub fn id(&self) -> runtime::Id {
904            self.context.shared.local_state.owned.id.into()
905        }
906    }
907}
908
909impl fmt::Debug for LocalSet {
910    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
911        fmt.debug_struct("LocalSet").finish()
912    }
913}
914
915impl Future for LocalSet {
916    type Output = ();
917
918    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
919        // Register the waker before starting to work
920        self.context.shared.waker.register_by_ref(cx.waker());
921
922        if self.with(|| self.tick()) {
923            // If `tick` returns true, we need to notify the local future again:
924            // there are still tasks remaining in the run queue.
925            cx.waker().wake_by_ref();
926            Poll::Pending
927
928        // Safety: called from the thread that owns `LocalSet`. Because
929        // `LocalSet` is `!Send`, this is safe.
930        } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
931            // If the scheduler has no remaining futures, we're done!
932            Poll::Ready(())
933        } else {
934            // There are still futures in the local set, but we've polled all the
935            // futures in the run queue. Therefore, we can just return Pending
936            // since the remaining futures will be woken from somewhere else.
937            Poll::Pending
938        }
939    }
940}
941
942impl Default for LocalSet {
943    fn default() -> LocalSet {
944        LocalSet::new()
945    }
946}
947
948impl Drop for LocalSet {
949    fn drop(&mut self) {
950        self.with_if_possible(|| {
951            // Shut down all tasks in the LocalOwnedTasks and close it to
952            // prevent new tasks from ever being added.
953            unsafe {
954                // Safety: called from the thread that owns `LocalSet`
955                self.context.shared.local_state.close_and_shutdown_all();
956            }
957
958            // We already called shutdown on all tasks above, so there is no
959            // need to call shutdown.
960
961            // Safety: note that this *intentionally* bypasses the unsafe
962            // `Shared::local_queue()` method. This is in order to avoid the
963            // debug assertion that we are on the thread that owns the
964            // `LocalSet`, because on some systems (e.g. at least some macOS
965            // versions), attempting to get the current thread ID can panic due
966            // to the thread's local data that stores the thread ID being
967            // dropped *before* the `LocalSet`.
968            //
969            // Despite avoiding the assertion here, it is safe for us to access
970            // the local queue in `Drop`, because the `LocalSet` itself is
971            // `!Send`, so we can reasonably guarantee that it will not be
972            // `Drop`ped from another thread.
973            let local_queue = unsafe {
974                // Safety: called from the thread that owns `LocalSet`
975                self.context.shared.local_state.take_local_queue()
976            };
977            for task in local_queue {
978                drop(task);
979            }
980
981            // Take the queue from the Shared object to prevent pushing
982            // notifications to it in the future.
983            let queue = self.context.shared.queue.lock().take().unwrap();
984            for task in queue {
985                drop(task);
986            }
987
988            // Safety: called from the thread that owns `LocalSet`
989            assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
990        });
991    }
992}
993
994// === impl Context ===
995
996impl Context {
997    #[track_caller]
998    fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
999    where
1000        F: Future + 'static,
1001        F::Output: 'static,
1002    {
1003        let id = crate::runtime::task::Id::next();
1004        let future = crate::util::trace::task(future, "local", meta, id.as_u64());
1005
1006        // Safety: called from the thread that owns the `LocalSet`
1007        let (handle, notified) = {
1008            self.shared.local_state.assert_called_from_owner_thread();
1009            self.shared
1010                .local_state
1011                .owned
1012                .bind(future, self.shared.clone(), id)
1013        };
1014
1015        if let Some(notified) = notified {
1016            self.shared.schedule(notified);
1017        }
1018
1019        handle
1020    }
1021}
1022
1023// === impl LocalFuture ===
1024
1025impl<T: Future> Future for RunUntil<'_, T> {
1026    type Output = T::Output;
1027
1028    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1029        let me = self.project();
1030
1031        me.local_set.with(|| {
1032            me.local_set
1033                .context
1034                .shared
1035                .waker
1036                .register_by_ref(cx.waker());
1037
1038            let _no_blocking = crate::runtime::context::disallow_block_in_place();
1039            let f = me.future;
1040
1041            if let Poll::Ready(output) = f.poll(cx) {
1042                return Poll::Ready(output);
1043            }
1044
1045            if me.local_set.tick() {
1046                // If `tick` returns `true`, we need to notify the local future again:
1047                // there are still tasks remaining in the run queue.
1048                cx.waker().wake_by_ref();
1049            }
1050
1051            Poll::Pending
1052        })
1053    }
1054}
1055
1056impl Shared {
1057    /// Schedule the provided task on the scheduler.
1058    fn schedule(&self, task: task::Notified<Arc<Self>>) {
1059        CURRENT.with(|localdata| {
1060            match localdata.ctx.get() {
1061                // If the current `LocalSet` is being polled, we don't need to wake it.
1062                // When we `enter` it, then the value `wake_on_schedule` is set to be true.
1063                // In this case it is not being polled, so we need to wake it.
1064                Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
1065                    // Safety: if the current `LocalSet` context points to this
1066                    // `LocalSet`, then we are on the thread that owns it.
1067                    cx.shared.local_state.task_push_back(task);
1068                },
1069
1070                // We are on the thread that owns the `LocalSet`, so we can
1071                // wake to the local queue.
1072                _ if context::thread_id().ok() == Some(self.local_state.owner) => {
1073                    unsafe {
1074                        // Safety: we just checked that the thread ID matches
1075                        // the localset's owner, so this is safe.
1076                        self.local_state.task_push_back(task);
1077                    }
1078                    // We still have to wake the `LocalSet`, because it isn't
1079                    // currently being polled.
1080                    self.waker.wake();
1081                }
1082
1083                // We are *not* on the thread that owns the `LocalSet`, so we
1084                // have to wake to the remote queue.
1085                _ => {
1086                    // First, check whether the queue is still there (if not, the
1087                    // LocalSet is dropped). Then push to it if so, and if not,
1088                    // do nothing.
1089                    let mut lock = self.queue.lock();
1090
1091                    if let Some(queue) = lock.as_mut() {
1092                        queue.push_back(task);
1093                        drop(lock);
1094                        self.waker.wake();
1095                    }
1096                }
1097            }
1098        });
1099    }
1100
1101    fn ptr_eq(&self, other: &Shared) -> bool {
1102        std::ptr::eq(self, other)
1103    }
1104}
1105
1106// This is safe because (and only because) we *pinky pwomise* to never touch the
1107// local run queue except from the thread that owns the `LocalSet`.
1108unsafe impl Sync for Shared {}
1109
1110impl task::Schedule for Arc<Shared> {
1111    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
1112        // Safety, this is always called from the thread that owns `LocalSet`
1113        unsafe { self.local_state.task_remove(task) }
1114    }
1115
1116    fn schedule(&self, task: task::Notified<Self>) {
1117        Shared::schedule(self, task);
1118    }
1119
1120    // localset does not currently support task hooks
1121    fn hooks(&self) -> TaskHarnessScheduleHooks {
1122        TaskHarnessScheduleHooks {
1123            task_terminate_callback: None,
1124        }
1125    }
1126
1127    cfg_unstable! {
1128        fn unhandled_panic(&self) {
1129            use crate::runtime::UnhandledPanic;
1130
1131            match self.unhandled_panic {
1132                UnhandledPanic::Ignore => {
1133                    // Do nothing
1134                }
1135                UnhandledPanic::ShutdownRuntime => {
1136                    // This hook is only called from within the runtime, so
1137                    // `CURRENT` should match with `&self`, i.e. there is no
1138                    // opportunity for a nested scheduler to be called.
1139                    CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1140                        Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1141                            cx.unhandled_panic.set(true);
1142                            // Safety: this is always called from the thread that owns `LocalSet`
1143                            unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1144                        }
1145                        _ => unreachable!("runtime core not set in CURRENT thread-local"),
1146                    })
1147                }
1148            }
1149        }
1150    }
1151}
1152
1153impl LocalState {
1154    unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1155        // The caller ensures it is called from the same thread that owns
1156        // the LocalSet.
1157        self.assert_called_from_owner_thread();
1158
1159        self.local_queue.with_mut(|ptr| (*ptr).pop_front())
1160    }
1161
1162    unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1163        // The caller ensures it is called from the same thread that owns
1164        // the LocalSet.
1165        self.assert_called_from_owner_thread();
1166
1167        self.local_queue.with_mut(|ptr| (*ptr).push_back(task));
1168    }
1169
1170    unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1171        // The caller ensures it is called from the same thread that owns
1172        // the LocalSet.
1173        self.assert_called_from_owner_thread();
1174
1175        self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr)))
1176    }
1177
1178    unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1179        // The caller ensures it is called from the same thread that owns
1180        // the LocalSet.
1181        self.assert_called_from_owner_thread();
1182
1183        self.owned.remove(task)
1184    }
1185
1186    /// Returns true if the `LocalSet` does not have any spawned tasks
1187    unsafe fn owned_is_empty(&self) -> bool {
1188        // The caller ensures it is called from the same thread that owns
1189        // the LocalSet.
1190        self.assert_called_from_owner_thread();
1191
1192        self.owned.is_empty()
1193    }
1194
1195    unsafe fn assert_owner(
1196        &self,
1197        task: task::Notified<Arc<Shared>>,
1198    ) -> task::LocalNotified<Arc<Shared>> {
1199        // The caller ensures it is called from the same thread that owns
1200        // the LocalSet.
1201        self.assert_called_from_owner_thread();
1202
1203        self.owned.assert_owner(task)
1204    }
1205
1206    unsafe fn close_and_shutdown_all(&self) {
1207        // The caller ensures it is called from the same thread that owns
1208        // the LocalSet.
1209        self.assert_called_from_owner_thread();
1210
1211        self.owned.close_and_shutdown_all();
1212    }
1213
1214    #[track_caller]
1215    fn assert_called_from_owner_thread(&self) {
1216        // FreeBSD has some weirdness around thread-local destruction.
1217        // TODO: remove this hack when thread id is cleaned up
1218        #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1219        debug_assert!(
1220            // if we couldn't get the thread ID because we're dropping the local
1221            // data, skip the assertion --- the `Drop` impl is not going to be
1222            // called from another thread, because `LocalSet` is `!Send`
1223            context::thread_id()
1224                .map(|id| id == self.owner)
1225                .unwrap_or(true),
1226            "`LocalSet`'s local run queue must not be accessed by another thread!"
1227        );
1228    }
1229}
1230
1231// This is `Send` because it is stored in `Shared`. It is up to the caller to
1232// ensure they are on the same thread that owns the `LocalSet`.
1233unsafe impl Send for LocalState {}
1234
1235#[cfg(all(test, not(loom)))]
1236mod tests {
1237    use super::*;
1238
1239    // Does a `LocalSet` running on a current-thread runtime...basically work?
1240    //
1241    // This duplicates a test in `tests/task_local_set.rs`, but because this is
1242    // a lib test, it will run under Miri, so this is necessary to catch stacked
1243    // borrows violations in the `LocalSet` implementation.
1244    #[test]
1245    fn local_current_thread_scheduler() {
1246        let f = async {
1247            LocalSet::new()
1248                .run_until(async {
1249                    spawn_local(async {}).await.unwrap();
1250                })
1251                .await;
1252        };
1253        crate::runtime::Builder::new_current_thread()
1254            .build()
1255            .expect("rt")
1256            .block_on(f)
1257    }
1258
1259    // Tests that when a task on a `LocalSet` is woken by an io driver on the
1260    // same thread, the task is woken to the localset's local queue rather than
1261    // its remote queue.
1262    //
1263    // This test has to be defined in the `local.rs` file as a lib test, rather
1264    // than in `tests/`, because it makes assertions about the local set's
1265    // internal state.
1266    #[test]
1267    fn wakes_to_local_queue() {
1268        use super::*;
1269        use crate::sync::Notify;
1270        let rt = crate::runtime::Builder::new_current_thread()
1271            .build()
1272            .expect("rt");
1273        rt.block_on(async {
1274            let local = LocalSet::new();
1275            let notify = Arc::new(Notify::new());
1276            let task = local.spawn_local({
1277                let notify = notify.clone();
1278                async move {
1279                    notify.notified().await;
1280                }
1281            });
1282            let mut run_until = Box::pin(local.run_until(async move {
1283                task.await.unwrap();
1284            }));
1285
1286            // poll the run until future once
1287            std::future::poll_fn(|cx| {
1288                let _ = run_until.as_mut().poll(cx);
1289                Poll::Ready(())
1290            })
1291            .await;
1292
1293            notify.notify_one();
1294            let task = unsafe { local.context.shared.local_state.task_pop_front() };
1295            // TODO(eliza): it would be nice to be able to assert that this is
1296            // the local task.
1297            assert!(
1298                task.is_some(),
1299                "task should have been notified to the LocalSet's local queue"
1300            );
1301        })
1302    }
1303}