tokio/util/
idle_notified_set.rs

1//! This module defines an `IdleNotifiedSet`, which is a collection of elements.
2//! Each element is intended to correspond to a task, and the collection will
3//! keep track of which tasks have had their waker notified, and which have not.
4//!
5//! Each entry in the set holds some user-specified value. The value's type is
6//! specified using the `T` parameter. It will usually be a `JoinHandle` or
7//! similar.
8
9use std::marker::PhantomPinned;
10use std::mem::ManuallyDrop;
11use std::ptr::NonNull;
12use std::task::{Context, Waker};
13
14use crate::loom::cell::UnsafeCell;
15use crate::loom::sync::{Arc, Mutex};
16use crate::util::linked_list::{self, Link};
17use crate::util::{waker_ref, Wake};
18
19type LinkedList<T> =
20    linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>;
21
22/// This is the main handle to the collection.
23pub(crate) struct IdleNotifiedSet<T> {
24    lists: Arc<Lists<T>>,
25    length: usize,
26}
27
28/// A handle to an entry that is guaranteed to be stored in the idle or notified
29/// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet`
30/// mutably to prevent the entry from being moved to the `Neither` list, which
31/// only the `IdleNotifiedSet` may do.
32///
33/// The main consequence of being stored in one of the lists is that the `value`
34/// field has not yet been consumed.
35///
36/// Note: This entry can be moved from the idle to the notified list while this
37/// object exists by waking its waker.
38pub(crate) struct EntryInOneOfTheLists<'a, T> {
39    entry: Arc<ListEntry<T>>,
40    set: &'a mut IdleNotifiedSet<T>,
41}
42
43type Lists<T> = Mutex<ListsInner<T>>;
44
45/// The linked lists hold strong references to the `ListEntry` items, and the
46/// `ListEntry` items also hold a strong reference back to the Lists object, but
47/// the destructor of the `IdleNotifiedSet` will clear the two lists, so once
48/// that object is destroyed, no ref-cycles will remain.
49struct ListsInner<T> {
50    notified: LinkedList<T>,
51    idle: LinkedList<T>,
52    /// Whenever an element in the `notified` list is woken, this waker will be
53    /// notified and consumed, if it exists.
54    waker: Option<Waker>,
55}
56
57/// Which of the two lists in the shared Lists object is this entry stored in?
58///
59/// If the value is `Idle`, then an entry's waker may move it to the notified
60/// list. Otherwise, only the `IdleNotifiedSet` may move it.
61///
62/// If the value is `Neither`, then it is still possible that the entry is in
63/// some third external list (this happens in `drain`).
64#[derive(Copy, Clone, Eq, PartialEq)]
65enum List {
66    Notified,
67    Idle,
68    Neither,
69}
70
71/// An entry in the list.
72///
73/// # Safety
74///
75/// The `my_list` field must only be accessed while holding the mutex in
76/// `parent`. It is an invariant that the value of `my_list` corresponds to
77/// which linked list in the `parent` holds this entry. Once this field takes
78/// the value `Neither`, then it may never be modified again.
79///
80/// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field
81/// must only be accessed while holding the mutex. If the value of `my_list` is
82/// `Neither`, then the `pointers` field may be accessed by the
83/// `IdleNotifiedSet` (this happens inside `drain`).
84///
85/// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed
86/// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to
87/// `Neither` assumes ownership of the `value`, and it must either drop it or
88/// move it out from this entry to prevent it from getting leaked. (Since the
89/// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the
90/// value should not be leaked.)
91struct ListEntry<T> {
92    /// The linked list pointers of the list this entry is in.
93    pointers: linked_list::Pointers<ListEntry<T>>,
94    /// Pointer to the shared `Lists` struct.
95    parent: Arc<Lists<T>>,
96    /// The value stored in this entry.
97    value: UnsafeCell<ManuallyDrop<T>>,
98    /// Used to remember which list this entry is in.
99    my_list: UnsafeCell<List>,
100    /// Required by the `linked_list::Pointers` field.
101    _pin: PhantomPinned,
102}
103
104generate_addr_of_methods! {
105    impl<T> ListEntry<T> {
106        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
107            &self.pointers
108        }
109    }
110}
111
112// With mutable access to the `IdleNotifiedSet`, you can get mutable access to
113// the values.
114unsafe impl<T: Send> Send for IdleNotifiedSet<T> {}
115// With the current API we strictly speaking don't even need `T: Sync`, but we
116// require it anyway to support adding &self APIs that access the values in the
117// future.
118unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {}
119
120// These impls control when it is safe to create a Waker. Since the waker does
121// not allow access to the value in any way (including its destructor), it is
122// not necessary for `T` to be Send or Sync.
123unsafe impl<T> Send for ListEntry<T> {}
124unsafe impl<T> Sync for ListEntry<T> {}
125
126impl<T> IdleNotifiedSet<T> {
127    /// Create a new `IdleNotifiedSet`.
128    pub(crate) fn new() -> Self {
129        let lists = Mutex::new(ListsInner {
130            notified: LinkedList::new(),
131            idle: LinkedList::new(),
132            waker: None,
133        });
134
135        IdleNotifiedSet {
136            lists: Arc::new(lists),
137            length: 0,
138        }
139    }
140
141    pub(crate) fn len(&self) -> usize {
142        self.length
143    }
144
145    pub(crate) fn is_empty(&self) -> bool {
146        self.length == 0
147    }
148
149    /// Insert the given value into the `idle` list.
150    pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> {
151        self.length += 1;
152
153        let entry = Arc::new(ListEntry {
154            parent: self.lists.clone(),
155            value: UnsafeCell::new(ManuallyDrop::new(value)),
156            my_list: UnsafeCell::new(List::Idle),
157            pointers: linked_list::Pointers::new(),
158            _pin: PhantomPinned,
159        });
160
161        {
162            let mut lock = self.lists.lock();
163            lock.idle.push_front(entry.clone());
164        }
165
166        // Safety: We just put the entry in the idle list, so it is in one of the lists.
167        EntryInOneOfTheLists { entry, set: self }
168    }
169
170    /// Pop an entry from the notified list to poll it. The entry is moved to
171    /// the idle list atomically.
172    pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> {
173        // We don't decrement the length because this call moves the entry to
174        // the idle list rather than removing it.
175        if self.length == 0 {
176            // Fast path.
177            return None;
178        }
179
180        let mut lock = self.lists.lock();
181
182        let should_update_waker = match lock.waker.as_mut() {
183            Some(cur_waker) => !waker.will_wake(cur_waker),
184            None => true,
185        };
186        if should_update_waker {
187            lock.waker = Some(waker.clone());
188        }
189
190        // Pop the entry, returning None if empty.
191        let entry = lock.notified.pop_back()?;
192
193        lock.idle.push_front(entry.clone());
194
195        // Safety: We are holding the lock.
196        entry.my_list.with_mut(|ptr| unsafe {
197            *ptr = List::Idle;
198        });
199
200        drop(lock);
201
202        // Safety: We just put the entry in the idle list, so it is in one of the lists.
203        Some(EntryInOneOfTheLists { entry, set: self })
204    }
205
206    /// Tries to pop an entry from the notified list to poll it. The entry is moved to
207    /// the idle list atomically.
208    pub(crate) fn try_pop_notified(&mut self) -> Option<EntryInOneOfTheLists<'_, T>> {
209        // We don't decrement the length because this call moves the entry to
210        // the idle list rather than removing it.
211        if self.length == 0 {
212            // Fast path.
213            return None;
214        }
215
216        let mut lock = self.lists.lock();
217
218        // Pop the entry, returning None if empty.
219        let entry = lock.notified.pop_back()?;
220
221        lock.idle.push_front(entry.clone());
222
223        // Safety: We are holding the lock.
224        entry.my_list.with_mut(|ptr| unsafe {
225            *ptr = List::Idle;
226        });
227
228        drop(lock);
229
230        // Safety: We just put the entry in the idle list, so it is in one of the lists.
231        Some(EntryInOneOfTheLists { entry, set: self })
232    }
233
234    /// Call a function on every element in this list.
235    pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) {
236        fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) {
237            let mut node = list.last();
238
239            while let Some(entry) = node {
240                ptrs.push(entry.value.with_mut(|ptr| {
241                    let ptr: *mut ManuallyDrop<T> = ptr;
242                    let ptr: *mut T = ptr.cast();
243                    ptr
244                }));
245
246                let prev = entry.pointers.get_prev();
247                node = prev.map(|prev| unsafe { &*prev.as_ptr() });
248            }
249        }
250
251        // Atomically get a raw pointer to the value of every entry.
252        //
253        // Since this only locks the mutex once, it is not possible for a value
254        // to get moved from the idle list to the notified list during the
255        // operation, which would otherwise result in some value being listed
256        // twice.
257        let mut ptrs = Vec::with_capacity(self.len());
258        {
259            let mut lock = self.lists.lock();
260
261            get_ptrs(&mut lock.idle, &mut ptrs);
262            get_ptrs(&mut lock.notified, &mut ptrs);
263        }
264        debug_assert_eq!(ptrs.len(), ptrs.capacity());
265
266        for ptr in ptrs {
267            // Safety: When we grabbed the pointers, the entries were in one of
268            // the two lists. This means that their value was valid at the time,
269            // and it must still be valid because we are the IdleNotifiedSet,
270            // and only we can remove an entry from the two lists. (It's
271            // possible that an entry is moved from one list to the other during
272            // this loop, but that is ok.)
273            func(unsafe { &mut *ptr });
274        }
275    }
276
277    /// Remove all entries in both lists, applying some function to each element.
278    ///
279    /// The closure is called on all elements even if it panics. Having it panic
280    /// twice is a double-panic, and will abort the application.
281    pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) {
282        if self.length == 0 {
283            // Fast path.
284            return;
285        }
286        self.length = 0;
287
288        // The LinkedList is not cleared on panic, so we use a bomb to clear it.
289        //
290        // This value has the invariant that any entry in its `all_entries` list
291        // has `my_list` set to `Neither` and that the value has not yet been
292        // dropped.
293        struct AllEntries<T, F: FnMut(T)> {
294            all_entries: LinkedList<T>,
295            func: F,
296        }
297
298        impl<T, F: FnMut(T)> AllEntries<T, F> {
299            fn pop_next(&mut self) -> bool {
300                if let Some(entry) = self.all_entries.pop_back() {
301                    // Safety: We just took this value from the list, so we can
302                    // destroy the value in the entry.
303                    entry
304                        .value
305                        .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) });
306                    true
307                } else {
308                    false
309                }
310            }
311        }
312
313        impl<T, F: FnMut(T)> Drop for AllEntries<T, F> {
314            fn drop(&mut self) {
315                while self.pop_next() {}
316            }
317        }
318
319        let mut all_entries = AllEntries {
320            all_entries: LinkedList::new(),
321            func,
322        };
323
324        // Atomically move all entries to the new linked list in the AllEntries
325        // object.
326        {
327            let mut lock = self.lists.lock();
328            unsafe {
329                // Safety: We are holding the lock and `all_entries` is a new
330                // LinkedList.
331                move_to_new_list(&mut lock.idle, &mut all_entries.all_entries);
332                move_to_new_list(&mut lock.notified, &mut all_entries.all_entries);
333            }
334        }
335
336        // Keep destroying entries in the list until it is empty.
337        //
338        // If the closure panics, then the destructor of the `AllEntries` bomb
339        // ensures that we keep running the destructor on the remaining values.
340        // A second panic will abort the program.
341        while all_entries.pop_next() {}
342    }
343}
344
345/// # Safety
346///
347/// The mutex for the entries must be held, and the target list must be such
348/// that setting `my_list` to `Neither` is ok.
349unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) {
350    while let Some(entry) = from.pop_back() {
351        entry.my_list.with_mut(|ptr| {
352            *ptr = List::Neither;
353        });
354        to.push_front(entry);
355    }
356}
357
358impl<'a, T> EntryInOneOfTheLists<'a, T> {
359    /// Remove this entry from the list it is in, returning the value associated
360    /// with the entry.
361    ///
362    /// This consumes the value, since it is no longer guaranteed to be in a
363    /// list.
364    pub(crate) fn remove(self) -> T {
365        self.set.length -= 1;
366
367        {
368            let mut lock = self.set.lists.lock();
369
370            // Safety: We are holding the lock so there is no race, and we will
371            // remove the entry afterwards to uphold invariants.
372            let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe {
373                let old_my_list = *ptr;
374                *ptr = List::Neither;
375                old_my_list
376            });
377
378            let list = match old_my_list {
379                List::Idle => &mut lock.idle,
380                List::Notified => &mut lock.notified,
381                // An entry in one of the lists is in one of the lists.
382                List::Neither => unreachable!(),
383            };
384
385            unsafe {
386                // Safety: We just checked that the entry is in this particular
387                // list.
388                list.remove(ListEntry::as_raw(&self.entry)).unwrap();
389            }
390        }
391
392        // By setting `my_list` to `Neither`, we have taken ownership of the
393        // value. We return it to the caller.
394        //
395        // Safety: We have a mutable reference to the `IdleNotifiedSet` that
396        // owns this entry, so we can use its permission to access the value.
397        self.entry
398            .value
399            .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) })
400    }
401
402    /// Access the value in this entry together with a context for its waker.
403    pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U
404    where
405        F: FnOnce(&mut T, &mut Context<'_>) -> U,
406        T: 'static,
407    {
408        let waker = waker_ref(&self.entry);
409
410        let mut context = Context::from_waker(&waker);
411
412        // Safety: We have a mutable reference to the `IdleNotifiedSet` that
413        // owns this entry, so we can use its permission to access the value.
414        self.entry
415            .value
416            .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) })
417    }
418}
419
420impl<T> Drop for IdleNotifiedSet<T> {
421    fn drop(&mut self) {
422        // Clear both lists.
423        self.drain(drop);
424
425        #[cfg(debug_assertions)]
426        if !std::thread::panicking() {
427            let lock = self.lists.lock();
428            assert!(lock.idle.is_empty());
429            assert!(lock.notified.is_empty());
430        }
431    }
432}
433
434impl<T: 'static> Wake for ListEntry<T> {
435    fn wake_by_ref(me: &Arc<Self>) {
436        let mut lock = me.parent.lock();
437
438        // Safety: We are holding the lock and we will update the lists to
439        // maintain invariants.
440        let old_my_list = me.my_list.with_mut(|ptr| unsafe {
441            let old_my_list = *ptr;
442            if old_my_list == List::Idle {
443                *ptr = List::Notified;
444            }
445            old_my_list
446        });
447
448        if old_my_list == List::Idle {
449            // We move ourself to the notified list.
450            let me = unsafe {
451                // Safety: We just checked that we are in this particular list.
452                lock.idle.remove(ListEntry::as_raw(me)).unwrap()
453            };
454            lock.notified.push_front(me);
455
456            if let Some(waker) = lock.waker.take() {
457                drop(lock);
458                waker.wake();
459            }
460        }
461    }
462
463    fn wake(me: Arc<Self>) {
464        Self::wake_by_ref(&me);
465    }
466}
467
468/// # Safety
469///
470/// `ListEntry` is forced to be !Unpin.
471unsafe impl<T> linked_list::Link for ListEntry<T> {
472    type Handle = Arc<ListEntry<T>>;
473    type Target = ListEntry<T>;
474
475    fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> {
476        let ptr: *const ListEntry<T> = Arc::as_ptr(handle);
477        // Safety: We can't get a null pointer from `Arc::as_ptr`.
478        unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) }
479    }
480
481    unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> {
482        Arc::from_raw(ptr.as_ptr())
483    }
484
485    unsafe fn pointers(
486        target: NonNull<ListEntry<T>>,
487    ) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
488        ListEntry::addr_of_pointers(target)
489    }
490}
491
492#[cfg(all(test, not(loom)))]
493mod tests {
494    use crate::runtime::Builder;
495    use crate::task::JoinSet;
496
497    // A test that runs under miri.
498    //
499    // https://github.com/tokio-rs/tokio/pull/5693
500    #[test]
501    fn join_set_test() {
502        let rt = Builder::new_current_thread().build().unwrap();
503
504        let mut set = JoinSet::new();
505        set.spawn_on(futures::future::ready(()), rt.handle());
506
507        rt.block_on(set.join_next()).unwrap().unwrap();
508    }
509}