tokio/sync/mpsc/
list.rs

1//! A concurrent, lock-free, FIFO list.
2
3use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
4use crate::loom::thread;
5use crate::sync::mpsc::block::{self, Block};
6
7use std::fmt;
8use std::ptr::NonNull;
9use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
10
11/// List queue transmit handle.
12pub(crate) struct Tx<T> {
13    /// Tail in the `Block` mpmc list.
14    block_tail: AtomicPtr<Block<T>>,
15
16    /// Position to push the next message. This references a block and offset
17    /// into the block.
18    tail_position: AtomicUsize,
19}
20
21/// List queue receive handle
22pub(crate) struct Rx<T> {
23    /// Pointer to the block being processed.
24    head: NonNull<Block<T>>,
25
26    /// Next slot index to process.
27    index: usize,
28
29    /// Pointer to the next block pending release.
30    free_head: NonNull<Block<T>>,
31}
32
33/// Return value of `Rx::try_pop`.
34pub(crate) enum TryPopResult<T> {
35    /// Successfully popped a value.
36    Ok(T),
37    /// The channel is empty.
38    Empty,
39    /// The channel is empty and closed.
40    Closed,
41    /// The channel is not empty, but the first value is being written.
42    Busy,
43}
44
45pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
46    // Create the initial block shared between the tx and rx halves.
47    let initial_block = Block::new(0);
48    let initial_block_ptr = Box::into_raw(initial_block);
49
50    let tx = Tx {
51        block_tail: AtomicPtr::new(initial_block_ptr),
52        tail_position: AtomicUsize::new(0),
53    };
54
55    let head = NonNull::new(initial_block_ptr).unwrap();
56
57    let rx = Rx {
58        head,
59        index: 0,
60        free_head: head,
61    };
62
63    (tx, rx)
64}
65
66impl<T> Tx<T> {
67    /// Pushes a value into the list.
68    pub(crate) fn push(&self, value: T) {
69        // First, claim a slot for the value. `Acquire` is used here to
70        // synchronize with the `fetch_add` in `reclaim_blocks`.
71        let slot_index = self.tail_position.fetch_add(1, Acquire);
72
73        // Load the current block and write the value
74        let block = self.find_block(slot_index);
75
76        unsafe {
77            // Write the value to the block
78            block.as_ref().write(slot_index, value);
79        }
80    }
81
82    /// Closes the send half of the list.
83    ///
84    /// Similar process as pushing a value, but instead of writing the value &
85    /// setting the ready flag, the `TX_CLOSED` flag is set on the block.
86    pub(crate) fn close(&self) {
87        // First, claim a slot for the value. This is the last slot that will be
88        // claimed.
89        let slot_index = self.tail_position.fetch_add(1, Acquire);
90
91        let block = self.find_block(slot_index);
92
93        unsafe { block.as_ref().tx_close() }
94    }
95
96    fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
97        // The start index of the block that contains `index`.
98        let start_index = block::start_index(slot_index);
99
100        // The index offset into the block
101        let offset = block::offset(slot_index);
102
103        // Load the current head of the block
104        let mut block_ptr = self.block_tail.load(Acquire);
105
106        let block = unsafe { &*block_ptr };
107
108        // Calculate the distance between the tail ptr and the target block
109        let distance = block.distance(start_index);
110
111        // Decide if this call to `find_block` should attempt to update the
112        // `block_tail` pointer.
113        //
114        // Updating `block_tail` is not always performed in order to reduce
115        // contention.
116        //
117        // When set, as the routine walks the linked list, it attempts to update
118        // `block_tail`. If the update cannot be performed, `try_updating_tail`
119        // is unset.
120        let mut try_updating_tail = distance > offset;
121
122        // Walk the linked list of blocks until the block with `start_index` is
123        // found.
124        loop {
125            let block = unsafe { &(*block_ptr) };
126
127            if block.is_at_index(start_index) {
128                return unsafe { NonNull::new_unchecked(block_ptr) };
129            }
130
131            let next_block = block
132                .load_next(Acquire)
133                // There is no allocated next block, grow the linked list.
134                .unwrap_or_else(|| block.grow());
135
136            // If the block is **not** final, then the tail pointer cannot be
137            // advanced any more.
138            try_updating_tail &= block.is_final();
139
140            if try_updating_tail {
141                // Advancing `block_tail` must happen when walking the linked
142                // list. `block_tail` may not advance passed any blocks that are
143                // not "final". At the point a block is finalized, it is unknown
144                // if there are any prior blocks that are unfinalized, which
145                // makes it impossible to advance `block_tail`.
146                //
147                // While walking the linked list, `block_tail` can be advanced
148                // as long as finalized blocks are traversed.
149                //
150                // Release ordering is used to ensure that any subsequent reads
151                // are able to see the memory pointed to by `block_tail`.
152                //
153                // Acquire is not needed as any "actual" value is not accessed.
154                // At this point, the linked list is walked to acquire blocks.
155                if self
156                    .block_tail
157                    .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
158                    .is_ok()
159                {
160                    // Synchronize with any senders
161                    let tail_position = self.tail_position.fetch_add(0, Release);
162
163                    unsafe {
164                        block.tx_release(tail_position);
165                    }
166                } else {
167                    // A concurrent sender is also working on advancing
168                    // `block_tail` and this thread is falling behind.
169                    //
170                    // Stop trying to advance the tail pointer
171                    try_updating_tail = false;
172                }
173            }
174
175            block_ptr = next_block.as_ptr();
176
177            thread::yield_now();
178        }
179    }
180
181    pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
182        // The block has been removed from the linked list and ownership
183        // is reclaimed.
184        //
185        // Before dropping the block, see if it can be reused by
186        // inserting it back at the end of the linked list.
187        //
188        // First, reset the data
189        block.as_mut().reclaim();
190
191        let mut reused = false;
192
193        // Attempt to insert the block at the end
194        //
195        // Walk at most three times
196        //
197        let curr_ptr = self.block_tail.load(Acquire);
198
199        // The pointer can never be null
200        debug_assert!(!curr_ptr.is_null());
201
202        let mut curr = NonNull::new_unchecked(curr_ptr);
203
204        // TODO: Unify this logic with Block::grow
205        for _ in 0..3 {
206            match curr.as_ref().try_push(&mut block, AcqRel, Acquire) {
207                Ok(()) => {
208                    reused = true;
209                    break;
210                }
211                Err(next) => {
212                    curr = next;
213                }
214            }
215        }
216
217        if !reused {
218            let _ = Box::from_raw(block.as_ptr());
219        }
220    }
221
222    pub(crate) fn is_closed(&self) -> bool {
223        let tail = self.block_tail.load(Acquire);
224
225        unsafe {
226            let tail_block = &*tail;
227            tail_block.is_closed()
228        }
229    }
230}
231
232impl<T> fmt::Debug for Tx<T> {
233    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
234        fmt.debug_struct("Tx")
235            .field("block_tail", &self.block_tail.load(Relaxed))
236            .field("tail_position", &self.tail_position.load(Relaxed))
237            .finish()
238    }
239}
240
241impl<T> Rx<T> {
242    pub(crate) fn is_empty(&self, tx: &Tx<T>) -> bool {
243        let block = unsafe { self.head.as_ref() };
244        if block.has_value(self.index) {
245            return false;
246        }
247
248        // It is possible that a block has no value "now" but the list is still not empty.
249        // To be sure, it is necessary to check the length of the list.
250        self.len(tx) == 0
251    }
252
253    pub(crate) fn len(&self, tx: &Tx<T>) -> usize {
254        // When all the senders are dropped, there will be a last block in the tail position,
255        // but it will be closed
256        let tail_position = tx.tail_position.load(Acquire);
257        tail_position - self.index - (tx.is_closed() as usize)
258    }
259
260    /// Pops the next value off the queue.
261    pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
262        // Advance `head`, if needed
263        if !self.try_advancing_head() {
264            return None;
265        }
266
267        self.reclaim_blocks(tx);
268
269        unsafe {
270            let block = self.head.as_ref();
271
272            let ret = block.read(self.index);
273
274            if let Some(block::Read::Value(..)) = ret {
275                self.index = self.index.wrapping_add(1);
276            }
277
278            ret
279        }
280    }
281
282    /// Pops the next value off the queue, detecting whether the block
283    /// is busy or empty on failure.
284    ///
285    /// This function exists because `Rx::pop` can return `None` even if the
286    /// channel's queue contains a message that has been completely written.
287    /// This can happen if the fully delivered message is behind another message
288    /// that is in the middle of being written to the block, since the channel
289    /// can't return the messages out of order.
290    pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> {
291        let tail_position = tx.tail_position.load(Acquire);
292        let result = self.pop(tx);
293
294        match result {
295            Some(block::Read::Value(t)) => TryPopResult::Ok(t),
296            Some(block::Read::Closed) => TryPopResult::Closed,
297            None if tail_position == self.index => TryPopResult::Empty,
298            None => TryPopResult::Busy,
299        }
300    }
301
302    /// Tries advancing the block pointer to the block referenced by `self.index`.
303    ///
304    /// Returns `true` if successful, `false` if there is no next block to load.
305    fn try_advancing_head(&mut self) -> bool {
306        let block_index = block::start_index(self.index);
307
308        loop {
309            let next_block = {
310                let block = unsafe { self.head.as_ref() };
311
312                if block.is_at_index(block_index) {
313                    return true;
314                }
315
316                block.load_next(Acquire)
317            };
318
319            let next_block = match next_block {
320                Some(next_block) => next_block,
321                None => {
322                    return false;
323                }
324            };
325
326            self.head = next_block;
327
328            thread::yield_now();
329        }
330    }
331
332    fn reclaim_blocks(&mut self, tx: &Tx<T>) {
333        while self.free_head != self.head {
334            unsafe {
335                // Get a handle to the block that will be freed and update
336                // `free_head` to point to the next block.
337                let block = self.free_head;
338
339                let observed_tail_position = block.as_ref().observed_tail_position();
340
341                let required_index = match observed_tail_position {
342                    Some(i) => i,
343                    None => return,
344                };
345
346                if required_index > self.index {
347                    return;
348                }
349
350                // We may read the next pointer with `Relaxed` ordering as it is
351                // guaranteed that the `reclaim_blocks` routine trails the `recv`
352                // routine. Any memory accessed by `reclaim_blocks` has already
353                // been acquired by `recv`.
354                let next_block = block.as_ref().load_next(Relaxed);
355
356                // Update the free list head
357                self.free_head = next_block.unwrap();
358
359                // Push the emptied block onto the back of the queue, making it
360                // available to senders.
361                tx.reclaim_block(block);
362            }
363
364            thread::yield_now();
365        }
366    }
367
368    /// Effectively `Drop` all the blocks. Should only be called once, when
369    /// the list is dropping.
370    pub(super) unsafe fn free_blocks(&mut self) {
371        debug_assert_ne!(self.free_head, NonNull::dangling());
372
373        let mut cur = Some(self.free_head);
374
375        #[cfg(debug_assertions)]
376        {
377            // to trigger the debug assert above so as to catch that we
378            // don't call `free_blocks` more than once.
379            self.free_head = NonNull::dangling();
380            self.head = NonNull::dangling();
381        }
382
383        while let Some(block) = cur {
384            cur = block.as_ref().load_next(Relaxed);
385            drop(Box::from_raw(block.as_ptr()));
386        }
387    }
388}
389
390impl<T> fmt::Debug for Rx<T> {
391    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
392        fmt.debug_struct("Rx")
393            .field("head", &self.head)
394            .field("index", &self.index)
395            .field("free_head", &self.free_head)
396            .finish()
397    }
398}