tokio/runtime/time/entry.rs
1//! Timer state structures.
2//!
3//! This module contains the heart of the intrusive timer implementation, and as
4//! such the structures inside are full of tricky concurrency and unsafe code.
5//!
6//! # Ground rules
7//!
8//! The heart of the timer implementation here is the [`TimerShared`] structure,
9//! shared between the [`TimerEntry`] and the driver. Generally, we permit access
10//! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or
11//! 2) a held driver lock.
12//!
13//! It follows from this that any changes made while holding BOTH 1 and 2 will
14//! be reliably visible, regardless of ordering. This is because of the `acq/rel`
15//! fences on the driver lock ensuring ordering with 2, and rust mutable
16//! reference rules for 1 (a mutable reference to an object can't be passed
17//! between threads without an `acq/rel` barrier, and same-thread we have local
18//! happens-before ordering).
19//!
20//! # State field
21//!
22//! Each timer has a state field associated with it. This field contains either
23//! the current scheduled time, or a special flag value indicating its state.
24//! This state can either indicate that the timer is on the 'pending' queue (and
25//! thus will be fired with an `Ok(())` result soon) or that it has already been
26//! fired/deregistered.
27//!
28//! This single state field allows for code that is firing the timer to
29//! synchronize with any racing `reset` calls reliably.
30//!
31//! # Cached vs true timeouts
32//!
33//! To allow for the use case of a timeout that is periodically reset before
34//! expiration to be as lightweight as possible, we support optimistically
35//! lock-free timer resets, in the case where a timer is rescheduled to a later
36//! point than it was originally scheduled for.
37//!
38//! This is accomplished by lazily rescheduling timers. That is, we update the
39//! state field with the true expiration of the timer from the holder of
40//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's
41//! walking lists of timers), it checks this "true when" value, and reschedules
42//! based on it.
43//!
44//! We do, however, also need to track what the expiration time was when we
45//! originally registered the timer; this is used to locate the right linked
46//! list when the timer is being cancelled. This is referred to as the "cached
47//! when" internally.
48//!
49//! There is of course a race condition between timer reset and timer
50//! expiration. If the driver fails to observe the updated expiration time, it
51//! could trigger expiration of the timer too early. However, because
52//! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and
53//! refuse to mark the timer as pending.
54//!
55//! [mark_pending]: TimerHandle::mark_pending
56
57use crate::loom::cell::UnsafeCell;
58use crate::loom::sync::atomic::AtomicU64;
59use crate::loom::sync::atomic::Ordering;
60
61use crate::runtime::context;
62use crate::runtime::scheduler;
63use crate::sync::AtomicWaker;
64use crate::time::Instant;
65use crate::util::linked_list;
66
67use std::cell::UnsafeCell as StdUnsafeCell;
68use std::task::{Context, Poll, Waker};
69use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
70
71type TimerResult = Result<(), crate::time::error::Error>;
72
73const STATE_DEREGISTERED: u64 = u64::MAX;
74const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
75const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
76/// The largest safe integer to use for ticks.
77///
78/// This value should be updated if any other signal values are added above.
79pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = STATE_MIN_VALUE - 1;
80
81/// This structure holds the current shared state of the timer - its scheduled
82/// time (if registered), or otherwise the result of the timer completing, as
83/// well as the registered waker.
84///
85/// Generally, the `StateCell` is only permitted to be accessed from two contexts:
86/// Either a thread holding the corresponding `&mut TimerEntry`, or a thread
87/// holding the timer driver lock. The write actions on the `StateCell` amount to
88/// passing "ownership" of the `StateCell` between these contexts; moving a timer
89/// from the `TimerEntry` to the driver requires _both_ holding the `&mut
90/// TimerEntry` and the driver lock, while moving it back (firing the timer)
91/// requires only the driver lock.
92pub(super) struct StateCell {
93 /// Holds either the scheduled expiration time for this timer, or (if the
94 /// timer has been fired and is unregistered), `u64::MAX`.
95 state: AtomicU64,
96 /// If the timer is fired (an Acquire order read on state shows
97 /// `u64::MAX`), holds the result that should be returned from
98 /// polling the timer. Otherwise, the contents are unspecified and reading
99 /// without holding the driver lock is undefined behavior.
100 result: UnsafeCell<TimerResult>,
101 /// The currently-registered waker
102 waker: AtomicWaker,
103}
104
105impl Default for StateCell {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl std::fmt::Debug for StateCell {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "StateCell({:?})", self.read_state())
114 }
115}
116
117impl StateCell {
118 fn new() -> Self {
119 Self {
120 state: AtomicU64::new(STATE_DEREGISTERED),
121 result: UnsafeCell::new(Ok(())),
122 waker: AtomicWaker::new(),
123 }
124 }
125
126 fn is_pending(&self) -> bool {
127 self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE
128 }
129
130 /// Returns the current expiration time, or None if not currently scheduled.
131 fn when(&self) -> Option<u64> {
132 let cur_state = self.state.load(Ordering::Relaxed);
133
134 if cur_state == STATE_DEREGISTERED {
135 None
136 } else {
137 Some(cur_state)
138 }
139 }
140
141 /// If the timer is completed, returns the result of the timer. Otherwise,
142 /// returns None and registers the waker.
143 fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
144 // We must register first. This ensures that either `fire` will
145 // observe the new waker, or we will observe a racing fire to have set
146 // the state, or both.
147 self.waker.register_by_ref(waker);
148
149 self.read_state()
150 }
151
152 fn read_state(&self) -> Poll<TimerResult> {
153 let cur_state = self.state.load(Ordering::Acquire);
154
155 if cur_state == STATE_DEREGISTERED {
156 // SAFETY: The driver has fired this timer; this involves writing
157 // the result, and then writing (with release ordering) the state
158 // field.
159 Poll::Ready(unsafe { self.result.with(|p| *p) })
160 } else {
161 Poll::Pending
162 }
163 }
164
165 /// Marks this timer as being moved to the pending list, if its scheduled
166 /// time is not after `not_after`.
167 ///
168 /// If the timer is scheduled for a time after `not_after`, returns an Err
169 /// containing the current scheduled time.
170 ///
171 /// SAFETY: Must hold the driver lock.
172 unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
173 // Quick initial debug check to see if the timer is already fired. Since
174 // firing the timer can only happen with the driver lock held, we know
175 // we shouldn't be able to "miss" a transition to a fired state, even
176 // with relaxed ordering.
177 let mut cur_state = self.state.load(Ordering::Relaxed);
178
179 loop {
180 // improve the error message for things like
181 // https://github.com/tokio-rs/tokio/issues/3675
182 assert!(
183 cur_state < STATE_MIN_VALUE,
184 "mark_pending called when the timer entry is in an invalid state"
185 );
186
187 if cur_state > not_after {
188 break Err(cur_state);
189 }
190
191 match self.state.compare_exchange_weak(
192 cur_state,
193 STATE_PENDING_FIRE,
194 Ordering::AcqRel,
195 Ordering::Acquire,
196 ) {
197 Ok(_) => break Ok(()),
198 Err(actual_state) => cur_state = actual_state,
199 }
200 }
201 }
202
203 /// Fires the timer, setting the result to the provided result.
204 ///
205 /// Returns:
206 /// * `Some(waker)` - if fired and a waker needs to be invoked once the
207 /// driver lock is released
208 /// * `None` - if fired and a waker does not need to be invoked, or if
209 /// already fired
210 ///
211 /// SAFETY: The driver lock must be held.
212 unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
213 // Quick initial check to see if the timer is already fired. Since
214 // firing the timer can only happen with the driver lock held, we know
215 // we shouldn't be able to "miss" a transition to a fired state, even
216 // with relaxed ordering.
217 let cur_state = self.state.load(Ordering::Relaxed);
218 if cur_state == STATE_DEREGISTERED {
219 return None;
220 }
221
222 // SAFETY: We assume the driver lock is held and the timer is not
223 // fired, so only the driver is accessing this field.
224 //
225 // We perform a release-ordered store to state below, to ensure this
226 // write is visible before the state update is visible.
227 unsafe { self.result.with_mut(|p| *p = result) };
228
229 self.state.store(STATE_DEREGISTERED, Ordering::Release);
230
231 self.waker.take_waker()
232 }
233
234 /// Marks the timer as registered (poll will return None) and sets the
235 /// expiration time.
236 ///
237 /// While this function is memory-safe, it should only be called from a
238 /// context holding both `&mut TimerEntry` and the driver lock.
239 fn set_expiration(&self, timestamp: u64) {
240 debug_assert!(timestamp < STATE_MIN_VALUE);
241
242 // We can use relaxed ordering because we hold the driver lock and will
243 // fence when we release the lock.
244 self.state.store(timestamp, Ordering::Relaxed);
245 }
246
247 /// Attempts to adjust the timer to a new timestamp.
248 ///
249 /// If the timer has already been fired, is pending firing, or the new
250 /// timestamp is earlier than the old timestamp, (or occasionally
251 /// spuriously) returns Err without changing the timer's state. In this
252 /// case, the timer must be deregistered and re-registered.
253 fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> {
254 let mut prior = self.state.load(Ordering::Relaxed);
255 loop {
256 if new_timestamp < prior || prior >= STATE_MIN_VALUE {
257 return Err(());
258 }
259
260 match self.state.compare_exchange_weak(
261 prior,
262 new_timestamp,
263 Ordering::AcqRel,
264 Ordering::Acquire,
265 ) {
266 Ok(_) => return Ok(()),
267 Err(true_prior) => prior = true_prior,
268 }
269 }
270 }
271
272 /// Returns true if the state of this timer indicates that the timer might
273 /// be registered with the driver. This check is performed with relaxed
274 /// ordering, but is conservative - if it returns false, the timer is
275 /// definitely _not_ registered.
276 pub(super) fn might_be_registered(&self) -> bool {
277 self.state.load(Ordering::Relaxed) != u64::MAX
278 }
279}
280
281/// A timer entry.
282///
283/// This is the handle to a timer that is controlled by the requester of the
284/// timer. As this participates in intrusive data structures, it must be pinned
285/// before polling.
286#[derive(Debug)]
287pub(crate) struct TimerEntry {
288 /// Arc reference to the runtime handle. We can only free the driver after
289 /// deregistering everything from their respective timer wheels.
290 driver: scheduler::Handle,
291 /// Shared inner structure; this is part of an intrusive linked list, and
292 /// therefore other references can exist to it while mutable references to
293 /// Entry exist.
294 ///
295 /// This is manipulated only under the inner mutex. TODO: Can we use loom
296 /// cells for this?
297 inner: StdUnsafeCell<Option<TimerShared>>,
298 /// Deadline for the timer. This is used to register on the first
299 /// poll, as we can't register prior to being pinned.
300 deadline: Instant,
301 /// Whether the deadline has been registered.
302 registered: bool,
303 /// Ensure the type is !Unpin
304 _m: std::marker::PhantomPinned,
305}
306
307unsafe impl Send for TimerEntry {}
308unsafe impl Sync for TimerEntry {}
309
310/// An `TimerHandle` is the (non-enforced) "unique" pointer from the driver to the
311/// timer entry. Generally, at most one `TimerHandle` exists for a timer at a time
312/// (enforced by the timer state machine).
313///
314/// SAFETY: An `TimerHandle` is essentially a raw pointer, and the usual caveats
315/// of pointer safety apply. In particular, `TimerHandle` does not itself enforce
316/// that the timer does still exist; however, normally an `TimerHandle` is created
317/// immediately before registering the timer, and is consumed when firing the
318/// timer, to help minimize mistakes. Still, because `TimerHandle` cannot enforce
319/// memory safety, all operations are unsafe.
320#[derive(Debug)]
321pub(crate) struct TimerHandle {
322 inner: NonNull<TimerShared>,
323}
324
325pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>;
326
327/// The shared state structure of a timer. This structure is shared between the
328/// frontend (`Entry`) and driver backend.
329///
330/// Note that this structure is located inside the `TimerEntry` structure.
331pub(crate) struct TimerShared {
332 /// The shard id. We should never change it.
333 shard_id: u32,
334 /// A link within the doubly-linked list of timers on a particular level and
335 /// slot. Valid only if state is equal to Registered.
336 ///
337 /// Only accessed under the entry lock.
338 pointers: linked_list::Pointers<TimerShared>,
339
340 /// The expiration time for which this entry is currently registered.
341 /// Generally owned by the driver, but is accessed by the entry when not
342 /// registered.
343 cached_when: AtomicU64,
344
345 /// Current state. This records whether the timer entry is currently under
346 /// the ownership of the driver, and if not, its current state (not
347 /// complete, fired, error, etc).
348 state: StateCell,
349
350 _p: PhantomPinned,
351}
352
353unsafe impl Send for TimerShared {}
354unsafe impl Sync for TimerShared {}
355
356impl std::fmt::Debug for TimerShared {
357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358 f.debug_struct("TimerShared")
359 .field("cached_when", &self.cached_when.load(Ordering::Relaxed))
360 .field("state", &self.state)
361 .finish()
362 }
363}
364
365generate_addr_of_methods! {
366 impl<> TimerShared {
367 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<TimerShared>> {
368 &self.pointers
369 }
370 }
371}
372
373impl TimerShared {
374 pub(super) fn new(shard_id: u32) -> Self {
375 Self {
376 shard_id,
377 cached_when: AtomicU64::new(0),
378 pointers: linked_list::Pointers::new(),
379 state: StateCell::default(),
380 _p: PhantomPinned,
381 }
382 }
383
384 /// Gets the cached time-of-expiration value.
385 pub(super) fn cached_when(&self) -> u64 {
386 // Cached-when is only accessed under the driver lock, so we can use relaxed
387 self.cached_when.load(Ordering::Relaxed)
388 }
389
390 /// Gets the true time-of-expiration value, and copies it into the cached
391 /// time-of-expiration value.
392 ///
393 /// SAFETY: Must be called with the driver lock held, and when this entry is
394 /// not in any timer wheel lists.
395 pub(super) unsafe fn sync_when(&self) -> u64 {
396 let true_when = self.true_when();
397
398 self.cached_when.store(true_when, Ordering::Relaxed);
399
400 true_when
401 }
402
403 /// Sets the cached time-of-expiration value.
404 ///
405 /// SAFETY: Must be called with the driver lock held, and when this entry is
406 /// not in any timer wheel lists.
407 unsafe fn set_cached_when(&self, when: u64) {
408 self.cached_when.store(when, Ordering::Relaxed);
409 }
410
411 /// Returns the true time-of-expiration value, with relaxed memory ordering.
412 pub(super) fn true_when(&self) -> u64 {
413 self.state.when().expect("Timer already fired")
414 }
415
416 /// Sets the true time-of-expiration value, even if it is less than the
417 /// current expiration or the timer is deregistered.
418 ///
419 /// SAFETY: Must only be called with the driver lock held and the entry not
420 /// in the timer wheel.
421 pub(super) unsafe fn set_expiration(&self, t: u64) {
422 self.state.set_expiration(t);
423 self.cached_when.store(t, Ordering::Relaxed);
424 }
425
426 /// Sets the true time-of-expiration only if it is after the current.
427 pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> {
428 self.state.extend_expiration(t)
429 }
430
431 /// Returns a `TimerHandle` for this timer.
432 pub(super) fn handle(&self) -> TimerHandle {
433 TimerHandle {
434 inner: NonNull::from(self),
435 }
436 }
437
438 /// Returns true if the state of this timer indicates that the timer might
439 /// be registered with the driver. This check is performed with relaxed
440 /// ordering, but is conservative - if it returns false, the timer is
441 /// definitely _not_ registered.
442 pub(super) fn might_be_registered(&self) -> bool {
443 self.state.might_be_registered()
444 }
445
446 /// Gets the shard id.
447 pub(super) fn shard_id(&self) -> u32 {
448 self.shard_id
449 }
450}
451
452unsafe impl linked_list::Link for TimerShared {
453 type Handle = TimerHandle;
454
455 type Target = TimerShared;
456
457 fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> {
458 handle.inner
459 }
460
461 unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
462 TimerHandle { inner: ptr }
463 }
464
465 unsafe fn pointers(
466 target: NonNull<Self::Target>,
467 ) -> NonNull<linked_list::Pointers<Self::Target>> {
468 TimerShared::addr_of_pointers(target)
469 }
470}
471
472// ===== impl Entry =====
473
474impl TimerEntry {
475 #[track_caller]
476 pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self {
477 // Panic if the time driver is not enabled
478 let _ = handle.driver().time();
479
480 Self {
481 driver: handle,
482 inner: StdUnsafeCell::new(None),
483 deadline,
484 registered: false,
485 _m: std::marker::PhantomPinned,
486 }
487 }
488
489 fn is_inner_init(&self) -> bool {
490 unsafe { &*self.inner.get() }.is_some()
491 }
492
493 // This lazy initialization is for performance purposes.
494 fn inner(&self) -> &TimerShared {
495 let inner = unsafe { &*self.inner.get() };
496 if inner.is_none() {
497 let shard_size = self.driver.driver().time().inner.get_shard_size();
498 let shard_id = generate_shard_id(shard_size);
499 unsafe {
500 *self.inner.get() = Some(TimerShared::new(shard_id));
501 }
502 }
503 return inner.as_ref().unwrap();
504 }
505
506 pub(crate) fn deadline(&self) -> Instant {
507 self.deadline
508 }
509
510 pub(crate) fn is_elapsed(&self) -> bool {
511 self.is_inner_init() && !self.inner().state.might_be_registered() && self.registered
512 }
513
514 /// Cancels and deregisters the timer. This operation is irreversible.
515 pub(crate) fn cancel(self: Pin<&mut Self>) {
516 // Avoid calling the `clear_entry` method, because it has not been initialized yet.
517 if !self.is_inner_init() {
518 return;
519 }
520 // We need to perform an acq/rel fence with the driver thread, and the
521 // simplest way to do so is to grab the driver lock.
522 //
523 // Why is this necessary? We're about to release this timer's memory for
524 // some other non-timer use. However, we've been doing a bunch of
525 // relaxed (or even non-atomic) writes from the driver thread, and we'll
526 // be doing more from _this thread_ (as this memory is interpreted as
527 // something else).
528 //
529 // It is critical to ensure that, from the point of view of the driver,
530 // those future non-timer writes happen-after the timer is fully fired,
531 // and from the purpose of this thread, the driver's writes all
532 // happen-before we drop the timer. This in turn requires us to perform
533 // an acquire-release barrier in _both_ directions between the driver
534 // and dropping thread.
535 //
536 // The lock acquisition in clear_entry serves this purpose. All of the
537 // driver manipulations happen with the lock held, so we can just take
538 // the lock and be sure that this drop happens-after everything the
539 // driver did so far and happens-before everything the driver does in
540 // the future. While we have the lock held, we also go ahead and
541 // deregister the entry if necessary.
542 unsafe { self.driver().clear_entry(NonNull::from(self.inner())) };
543 }
544
545 pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
546 let this = unsafe { self.as_mut().get_unchecked_mut() };
547 this.deadline = new_time;
548 this.registered = reregister;
549
550 let tick = self.driver().time_source().deadline_to_tick(new_time);
551
552 if self.inner().extend_expiration(tick).is_ok() {
553 return;
554 }
555
556 if reregister {
557 unsafe {
558 self.driver()
559 .reregister(&self.driver.driver().io, tick, self.inner().into());
560 }
561 }
562 }
563
564 pub(crate) fn poll_elapsed(
565 mut self: Pin<&mut Self>,
566 cx: &mut Context<'_>,
567 ) -> Poll<Result<(), super::Error>> {
568 assert!(
569 !self.driver().is_shutdown(),
570 "{}",
571 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
572 );
573
574 if !self.registered {
575 let deadline = self.deadline;
576 self.as_mut().reset(deadline, true);
577 }
578
579 self.inner().state.poll(cx.waker())
580 }
581
582 pub(crate) fn driver(&self) -> &super::Handle {
583 self.driver.driver().time()
584 }
585
586 #[cfg(all(tokio_unstable, feature = "tracing"))]
587 pub(crate) fn clock(&self) -> &super::Clock {
588 self.driver.driver().clock()
589 }
590}
591
592impl TimerHandle {
593 pub(super) unsafe fn cached_when(&self) -> u64 {
594 unsafe { self.inner.as_ref().cached_when() }
595 }
596
597 pub(super) unsafe fn sync_when(&self) -> u64 {
598 unsafe { self.inner.as_ref().sync_when() }
599 }
600
601 pub(super) unsafe fn is_pending(&self) -> bool {
602 unsafe { self.inner.as_ref().state.is_pending() }
603 }
604
605 /// Forcibly sets the true and cached expiration times to the given tick.
606 ///
607 /// SAFETY: The caller must ensure that the handle remains valid, the driver
608 /// lock is held, and that the timer is not in any wheel linked lists.
609 pub(super) unsafe fn set_expiration(&self, tick: u64) {
610 self.inner.as_ref().set_expiration(tick);
611 }
612
613 /// Attempts to mark this entry as pending. If the expiration time is after
614 /// `not_after`, however, returns an Err with the current expiration time.
615 ///
616 /// If an `Err` is returned, the `cached_when` value will be updated to this
617 /// new expiration time.
618 ///
619 /// SAFETY: The caller must ensure that the handle remains valid, the driver
620 /// lock is held, and that the timer is not in any wheel linked lists.
621 /// After returning Ok, the entry must be added to the pending list.
622 pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
623 match self.inner.as_ref().state.mark_pending(not_after) {
624 Ok(()) => {
625 // mark this as being on the pending queue in cached_when
626 self.inner.as_ref().set_cached_when(u64::MAX);
627 Ok(())
628 }
629 Err(tick) => {
630 self.inner.as_ref().set_cached_when(tick);
631 Err(tick)
632 }
633 }
634 }
635
636 /// Attempts to transition to a terminal state. If the state is already a
637 /// terminal state, does nothing.
638 ///
639 /// Because the entry might be dropped after the state is moved to a
640 /// terminal state, this function consumes the handle to ensure we don't
641 /// access the entry afterwards.
642 ///
643 /// Returns the last-registered waker, if any.
644 ///
645 /// SAFETY: The driver lock must be held while invoking this function, and
646 /// the entry must not be in any wheel linked lists.
647 pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> {
648 self.inner.as_ref().state.fire(completed_state)
649 }
650}
651
652impl Drop for TimerEntry {
653 fn drop(&mut self) {
654 unsafe { Pin::new_unchecked(self) }.as_mut().cancel();
655 }
656}
657
658// Generates a shard id. If current thread is a worker thread, we use its worker index as a shard id.
659// Otherwise, we use a random number generator to obtain the shard id.
660cfg_rt! {
661 fn generate_shard_id(shard_size: u32) -> u32 {
662 let id = context::with_scheduler(|ctx| match ctx {
663 Some(scheduler::Context::CurrentThread(_ctx)) => 0,
664 #[cfg(feature = "rt-multi-thread")]
665 Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32,
666 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
667 Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32,
668 None => context::thread_rng_n(shard_size),
669 });
670 id % shard_size
671 }
672}
673
674cfg_not_rt! {
675 fn generate_shard_id(shard_size: u32) -> u32 {
676 context::thread_rng_n(shard_size)
677 }
678}