1mod level;
2pub(crate) use self::level::Expiration;
3use self::level::Level;
45mod stack;
6pub(crate) use self::stack::Stack;
78use std::borrow::Borrow;
9use std::fmt::Debug;
1011/// Timing wheel implementation.
12///
13/// This type provides the hashed timing wheel implementation that backs `Timer`
14/// and `DelayQueue`.
15///
16/// The structure is generic over `T: Stack`. This allows handling timeout data
17/// being stored on the heap or in a slab. In order to support the latter case,
18/// the slab must be passed into each function allowing the implementation to
19/// lookup timer entries.
20///
21/// See `Timer` documentation for some implementation notes.
22#[derive(Debug)]
23pub(crate) struct Wheel<T> {
24/// The number of milliseconds elapsed since the wheel started.
25elapsed: u64,
2627/// Timer wheel.
28 ///
29 /// Levels:
30 ///
31 /// * 1 ms slots / 64 ms range
32 /// * 64 ms slots / ~ 4 sec range
33 /// * ~ 4 sec slots / ~ 4 min range
34 /// * ~ 4 min slots / ~ 4 hr range
35 /// * ~ 4 hr slots / ~ 12 day range
36 /// * ~ 12 day slots / ~ 2 yr range
37levels: Box<[Level<T>]>,
38}
3940/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
41/// each, the timer is able to track time up to 2 years into the future with a
42/// precision of 1 millisecond.
43const NUM_LEVELS: usize = 6;
4445/// The maximum duration of a delay
46const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
4748#[derive(Debug)]
49pub(crate) enum InsertError {
50 Elapsed,
51 Invalid,
52}
5354impl<T> Wheel<T>
55where
56T: Stack,
57{
58/// Create a new timing wheel
59pub(crate) fn new() -> Wheel<T> {
60let levels = (0..NUM_LEVELS).map(Level::new).collect();
6162 Wheel { elapsed: 0, levels }
63 }
6465/// Return the number of milliseconds that have elapsed since the timing
66 /// wheel's creation.
67pub(crate) fn elapsed(&self) -> u64 {
68self.elapsed
69 }
7071/// Insert an entry into the timing wheel.
72 ///
73 /// # Arguments
74 ///
75 /// * `when`: is the instant at which the entry should be fired. It is
76 /// represented as the number of milliseconds since the creation
77 /// of the timing wheel.
78 ///
79 /// * `item`: The item to insert into the wheel.
80 ///
81 /// * `store`: The slab or `()` when using heap storage.
82 ///
83 /// # Return
84 ///
85 /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
86 ///
87 /// `Err(Elapsed)` indicates that `when` represents an instant that has
88 /// already passed. In this case, the caller should fire the timeout
89 /// immediately.
90 ///
91 /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
92pub(crate) fn insert(
93&mut self,
94 when: u64,
95 item: T::Owned,
96 store: &mut T::Store,
97 ) -> Result<(), (T::Owned, InsertError)> {
98if when <= self.elapsed {
99return Err((item, InsertError::Elapsed));
100 } else if when - self.elapsed > MAX_DURATION {
101return Err((item, InsertError::Invalid));
102 }
103104// Get the level at which the entry should be stored
105let level = self.level_for(when);
106107self.levels[level].add_entry(when, item, store);
108109debug_assert!({
110self.levels[level]
111 .next_expiration(self.elapsed)
112 .map(|e| e.deadline >= self.elapsed)
113 .unwrap_or(true)
114 });
115116Ok(())
117 }
118119/// Remove `item` from the timing wheel.
120#[track_caller]
121pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
122let when = T::when(item, store);
123124assert!(
125self.elapsed <= when,
126"elapsed={}; when={}",
127self.elapsed,
128 when
129 );
130131let level = self.level_for(when);
132133self.levels[level].remove_entry(when, item, store);
134 }
135136/// Instant at which to poll
137pub(crate) fn poll_at(&self) -> Option<u64> {
138self.next_expiration().map(|expiration| expiration.deadline)
139 }
140141/// Next key that will expire
142pub(crate) fn peek(&self) -> Option<T::Owned> {
143self.next_expiration()
144 .and_then(|expiration| self.peek_entry(&expiration))
145 }
146147/// Advances the timer up to the instant represented by `now`.
148pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
149loop {
150let expiration = self.next_expiration().and_then(|expiration| {
151if expiration.deadline > now {
152None
153} else {
154Some(expiration)
155 }
156 });
157158match expiration {
159Some(ref expiration) => {
160if let Some(item) = self.poll_expiration(expiration, store) {
161return Some(item);
162 }
163164self.set_elapsed(expiration.deadline);
165 }
166None => {
167// in this case the poll did not indicate an expiration
168 // _and_ we were not able to find a next expiration in
169 // the current list of timers. advance to the poll's
170 // current time and do nothing else.
171self.set_elapsed(now);
172return None;
173 }
174 }
175 }
176 }
177178/// Returns the instant at which the next timeout expires.
179fn next_expiration(&self) -> Option<Expiration> {
180// Check all levels
181for level in 0..NUM_LEVELS {
182if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
183// There cannot be any expirations at a higher level that happen
184 // before this one.
185debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
186187return Some(expiration);
188 }
189 }
190191None
192}
193194/// Used for debug assertions
195fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
196let mut res = true;
197198for l2 in start_level..NUM_LEVELS {
199if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
200if e2.deadline < before {
201 res = false;
202 }
203 }
204 }
205206 res
207 }
208209/// iteratively find entries that are between the wheel's current
210 /// time and the expiration time. for each in that population either
211 /// return it for notification (in the case of the last level) or tier
212 /// it down to the next level (in all other cases).
213pub(crate) fn poll_expiration(
214&mut self,
215 expiration: &Expiration,
216 store: &mut T::Store,
217 ) -> Option<T::Owned> {
218while let Some(item) = self.pop_entry(expiration, store) {
219if expiration.level == 0 {
220debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline);
221222return Some(item);
223 } else {
224let when = T::when(item.borrow(), store);
225226let next_level = expiration.level - 1;
227228self.levels[next_level].add_entry(when, item, store);
229 }
230 }
231232None
233}
234235fn set_elapsed(&mut self, when: u64) {
236assert!(
237self.elapsed <= when,
238"elapsed={:?}; when={:?}",
239self.elapsed,
240 when
241 );
242243if when > self.elapsed {
244self.elapsed = when;
245 }
246 }
247248fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> {
249self.levels[expiration.level].pop_entry_slot(expiration.slot, store)
250 }
251252fn peek_entry(&self, expiration: &Expiration) -> Option<T::Owned> {
253self.levels[expiration.level].peek_entry_slot(expiration.slot)
254 }
255256fn level_for(&self, when: u64) -> usize {
257 level_for(self.elapsed, when)
258 }
259}
260261fn level_for(elapsed: u64, when: u64) -> usize {
262const SLOT_MASK: u64 = (1 << 6) - 1;
263264// Mask in the trailing bits ignored by the level calculation in order to cap
265 // the possible leading zeros
266let mut masked = elapsed ^ when | SLOT_MASK;
267if masked >= MAX_DURATION {
268// Fudge the timer into the top level
269masked = MAX_DURATION - 1;
270 }
271let leading_zeros = masked.leading_zeros() as usize;
272let significant = 63 - leading_zeros;
273 significant / 6
274}
275276#[cfg(all(test, not(loom)))]
277mod test {
278use super::*;
279280#[test]
281fn test_level_for() {
282for pos in 0..64 {
283assert_eq!(0, level_for(0, pos), "level_for({pos}) -- binary = {pos:b}");
284 }
285286for level in 1..5 {
287for pos in level..64 {
288let a = pos * 64_usize.pow(level as u32);
289assert_eq!(
290 level,
291 level_for(0, a as u64),
292"level_for({a}) -- binary = {a:b}"
293);
294295if pos > level {
296let a = a - 1;
297assert_eq!(
298 level,
299 level_for(0, a as u64),
300"level_for({a}) -- binary = {a:b}"
301);
302 }
303304if pos < 64 {
305let a = a + 1;
306assert_eq!(
307 level,
308 level_for(0, a as u64),
309"level_for({a}) -- binary = {a:b}"
310);
311 }
312 }
313 }
314 }
315}