synchronoise/
event.rs

1//! "Event" primitives, allowing one thread to wait on a signal or countdown from other threads.
2//!
3//! The primary types in this module are the [`CountdownEvent`] and the [`SignalEvent`] structs. See
4//! the documentation on those types for further information.
5//!
6//! [`CountdownEvent`]: struct.CountdownEvent.html
7//! [`SignalEvent`]: struct.SignalEvent.html
8
9use std::convert::identity;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11use std::thread;
12use std::time::Duration;
13
14use crossbeam_queue::SegQueue;
15
16/// A synchronization primitive that signals when its count reaches zero.
17///
18/// With a `CountdownEvent`, it's possible to cause one thread to wait on a set of computations
19/// occurring in other threads by making the other threads interact with the counter as they
20/// perform their work.
21///
22/// The main limitation of a CountdownEvent is that once its counter reaches zero (even by starting
23/// there), any attempts to update the counter will return `CountdownError::AlreadySet` until the
24/// counter is reset by calling `reset` or `reset_to_count`.
25///
26/// `CountdownEvent` is a port of [System.Threading.CountdownEvent][src-link] from .NET (also
27/// called [`CountDownLatch`][java-src] in Java).
28///
29/// [src-link]: https://msdn.microsoft.com/en-us/library/system.threading.countdownevent(v=vs.110).aspx
30/// [java-src]: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
31///
32/// # Example
33///
34/// This example uses a `CountdownEvent` to make the "coordinator" thread sleep until all of its
35/// "worker" threads have finished. Each thread calls `signal.decrement()` to signal to the Event
36/// that its work has completed. When the last thread does this (and brings the counter to zero),
37/// the "coordinator" thread wakes up and prints `all done!`.
38///
39/// ```
40/// use synchronoise::CountdownEvent;
41/// use std::sync::Arc;
42/// use std::thread;
43/// use std::time::Duration;
44///
45/// let thread_count = 5;
46/// let counter = Arc::new(CountdownEvent::new(thread_count));
47///
48/// for i in 0..thread_count {
49///     let signal = counter.clone();
50///     thread::spawn(move || {
51///         thread::sleep(Duration::from_secs(i as u64));
52///         println!("thread {} activated!", i);
53///         signal.decrement().unwrap();
54///     });
55/// }
56///
57/// counter.wait();
58///
59/// println!("all done!");
60/// ```
61pub struct CountdownEvent {
62    initial: usize,
63    counter: AtomicUsize,
64    waiting: SegQueue<thread::Thread>,
65}
66
67/// The collection of errors that can be returned by [`CountdownEvent`] methods.
68///
69/// See [`CountdownEvent`] for more details.
70///
71/// [`CountdownEvent`]: struct.CountdownEvent.html
72#[derive(Debug, Copy, Clone, PartialEq, Eq)]
73pub enum CountdownError {
74    /// Returned when adding to a counter would have caused it to overflow.
75    SaturatedCounter,
76    /// Returned when attempting to signal would have caused the counter to go below zero.
77    TooManySignals,
78    /// Returned when attempting to modify the counter after it has reached zero.
79    AlreadySet,
80}
81
82impl CountdownEvent {
83    /// Creates a new `CountdownEvent`, initialized to the given count.
84    ///
85    /// Remember that once the counter reaches zero, calls to `add` or `signal` will fail, so
86    /// passing zero to this function will create a `CountdownEvent` that is permanently signaled.
87    pub fn new(count: usize) -> CountdownEvent {
88        CountdownEvent {
89            initial: count,
90            counter: AtomicUsize::new(count),
91            waiting: SegQueue::new(),
92        }
93    }
94
95    /// Resets the counter to the count given to `new`.
96    ///
97    /// This function is safe because the `&mut self` enforces that no other references or locks
98    /// exist.
99    pub fn reset(&mut self) {
100        self.counter = AtomicUsize::new(self.initial);
101        // there shouldn't be any remaining thread handles in here, but let's clear it out anyway
102        while let Some(thread) = self.waiting.pop() {
103            thread.unpark();
104        }
105    }
106
107    /// Resets the counter to the given count.
108    ///
109    /// This function is safe because the `&mut self` enforces that no other references or locks
110    /// exist.
111    pub fn reset_to_count(&mut self, count: usize) {
112        self.initial = count;
113        self.reset();
114    }
115
116    /// Returns the current counter value.
117    pub fn count(&self) -> usize {
118        self.counter.load(Ordering::SeqCst)
119    }
120
121    /// Adds the given count to the counter.
122    ///
123    /// # Errors
124    ///
125    /// If the counter is already at zero, this function will return `CountdownError::AlreadySet`.
126    ///
127    /// If the given count would cause the counter to overflow `usize`, this function will return
128    /// `CountdownError::SaturatedCounter`.
129    pub fn add(&self, count: usize) -> Result<(), CountdownError> {
130        let mut current = self.count();
131
132        loop {
133            if current == 0 {
134                return Err(CountdownError::AlreadySet);
135            }
136
137            if let Some(new_count) = current.checked_add(count) {
138                let exchange_result = self.counter.compare_exchange_weak(
139                    current,
140                    new_count,
141                    Ordering::SeqCst,
142                    Ordering::SeqCst,
143                );
144                match exchange_result {
145                    Ok(_) => return Ok(()),
146                    Err(last_count) => current = last_count,
147                }
148            } else {
149                return Err(CountdownError::SaturatedCounter);
150            }
151        }
152    }
153
154    /// Subtracts the given count to the counter, and returns whether this caused any waiting
155    /// threads to wake up.
156    ///
157    /// # Errors
158    ///
159    /// If the counter is already at zero, this function will return `CountdownError::AlreadySet`.
160    ///
161    /// If the given count would cause the counter to go *below* zero (instead of reaching zero),
162    /// this function will return `CountdownError::TooManySignals`.
163    pub fn signal(&self, count: usize) -> Result<bool, CountdownError> {
164        let mut current = self.count();
165
166        loop {
167            if current == 0 {
168                return Err(CountdownError::AlreadySet);
169            }
170
171            if let Some(new_count) = current.checked_sub(count) {
172                let exchange_result = self.counter.compare_exchange_weak(
173                    current,
174                    new_count,
175                    Ordering::SeqCst,
176                    Ordering::SeqCst,
177                );
178                match exchange_result {
179                    Ok(_) => {
180                        current = new_count;
181                        break;
182                    }
183                    Err(last_count) => current = last_count,
184                }
185            } else {
186                return Err(CountdownError::TooManySignals);
187            }
188        }
189
190        if current == 0 {
191            while let Some(thread) = self.waiting.pop() {
192                thread.unpark();
193            }
194            Ok(true)
195        } else {
196            Ok(false)
197        }
198    }
199
200    /// Adds one to the count.
201    ///
202    /// # Errors
203    ///
204    /// See [`add`] for the situations where this function will return an error.
205    ///
206    /// [`add`]: #method.add
207    pub fn increment(&self) -> Result<(), CountdownError> {
208        self.add(1)
209    }
210
211    /// Subtracts one from the counter, and returns whether this caused any waiting threads to wake
212    /// up.
213    ///
214    /// # Errors
215    ///
216    /// See [`signal`] for the situations where this function will return an error.
217    ///
218    /// [`signal`]: #method.signal
219    pub fn decrement(&self) -> Result<bool, CountdownError> {
220        self.signal(1)
221    }
222
223    /// Increments the counter, then returns a guard object that will decrement the counter upon
224    /// drop.
225    ///
226    /// # Errors
227    ///
228    /// This function will return the same errors as `add`. If the event has already signaled by
229    /// the time the guard is dropped (and would cause its `decrement` call to return an error),
230    /// then the error will be silently ignored.
231    ///
232    /// # Example
233    ///
234    /// Here's the sample from the main docs, using `CountdownGuard`s instead of manually
235    /// decrementing:
236    ///
237    /// ```
238    /// use synchronoise::CountdownEvent;
239    /// use std::sync::Arc;
240    /// use std::thread;
241    /// use std::time::Duration;
242    ///
243    /// let thread_count = 5;
244    /// // counter can't start from zero, but the guard increments on its own, so start at one and
245    /// // just decrement once when we're ready to wait
246    /// let counter = Arc::new(CountdownEvent::new(1));
247    ///
248    /// for i in 0..thread_count {
249    ///     let signal = counter.clone();
250    ///     thread::spawn(move || {
251    ///         let _guard = signal.guard().unwrap();
252    ///         thread::sleep(Duration::from_secs(i));
253    ///         println!("thread {} activated!", i);
254    ///     });
255    /// }
256    ///
257    /// // give all the threads time to increment the counter before continuing
258    /// thread::sleep(Duration::from_millis(100));
259    /// counter.decrement().unwrap();
260    /// counter.wait();
261    ///
262    /// println!("all done!");
263    /// ```
264    pub fn guard(&self) -> Result<CountdownGuard, CountdownError> {
265        CountdownGuard::new(self)
266    }
267
268    /// Blocks the current thread until the counter reaches zero.
269    ///
270    /// This function will block indefinitely until the counter reaches zero. It will return
271    /// immediately if it is already at zero.
272    pub fn wait(&self) {
273        // see SignalEvent::wait for why we push first even if the count is already set
274        self.waiting.push(thread::current());
275
276        let mut first = true;
277        while self.count() > 0 {
278            if first {
279                first = false;
280            } else {
281                self.waiting.push(thread::current());
282            }
283
284            thread::park();
285        }
286    }
287
288    /// Blocks the current thread until the timer reaches zero, or until the given timeout elapses,
289    /// returning the count at the time of wakeup.
290    ///
291    /// This function will return immediately if the counter was already at zero. Otherwise, it
292    /// will block for roughly no longer than `timeout`, or when the counter reaches zero,
293    /// whichever comes first.
294    pub fn wait_timeout(&self, timeout: Duration) -> usize {
295        use std::time::Instant;
296
297        // see SignalEvent::wait for why we push first even if the count is already set
298        self.waiting.push(thread::current());
299
300        let begin = Instant::now();
301        let mut first = true;
302        let mut remaining = timeout;
303        loop {
304            let current = self.count();
305
306            if current == 0 {
307                return 0;
308            }
309
310            if first {
311                first = false;
312            } else {
313                let elapsed = begin.elapsed();
314                if elapsed >= timeout {
315                    return current;
316                } else {
317                    remaining = timeout - elapsed;
318                }
319
320                self.waiting.push(thread::current());
321            }
322
323            thread::park_timeout(remaining);
324        }
325    }
326}
327
328/// An opaque guard struct that decrements the count of a borrowed `CountdownEvent` on drop.
329///
330/// See [`CountdownEvent::guard`] for more information about this struct.
331///
332/// [`CountdownEvent::guard`]: struct.CountdownEvent.html#method.guard
333pub struct CountdownGuard<'a> {
334    event: &'a CountdownEvent,
335}
336
337impl<'a> CountdownGuard<'a> {
338    fn new(event: &'a CountdownEvent) -> Result<CountdownGuard<'a>, CountdownError> {
339        event.increment()?;
340        Ok(CountdownGuard { event })
341    }
342}
343
344/// Upon drop, this guard will decrement the counter of its parent `CountdownEvent`. If this would
345/// cause an error (see [`CountdownEvent::signal`] for details), the error is silently ignored.
346///
347/// [`CountdownEvent::signal`]: struct.CountdownEvent.html#method.signal
348impl<'a> Drop for CountdownGuard<'a> {
349    fn drop(&mut self) {
350        // if decrement() returns an error, then the event has already been signaled somehow. i'm
351        // not gonna care about it tho
352        self.event.decrement().ok();
353    }
354}
355
356/// Determines the reset behavior of a [`SignalEvent`].
357///
358/// See [`SignalEvent`] for more information.
359///
360/// [`SignalEvent`]: struct.SignalEvent.html
361#[derive(Debug, PartialEq, Copy, Clone)]
362pub enum SignalKind {
363    /// An activated `SignalEvent` automatically resets when a thread is resumed.
364    ///
365    /// `SignalEvent`s with this kind will only resume one thread at a time.
366    Auto,
367    /// An activated `SignalEvent` must be manually reset to block threads again.
368    ///
369    /// `SignalEvent`s with this kind will signal every waiting thread to continue at once.
370    Manual,
371}
372
373/// A synchronization primitive that allows one or more threads to wait on a signal from another
374/// thread.
375///
376/// With a `SignalEvent`, it's possible to have one or more threads gate on a signal from another
377/// thread. The behavior for what happens when an event is signaled depends on the value of the
378/// `signal_kind` parameter given to `new`, or whether `auto` or `manual` is used to construct the
379/// `SignalEvent`:
380///
381/// * A value of `SignalKind::Auto` (or a `SignalEvent` created via `SignalEvent::auto()`) will
382///   automatically reset the signal when a thread is resumed by this event. If more than one
383///   thread is waiting on the event when it is signaled, only one will be resumed.
384/// * A value of `SignalKind::Manual` (or a `SignalEvent` created via `SignalEvent::manual()`) will
385///   remain signaled until it is manually reset. If more than one thread is waiting on the event
386///   when it is signaled, all of them will be resumed. Any other thread that tries to wait on the
387///   signal before it is reset will not be blocked at all.
388///
389/// `SignalEvent` is a port of [System.Threading.EventWaitHandle][src-link] from .NET.
390///
391/// [src-link]: https://msdn.microsoft.com/en-us/library/system.threading.eventwaithandle(v=vs.110).aspx
392///
393/// # Example
394///
395/// The following example uses two `SignalEvent`s:
396///
397/// * `start_signal` is used as a kind of `std::sync::Barrier`, that keeps all the threads inside
398///   the loop from starting until they all have been spawned. All the `start.wait()` calls resume
399///   when `start_signal.signal()` is called after the initial loop.
400///   * Note that because the "coordinator" doesn't wait for each thread to be scheduled before
401///     signaling, it's possible that some later threads may not have had a chance to enter
402///     `start.wait()` before the signal is set. In this case they won't block in the first place,
403///     and immediately return.
404/// * `stop_signal` is used to wake up the "coordinator" thread when each "worker" thread is
405///   finished with its work. This allows it to keep a count of the number of threads yet to
406///   finish, so it can exit its final loop when all the threads have stopped.
407///
408/// ```
409/// use synchronoise::SignalEvent;
410/// use std::sync::Arc;
411/// use std::thread;
412/// use std::time::Duration;
413///
414/// let start_signal = Arc::new(SignalEvent::manual(false));
415/// let stop_signal = Arc::new(SignalEvent::auto(false));
416/// let mut thread_count = 5;
417///
418/// for i in 0..thread_count {
419///     let start = start_signal.clone();
420///     let stop = stop_signal.clone();
421///     thread::spawn(move || {
422///         // as a Manual-reset signal, all the threads will start at the same time
423///         start.wait();
424///         thread::sleep(Duration::from_secs(i));
425///         println!("thread {} activated!", i);
426///         stop.signal();
427///     });
428/// }
429///
430/// start_signal.signal();
431///
432/// while thread_count > 0 {
433///     // as an Auto-reset signal, this will automatically reset when resuming
434///     // so when the loop comes back, we don't have to reset before blocking again
435///     stop_signal.wait();
436///     thread_count -= 1;
437/// }
438///
439/// println!("all done!");
440/// ```
441pub struct SignalEvent {
442    reset: SignalKind,
443    signal: AtomicBool,
444    waiting: SegQueue<thread::Thread>,
445}
446
447impl SignalEvent {
448    /// Creates a new `SignalEvent` with the given starting state and reset behavior.
449    ///
450    /// If `init_state` is `true`, then this `SignalEvent` will start with the signal already set,
451    /// so that threads that wait will immediately unblock.
452    pub fn new(init_state: bool, signal_kind: SignalKind) -> SignalEvent {
453        SignalEvent {
454            reset: signal_kind,
455            signal: AtomicBool::new(init_state),
456            waiting: SegQueue::new(),
457        }
458    }
459
460    /// Creates a new automatically-resetting `SignalEvent` with the given starting state.
461    ///
462    /// If `init_state` is `true`, then this `SignalEvent` will start with the signal already set,
463    /// so that the first thread that tries to wait will immediately unblock.
464    pub fn auto(init_state: bool) -> SignalEvent {
465        SignalEvent::new(init_state, SignalKind::Auto)
466    }
467
468    /// Creates a new manually-resetting `SignalEvent` with the given starting state.
469    ///
470    /// If `init_state` is `true`, then this `SignalEvent` will start with the signal alraedy set,
471    /// so that threads that wait will immediately unblock until `reset` is called.
472    pub fn manual(init_state: bool) -> SignalEvent {
473        SignalEvent::new(init_state, SignalKind::Manual)
474    }
475
476    /// Returns the current signal status of the `SignalEvent`.
477    pub fn status(&self) -> bool {
478        self.signal.load(Ordering::SeqCst)
479    }
480
481    /// Sets the signal on this `SignalEvent`, potentially waking up one or all threads waiting on
482    /// it.
483    ///
484    /// If more than one thread is waiting on the event, the behavior is different depending on the
485    /// `SignalKind` passed to the event when it was created. For a value of `Auto`, one thread
486    /// will be resumed. For a value of `Manual`, all waiting threads will be resumed.
487    ///
488    /// If no thread is currently waiting on the event, its state will be set regardless. Any
489    /// future attempts to wait on the event will unblock immediately, except for a `SignalKind` of
490    /// Auto, which will immediately unblock the first thread only.
491    pub fn signal(&self) {
492        self.signal.store(true, Ordering::SeqCst);
493
494        match self.reset {
495            // there may be duplicate handles in the queue due to spurious wakeups, so just loop
496            // until we know the signal got reset - any that got woken up wrongly will also observe
497            // the reset signal and push their handle back in
498            SignalKind::Auto => {
499                while self.signal.load(Ordering::SeqCst) {
500                    if let Some(thread) = self.waiting.pop() {
501                        thread.unpark();
502                    } else {
503                        break;
504                    }
505                }
506            }
507            // for manual resets, just unilaterally drain the queue
508            SignalKind::Manual => {
509                while let Some(thread) = self.waiting.pop() {
510                    thread.unpark();
511                }
512            }
513        }
514    }
515
516    /// Resets the signal on this `SignalEvent`, allowing threads that wait on it to block.
517    pub fn reset(&self) {
518        self.signal.store(false, Ordering::SeqCst);
519    }
520
521    /// Blocks this thread until another thread calls `signal`.
522    ///
523    /// If this event is already set, then this function will immediately return without blocking.
524    /// For events with a `SignalKind` of `Auto`, this will reset the signal so that the next
525    /// thread to wait will block.
526    pub fn wait(&self) {
527        // Push first, regardless, because in SignalEvent's doctest there's a thorny race condition
528        // where (1) the waiting thread will see an unset signal, (2) the signalling thread will
529        // set the signal and drain the queue, and only then (3) the waiting thread will push its
530        // handle. Having erroneous handles is ultimately harmless from a correctness standpoint
531        // because signal loops properly anyway, and if the park handle is already set when a
532        // thread tries to wait it will just immediately unpark, see that the signal is still
533        // unset, and park again. Shame about those spent cycles dealing with it though.
534        self.waiting.push(thread::current());
535
536        // loop on the park in case we spuriously wake up
537        let mut first = true;
538        while !self.check_signal() {
539            // push every time in case there's a race between `signal` and this, since on
540            // `SignalKind::Auto` it will loop until someone turns it off - but only one will
541            // actually exit this loop, because `check_signal` does a CAS
542            if first {
543                first = false;
544            } else {
545                self.waiting.push(thread::current());
546            }
547
548            thread::park();
549        }
550    }
551
552    /// Blocks this thread until either another thread calls `signal`, or until the timeout
553    /// elapses.
554    ///
555    /// This function returns the status of the signal when it woke up. If this function exits
556    /// because the signal was set, and this event has a `SignalKind` of `Auto`, the signal will be
557    /// reset so that the next thread to wait will block.
558    pub fn wait_timeout(&self, timeout: Duration) -> bool {
559        use std::time::Instant;
560
561        // see SignalEvent::wait for why we push first even if the signal is already set
562        self.waiting.push(thread::current());
563
564        let begin = Instant::now();
565        let mut first = true;
566        let mut remaining = timeout;
567        loop {
568            if self.check_signal() {
569                return true;
570            }
571
572            if first {
573                first = false;
574            } else {
575                let elapsed = begin.elapsed();
576                if elapsed >= timeout {
577                    return self.status();
578                } else {
579                    remaining = timeout - elapsed;
580                }
581
582                self.waiting.push(thread::current());
583            }
584
585            thread::park_timeout(remaining);
586        }
587    }
588
589    /// Perfoms an atomic compare-exchange on the signal, resetting it if (1) it was set, and (2)
590    /// this `SignalEvent` was configured with `SignalKind::Auto`. Returns whether the signal was
591    /// previously set.
592    fn check_signal(&self) -> bool {
593        self.signal
594            .compare_exchange_weak(
595                true,
596                self.reset == SignalKind::Manual,
597                Ordering::SeqCst,
598                Ordering::SeqCst,
599            )
600            .unwrap_or_else(identity)
601    }
602}