tokio_util/time/wheel/
mod.rs1mod level;
2pub(crate) use self::level::Expiration;
3use self::level::Level;
4
5mod stack;
6pub(crate) use self::stack::Stack;
7
8use std::borrow::Borrow;
9use std::fmt::Debug;
10
11#[derive(Debug)]
23pub(crate) struct Wheel<T> {
24 elapsed: u64,
26
27 levels: Box<[Level<T>]>,
38}
39
40const NUM_LEVELS: usize = 6;
44
45const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
47
48#[derive(Debug)]
49pub(crate) enum InsertError {
50 Elapsed,
51 Invalid,
52}
53
54impl<T> Wheel<T>
55where
56 T: Stack,
57{
58 pub(crate) fn new() -> Wheel<T> {
60 let levels = (0..NUM_LEVELS).map(Level::new).collect();
61
62 Wheel { elapsed: 0, levels }
63 }
64
65 pub(crate) fn elapsed(&self) -> u64 {
68 self.elapsed
69 }
70
71 pub(crate) fn insert(
93 &mut self,
94 when: u64,
95 item: T::Owned,
96 store: &mut T::Store,
97 ) -> Result<(), (T::Owned, InsertError)> {
98 if when <= self.elapsed {
99 return Err((item, InsertError::Elapsed));
100 } else if when - self.elapsed > MAX_DURATION {
101 return Err((item, InsertError::Invalid));
102 }
103
104 let level = self.level_for(when);
106
107 self.levels[level].add_entry(when, item, store);
108
109 debug_assert!({
110 self.levels[level]
111 .next_expiration(self.elapsed)
112 .map(|e| e.deadline >= self.elapsed)
113 .unwrap_or(true)
114 });
115
116 Ok(())
117 }
118
119 #[track_caller]
121 pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
122 let when = T::when(item, store);
123
124 assert!(
125 self.elapsed <= when,
126 "elapsed={}; when={}",
127 self.elapsed,
128 when
129 );
130
131 let level = self.level_for(when);
132
133 self.levels[level].remove_entry(when, item, store);
134 }
135
136 pub(crate) fn poll_at(&self) -> Option<u64> {
138 self.next_expiration().map(|expiration| expiration.deadline)
139 }
140
141 pub(crate) fn peek(&self) -> Option<T::Owned> {
143 self.next_expiration()
144 .and_then(|expiration| self.peek_entry(&expiration))
145 }
146
147 pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
149 loop {
150 let expiration = self.next_expiration().and_then(|expiration| {
151 if expiration.deadline > now {
152 None
153 } else {
154 Some(expiration)
155 }
156 });
157
158 match expiration {
159 Some(ref expiration) => {
160 if let Some(item) = self.poll_expiration(expiration, store) {
161 return Some(item);
162 }
163
164 self.set_elapsed(expiration.deadline);
165 }
166 None => {
167 self.set_elapsed(now);
172 return None;
173 }
174 }
175 }
176 }
177
178 fn next_expiration(&self) -> Option<Expiration> {
180 for level in 0..NUM_LEVELS {
182 if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
183 debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
186
187 return Some(expiration);
188 }
189 }
190
191 None
192 }
193
194 fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
196 let mut res = true;
197
198 for l2 in start_level..NUM_LEVELS {
199 if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
200 if e2.deadline < before {
201 res = false;
202 }
203 }
204 }
205
206 res
207 }
208
209 pub(crate) fn poll_expiration(
214 &mut self,
215 expiration: &Expiration,
216 store: &mut T::Store,
217 ) -> Option<T::Owned> {
218 while let Some(item) = self.pop_entry(expiration, store) {
219 if expiration.level == 0 {
220 debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline);
221
222 return Some(item);
223 } else {
224 let when = T::when(item.borrow(), store);
225
226 let next_level = expiration.level - 1;
227
228 self.levels[next_level].add_entry(when, item, store);
229 }
230 }
231
232 None
233 }
234
235 fn set_elapsed(&mut self, when: u64) {
236 assert!(
237 self.elapsed <= when,
238 "elapsed={:?}; when={:?}",
239 self.elapsed,
240 when
241 );
242
243 if when > self.elapsed {
244 self.elapsed = when;
245 }
246 }
247
248 fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> {
249 self.levels[expiration.level].pop_entry_slot(expiration.slot, store)
250 }
251
252 fn peek_entry(&self, expiration: &Expiration) -> Option<T::Owned> {
253 self.levels[expiration.level].peek_entry_slot(expiration.slot)
254 }
255
256 fn level_for(&self, when: u64) -> usize {
257 level_for(self.elapsed, when)
258 }
259}
260
261fn level_for(elapsed: u64, when: u64) -> usize {
262 const SLOT_MASK: u64 = (1 << 6) - 1;
263
264 let mut masked = elapsed ^ when | SLOT_MASK;
267 if masked >= MAX_DURATION {
268 masked = MAX_DURATION - 1;
270 }
271 let leading_zeros = masked.leading_zeros() as usize;
272 let significant = 63 - leading_zeros;
273 significant / 6
274}
275
276#[cfg(all(test, not(loom)))]
277mod test {
278 use super::*;
279
280 #[test]
281 fn test_level_for() {
282 for pos in 0..64 {
283 assert_eq!(0, level_for(0, pos), "level_for({pos}) -- binary = {pos:b}");
284 }
285
286 for level in 1..5 {
287 for pos in level..64 {
288 let a = pos * 64_usize.pow(level as u32);
289 assert_eq!(
290 level,
291 level_for(0, a as u64),
292 "level_for({a}) -- binary = {a:b}"
293 );
294
295 if pos > level {
296 let a = a - 1;
297 assert_eq!(
298 level,
299 level_for(0, a as u64),
300 "level_for({a}) -- binary = {a:b}"
301 );
302 }
303
304 if pos < 64 {
305 let a = a + 1;
306 assert_eq!(
307 level,
308 level_for(0, a as u64),
309 "level_for({a}) -- binary = {a:b}"
310 );
311 }
312 }
313 }
314 }
315}