1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6 self, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9 blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25pub(crate) struct CurrentThread {
27 core: AtomicCell<Core>,
29
30 notify: Notify,
33}
34
35pub(crate) struct Handle {
37 shared: Shared,
39
40 pub(crate) driver: driver::Handle,
42
43 pub(crate) blocking_spawner: blocking::Spawner,
45
46 pub(crate) seed_generator: RngSeedGenerator,
48
49 pub(crate) task_hooks: TaskHooks,
51
52 pub(crate) local_tid: Option<ThreadId>,
54}
55
56struct Core {
59 tasks: VecDeque<Notified>,
61
62 tick: u32,
64
65 driver: Option<Driver>,
69
70 metrics: MetricsBatch,
72
73 global_queue_interval: u32,
75
76 unhandled_panic: bool,
79}
80
81struct Shared {
83 inject: Inject<Arc<Handle>>,
85
86 owned: OwnedTasks<Arc<Handle>>,
88
89 woken: AtomicBool,
91
92 config: Config,
94
95 scheduler_metrics: SchedulerMetrics,
97
98 worker_metrics: WorkerMetrics,
100}
101
102pub(crate) struct Context {
106 handle: Arc<Handle>,
108
109 core: RefCell<Option<Box<Core>>>,
112
113 pub(crate) defer: Defer,
115}
116
117type Notified = task::Notified<Arc<Handle>>;
118
119const INITIAL_CAPACITY: usize = 64;
121
122const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127impl CurrentThread {
128 pub(crate) fn new(
129 driver: Driver,
130 driver_handle: driver::Handle,
131 blocking_spawner: blocking::Spawner,
132 seed_generator: RngSeedGenerator,
133 config: Config,
134 local_tid: Option<ThreadId>,
135 ) -> (CurrentThread, Arc<Handle>) {
136 let worker_metrics = WorkerMetrics::from_config(&config);
137 worker_metrics.set_thread_id(thread::current().id());
138
139 let global_queue_interval = config
141 .global_queue_interval
142 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144 let handle = Arc::new(Handle {
145 task_hooks: TaskHooks {
146 task_spawn_callback: config.before_spawn.clone(),
147 task_terminate_callback: config.after_termination.clone(),
148 },
149 shared: Shared {
150 inject: Inject::new(),
151 owned: OwnedTasks::new(1),
152 woken: AtomicBool::new(false),
153 config,
154 scheduler_metrics: SchedulerMetrics::new(),
155 worker_metrics,
156 },
157 driver: driver_handle,
158 blocking_spawner,
159 seed_generator,
160 local_tid,
161 });
162
163 let core = AtomicCell::new(Some(Box::new(Core {
164 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
165 tick: 0,
166 driver: Some(driver),
167 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
168 global_queue_interval,
169 unhandled_panic: false,
170 })));
171
172 let scheduler = CurrentThread {
173 core,
174 notify: Notify::new(),
175 };
176
177 (scheduler, handle)
178 }
179
180 #[track_caller]
181 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
182 pin!(future);
183
184 crate::runtime::context::enter_runtime(handle, false, |blocking| {
185 let handle = handle.as_current_thread();
186
187 loop {
191 if let Some(core) = self.take_core(handle) {
192 handle
193 .shared
194 .worker_metrics
195 .set_thread_id(thread::current().id());
196 return core.block_on(future);
197 } else {
198 let notified = self.notify.notified();
199 pin!(notified);
200
201 if let Some(out) = blocking
202 .block_on(poll_fn(|cx| {
203 if notified.as_mut().poll(cx).is_ready() {
204 return Ready(None);
205 }
206
207 if let Ready(out) = future.as_mut().poll(cx) {
208 return Ready(Some(out));
209 }
210
211 Pending
212 }))
213 .expect("Failed to `Enter::block_on`")
214 {
215 return out;
216 }
217 }
218 }
219 })
220 }
221
222 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
223 let core = self.core.take()?;
224
225 Some(CoreGuard {
226 context: scheduler::Context::CurrentThread(Context {
227 handle: handle.clone(),
228 core: RefCell::new(Some(core)),
229 defer: Defer::new(),
230 }),
231 scheduler: self,
232 })
233 }
234
235 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
236 let handle = handle.as_current_thread();
237
238 let core = match self.take_core(handle) {
242 Some(core) => core,
243 None if std::thread::panicking() => return,
244 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
245 };
246
247 let tls_available = context::with_current(|_| ()).is_ok();
249
250 if tls_available {
251 core.enter(|core, _context| {
252 let core = shutdown2(core, handle);
253 (core, ())
254 });
255 } else {
256 let context = core.context.expect_current_thread();
260 let core = context.core.borrow_mut().take().unwrap();
261
262 let core = shutdown2(core, handle);
263 *context.core.borrow_mut() = Some(core);
264 }
265 }
266}
267
268fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
269 handle.shared.owned.close_and_shutdown_all(0);
273
274 while let Some(task) = core.next_local_task(handle) {
277 drop(task);
278 }
279
280 handle.shared.inject.close();
282
283 while let Some(task) = handle.shared.inject.pop() {
285 drop(task);
286 }
287
288 assert!(handle.shared.owned.is_empty());
289
290 core.submit_metrics(handle);
292
293 if let Some(driver) = core.driver.as_mut() {
295 driver.shutdown(&handle.driver);
296 }
297
298 core
299}
300
301impl fmt::Debug for CurrentThread {
302 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
303 fmt.debug_struct("CurrentThread").finish()
304 }
305}
306
307impl Core {
310 fn tick(&mut self) {
312 self.tick = self.tick.wrapping_add(1);
313 }
314
315 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
316 if self.tick % self.global_queue_interval == 0 {
317 handle
318 .next_remote_task()
319 .or_else(|| self.next_local_task(handle))
320 } else {
321 self.next_local_task(handle)
322 .or_else(|| handle.next_remote_task())
323 }
324 }
325
326 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
327 let ret = self.tasks.pop_front();
328 handle
329 .shared
330 .worker_metrics
331 .set_queue_depth(self.tasks.len());
332 ret
333 }
334
335 fn push_task(&mut self, handle: &Handle, task: Notified) {
336 self.tasks.push_back(task);
337 self.metrics.inc_local_schedule_count();
338 handle
339 .shared
340 .worker_metrics
341 .set_queue_depth(self.tasks.len());
342 }
343
344 fn submit_metrics(&mut self, handle: &Handle) {
345 self.metrics.submit(&handle.shared.worker_metrics, 0);
346 }
347}
348
349#[cfg(tokio_taskdump)]
350fn wake_deferred_tasks_and_free(context: &Context) {
351 let wakers = context.defer.take_deferred();
352 for waker in wakers {
353 waker.wake();
354 }
355}
356
357impl Context {
360 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
363 core.metrics.start_poll();
364 let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
365 ret.0.metrics.end_poll();
366 ret
367 }
368
369 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
372 let mut driver = core.driver.take().expect("driver missing");
373
374 if let Some(f) = &handle.shared.config.before_park {
375 let (c, ()) = self.enter(core, || f());
376 core = c;
377 }
378
379 if core.tasks.is_empty() {
382 core.metrics.about_to_park();
384 core.submit_metrics(handle);
385
386 let (c, ()) = self.enter(core, || {
387 driver.park(&handle.driver);
388 self.defer.wake();
389 });
390
391 core = c;
392
393 core.metrics.unparked();
394 core.submit_metrics(handle);
395 }
396
397 if let Some(f) = &handle.shared.config.after_unpark {
398 let (c, ()) = self.enter(core, || f());
399 core = c;
400 }
401
402 core.driver = Some(driver);
403 core
404 }
405
406 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
408 let mut driver = core.driver.take().expect("driver missing");
409
410 core.submit_metrics(handle);
411
412 let (mut core, ()) = self.enter(core, || {
413 driver.park_timeout(&handle.driver, Duration::from_millis(0));
414 self.defer.wake();
415 });
416
417 core.driver = Some(driver);
418 core
419 }
420
421 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
422 *self.core.borrow_mut() = Some(core);
426
427 let ret = f();
429
430 let core = self.core.borrow_mut().take().expect("core missing");
432 (core, ret)
433 }
434
435 pub(crate) fn defer(&self, waker: &Waker) {
436 self.defer.defer(waker);
437 }
438}
439
440impl Handle {
443 pub(crate) fn spawn<F>(
445 me: &Arc<Self>,
446 future: F,
447 id: crate::runtime::task::Id,
448 ) -> JoinHandle<F::Output>
449 where
450 F: crate::future::Future + Send + 'static,
451 F::Output: Send + 'static,
452 {
453 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
454
455 me.task_hooks.spawn(&TaskMeta {
456 id,
457 _phantom: Default::default(),
458 });
459
460 if let Some(notified) = notified {
461 me.schedule(notified);
462 }
463
464 handle
465 }
466
467 pub(crate) unsafe fn spawn_local<F>(
474 me: &Arc<Self>,
475 future: F,
476 id: crate::runtime::task::Id,
477 ) -> JoinHandle<F::Output>
478 where
479 F: crate::future::Future + 'static,
480 F::Output: 'static,
481 {
482 let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id);
483
484 me.task_hooks.spawn(&TaskMeta {
485 id,
486 _phantom: Default::default(),
487 });
488
489 if let Some(notified) = notified {
490 me.schedule(notified);
491 }
492
493 handle
494 }
495
496 #[cfg(all(
498 tokio_unstable,
499 tokio_taskdump,
500 target_os = "linux",
501 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
502 ))]
503 pub(crate) fn dump(&self) -> crate::runtime::Dump {
504 use crate::runtime::dump;
505 use task::trace::trace_current_thread;
506
507 let mut traces = vec![];
508
509 context::with_scheduler(|maybe_context| {
511 let context = if let Some(context) = maybe_context {
513 context.expect_current_thread()
514 } else {
515 return;
516 };
517 let mut maybe_core = context.core.borrow_mut();
518 let core = if let Some(core) = maybe_core.as_mut() {
519 core
520 } else {
521 return;
522 };
523 let local = &mut core.tasks;
524
525 if self.shared.inject.is_closed() {
526 return;
527 }
528
529 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
530 .into_iter()
531 .map(|(id, trace)| dump::Task::new(id, trace))
532 .collect();
533
534 drop(maybe_core);
536
537 wake_deferred_tasks_and_free(context);
541 });
542
543 dump::Dump::new(traces)
544 }
545
546 fn next_remote_task(&self) -> Option<Notified> {
547 self.shared.inject.pop()
548 }
549
550 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
551 me.shared.woken.store(true, Release);
554 waker_ref(me)
555 }
556
557 pub(crate) fn reset_woken(&self) -> bool {
559 self.shared.woken.swap(false, AcqRel)
560 }
561
562 pub(crate) fn num_alive_tasks(&self) -> usize {
563 self.shared.owned.num_alive_tasks()
564 }
565
566 pub(crate) fn injection_queue_depth(&self) -> usize {
567 self.shared.inject.len()
568 }
569}
570
571cfg_unstable_metrics! {
572 impl Handle {
573 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
574 &self.shared.scheduler_metrics
575 }
576
577 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
578 assert_eq!(0, worker);
579 &self.shared.worker_metrics
580 }
581
582 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
583 self.worker_metrics(worker).queue_depth()
584 }
585
586 pub(crate) fn num_blocking_threads(&self) -> usize {
587 self.blocking_spawner.num_threads()
588 }
589
590 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
591 self.blocking_spawner.num_idle_threads()
592 }
593
594 pub(crate) fn blocking_queue_depth(&self) -> usize {
595 self.blocking_spawner.queue_depth()
596 }
597
598 cfg_64bit_metrics! {
599 pub(crate) fn spawned_tasks_count(&self) -> u64 {
600 self.shared.owned.spawned_tasks_count()
601 }
602 }
603 }
604}
605
606cfg_unstable! {
607 use std::num::NonZeroU64;
608
609 impl Handle {
610 pub(crate) fn owned_id(&self) -> NonZeroU64 {
611 self.shared.owned.id
612 }
613 }
614}
615
616impl fmt::Debug for Handle {
617 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
618 fmt.debug_struct("current_thread::Handle { ... }").finish()
619 }
620}
621
622impl Schedule for Arc<Handle> {
625 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
626 self.shared.owned.remove(task)
627 }
628
629 fn schedule(&self, task: task::Notified<Self>) {
630 use scheduler::Context::CurrentThread;
631
632 context::with_scheduler(|maybe_cx| match maybe_cx {
633 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
634 let mut core = cx.core.borrow_mut();
635
636 if let Some(core) = core.as_mut() {
639 core.push_task(self, task);
640 }
641 }
642 _ => {
643 self.shared.scheduler_metrics.inc_remote_schedule_count();
645
646 self.shared.inject.push(task);
648 self.driver.unpark();
649 }
650 });
651 }
652
653 fn hooks(&self) -> TaskHarnessScheduleHooks {
654 TaskHarnessScheduleHooks {
655 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
656 }
657 }
658
659 cfg_unstable! {
660 fn unhandled_panic(&self) {
661 use crate::runtime::UnhandledPanic;
662
663 match self.shared.config.unhandled_panic {
664 UnhandledPanic::Ignore => {
665 }
667 UnhandledPanic::ShutdownRuntime => {
668 use scheduler::Context::CurrentThread;
669
670 context::with_scheduler(|maybe_cx| match maybe_cx {
675 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
676 let mut core = cx.core.borrow_mut();
677
678 if let Some(core) = core.as_mut() {
680 core.unhandled_panic = true;
681 self.shared.owned.close_and_shutdown_all(0);
682 }
683 }
684 _ => unreachable!("runtime core not set in CURRENT thread-local"),
685 })
686 }
687 }
688 }
689 }
690}
691
692impl Wake for Handle {
693 fn wake(arc_self: Arc<Self>) {
694 Wake::wake_by_ref(&arc_self);
695 }
696
697 fn wake_by_ref(arc_self: &Arc<Self>) {
699 arc_self.shared.woken.store(true, Release);
700 arc_self.driver.unpark();
701 }
702}
703
704struct CoreGuard<'a> {
709 context: scheduler::Context,
710 scheduler: &'a CurrentThread,
711}
712
713impl CoreGuard<'_> {
714 #[track_caller]
715 fn block_on<F: Future>(self, future: F) -> F::Output {
716 let ret = self.enter(|mut core, context| {
717 let waker = Handle::waker_ref(&context.handle);
718 let mut cx = std::task::Context::from_waker(&waker);
719
720 pin!(future);
721
722 core.metrics.start_processing_scheduled_tasks();
723
724 'outer: loop {
725 let handle = &context.handle;
726
727 if handle.reset_woken() {
728 let (c, res) = context.enter(core, || {
729 crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
730 });
731
732 core = c;
733
734 if let Ready(v) = res {
735 return (core, Some(v));
736 }
737 }
738
739 for _ in 0..handle.shared.config.event_interval {
740 if core.unhandled_panic {
742 return (core, None);
743 }
744
745 core.tick();
746
747 let entry = core.next_task(handle);
748
749 let task = match entry {
750 Some(entry) => entry,
751 None => {
752 core.metrics.end_processing_scheduled_tasks();
753
754 core = if !context.defer.is_empty() {
755 context.park_yield(core, handle)
756 } else {
757 context.park(core, handle)
758 };
759
760 core.metrics.start_processing_scheduled_tasks();
761
762 continue 'outer;
764 }
765 };
766
767 let task = context.handle.shared.owned.assert_owner(task);
768
769 let (c, ()) = context.run_task(core, || {
770 task.run();
771 });
772
773 core = c;
774 }
775
776 core.metrics.end_processing_scheduled_tasks();
777
778 core = context.park_yield(core, handle);
781
782 core.metrics.start_processing_scheduled_tasks();
783 }
784 });
785
786 match ret {
787 Some(ret) => ret,
788 None => {
789 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
791 }
792 }
793 }
794
795 fn enter<F, R>(self, f: F) -> R
798 where
799 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
800 {
801 let context = self.context.expect_current_thread();
802
803 let core = context.core.borrow_mut().take().expect("core missing");
805
806 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
808
809 *context.core.borrow_mut() = Some(core);
810
811 ret
812 }
813}
814
815impl Drop for CoreGuard<'_> {
816 fn drop(&mut self) {
817 let context = self.context.expect_current_thread();
818
819 if let Some(core) = context.core.borrow_mut().take() {
820 self.scheduler.core.set(core);
823
824 self.scheduler.notify.notify_one();
826 }
827 }
828}