tokio_util/time/wheel/
mod.rs

1mod level;
2pub(crate) use self::level::Expiration;
3use self::level::Level;
4
5mod stack;
6pub(crate) use self::stack::Stack;
7
8use std::borrow::Borrow;
9use std::fmt::Debug;
10
11/// Timing wheel implementation.
12///
13/// This type provides the hashed timing wheel implementation that backs `Timer`
14/// and `DelayQueue`.
15///
16/// The structure is generic over `T: Stack`. This allows handling timeout data
17/// being stored on the heap or in a slab. In order to support the latter case,
18/// the slab must be passed into each function allowing the implementation to
19/// lookup timer entries.
20///
21/// See `Timer` documentation for some implementation notes.
22#[derive(Debug)]
23pub(crate) struct Wheel<T> {
24    /// The number of milliseconds elapsed since the wheel started.
25    elapsed: u64,
26
27    /// Timer wheel.
28    ///
29    /// Levels:
30    ///
31    /// * 1 ms slots / 64 ms range
32    /// * 64 ms slots / ~ 4 sec range
33    /// * ~ 4 sec slots / ~ 4 min range
34    /// * ~ 4 min slots / ~ 4 hr range
35    /// * ~ 4 hr slots / ~ 12 day range
36    /// * ~ 12 day slots / ~ 2 yr range
37    levels: Box<[Level<T>]>,
38}
39
40/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
41/// each, the timer is able to track time up to 2 years into the future with a
42/// precision of 1 millisecond.
43const NUM_LEVELS: usize = 6;
44
45/// The maximum duration of a delay
46const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
47
48#[derive(Debug)]
49pub(crate) enum InsertError {
50    Elapsed,
51    Invalid,
52}
53
54impl<T> Wheel<T>
55where
56    T: Stack,
57{
58    /// Create a new timing wheel
59    pub(crate) fn new() -> Wheel<T> {
60        let levels = (0..NUM_LEVELS).map(Level::new).collect();
61
62        Wheel { elapsed: 0, levels }
63    }
64
65    /// Return the number of milliseconds that have elapsed since the timing
66    /// wheel's creation.
67    pub(crate) fn elapsed(&self) -> u64 {
68        self.elapsed
69    }
70
71    /// Insert an entry into the timing wheel.
72    ///
73    /// # Arguments
74    ///
75    /// * `when`: is the instant at which the entry should be fired. It is
76    ///           represented as the number of milliseconds since the creation
77    ///           of the timing wheel.
78    ///
79    /// * `item`: The item to insert into the wheel.
80    ///
81    /// * `store`: The slab or `()` when using heap storage.
82    ///
83    /// # Return
84    ///
85    /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
86    ///
87    /// `Err(Elapsed)` indicates that `when` represents an instant that has
88    /// already passed. In this case, the caller should fire the timeout
89    /// immediately.
90    ///
91    /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
92    pub(crate) fn insert(
93        &mut self,
94        when: u64,
95        item: T::Owned,
96        store: &mut T::Store,
97    ) -> Result<(), (T::Owned, InsertError)> {
98        if when <= self.elapsed {
99            return Err((item, InsertError::Elapsed));
100        } else if when - self.elapsed > MAX_DURATION {
101            return Err((item, InsertError::Invalid));
102        }
103
104        // Get the level at which the entry should be stored
105        let level = self.level_for(when);
106
107        self.levels[level].add_entry(when, item, store);
108
109        debug_assert!({
110            self.levels[level]
111                .next_expiration(self.elapsed)
112                .map(|e| e.deadline >= self.elapsed)
113                .unwrap_or(true)
114        });
115
116        Ok(())
117    }
118
119    /// Remove `item` from the timing wheel.
120    #[track_caller]
121    pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
122        let when = T::when(item, store);
123
124        assert!(
125            self.elapsed <= when,
126            "elapsed={}; when={}",
127            self.elapsed,
128            when
129        );
130
131        let level = self.level_for(when);
132
133        self.levels[level].remove_entry(when, item, store);
134    }
135
136    /// Instant at which to poll
137    pub(crate) fn poll_at(&self) -> Option<u64> {
138        self.next_expiration().map(|expiration| expiration.deadline)
139    }
140
141    /// Next key that will expire
142    pub(crate) fn peek(&self) -> Option<T::Owned> {
143        self.next_expiration()
144            .and_then(|expiration| self.peek_entry(&expiration))
145    }
146
147    /// Advances the timer up to the instant represented by `now`.
148    pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
149        loop {
150            let expiration = self.next_expiration().and_then(|expiration| {
151                if expiration.deadline > now {
152                    None
153                } else {
154                    Some(expiration)
155                }
156            });
157
158            match expiration {
159                Some(ref expiration) => {
160                    if let Some(item) = self.poll_expiration(expiration, store) {
161                        return Some(item);
162                    }
163
164                    self.set_elapsed(expiration.deadline);
165                }
166                None => {
167                    // in this case the poll did not indicate an expiration
168                    // _and_ we were not able to find a next expiration in
169                    // the current list of timers.  advance to the poll's
170                    // current time and do nothing else.
171                    self.set_elapsed(now);
172                    return None;
173                }
174            }
175        }
176    }
177
178    /// Returns the instant at which the next timeout expires.
179    fn next_expiration(&self) -> Option<Expiration> {
180        // Check all levels
181        for level in 0..NUM_LEVELS {
182            if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
183                // There cannot be any expirations at a higher level that happen
184                // before this one.
185                debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
186
187                return Some(expiration);
188            }
189        }
190
191        None
192    }
193
194    /// Used for debug assertions
195    fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
196        let mut res = true;
197
198        for l2 in start_level..NUM_LEVELS {
199            if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
200                if e2.deadline < before {
201                    res = false;
202                }
203            }
204        }
205
206        res
207    }
208
209    /// iteratively find entries that are between the wheel's current
210    /// time and the expiration time.  for each in that population either
211    /// return it for notification (in the case of the last level) or tier
212    /// it down to the next level (in all other cases).
213    pub(crate) fn poll_expiration(
214        &mut self,
215        expiration: &Expiration,
216        store: &mut T::Store,
217    ) -> Option<T::Owned> {
218        while let Some(item) = self.pop_entry(expiration, store) {
219            if expiration.level == 0 {
220                debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline);
221
222                return Some(item);
223            } else {
224                let when = T::when(item.borrow(), store);
225
226                let next_level = expiration.level - 1;
227
228                self.levels[next_level].add_entry(when, item, store);
229            }
230        }
231
232        None
233    }
234
235    fn set_elapsed(&mut self, when: u64) {
236        assert!(
237            self.elapsed <= when,
238            "elapsed={:?}; when={:?}",
239            self.elapsed,
240            when
241        );
242
243        if when > self.elapsed {
244            self.elapsed = when;
245        }
246    }
247
248    fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> {
249        self.levels[expiration.level].pop_entry_slot(expiration.slot, store)
250    }
251
252    fn peek_entry(&self, expiration: &Expiration) -> Option<T::Owned> {
253        self.levels[expiration.level].peek_entry_slot(expiration.slot)
254    }
255
256    fn level_for(&self, when: u64) -> usize {
257        level_for(self.elapsed, when)
258    }
259}
260
261fn level_for(elapsed: u64, when: u64) -> usize {
262    const SLOT_MASK: u64 = (1 << 6) - 1;
263
264    // Mask in the trailing bits ignored by the level calculation in order to cap
265    // the possible leading zeros
266    let mut masked = elapsed ^ when | SLOT_MASK;
267    if masked >= MAX_DURATION {
268        // Fudge the timer into the top level
269        masked = MAX_DURATION - 1;
270    }
271    let leading_zeros = masked.leading_zeros() as usize;
272    let significant = 63 - leading_zeros;
273    significant / 6
274}
275
276#[cfg(all(test, not(loom)))]
277mod test {
278    use super::*;
279
280    #[test]
281    fn test_level_for() {
282        for pos in 0..64 {
283            assert_eq!(0, level_for(0, pos), "level_for({pos}) -- binary = {pos:b}");
284        }
285
286        for level in 1..5 {
287            for pos in level..64 {
288                let a = pos * 64_usize.pow(level as u32);
289                assert_eq!(
290                    level,
291                    level_for(0, a as u64),
292                    "level_for({a}) -- binary = {a:b}"
293                );
294
295                if pos > level {
296                    let a = a - 1;
297                    assert_eq!(
298                        level,
299                        level_for(0, a as u64),
300                        "level_for({a}) -- binary = {a:b}"
301                    );
302                }
303
304                if pos < 64 {
305                    let a = a + 1;
306                    assert_eq!(
307                        level,
308                        level_for(0, a as u64),
309                        "level_for({a}) -- binary = {a:b}"
310                    );
311                }
312            }
313        }
314    }
315}