tokio/runtime/scheduler/current_thread/
mod.rs

1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6    self, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9    blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25/// Executes tasks on the current thread
26pub(crate) struct CurrentThread {
27    /// Core scheduler data is acquired by a thread entering `block_on`.
28    core: AtomicCell<Core>,
29
30    /// Notifier for waking up other threads to steal the
31    /// driver.
32    notify: Notify,
33}
34
35/// Handle to the current thread scheduler
36pub(crate) struct Handle {
37    /// Scheduler state shared across threads
38    shared: Shared,
39
40    /// Resource driver handles
41    pub(crate) driver: driver::Handle,
42
43    /// Blocking pool spawner
44    pub(crate) blocking_spawner: blocking::Spawner,
45
46    /// Current random number generator seed
47    pub(crate) seed_generator: RngSeedGenerator,
48
49    /// User-supplied hooks to invoke for things
50    pub(crate) task_hooks: TaskHooks,
51
52    /// If this is a `LocalRuntime`, flags the owning thread ID.
53    pub(crate) local_tid: Option<ThreadId>,
54}
55
56/// Data required for executing the scheduler. The struct is passed around to
57/// a function that will perform the scheduling work and acts as a capability token.
58struct Core {
59    /// Scheduler run queue
60    tasks: VecDeque<Notified>,
61
62    /// Current tick
63    tick: u32,
64
65    /// Runtime driver
66    ///
67    /// The driver is removed before starting to park the thread
68    driver: Option<Driver>,
69
70    /// Metrics batch
71    metrics: MetricsBatch,
72
73    /// How often to check the global queue
74    global_queue_interval: u32,
75
76    /// True if a task panicked without being handled and the runtime is
77    /// configured to shutdown on unhandled panic.
78    unhandled_panic: bool,
79}
80
81/// Scheduler state shared between threads.
82struct Shared {
83    /// Remote run queue
84    inject: Inject<Arc<Handle>>,
85
86    /// Collection of all active tasks spawned onto this executor.
87    owned: OwnedTasks<Arc<Handle>>,
88
89    /// Indicates whether the blocked on thread was woken.
90    woken: AtomicBool,
91
92    /// Scheduler configuration options
93    config: Config,
94
95    /// Keeps track of various runtime metrics.
96    scheduler_metrics: SchedulerMetrics,
97
98    /// This scheduler only has one worker.
99    worker_metrics: WorkerMetrics,
100}
101
102/// Thread-local context.
103///
104/// pub(crate) to store in `runtime::context`.
105pub(crate) struct Context {
106    /// Scheduler handle
107    handle: Arc<Handle>,
108
109    /// Scheduler core, enabling the holder of `Context` to execute the
110    /// scheduler.
111    core: RefCell<Option<Box<Core>>>,
112
113    /// Deferred tasks, usually ones that called `task::yield_now()`.
114    pub(crate) defer: Defer,
115}
116
117type Notified = task::Notified<Arc<Handle>>;
118
119/// Initial queue capacity.
120const INITIAL_CAPACITY: usize = 64;
121
122/// Used if none is specified. This is a temporary constant and will be removed
123/// as we unify tuning logic between the multi-thread and current-thread
124/// schedulers.
125const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127impl CurrentThread {
128    pub(crate) fn new(
129        driver: Driver,
130        driver_handle: driver::Handle,
131        blocking_spawner: blocking::Spawner,
132        seed_generator: RngSeedGenerator,
133        config: Config,
134        local_tid: Option<ThreadId>,
135    ) -> (CurrentThread, Arc<Handle>) {
136        let worker_metrics = WorkerMetrics::from_config(&config);
137        worker_metrics.set_thread_id(thread::current().id());
138
139        // Get the configured global queue interval, or use the default.
140        let global_queue_interval = config
141            .global_queue_interval
142            .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144        let handle = Arc::new(Handle {
145            task_hooks: TaskHooks {
146                task_spawn_callback: config.before_spawn.clone(),
147                task_terminate_callback: config.after_termination.clone(),
148            },
149            shared: Shared {
150                inject: Inject::new(),
151                owned: OwnedTasks::new(1),
152                woken: AtomicBool::new(false),
153                config,
154                scheduler_metrics: SchedulerMetrics::new(),
155                worker_metrics,
156            },
157            driver: driver_handle,
158            blocking_spawner,
159            seed_generator,
160            local_tid,
161        });
162
163        let core = AtomicCell::new(Some(Box::new(Core {
164            tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
165            tick: 0,
166            driver: Some(driver),
167            metrics: MetricsBatch::new(&handle.shared.worker_metrics),
168            global_queue_interval,
169            unhandled_panic: false,
170        })));
171
172        let scheduler = CurrentThread {
173            core,
174            notify: Notify::new(),
175        };
176
177        (scheduler, handle)
178    }
179
180    #[track_caller]
181    pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
182        pin!(future);
183
184        crate::runtime::context::enter_runtime(handle, false, |blocking| {
185            let handle = handle.as_current_thread();
186
187            // Attempt to steal the scheduler core and block_on the future if we can
188            // there, otherwise, lets select on a notification that the core is
189            // available or the future is complete.
190            loop {
191                if let Some(core) = self.take_core(handle) {
192                    handle
193                        .shared
194                        .worker_metrics
195                        .set_thread_id(thread::current().id());
196                    return core.block_on(future);
197                } else {
198                    let notified = self.notify.notified();
199                    pin!(notified);
200
201                    if let Some(out) = blocking
202                        .block_on(poll_fn(|cx| {
203                            if notified.as_mut().poll(cx).is_ready() {
204                                return Ready(None);
205                            }
206
207                            if let Ready(out) = future.as_mut().poll(cx) {
208                                return Ready(Some(out));
209                            }
210
211                            Pending
212                        }))
213                        .expect("Failed to `Enter::block_on`")
214                    {
215                        return out;
216                    }
217                }
218            }
219        })
220    }
221
222    fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
223        let core = self.core.take()?;
224
225        Some(CoreGuard {
226            context: scheduler::Context::CurrentThread(Context {
227                handle: handle.clone(),
228                core: RefCell::new(Some(core)),
229                defer: Defer::new(),
230            }),
231            scheduler: self,
232        })
233    }
234
235    pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
236        let handle = handle.as_current_thread();
237
238        // Avoid a double panic if we are currently panicking and
239        // the lock may be poisoned.
240
241        let core = match self.take_core(handle) {
242            Some(core) => core,
243            None if std::thread::panicking() => return,
244            None => panic!("Oh no! We never placed the Core back, this is a bug!"),
245        };
246
247        // Check that the thread-local is not being destroyed
248        let tls_available = context::with_current(|_| ()).is_ok();
249
250        if tls_available {
251            core.enter(|core, _context| {
252                let core = shutdown2(core, handle);
253                (core, ())
254            });
255        } else {
256            // Shutdown without setting the context. `tokio::spawn` calls will
257            // fail, but those will fail either way because the thread-local is
258            // not available anymore.
259            let context = core.context.expect_current_thread();
260            let core = context.core.borrow_mut().take().unwrap();
261
262            let core = shutdown2(core, handle);
263            *context.core.borrow_mut() = Some(core);
264        }
265    }
266}
267
268fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
269    // Drain the OwnedTasks collection. This call also closes the
270    // collection, ensuring that no tasks are ever pushed after this
271    // call returns.
272    handle.shared.owned.close_and_shutdown_all(0);
273
274    // Drain local queue
275    // We already shut down every task, so we just need to drop the task.
276    while let Some(task) = core.next_local_task(handle) {
277        drop(task);
278    }
279
280    // Close the injection queue
281    handle.shared.inject.close();
282
283    // Drain remote queue
284    while let Some(task) = handle.shared.inject.pop() {
285        drop(task);
286    }
287
288    assert!(handle.shared.owned.is_empty());
289
290    // Submit metrics
291    core.submit_metrics(handle);
292
293    // Shutdown the resource drivers
294    if let Some(driver) = core.driver.as_mut() {
295        driver.shutdown(&handle.driver);
296    }
297
298    core
299}
300
301impl fmt::Debug for CurrentThread {
302    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
303        fmt.debug_struct("CurrentThread").finish()
304    }
305}
306
307// ===== impl Core =====
308
309impl Core {
310    /// Get and increment the current tick
311    fn tick(&mut self) {
312        self.tick = self.tick.wrapping_add(1);
313    }
314
315    fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
316        if self.tick % self.global_queue_interval == 0 {
317            handle
318                .next_remote_task()
319                .or_else(|| self.next_local_task(handle))
320        } else {
321            self.next_local_task(handle)
322                .or_else(|| handle.next_remote_task())
323        }
324    }
325
326    fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
327        let ret = self.tasks.pop_front();
328        handle
329            .shared
330            .worker_metrics
331            .set_queue_depth(self.tasks.len());
332        ret
333    }
334
335    fn push_task(&mut self, handle: &Handle, task: Notified) {
336        self.tasks.push_back(task);
337        self.metrics.inc_local_schedule_count();
338        handle
339            .shared
340            .worker_metrics
341            .set_queue_depth(self.tasks.len());
342    }
343
344    fn submit_metrics(&mut self, handle: &Handle) {
345        self.metrics.submit(&handle.shared.worker_metrics, 0);
346    }
347}
348
349#[cfg(tokio_taskdump)]
350fn wake_deferred_tasks_and_free(context: &Context) {
351    let wakers = context.defer.take_deferred();
352    for waker in wakers {
353        waker.wake();
354    }
355}
356
357// ===== impl Context =====
358
359impl Context {
360    /// Execute the closure with the given scheduler core stored in the
361    /// thread-local context.
362    fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
363        core.metrics.start_poll();
364        let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
365        ret.0.metrics.end_poll();
366        ret
367    }
368
369    /// Blocks the current thread until an event is received by the driver,
370    /// including I/O events, timer events, ...
371    fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
372        let mut driver = core.driver.take().expect("driver missing");
373
374        if let Some(f) = &handle.shared.config.before_park {
375            let (c, ()) = self.enter(core, || f());
376            core = c;
377        }
378
379        // This check will fail if `before_park` spawns a task for us to run
380        // instead of parking the thread
381        if core.tasks.is_empty() {
382            // Park until the thread is signaled
383            core.metrics.about_to_park();
384            core.submit_metrics(handle);
385
386            let (c, ()) = self.enter(core, || {
387                driver.park(&handle.driver);
388                self.defer.wake();
389            });
390
391            core = c;
392
393            core.metrics.unparked();
394            core.submit_metrics(handle);
395        }
396
397        if let Some(f) = &handle.shared.config.after_unpark {
398            let (c, ()) = self.enter(core, || f());
399            core = c;
400        }
401
402        core.driver = Some(driver);
403        core
404    }
405
406    /// Checks the driver for new events without blocking the thread.
407    fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
408        let mut driver = core.driver.take().expect("driver missing");
409
410        core.submit_metrics(handle);
411
412        let (mut core, ()) = self.enter(core, || {
413            driver.park_timeout(&handle.driver, Duration::from_millis(0));
414            self.defer.wake();
415        });
416
417        core.driver = Some(driver);
418        core
419    }
420
421    fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
422        // Store the scheduler core in the thread-local context
423        //
424        // A drop-guard is employed at a higher level.
425        *self.core.borrow_mut() = Some(core);
426
427        // Execute the closure while tracking the execution budget
428        let ret = f();
429
430        // Take the scheduler core back
431        let core = self.core.borrow_mut().take().expect("core missing");
432        (core, ret)
433    }
434
435    pub(crate) fn defer(&self, waker: &Waker) {
436        self.defer.defer(waker);
437    }
438}
439
440// ===== impl Handle =====
441
442impl Handle {
443    /// Spawns a future onto the `CurrentThread` scheduler
444    pub(crate) fn spawn<F>(
445        me: &Arc<Self>,
446        future: F,
447        id: crate::runtime::task::Id,
448    ) -> JoinHandle<F::Output>
449    where
450        F: crate::future::Future + Send + 'static,
451        F::Output: Send + 'static,
452    {
453        let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
454
455        me.task_hooks.spawn(&TaskMeta {
456            id,
457            _phantom: Default::default(),
458        });
459
460        if let Some(notified) = notified {
461            me.schedule(notified);
462        }
463
464        handle
465    }
466
467    /// Spawn a task which isn't safe to send across thread boundaries onto the runtime.
468    ///
469    /// # Safety
470    /// This should only be used when this is a `LocalRuntime` or in another case where the runtime
471    /// provably cannot be driven from or moved to different threads from the one on which the task
472    /// is spawned.
473    pub(crate) unsafe fn spawn_local<F>(
474        me: &Arc<Self>,
475        future: F,
476        id: crate::runtime::task::Id,
477    ) -> JoinHandle<F::Output>
478    where
479        F: crate::future::Future + 'static,
480        F::Output: 'static,
481    {
482        let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id);
483
484        me.task_hooks.spawn(&TaskMeta {
485            id,
486            _phantom: Default::default(),
487        });
488
489        if let Some(notified) = notified {
490            me.schedule(notified);
491        }
492
493        handle
494    }
495
496    /// Capture a snapshot of this runtime's state.
497    #[cfg(all(
498        tokio_unstable,
499        tokio_taskdump,
500        target_os = "linux",
501        any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
502    ))]
503    pub(crate) fn dump(&self) -> crate::runtime::Dump {
504        use crate::runtime::dump;
505        use task::trace::trace_current_thread;
506
507        let mut traces = vec![];
508
509        // todo: how to make this work outside of a runtime context?
510        context::with_scheduler(|maybe_context| {
511            // drain the local queue
512            let context = if let Some(context) = maybe_context {
513                context.expect_current_thread()
514            } else {
515                return;
516            };
517            let mut maybe_core = context.core.borrow_mut();
518            let core = if let Some(core) = maybe_core.as_mut() {
519                core
520            } else {
521                return;
522            };
523            let local = &mut core.tasks;
524
525            if self.shared.inject.is_closed() {
526                return;
527            }
528
529            traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
530                .into_iter()
531                .map(|(id, trace)| dump::Task::new(id, trace))
532                .collect();
533
534            // Avoid double borrow panic
535            drop(maybe_core);
536
537            // Taking a taskdump could wakes every task, but we probably don't want
538            // the `yield_now` vector to be that large under normal circumstances.
539            // Therefore, we free its allocation.
540            wake_deferred_tasks_and_free(context);
541        });
542
543        dump::Dump::new(traces)
544    }
545
546    fn next_remote_task(&self) -> Option<Notified> {
547        self.shared.inject.pop()
548    }
549
550    fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
551        // Set woken to true when enter block_on, ensure outer future
552        // be polled for the first time when enter loop
553        me.shared.woken.store(true, Release);
554        waker_ref(me)
555    }
556
557    // reset woken to false and return original value
558    pub(crate) fn reset_woken(&self) -> bool {
559        self.shared.woken.swap(false, AcqRel)
560    }
561
562    pub(crate) fn num_alive_tasks(&self) -> usize {
563        self.shared.owned.num_alive_tasks()
564    }
565
566    pub(crate) fn injection_queue_depth(&self) -> usize {
567        self.shared.inject.len()
568    }
569}
570
571cfg_unstable_metrics! {
572    impl Handle {
573        pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
574            &self.shared.scheduler_metrics
575        }
576
577        pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
578            assert_eq!(0, worker);
579            &self.shared.worker_metrics
580        }
581
582        pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
583            self.worker_metrics(worker).queue_depth()
584        }
585
586        pub(crate) fn num_blocking_threads(&self) -> usize {
587            self.blocking_spawner.num_threads()
588        }
589
590        pub(crate) fn num_idle_blocking_threads(&self) -> usize {
591            self.blocking_spawner.num_idle_threads()
592        }
593
594        pub(crate) fn blocking_queue_depth(&self) -> usize {
595            self.blocking_spawner.queue_depth()
596        }
597
598        cfg_64bit_metrics! {
599            pub(crate) fn spawned_tasks_count(&self) -> u64 {
600                self.shared.owned.spawned_tasks_count()
601            }
602        }
603    }
604}
605
606cfg_unstable! {
607    use std::num::NonZeroU64;
608
609    impl Handle {
610        pub(crate) fn owned_id(&self) -> NonZeroU64 {
611            self.shared.owned.id
612        }
613    }
614}
615
616impl fmt::Debug for Handle {
617    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
618        fmt.debug_struct("current_thread::Handle { ... }").finish()
619    }
620}
621
622// ===== impl Shared =====
623
624impl Schedule for Arc<Handle> {
625    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
626        self.shared.owned.remove(task)
627    }
628
629    fn schedule(&self, task: task::Notified<Self>) {
630        use scheduler::Context::CurrentThread;
631
632        context::with_scheduler(|maybe_cx| match maybe_cx {
633            Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
634                let mut core = cx.core.borrow_mut();
635
636                // If `None`, the runtime is shutting down, so there is no need
637                // to schedule the task.
638                if let Some(core) = core.as_mut() {
639                    core.push_task(self, task);
640                }
641            }
642            _ => {
643                // Track that a task was scheduled from **outside** of the runtime.
644                self.shared.scheduler_metrics.inc_remote_schedule_count();
645
646                // Schedule the task
647                self.shared.inject.push(task);
648                self.driver.unpark();
649            }
650        });
651    }
652
653    fn hooks(&self) -> TaskHarnessScheduleHooks {
654        TaskHarnessScheduleHooks {
655            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
656        }
657    }
658
659    cfg_unstable! {
660        fn unhandled_panic(&self) {
661            use crate::runtime::UnhandledPanic;
662
663            match self.shared.config.unhandled_panic {
664                UnhandledPanic::Ignore => {
665                    // Do nothing
666                }
667                UnhandledPanic::ShutdownRuntime => {
668                    use scheduler::Context::CurrentThread;
669
670                    // This hook is only called from within the runtime, so
671                    // `context::with_scheduler` should match with `&self`, i.e.
672                    // there is no opportunity for a nested scheduler to be
673                    // called.
674                    context::with_scheduler(|maybe_cx| match maybe_cx {
675                        Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
676                            let mut core = cx.core.borrow_mut();
677
678                            // If `None`, the runtime is shutting down, so there is no need to signal shutdown
679                            if let Some(core) = core.as_mut() {
680                                core.unhandled_panic = true;
681                                self.shared.owned.close_and_shutdown_all(0);
682                            }
683                        }
684                        _ => unreachable!("runtime core not set in CURRENT thread-local"),
685                    })
686                }
687            }
688        }
689    }
690}
691
692impl Wake for Handle {
693    fn wake(arc_self: Arc<Self>) {
694        Wake::wake_by_ref(&arc_self);
695    }
696
697    /// Wake by reference
698    fn wake_by_ref(arc_self: &Arc<Self>) {
699        arc_self.shared.woken.store(true, Release);
700        arc_self.driver.unpark();
701    }
702}
703
704// ===== CoreGuard =====
705
706/// Used to ensure we always place the `Core` value back into its slot in
707/// `CurrentThread`, even if the future panics.
708struct CoreGuard<'a> {
709    context: scheduler::Context,
710    scheduler: &'a CurrentThread,
711}
712
713impl CoreGuard<'_> {
714    #[track_caller]
715    fn block_on<F: Future>(self, future: F) -> F::Output {
716        let ret = self.enter(|mut core, context| {
717            let waker = Handle::waker_ref(&context.handle);
718            let mut cx = std::task::Context::from_waker(&waker);
719
720            pin!(future);
721
722            core.metrics.start_processing_scheduled_tasks();
723
724            'outer: loop {
725                let handle = &context.handle;
726
727                if handle.reset_woken() {
728                    let (c, res) = context.enter(core, || {
729                        crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
730                    });
731
732                    core = c;
733
734                    if let Ready(v) = res {
735                        return (core, Some(v));
736                    }
737                }
738
739                for _ in 0..handle.shared.config.event_interval {
740                    // Make sure we didn't hit an unhandled_panic
741                    if core.unhandled_panic {
742                        return (core, None);
743                    }
744
745                    core.tick();
746
747                    let entry = core.next_task(handle);
748
749                    let task = match entry {
750                        Some(entry) => entry,
751                        None => {
752                            core.metrics.end_processing_scheduled_tasks();
753
754                            core = if !context.defer.is_empty() {
755                                context.park_yield(core, handle)
756                            } else {
757                                context.park(core, handle)
758                            };
759
760                            core.metrics.start_processing_scheduled_tasks();
761
762                            // Try polling the `block_on` future next
763                            continue 'outer;
764                        }
765                    };
766
767                    let task = context.handle.shared.owned.assert_owner(task);
768
769                    let (c, ()) = context.run_task(core, || {
770                        task.run();
771                    });
772
773                    core = c;
774                }
775
776                core.metrics.end_processing_scheduled_tasks();
777
778                // Yield to the driver, this drives the timer and pulls any
779                // pending I/O events.
780                core = context.park_yield(core, handle);
781
782                core.metrics.start_processing_scheduled_tasks();
783            }
784        });
785
786        match ret {
787            Some(ret) => ret,
788            None => {
789                // `block_on` panicked.
790                panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
791            }
792        }
793    }
794
795    /// Enters the scheduler context. This sets the queue and other necessary
796    /// scheduler state in the thread-local.
797    fn enter<F, R>(self, f: F) -> R
798    where
799        F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
800    {
801        let context = self.context.expect_current_thread();
802
803        // Remove `core` from `context` to pass into the closure.
804        let core = context.core.borrow_mut().take().expect("core missing");
805
806        // Call the closure and place `core` back
807        let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
808
809        *context.core.borrow_mut() = Some(core);
810
811        ret
812    }
813}
814
815impl Drop for CoreGuard<'_> {
816    fn drop(&mut self) {
817        let context = self.context.expect_current_thread();
818
819        if let Some(core) = context.core.borrow_mut().take() {
820            // Replace old scheduler back into the state to allow
821            // other threads to pick it up and drive it.
822            self.scheduler.core.set(core);
823
824            // Wake up other possible threads that could steal the driver.
825            self.scheduler.notify.notify_one();
826        }
827    }
828}