rayon_core/sleep/
mod.rs

1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use crate::latch::CoreLatch;
5use crate::sync::{Condvar, Mutex};
6use crossbeam_utils::CachePadded;
7use std::sync::atomic::Ordering;
8use std::thread;
9use std::usize;
10
11mod counters;
12pub(crate) use self::counters::THREADS_MAX;
13use self::counters::{AtomicCounters, JobsEventCounter};
14
15/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
16/// of workers. It has callbacks that are invoked periodically at significant events,
17/// such as when workers are looping and looking for work, when latches are set, or when
18/// jobs are published, and it either blocks threads or wakes them in response to these
19/// events. See the [`README.md`] in this module for more details.
20///
21/// [`README.md`] README.md
22pub(super) struct Sleep {
23    /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
24    /// them block.
25    worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
26
27    counters: AtomicCounters,
28}
29
30/// An instance of this struct is created when a thread becomes idle.
31/// It is consumed when the thread finds work, and passed by `&mut`
32/// reference for operations that preserve the idle state. (In other
33/// words, producing one of these structs is evidence the thread is
34/// idle.) It tracks state such as how long the thread has been idle.
35pub(super) struct IdleState {
36    /// What is worker index of the idle thread?
37    worker_index: usize,
38
39    /// How many rounds have we been circling without sleeping?
40    rounds: u32,
41
42    /// Once we become sleepy, what was the sleepy counter value?
43    /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
44    jobs_counter: JobsEventCounter,
45}
46
47/// The "sleep state" for an individual worker.
48#[derive(Default)]
49struct WorkerSleepState {
50    /// Set to true when the worker goes to sleep; set to false when
51    /// the worker is notified or when it wakes.
52    is_blocked: Mutex<bool>,
53
54    condvar: Condvar,
55}
56
57const ROUNDS_UNTIL_SLEEPY: u32 = 32;
58const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
59
60impl Sleep {
61    pub(super) fn new(n_threads: usize) -> Sleep {
62        assert!(n_threads <= THREADS_MAX);
63        Sleep {
64            worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
65            counters: AtomicCounters::new(),
66        }
67    }
68
69    #[inline]
70    pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
71        self.counters.add_inactive_thread();
72
73        IdleState {
74            worker_index,
75            rounds: 0,
76            jobs_counter: JobsEventCounter::DUMMY,
77        }
78    }
79
80    #[inline]
81    pub(super) fn work_found(&self) {
82        // If we were the last idle thread and other threads are still sleeping,
83        // then we should wake up another thread.
84        let threads_to_wake = self.counters.sub_inactive_thread();
85        self.wake_any_threads(threads_to_wake as u32);
86    }
87
88    #[inline]
89    pub(super) fn no_work_found(
90        &self,
91        idle_state: &mut IdleState,
92        latch: &CoreLatch,
93        has_injected_jobs: impl FnOnce() -> bool,
94    ) {
95        if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
96            thread::yield_now();
97            idle_state.rounds += 1;
98        } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
99            idle_state.jobs_counter = self.announce_sleepy();
100            idle_state.rounds += 1;
101            thread::yield_now();
102        } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
103            idle_state.rounds += 1;
104            thread::yield_now();
105        } else {
106            debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
107            self.sleep(idle_state, latch, has_injected_jobs);
108        }
109    }
110
111    #[cold]
112    fn announce_sleepy(&self) -> JobsEventCounter {
113        self.counters
114            .increment_jobs_event_counter_if(JobsEventCounter::is_active)
115            .jobs_counter()
116    }
117
118    #[cold]
119    fn sleep(
120        &self,
121        idle_state: &mut IdleState,
122        latch: &CoreLatch,
123        has_injected_jobs: impl FnOnce() -> bool,
124    ) {
125        let worker_index = idle_state.worker_index;
126
127        if !latch.get_sleepy() {
128            return;
129        }
130
131        let sleep_state = &self.worker_sleep_states[worker_index];
132        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
133        debug_assert!(!*is_blocked);
134
135        // Our latch was signalled. We should wake back up fully as we
136        // will have some stuff to do.
137        if !latch.fall_asleep() {
138            idle_state.wake_fully();
139            return;
140        }
141
142        loop {
143            let counters = self.counters.load(Ordering::SeqCst);
144
145            // Check if the JEC has changed since we got sleepy.
146            debug_assert!(idle_state.jobs_counter.is_sleepy());
147            if counters.jobs_counter() != idle_state.jobs_counter {
148                // JEC has changed, so a new job was posted, but for some reason
149                // we didn't see it. We should return to just before the SLEEPY
150                // state so we can do another search and (if we fail to find
151                // work) go back to sleep.
152                idle_state.wake_partly();
153                latch.wake_up();
154                return;
155            }
156
157            // Otherwise, let's move from IDLE to SLEEPING.
158            if self.counters.try_add_sleeping_thread(counters) {
159                break;
160            }
161        }
162
163        // Successfully registered as asleep.
164
165        // We have one last check for injected jobs to do. This protects against
166        // deadlock in the very unlikely event that
167        //
168        // - an external job is being injected while we are sleepy
169        // - that job triggers the rollover over the JEC such that we don't see it
170        // - we are the last active worker thread
171        std::sync::atomic::fence(Ordering::SeqCst);
172        if has_injected_jobs() {
173            // If we see an externally injected job, then we have to 'wake
174            // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
175            // the one that wakes us.)
176            self.counters.sub_sleeping_thread();
177        } else {
178            // If we don't see an injected job (the normal case), then flag
179            // ourselves as asleep and wait till we are notified.
180            //
181            // (Note that `is_blocked` is held under a mutex and the mutex was
182            // acquired *before* we incremented the "sleepy counter". This means
183            // that whomever is coming to wake us will have to wait until we
184            // release the mutex in the call to `wait`, so they will see this
185            // boolean as true.)
186            *is_blocked = true;
187            while *is_blocked {
188                is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
189            }
190        }
191
192        // Update other state:
193        idle_state.wake_fully();
194        latch.wake_up();
195    }
196
197    /// Notify the given thread that it should wake up (if it is
198    /// sleeping).  When this method is invoked, we typically know the
199    /// thread is asleep, though in rare cases it could have been
200    /// awoken by (e.g.) new work having been posted.
201    pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
202        self.wake_specific_thread(target_worker_index);
203    }
204
205    /// Signals that `num_jobs` new jobs were injected into the thread
206    /// pool from outside. This function will ensure that there are
207    /// threads available to process them, waking threads from sleep
208    /// if necessary.
209    ///
210    /// # Parameters
211    ///
212    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
213    ///   We'll try to get at least one thread per job.
214    #[inline]
215    pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
216        // This fence is needed to guarantee that threads
217        // as they are about to fall asleep, observe any
218        // new jobs that may have been injected.
219        std::sync::atomic::fence(Ordering::SeqCst);
220
221        self.new_jobs(num_jobs, queue_was_empty)
222    }
223
224    /// Signals that `num_jobs` new jobs were pushed onto a thread's
225    /// local deque. This function will try to ensure that there are
226    /// threads available to process them, waking threads from sleep
227    /// if necessary. However, this is not guaranteed: under certain
228    /// race conditions, the function may fail to wake any new
229    /// threads; in that case the existing thread should eventually
230    /// pop the job.
231    ///
232    /// # Parameters
233    ///
234    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
235    ///   We'll try to get at least one thread per job.
236    #[inline]
237    pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
238        self.new_jobs(num_jobs, queue_was_empty)
239    }
240
241    /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
242    #[inline]
243    fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
244        // Read the counters and -- if sleepy workers have announced themselves
245        // -- announce that there is now work available. The final value of `counters`
246        // with which we exit the loop thus corresponds to a state when
247        let counters = self
248            .counters
249            .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
250        let num_awake_but_idle = counters.awake_but_idle_threads();
251        let num_sleepers = counters.sleeping_threads();
252
253        if num_sleepers == 0 {
254            // nobody to wake
255            return;
256        }
257
258        // Promote from u16 to u32 so we can interoperate with
259        // num_jobs more easily.
260        let num_awake_but_idle = num_awake_but_idle as u32;
261        let num_sleepers = num_sleepers as u32;
262
263        // If the queue is non-empty, then we always wake up a worker
264        // -- clearly the existing idle jobs aren't enough. Otherwise,
265        // check to see if we have enough idle workers.
266        if !queue_was_empty {
267            let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
268            self.wake_any_threads(num_to_wake);
269        } else if num_awake_but_idle < num_jobs {
270            let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
271            self.wake_any_threads(num_to_wake);
272        }
273    }
274
275    #[cold]
276    fn wake_any_threads(&self, mut num_to_wake: u32) {
277        if num_to_wake > 0 {
278            for i in 0..self.worker_sleep_states.len() {
279                if self.wake_specific_thread(i) {
280                    num_to_wake -= 1;
281                    if num_to_wake == 0 {
282                        return;
283                    }
284                }
285            }
286        }
287    }
288
289    fn wake_specific_thread(&self, index: usize) -> bool {
290        let sleep_state = &self.worker_sleep_states[index];
291
292        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
293        if *is_blocked {
294            *is_blocked = false;
295            sleep_state.condvar.notify_one();
296
297            // When the thread went to sleep, it will have incremented
298            // this value. When we wake it, its our job to decrement
299            // it. We could have the thread do it, but that would
300            // introduce a delay between when the thread was
301            // *notified* and when this counter was decremented. That
302            // might mislead people with new work into thinking that
303            // there are sleeping threads that they should try to
304            // wake, when in fact there is nothing left for them to
305            // do.
306            self.counters.sub_sleeping_thread();
307
308            true
309        } else {
310            false
311        }
312    }
313}
314
315impl IdleState {
316    fn wake_fully(&mut self) {
317        self.rounds = 0;
318        self.jobs_counter = JobsEventCounter::DUMMY;
319    }
320
321    fn wake_partly(&mut self) {
322        self.rounds = ROUNDS_UNTIL_SLEEPY;
323        self.jobs_counter = JobsEventCounter::DUMMY;
324    }
325}