tokio/task/local.rs
1//! Runs `!Send` futures on the current thread.
2use crate::loom::cell::UnsafeCell;
3use crate::loom::sync::{Arc, Mutex};
4#[cfg(tokio_unstable)]
5use crate::runtime;
6use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task, TaskHarnessScheduleHooks};
7use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD};
8use crate::sync::AtomicWaker;
9use crate::util::trace::SpawnMeta;
10use crate::util::RcCell;
11
12use std::cell::Cell;
13use std::collections::VecDeque;
14use std::fmt;
15use std::future::Future;
16use std::marker::PhantomData;
17use std::mem;
18use std::pin::Pin;
19use std::rc::Rc;
20use std::task::Poll;
21
22use pin_project_lite::pin_project;
23
24cfg_rt! {
25 /// A set of tasks which are executed on the same thread.
26 ///
27 /// In some cases, it is necessary to run one or more futures that do not
28 /// implement [`Send`] and thus are unsafe to send between threads. In these
29 /// cases, a [local task set] may be used to schedule one or more `!Send`
30 /// futures to run together on the same thread.
31 ///
32 /// For example, the following code will not compile:
33 ///
34 /// ```rust,compile_fail
35 /// use std::rc::Rc;
36 ///
37 /// #[tokio::main]
38 /// async fn main() {
39 /// // `Rc` does not implement `Send`, and thus may not be sent between
40 /// // threads safely.
41 /// let nonsend_data = Rc::new("my nonsend data...");
42 ///
43 /// let nonsend_data = nonsend_data.clone();
44 /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
45 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
46 /// // will not compile.
47 /// tokio::spawn(async move {
48 /// println!("{}", nonsend_data);
49 /// // ...
50 /// }).await.unwrap();
51 /// }
52 /// ```
53 ///
54 /// # Use with `run_until`
55 ///
56 /// To spawn `!Send` futures, we can use a local task set to schedule them
57 /// on the thread calling [`Runtime::block_on`]. When running inside of the
58 /// local task set, we can use [`task::spawn_local`], which can spawn
59 /// `!Send` futures. For example:
60 ///
61 /// ```rust
62 /// use std::rc::Rc;
63 /// use tokio::task;
64 ///
65 /// #[tokio::main]
66 /// async fn main() {
67 /// let nonsend_data = Rc::new("my nonsend data...");
68 ///
69 /// // Construct a local task set that can run `!Send` futures.
70 /// let local = task::LocalSet::new();
71 ///
72 /// // Run the local task set.
73 /// local.run_until(async move {
74 /// let nonsend_data = nonsend_data.clone();
75 /// // `spawn_local` ensures that the future is spawned on the local
76 /// // task set.
77 /// task::spawn_local(async move {
78 /// println!("{}", nonsend_data);
79 /// // ...
80 /// }).await.unwrap();
81 /// }).await;
82 /// }
83 /// ```
84 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
85 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
86 /// cannot be used inside a task spawned with `tokio::spawn`.
87 ///
88 /// ## Awaiting a `LocalSet`
89 ///
90 /// Additionally, a `LocalSet` itself implements `Future`, completing when
91 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
92 /// several futures on a `LocalSet` and drive the whole set until they
93 /// complete. For example,
94 ///
95 /// ```rust
96 /// use tokio::{task, time};
97 /// use std::rc::Rc;
98 ///
99 /// #[tokio::main]
100 /// async fn main() {
101 /// let nonsend_data = Rc::new("world");
102 /// let local = task::LocalSet::new();
103 ///
104 /// let nonsend_data2 = nonsend_data.clone();
105 /// local.spawn_local(async move {
106 /// // ...
107 /// println!("hello {}", nonsend_data2)
108 /// });
109 ///
110 /// local.spawn_local(async move {
111 /// time::sleep(time::Duration::from_millis(100)).await;
112 /// println!("goodbye {}", nonsend_data)
113 /// });
114 ///
115 /// // ...
116 ///
117 /// local.await;
118 /// }
119 /// ```
120 /// **Note:** Awaiting a `LocalSet` can only be done inside
121 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
122 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
123 /// `tokio::spawn`.
124 ///
125 /// ## Use inside `tokio::spawn`
126 ///
127 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
128 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
129 /// something else. The solution is to create the `LocalSet` somewhere else,
130 /// and communicate with it using an [`mpsc`] channel.
131 ///
132 /// The following example puts the `LocalSet` inside a new thread.
133 /// ```
134 /// use tokio::runtime::Builder;
135 /// use tokio::sync::{mpsc, oneshot};
136 /// use tokio::task::LocalSet;
137 ///
138 /// // This struct describes the task you want to spawn. Here we include
139 /// // some simple examples. The oneshot channel allows sending a response
140 /// // to the spawner.
141 /// #[derive(Debug)]
142 /// enum Task {
143 /// PrintNumber(u32),
144 /// AddOne(u32, oneshot::Sender<u32>),
145 /// }
146 ///
147 /// #[derive(Clone)]
148 /// struct LocalSpawner {
149 /// send: mpsc::UnboundedSender<Task>,
150 /// }
151 ///
152 /// impl LocalSpawner {
153 /// pub fn new() -> Self {
154 /// let (send, mut recv) = mpsc::unbounded_channel();
155 ///
156 /// let rt = Builder::new_current_thread()
157 /// .enable_all()
158 /// .build()
159 /// .unwrap();
160 ///
161 /// std::thread::spawn(move || {
162 /// let local = LocalSet::new();
163 ///
164 /// local.spawn_local(async move {
165 /// while let Some(new_task) = recv.recv().await {
166 /// tokio::task::spawn_local(run_task(new_task));
167 /// }
168 /// // If the while loop returns, then all the LocalSpawner
169 /// // objects have been dropped.
170 /// });
171 ///
172 /// // This will return once all senders are dropped and all
173 /// // spawned tasks have returned.
174 /// rt.block_on(local);
175 /// });
176 ///
177 /// Self {
178 /// send,
179 /// }
180 /// }
181 ///
182 /// pub fn spawn(&self, task: Task) {
183 /// self.send.send(task).expect("Thread with LocalSet has shut down.");
184 /// }
185 /// }
186 ///
187 /// // This task may do !Send stuff. We use printing a number as an example,
188 /// // but it could be anything.
189 /// //
190 /// // The Task struct is an enum to support spawning many different kinds
191 /// // of operations.
192 /// async fn run_task(task: Task) {
193 /// match task {
194 /// Task::PrintNumber(n) => {
195 /// println!("{}", n);
196 /// },
197 /// Task::AddOne(n, response) => {
198 /// // We ignore failures to send the response.
199 /// let _ = response.send(n + 1);
200 /// },
201 /// }
202 /// }
203 ///
204 /// #[tokio::main]
205 /// async fn main() {
206 /// let spawner = LocalSpawner::new();
207 ///
208 /// let (send, response) = oneshot::channel();
209 /// spawner.spawn(Task::AddOne(10, send));
210 /// let eleven = response.await.unwrap();
211 /// assert_eq!(eleven, 11);
212 /// }
213 /// ```
214 ///
215 /// [`Send`]: trait@std::marker::Send
216 /// [local task set]: struct@LocalSet
217 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
218 /// [`task::spawn_local`]: fn@spawn_local
219 /// [`mpsc`]: mod@crate::sync::mpsc
220 pub struct LocalSet {
221 /// Current scheduler tick.
222 tick: Cell<u8>,
223
224 /// State available from thread-local.
225 context: Rc<Context>,
226
227 /// This type should not be Send.
228 _not_send: PhantomData<*const ()>,
229 }
230}
231
232/// State available from the thread-local.
233struct Context {
234 /// State shared between threads.
235 shared: Arc<Shared>,
236
237 /// True if a task panicked without being handled and the local set is
238 /// configured to shutdown on unhandled panic.
239 unhandled_panic: Cell<bool>,
240}
241
242/// `LocalSet` state shared between threads.
243struct Shared {
244 /// # Safety
245 ///
246 /// This field must *only* be accessed from the thread that owns the
247 /// `LocalSet` (i.e., `Thread::current().id() == owner`).
248 local_state: LocalState,
249
250 /// Remote run queue sender.
251 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
252
253 /// Wake the `LocalSet` task.
254 waker: AtomicWaker,
255
256 /// How to respond to unhandled task panics.
257 #[cfg(tokio_unstable)]
258 pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
259}
260
261/// Tracks the `LocalSet` state that must only be accessed from the thread that
262/// created the `LocalSet`.
263struct LocalState {
264 /// The `ThreadId` of the thread that owns the `LocalSet`.
265 owner: ThreadId,
266
267 /// Local run queue sender and receiver.
268 local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
269
270 /// Collection of all active tasks spawned onto this executor.
271 owned: LocalOwnedTasks<Arc<Shared>>,
272}
273
274pin_project! {
275 #[derive(Debug)]
276 struct RunUntil<'a, F> {
277 local_set: &'a LocalSet,
278 #[pin]
279 future: F,
280 }
281}
282
283tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
284 ctx: RcCell::new(),
285 wake_on_schedule: Cell::new(false),
286} });
287
288struct LocalData {
289 ctx: RcCell<Context>,
290 wake_on_schedule: Cell<bool>,
291}
292
293impl LocalData {
294 /// Should be called except when we call `LocalSet::enter`.
295 /// Especially when we poll a `LocalSet`.
296 #[must_use = "dropping this guard will reset the entered state"]
297 fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
298 let ctx = self.ctx.replace(Some(ctx));
299 let wake_on_schedule = self.wake_on_schedule.replace(false);
300 LocalDataEnterGuard {
301 local_data_ref: self,
302 ctx,
303 wake_on_schedule,
304 }
305 }
306}
307
308/// A guard for `LocalData::enter()`
309struct LocalDataEnterGuard<'a> {
310 local_data_ref: &'a LocalData,
311 ctx: Option<Rc<Context>>,
312 wake_on_schedule: bool,
313}
314
315impl<'a> Drop for LocalDataEnterGuard<'a> {
316 fn drop(&mut self) {
317 self.local_data_ref.ctx.set(self.ctx.take());
318 self.local_data_ref
319 .wake_on_schedule
320 .set(self.wake_on_schedule)
321 }
322}
323
324cfg_rt! {
325 /// Spawns a `!Send` future on the current [`LocalSet`] or [`LocalRuntime`].
326 ///
327 /// The spawned future will run on the same thread that called `spawn_local`.
328 ///
329 /// The provided future will start running in the background immediately
330 /// when `spawn_local` is called, even if you don't await the returned
331 /// `JoinHandle`.
332 ///
333 /// # Panics
334 ///
335 /// This function panics if called outside of a [`LocalSet`].
336 ///
337 /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
338 /// resulting new task will _not_ be inside the `LocalSet`, so you must use
339 /// `spawn_local` if you want to stay within the `LocalSet`.
340 ///
341 /// # Examples
342 ///
343 /// ```rust
344 /// use std::rc::Rc;
345 /// use tokio::task;
346 ///
347 /// #[tokio::main]
348 /// async fn main() {
349 /// let nonsend_data = Rc::new("my nonsend data...");
350 ///
351 /// let local = task::LocalSet::new();
352 ///
353 /// // Run the local task set.
354 /// local.run_until(async move {
355 /// let nonsend_data = nonsend_data.clone();
356 /// task::spawn_local(async move {
357 /// println!("{}", nonsend_data);
358 /// // ...
359 /// }).await.unwrap();
360 /// }).await;
361 /// }
362 /// ```
363 ///
364 /// [`LocalSet`]: struct@crate::task::LocalSet
365 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
366 /// [`tokio::spawn`]: fn@crate::task::spawn
367 #[track_caller]
368 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
369 where
370 F: Future + 'static,
371 F::Output: 'static,
372 {
373 let fut_size = std::mem::size_of::<F>();
374 if fut_size > BOX_FUTURE_THRESHOLD {
375 spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
376 } else {
377 spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size))
378 }
379 }
380
381
382 #[track_caller]
383 pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
384 where F: Future + 'static,
385 F::Output: 'static
386 {
387 use crate::runtime::{context, task};
388
389 let mut future = Some(future);
390
391 let res = context::with_current(|handle| {
392 Some(if handle.is_local() {
393 if !handle.can_spawn_local_on_local_runtime() {
394 return None;
395 }
396
397 let future = future.take().unwrap();
398
399 #[cfg(all(
400 tokio_unstable,
401 tokio_taskdump,
402 feature = "rt",
403 target_os = "linux",
404 any(
405 target_arch = "aarch64",
406 target_arch = "x86",
407 target_arch = "x86_64"
408 )
409 ))]
410 let future = task::trace::Trace::root(future);
411 let id = task::Id::next();
412 let task = crate::util::trace::task(future, "task", meta, id.as_u64());
413
414 // safety: we have verified that this is a `LocalRuntime` owned by the current thread
415 unsafe { handle.spawn_local(task, id) }
416 } else {
417 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
418 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"),
419 Some(cx) => cx.spawn(future.take().unwrap(), meta)
420 }
421 })
422 });
423
424 match res {
425 Ok(None) => panic!("Local tasks can only be spawned on a LocalRuntime from the thread the runtime was created on"),
426 Ok(Some(join_handle)) => join_handle,
427 Err(_) => match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
428 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"),
429 Some(cx) => cx.spawn(future.unwrap(), meta)
430 }
431 }
432 }
433}
434
435/// Initial queue capacity.
436const INITIAL_CAPACITY: usize = 64;
437
438/// Max number of tasks to poll per tick.
439const MAX_TASKS_PER_TICK: usize = 61;
440
441/// How often it check the remote queue first.
442const REMOTE_FIRST_INTERVAL: u8 = 31;
443
444/// Context guard for `LocalSet`
445pub struct LocalEnterGuard {
446 ctx: Option<Rc<Context>>,
447
448 /// Distinguishes whether the context was entered or being polled.
449 /// When we enter it, the value `wake_on_schedule` is set. In this case
450 /// `spawn_local` refers the context, whereas it is not being polled now.
451 wake_on_schedule: bool,
452}
453
454impl Drop for LocalEnterGuard {
455 fn drop(&mut self) {
456 CURRENT.with(
457 |LocalData {
458 ctx,
459 wake_on_schedule,
460 }| {
461 ctx.set(self.ctx.take());
462 wake_on_schedule.set(self.wake_on_schedule);
463 },
464 );
465 }
466}
467
468impl fmt::Debug for LocalEnterGuard {
469 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470 f.debug_struct("LocalEnterGuard").finish()
471 }
472}
473
474impl LocalSet {
475 /// Returns a new local task set.
476 pub fn new() -> LocalSet {
477 let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
478
479 LocalSet {
480 tick: Cell::new(0),
481 context: Rc::new(Context {
482 shared: Arc::new(Shared {
483 local_state: LocalState {
484 owner,
485 owned: LocalOwnedTasks::new(),
486 local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
487 },
488 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
489 waker: AtomicWaker::new(),
490 #[cfg(tokio_unstable)]
491 unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
492 }),
493 unhandled_panic: Cell::new(false),
494 }),
495 _not_send: PhantomData,
496 }
497 }
498
499 /// Enters the context of this `LocalSet`.
500 ///
501 /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
502 /// context you are inside.
503 ///
504 /// [`spawn_local`]: fn@crate::task::spawn_local
505 pub fn enter(&self) -> LocalEnterGuard {
506 CURRENT.with(
507 |LocalData {
508 ctx,
509 wake_on_schedule,
510 ..
511 }| {
512 let ctx = ctx.replace(Some(self.context.clone()));
513 let wake_on_schedule = wake_on_schedule.replace(true);
514 LocalEnterGuard {
515 ctx,
516 wake_on_schedule,
517 }
518 },
519 )
520 }
521
522 /// Spawns a `!Send` task onto the local task set.
523 ///
524 /// This task is guaranteed to be run on the current thread.
525 ///
526 /// Unlike the free function [`spawn_local`], this method may be used to
527 /// spawn local tasks when the `LocalSet` is _not_ running. The provided
528 /// future will start running once the `LocalSet` is next started, even if
529 /// you don't await the returned `JoinHandle`.
530 ///
531 /// # Examples
532 ///
533 /// ```rust
534 /// use tokio::task;
535 ///
536 /// #[tokio::main]
537 /// async fn main() {
538 /// let local = task::LocalSet::new();
539 ///
540 /// // Spawn a future on the local set. This future will be run when
541 /// // we call `run_until` to drive the task set.
542 /// local.spawn_local(async {
543 /// // ...
544 /// });
545 ///
546 /// // Run the local task set.
547 /// local.run_until(async move {
548 /// // ...
549 /// }).await;
550 ///
551 /// // When `run` finishes, we can spawn _more_ futures, which will
552 /// // run in subsequent calls to `run_until`.
553 /// local.spawn_local(async {
554 /// // ...
555 /// });
556 ///
557 /// local.run_until(async move {
558 /// // ...
559 /// }).await;
560 /// }
561 /// ```
562 /// [`spawn_local`]: fn@spawn_local
563 #[track_caller]
564 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
565 where
566 F: Future + 'static,
567 F::Output: 'static,
568 {
569 let fut_size = mem::size_of::<F>();
570 if fut_size > BOX_FUTURE_THRESHOLD {
571 self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
572 } else {
573 self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
574 }
575 }
576
577 /// Runs a future to completion on the provided runtime, driving any local
578 /// futures spawned on this task set on the current thread.
579 ///
580 /// This runs the given future on the runtime, blocking until it is
581 /// complete, and yielding its resolved result. Any tasks or timers which
582 /// the future spawns internally will be executed on the runtime. The future
583 /// may also call [`spawn_local`] to `spawn_local` additional local futures on the
584 /// current thread.
585 ///
586 /// This method should not be called from an asynchronous context.
587 ///
588 /// # Panics
589 ///
590 /// This function panics if the executor is at capacity, if the provided
591 /// future panics, or if called within an asynchronous execution context.
592 ///
593 /// # Notes
594 ///
595 /// Since this function internally calls [`Runtime::block_on`], and drives
596 /// futures in the local task set inside that call to `block_on`, the local
597 /// futures may not use [in-place blocking]. If a blocking call needs to be
598 /// issued from a local task, the [`spawn_blocking`] API may be used instead.
599 ///
600 /// For example, this will panic:
601 /// ```should_panic
602 /// use tokio::runtime::Runtime;
603 /// use tokio::task;
604 ///
605 /// let rt = Runtime::new().unwrap();
606 /// let local = task::LocalSet::new();
607 /// local.block_on(&rt, async {
608 /// let join = task::spawn_local(async {
609 /// let blocking_result = task::block_in_place(|| {
610 /// // ...
611 /// });
612 /// // ...
613 /// });
614 /// join.await.unwrap();
615 /// })
616 /// ```
617 /// This, however, will not panic:
618 /// ```
619 /// use tokio::runtime::Runtime;
620 /// use tokio::task;
621 ///
622 /// let rt = Runtime::new().unwrap();
623 /// let local = task::LocalSet::new();
624 /// local.block_on(&rt, async {
625 /// let join = task::spawn_local(async {
626 /// let blocking_result = task::spawn_blocking(|| {
627 /// // ...
628 /// }).await;
629 /// // ...
630 /// });
631 /// join.await.unwrap();
632 /// })
633 /// ```
634 ///
635 /// [`spawn_local`]: fn@spawn_local
636 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
637 /// [in-place blocking]: fn@crate::task::block_in_place
638 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
639 #[track_caller]
640 #[cfg(feature = "rt")]
641 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
642 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
643 where
644 F: Future,
645 {
646 rt.block_on(self.run_until(future))
647 }
648
649 /// Runs a future to completion on the local set, returning its output.
650 ///
651 /// This returns a future that runs the given future with a local set,
652 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
653 /// Any local futures spawned on the local set will be driven in the
654 /// background until the future passed to `run_until` completes. When the future
655 /// passed to `run_until` finishes, any local futures which have not completed
656 /// will remain on the local set, and will be driven on subsequent calls to
657 /// `run_until` or when [awaiting the local set] itself.
658 ///
659 /// # Cancel safety
660 ///
661 /// This method is cancel safe when `future` is cancel safe.
662 ///
663 /// # Examples
664 ///
665 /// ```rust
666 /// use tokio::task;
667 ///
668 /// #[tokio::main]
669 /// async fn main() {
670 /// task::LocalSet::new().run_until(async {
671 /// task::spawn_local(async move {
672 /// // ...
673 /// }).await.unwrap();
674 /// // ...
675 /// }).await;
676 /// }
677 /// ```
678 ///
679 /// [`spawn_local`]: fn@spawn_local
680 /// [awaiting the local set]: #awaiting-a-localset
681 pub async fn run_until<F>(&self, future: F) -> F::Output
682 where
683 F: Future,
684 {
685 let run_until = RunUntil {
686 future,
687 local_set: self,
688 };
689 run_until.await
690 }
691
692 #[track_caller]
693 pub(in crate::task) fn spawn_named<F>(
694 &self,
695 future: F,
696 meta: SpawnMeta<'_>,
697 ) -> JoinHandle<F::Output>
698 where
699 F: Future + 'static,
700 F::Output: 'static,
701 {
702 self.spawn_named_inner(future, meta)
703 }
704
705 #[track_caller]
706 fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
707 where
708 F: Future + 'static,
709 F::Output: 'static,
710 {
711 let handle = self.context.spawn(future, meta);
712
713 // Because a task was spawned from *outside* the `LocalSet`, wake the
714 // `LocalSet` future to execute the new task, if it hasn't been woken.
715 //
716 // Spawning via the free fn `spawn` does not require this, as it can
717 // only be called from *within* a future executing on the `LocalSet` —
718 // in that case, the `LocalSet` must already be awake.
719 self.context.shared.waker.wake();
720 handle
721 }
722
723 /// Ticks the scheduler, returning whether the local future needs to be
724 /// notified again.
725 fn tick(&self) -> bool {
726 for _ in 0..MAX_TASKS_PER_TICK {
727 // Make sure we didn't hit an unhandled panic
728 assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
729
730 match self.next_task() {
731 // Run the task
732 //
733 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
734 // used. We are responsible for maintaining the invariant that
735 // `run_unchecked` is only called on threads that spawned the
736 // task initially. Because `LocalSet` itself is `!Send`, and
737 // `spawn_local` spawns into the `LocalSet` on the current
738 // thread, the invariant is maintained.
739 Some(task) => crate::runtime::coop::budget(|| task.run()),
740 // We have fully drained the queue of notified tasks, so the
741 // local future doesn't need to be notified again — it can wait
742 // until something else wakes a task in the local set.
743 None => return false,
744 }
745 }
746
747 true
748 }
749
750 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
751 let tick = self.tick.get();
752 self.tick.set(tick.wrapping_add(1));
753
754 let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
755 self.context
756 .shared
757 .queue
758 .lock()
759 .as_mut()
760 .and_then(|queue| queue.pop_front())
761 .or_else(|| self.pop_local())
762 } else {
763 self.pop_local().or_else(|| {
764 self.context
765 .shared
766 .queue
767 .lock()
768 .as_mut()
769 .and_then(VecDeque::pop_front)
770 })
771 };
772
773 task.map(|task| unsafe {
774 // Safety: because the `LocalSet` itself is `!Send`, we know we are
775 // on the same thread if we have access to the `LocalSet`, and can
776 // therefore access the local run queue.
777 self.context.shared.local_state.assert_owner(task)
778 })
779 }
780
781 fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
782 unsafe {
783 // Safety: because the `LocalSet` itself is `!Send`, we know we are
784 // on the same thread if we have access to the `LocalSet`, and can
785 // therefore access the local run queue.
786 self.context.shared.local_state.task_pop_front()
787 }
788 }
789
790 fn with<T>(&self, f: impl FnOnce() -> T) -> T {
791 CURRENT.with(|local_data| {
792 let _guard = local_data.enter(self.context.clone());
793 f()
794 })
795 }
796
797 /// This method is like `with`, but it just calls `f` without setting the thread-local if that
798 /// fails.
799 fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
800 let mut f = Some(f);
801
802 let res = CURRENT.try_with(|local_data| {
803 let _guard = local_data.enter(self.context.clone());
804 (f.take().unwrap())()
805 });
806
807 match res {
808 Ok(res) => res,
809 Err(_access_error) => (f.take().unwrap())(),
810 }
811 }
812}
813
814cfg_unstable! {
815 impl LocalSet {
816 /// Configure how the `LocalSet` responds to an unhandled panic on a
817 /// spawned task.
818 ///
819 /// By default, an unhandled panic (i.e. a panic not caught by
820 /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
821 /// execution. The panic is error value is forwarded to the task's
822 /// [`JoinHandle`] and all other spawned tasks continue running.
823 ///
824 /// The `unhandled_panic` option enables configuring this behavior.
825 ///
826 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
827 /// spawned tasks have no impact on the `LocalSet`'s execution.
828 /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
829 /// shutdown immediately when a spawned task panics even if that
830 /// task's `JoinHandle` has not been dropped. All other spawned tasks
831 /// will immediately terminate and further calls to
832 /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
833 ///
834 /// # Panics
835 ///
836 /// This method panics if called after the `LocalSet` has started
837 /// running.
838 ///
839 /// # Unstable
840 ///
841 /// This option is currently unstable and its implementation is
842 /// incomplete. The API may change or be removed in the future. See
843 /// tokio-rs/tokio#4516 for more details.
844 ///
845 /// # Examples
846 ///
847 /// The following demonstrates a `LocalSet` configured to shutdown on
848 /// panic. The first spawned task panics and results in the `LocalSet`
849 /// shutting down. The second spawned task never has a chance to
850 /// execute. The call to `run_until` will panic due to the runtime being
851 /// forcibly shutdown.
852 ///
853 /// ```should_panic
854 /// use tokio::runtime::UnhandledPanic;
855 ///
856 /// # #[tokio::main]
857 /// # async fn main() {
858 /// tokio::task::LocalSet::new()
859 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
860 /// .run_until(async {
861 /// tokio::task::spawn_local(async { panic!("boom"); });
862 /// tokio::task::spawn_local(async {
863 /// // This task never completes
864 /// });
865 ///
866 /// // Do some work, but `run_until` will panic before it completes
867 /// # loop { tokio::task::yield_now().await; }
868 /// })
869 /// .await;
870 /// # }
871 /// ```
872 ///
873 /// [`JoinHandle`]: struct@crate::task::JoinHandle
874 pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
875 // TODO: This should be set as a builder
876 Rc::get_mut(&mut self.context)
877 .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
878 .expect("Unhandled Panic behavior modified after starting LocalSet")
879 .unhandled_panic = behavior;
880 self
881 }
882
883 /// Returns the [`Id`] of the current `LocalSet` runtime.
884 ///
885 /// # Examples
886 ///
887 /// ```rust
888 /// use tokio::task;
889 ///
890 /// #[tokio::main]
891 /// async fn main() {
892 /// let local_set = task::LocalSet::new();
893 /// println!("Local set id: {}", local_set.id());
894 /// }
895 /// ```
896 ///
897 /// **Note**: This is an [unstable API][unstable]. The public API of this type
898 /// may break in 1.x releases. See [the documentation on unstable
899 /// features][unstable] for details.
900 ///
901 /// [unstable]: crate#unstable-features
902 /// [`Id`]: struct@crate::runtime::Id
903 pub fn id(&self) -> runtime::Id {
904 self.context.shared.local_state.owned.id.into()
905 }
906 }
907}
908
909impl fmt::Debug for LocalSet {
910 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
911 fmt.debug_struct("LocalSet").finish()
912 }
913}
914
915impl Future for LocalSet {
916 type Output = ();
917
918 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
919 // Register the waker before starting to work
920 self.context.shared.waker.register_by_ref(cx.waker());
921
922 if self.with(|| self.tick()) {
923 // If `tick` returns true, we need to notify the local future again:
924 // there are still tasks remaining in the run queue.
925 cx.waker().wake_by_ref();
926 Poll::Pending
927
928 // Safety: called from the thread that owns `LocalSet`. Because
929 // `LocalSet` is `!Send`, this is safe.
930 } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
931 // If the scheduler has no remaining futures, we're done!
932 Poll::Ready(())
933 } else {
934 // There are still futures in the local set, but we've polled all the
935 // futures in the run queue. Therefore, we can just return Pending
936 // since the remaining futures will be woken from somewhere else.
937 Poll::Pending
938 }
939 }
940}
941
942impl Default for LocalSet {
943 fn default() -> LocalSet {
944 LocalSet::new()
945 }
946}
947
948impl Drop for LocalSet {
949 fn drop(&mut self) {
950 self.with_if_possible(|| {
951 // Shut down all tasks in the LocalOwnedTasks and close it to
952 // prevent new tasks from ever being added.
953 unsafe {
954 // Safety: called from the thread that owns `LocalSet`
955 self.context.shared.local_state.close_and_shutdown_all();
956 }
957
958 // We already called shutdown on all tasks above, so there is no
959 // need to call shutdown.
960
961 // Safety: note that this *intentionally* bypasses the unsafe
962 // `Shared::local_queue()` method. This is in order to avoid the
963 // debug assertion that we are on the thread that owns the
964 // `LocalSet`, because on some systems (e.g. at least some macOS
965 // versions), attempting to get the current thread ID can panic due
966 // to the thread's local data that stores the thread ID being
967 // dropped *before* the `LocalSet`.
968 //
969 // Despite avoiding the assertion here, it is safe for us to access
970 // the local queue in `Drop`, because the `LocalSet` itself is
971 // `!Send`, so we can reasonably guarantee that it will not be
972 // `Drop`ped from another thread.
973 let local_queue = unsafe {
974 // Safety: called from the thread that owns `LocalSet`
975 self.context.shared.local_state.take_local_queue()
976 };
977 for task in local_queue {
978 drop(task);
979 }
980
981 // Take the queue from the Shared object to prevent pushing
982 // notifications to it in the future.
983 let queue = self.context.shared.queue.lock().take().unwrap();
984 for task in queue {
985 drop(task);
986 }
987
988 // Safety: called from the thread that owns `LocalSet`
989 assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
990 });
991 }
992}
993
994// === impl Context ===
995
996impl Context {
997 #[track_caller]
998 fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
999 where
1000 F: Future + 'static,
1001 F::Output: 'static,
1002 {
1003 let id = crate::runtime::task::Id::next();
1004 let future = crate::util::trace::task(future, "local", meta, id.as_u64());
1005
1006 // Safety: called from the thread that owns the `LocalSet`
1007 let (handle, notified) = {
1008 self.shared.local_state.assert_called_from_owner_thread();
1009 self.shared
1010 .local_state
1011 .owned
1012 .bind(future, self.shared.clone(), id)
1013 };
1014
1015 if let Some(notified) = notified {
1016 self.shared.schedule(notified);
1017 }
1018
1019 handle
1020 }
1021}
1022
1023// === impl LocalFuture ===
1024
1025impl<T: Future> Future for RunUntil<'_, T> {
1026 type Output = T::Output;
1027
1028 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1029 let me = self.project();
1030
1031 me.local_set.with(|| {
1032 me.local_set
1033 .context
1034 .shared
1035 .waker
1036 .register_by_ref(cx.waker());
1037
1038 let _no_blocking = crate::runtime::context::disallow_block_in_place();
1039 let f = me.future;
1040
1041 if let Poll::Ready(output) = f.poll(cx) {
1042 return Poll::Ready(output);
1043 }
1044
1045 if me.local_set.tick() {
1046 // If `tick` returns `true`, we need to notify the local future again:
1047 // there are still tasks remaining in the run queue.
1048 cx.waker().wake_by_ref();
1049 }
1050
1051 Poll::Pending
1052 })
1053 }
1054}
1055
1056impl Shared {
1057 /// Schedule the provided task on the scheduler.
1058 fn schedule(&self, task: task::Notified<Arc<Self>>) {
1059 CURRENT.with(|localdata| {
1060 match localdata.ctx.get() {
1061 // If the current `LocalSet` is being polled, we don't need to wake it.
1062 // When we `enter` it, then the value `wake_on_schedule` is set to be true.
1063 // In this case it is not being polled, so we need to wake it.
1064 Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
1065 // Safety: if the current `LocalSet` context points to this
1066 // `LocalSet`, then we are on the thread that owns it.
1067 cx.shared.local_state.task_push_back(task);
1068 },
1069
1070 // We are on the thread that owns the `LocalSet`, so we can
1071 // wake to the local queue.
1072 _ if context::thread_id().ok() == Some(self.local_state.owner) => {
1073 unsafe {
1074 // Safety: we just checked that the thread ID matches
1075 // the localset's owner, so this is safe.
1076 self.local_state.task_push_back(task);
1077 }
1078 // We still have to wake the `LocalSet`, because it isn't
1079 // currently being polled.
1080 self.waker.wake();
1081 }
1082
1083 // We are *not* on the thread that owns the `LocalSet`, so we
1084 // have to wake to the remote queue.
1085 _ => {
1086 // First, check whether the queue is still there (if not, the
1087 // LocalSet is dropped). Then push to it if so, and if not,
1088 // do nothing.
1089 let mut lock = self.queue.lock();
1090
1091 if let Some(queue) = lock.as_mut() {
1092 queue.push_back(task);
1093 drop(lock);
1094 self.waker.wake();
1095 }
1096 }
1097 }
1098 });
1099 }
1100
1101 fn ptr_eq(&self, other: &Shared) -> bool {
1102 std::ptr::eq(self, other)
1103 }
1104}
1105
1106// This is safe because (and only because) we *pinky pwomise* to never touch the
1107// local run queue except from the thread that owns the `LocalSet`.
1108unsafe impl Sync for Shared {}
1109
1110impl task::Schedule for Arc<Shared> {
1111 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
1112 // Safety, this is always called from the thread that owns `LocalSet`
1113 unsafe { self.local_state.task_remove(task) }
1114 }
1115
1116 fn schedule(&self, task: task::Notified<Self>) {
1117 Shared::schedule(self, task);
1118 }
1119
1120 // localset does not currently support task hooks
1121 fn hooks(&self) -> TaskHarnessScheduleHooks {
1122 TaskHarnessScheduleHooks {
1123 task_terminate_callback: None,
1124 }
1125 }
1126
1127 cfg_unstable! {
1128 fn unhandled_panic(&self) {
1129 use crate::runtime::UnhandledPanic;
1130
1131 match self.unhandled_panic {
1132 UnhandledPanic::Ignore => {
1133 // Do nothing
1134 }
1135 UnhandledPanic::ShutdownRuntime => {
1136 // This hook is only called from within the runtime, so
1137 // `CURRENT` should match with `&self`, i.e. there is no
1138 // opportunity for a nested scheduler to be called.
1139 CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1140 Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1141 cx.unhandled_panic.set(true);
1142 // Safety: this is always called from the thread that owns `LocalSet`
1143 unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1144 }
1145 _ => unreachable!("runtime core not set in CURRENT thread-local"),
1146 })
1147 }
1148 }
1149 }
1150 }
1151}
1152
1153impl LocalState {
1154 unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1155 // The caller ensures it is called from the same thread that owns
1156 // the LocalSet.
1157 self.assert_called_from_owner_thread();
1158
1159 self.local_queue.with_mut(|ptr| (*ptr).pop_front())
1160 }
1161
1162 unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1163 // The caller ensures it is called from the same thread that owns
1164 // the LocalSet.
1165 self.assert_called_from_owner_thread();
1166
1167 self.local_queue.with_mut(|ptr| (*ptr).push_back(task));
1168 }
1169
1170 unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1171 // The caller ensures it is called from the same thread that owns
1172 // the LocalSet.
1173 self.assert_called_from_owner_thread();
1174
1175 self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr)))
1176 }
1177
1178 unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1179 // The caller ensures it is called from the same thread that owns
1180 // the LocalSet.
1181 self.assert_called_from_owner_thread();
1182
1183 self.owned.remove(task)
1184 }
1185
1186 /// Returns true if the `LocalSet` does not have any spawned tasks
1187 unsafe fn owned_is_empty(&self) -> bool {
1188 // The caller ensures it is called from the same thread that owns
1189 // the LocalSet.
1190 self.assert_called_from_owner_thread();
1191
1192 self.owned.is_empty()
1193 }
1194
1195 unsafe fn assert_owner(
1196 &self,
1197 task: task::Notified<Arc<Shared>>,
1198 ) -> task::LocalNotified<Arc<Shared>> {
1199 // The caller ensures it is called from the same thread that owns
1200 // the LocalSet.
1201 self.assert_called_from_owner_thread();
1202
1203 self.owned.assert_owner(task)
1204 }
1205
1206 unsafe fn close_and_shutdown_all(&self) {
1207 // The caller ensures it is called from the same thread that owns
1208 // the LocalSet.
1209 self.assert_called_from_owner_thread();
1210
1211 self.owned.close_and_shutdown_all();
1212 }
1213
1214 #[track_caller]
1215 fn assert_called_from_owner_thread(&self) {
1216 // FreeBSD has some weirdness around thread-local destruction.
1217 // TODO: remove this hack when thread id is cleaned up
1218 #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1219 debug_assert!(
1220 // if we couldn't get the thread ID because we're dropping the local
1221 // data, skip the assertion --- the `Drop` impl is not going to be
1222 // called from another thread, because `LocalSet` is `!Send`
1223 context::thread_id()
1224 .map(|id| id == self.owner)
1225 .unwrap_or(true),
1226 "`LocalSet`'s local run queue must not be accessed by another thread!"
1227 );
1228 }
1229}
1230
1231// This is `Send` because it is stored in `Shared`. It is up to the caller to
1232// ensure they are on the same thread that owns the `LocalSet`.
1233unsafe impl Send for LocalState {}
1234
1235#[cfg(all(test, not(loom)))]
1236mod tests {
1237 use super::*;
1238
1239 // Does a `LocalSet` running on a current-thread runtime...basically work?
1240 //
1241 // This duplicates a test in `tests/task_local_set.rs`, but because this is
1242 // a lib test, it will run under Miri, so this is necessary to catch stacked
1243 // borrows violations in the `LocalSet` implementation.
1244 #[test]
1245 fn local_current_thread_scheduler() {
1246 let f = async {
1247 LocalSet::new()
1248 .run_until(async {
1249 spawn_local(async {}).await.unwrap();
1250 })
1251 .await;
1252 };
1253 crate::runtime::Builder::new_current_thread()
1254 .build()
1255 .expect("rt")
1256 .block_on(f)
1257 }
1258
1259 // Tests that when a task on a `LocalSet` is woken by an io driver on the
1260 // same thread, the task is woken to the localset's local queue rather than
1261 // its remote queue.
1262 //
1263 // This test has to be defined in the `local.rs` file as a lib test, rather
1264 // than in `tests/`, because it makes assertions about the local set's
1265 // internal state.
1266 #[test]
1267 fn wakes_to_local_queue() {
1268 use super::*;
1269 use crate::sync::Notify;
1270 let rt = crate::runtime::Builder::new_current_thread()
1271 .build()
1272 .expect("rt");
1273 rt.block_on(async {
1274 let local = LocalSet::new();
1275 let notify = Arc::new(Notify::new());
1276 let task = local.spawn_local({
1277 let notify = notify.clone();
1278 async move {
1279 notify.notified().await;
1280 }
1281 });
1282 let mut run_until = Box::pin(local.run_until(async move {
1283 task.await.unwrap();
1284 }));
1285
1286 // poll the run until future once
1287 std::future::poll_fn(|cx| {
1288 let _ = run_until.as_mut().poll(cx);
1289 Poll::Ready(())
1290 })
1291 .await;
1292
1293 notify.notify_one();
1294 let task = unsafe { local.context.shared.local_state.task_pop_front() };
1295 // TODO(eliza): it would be nice to be able to assert that this is
1296 // the local task.
1297 assert!(
1298 task.is_some(),
1299 "task should have been notified to the LocalSet's local queue"
1300 );
1301 })
1302 }
1303}