tokio/runtime/scheduler/inject/
shared.rs

1use super::{Pop, Synced};
2
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::runtime::task;
5
6use std::marker::PhantomData;
7use std::sync::atomic::Ordering::{Acquire, Release};
8
9pub(crate) struct Shared<T: 'static> {
10    /// Number of pending tasks in the queue. This helps prevent unnecessary
11    /// locking in the hot path.
12    pub(super) len: AtomicUsize,
13
14    _p: PhantomData<T>,
15}
16
17unsafe impl<T> Send for Shared<T> {}
18unsafe impl<T> Sync for Shared<T> {}
19
20impl<T: 'static> Shared<T> {
21    pub(crate) fn new() -> (Shared<T>, Synced) {
22        let inject = Shared {
23            len: AtomicUsize::new(0),
24            _p: PhantomData,
25        };
26
27        let synced = Synced {
28            is_closed: false,
29            head: None,
30            tail: None,
31        };
32
33        (inject, synced)
34    }
35
36    pub(crate) fn is_empty(&self) -> bool {
37        self.len() == 0
38    }
39
40    // Kind of annoying to have to include the cfg here
41    #[cfg(any(tokio_taskdump, feature = "rt-multi-thread"))]
42    pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
43        synced.is_closed
44    }
45
46    /// Closes the injection queue, returns `true` if the queue is open when the
47    /// transition is made.
48    pub(crate) fn close(&self, synced: &mut Synced) -> bool {
49        if synced.is_closed {
50            return false;
51        }
52
53        synced.is_closed = true;
54        true
55    }
56
57    pub(crate) fn len(&self) -> usize {
58        self.len.load(Acquire)
59    }
60
61    /// Pushes a value into the queue.
62    ///
63    /// This does nothing if the queue is closed.
64    ///
65    /// # Safety
66    ///
67    /// Must be called with the same `Synced` instance returned by `Inject::new`
68    pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
69        if synced.is_closed {
70            return;
71        }
72
73        // safety: only mutated with the lock held
74        let len = self.len.unsync_load();
75        let task = task.into_raw();
76
77        // The next pointer should already be null
78        debug_assert!(unsafe { task.get_queue_next().is_none() });
79
80        if let Some(tail) = synced.tail {
81            // safety: Holding the Notified for a task guarantees exclusive
82            // access to the `queue_next` field.
83            unsafe { tail.set_queue_next(Some(task)) };
84        } else {
85            synced.head = Some(task);
86        }
87
88        synced.tail = Some(task);
89        self.len.store(len + 1, Release);
90    }
91
92    /// Pop a value from the queue.
93    ///
94    /// # Safety
95    ///
96    /// Must be called with the same `Synced` instance returned by `Inject::new`
97    pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
98        self.pop_n(synced, 1).next()
99    }
100
101    /// Pop `n` values from the queue
102    ///
103    /// # Safety
104    ///
105    /// Must be called with the same `Synced` instance returned by `Inject::new`
106    pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
107        use std::cmp;
108
109        debug_assert!(n > 0);
110
111        // safety: All updates to the len atomic are guarded by the mutex. As
112        // such, a non-atomic load followed by a store is safe.
113        let len = self.len.unsync_load();
114        let n = cmp::min(n, len);
115
116        // Decrement the count.
117        self.len.store(len - n, Release);
118
119        Pop::new(n, synced)
120    }
121}