1use super::{Pop, Synced};
23use crate::loom::sync::atomic::AtomicUsize;
4use crate::runtime::task;
56use std::marker::PhantomData;
7use std::sync::atomic::Ordering::{Acquire, Release};
89pub(crate) struct Shared<T: 'static> {
10/// Number of pending tasks in the queue. This helps prevent unnecessary
11 /// locking in the hot path.
12pub(super) len: AtomicUsize,
1314 _p: PhantomData<T>,
15}
1617unsafe impl<T> Send for Shared<T> {}
18unsafe impl<T> Sync for Shared<T> {}
1920impl<T: 'static> Shared<T> {
21pub(crate) fn new() -> (Shared<T>, Synced) {
22let inject = Shared {
23 len: AtomicUsize::new(0),
24 _p: PhantomData,
25 };
2627let synced = Synced {
28 is_closed: false,
29 head: None,
30 tail: None,
31 };
3233 (inject, synced)
34 }
3536pub(crate) fn is_empty(&self) -> bool {
37self.len() == 0
38}
3940// Kind of annoying to have to include the cfg here
41#[cfg(any(tokio_taskdump, feature = "rt-multi-thread"))]
42pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
43 synced.is_closed
44 }
4546/// Closes the injection queue, returns `true` if the queue is open when the
47 /// transition is made.
48pub(crate) fn close(&self, synced: &mut Synced) -> bool {
49if synced.is_closed {
50return false;
51 }
5253 synced.is_closed = true;
54true
55}
5657pub(crate) fn len(&self) -> usize {
58self.len.load(Acquire)
59 }
6061/// Pushes a value into the queue.
62 ///
63 /// This does nothing if the queue is closed.
64 ///
65 /// # Safety
66 ///
67 /// Must be called with the same `Synced` instance returned by `Inject::new`
68pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
69if synced.is_closed {
70return;
71 }
7273// safety: only mutated with the lock held
74let len = self.len.unsync_load();
75let task = task.into_raw();
7677// The next pointer should already be null
78debug_assert!(unsafe { task.get_queue_next().is_none() });
7980if let Some(tail) = synced.tail {
81// safety: Holding the Notified for a task guarantees exclusive
82 // access to the `queue_next` field.
83unsafe { tail.set_queue_next(Some(task)) };
84 } else {
85 synced.head = Some(task);
86 }
8788 synced.tail = Some(task);
89self.len.store(len + 1, Release);
90 }
9192/// Pop a value from the queue.
93 ///
94 /// # Safety
95 ///
96 /// Must be called with the same `Synced` instance returned by `Inject::new`
97pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
98self.pop_n(synced, 1).next()
99 }
100101/// Pop `n` values from the queue
102 ///
103 /// # Safety
104 ///
105 /// Must be called with the same `Synced` instance returned by `Inject::new`
106pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
107use std::cmp;
108109debug_assert!(n > 0);
110111// safety: All updates to the len atomic are guarded by the mutex. As
112 // such, a non-atomic load followed by a store is safe.
113let len = self.len.unsync_load();
114let n = cmp::min(n, len);
115116// Decrement the count.
117self.len.store(len - n, Release);
118119 Pop::new(n, synced)
120 }
121}