synchronoise/event.rs
1//! "Event" primitives, allowing one thread to wait on a signal or countdown from other threads.
2//!
3//! The primary types in this module are the [`CountdownEvent`] and the [`SignalEvent`] structs. See
4//! the documentation on those types for further information.
5//!
6//! [`CountdownEvent`]: struct.CountdownEvent.html
7//! [`SignalEvent`]: struct.SignalEvent.html
8
9use std::convert::identity;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11use std::thread;
12use std::time::Duration;
13
14use crossbeam_queue::SegQueue;
15
16/// A synchronization primitive that signals when its count reaches zero.
17///
18/// With a `CountdownEvent`, it's possible to cause one thread to wait on a set of computations
19/// occurring in other threads by making the other threads interact with the counter as they
20/// perform their work.
21///
22/// The main limitation of a CountdownEvent is that once its counter reaches zero (even by starting
23/// there), any attempts to update the counter will return `CountdownError::AlreadySet` until the
24/// counter is reset by calling `reset` or `reset_to_count`.
25///
26/// `CountdownEvent` is a port of [System.Threading.CountdownEvent][src-link] from .NET (also
27/// called [`CountDownLatch`][java-src] in Java).
28///
29/// [src-link]: https://msdn.microsoft.com/en-us/library/system.threading.countdownevent(v=vs.110).aspx
30/// [java-src]: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
31///
32/// # Example
33///
34/// This example uses a `CountdownEvent` to make the "coordinator" thread sleep until all of its
35/// "worker" threads have finished. Each thread calls `signal.decrement()` to signal to the Event
36/// that its work has completed. When the last thread does this (and brings the counter to zero),
37/// the "coordinator" thread wakes up and prints `all done!`.
38///
39/// ```
40/// use synchronoise::CountdownEvent;
41/// use std::sync::Arc;
42/// use std::thread;
43/// use std::time::Duration;
44///
45/// let thread_count = 5;
46/// let counter = Arc::new(CountdownEvent::new(thread_count));
47///
48/// for i in 0..thread_count {
49/// let signal = counter.clone();
50/// thread::spawn(move || {
51/// thread::sleep(Duration::from_secs(i as u64));
52/// println!("thread {} activated!", i);
53/// signal.decrement().unwrap();
54/// });
55/// }
56///
57/// counter.wait();
58///
59/// println!("all done!");
60/// ```
61pub struct CountdownEvent {
62 initial: usize,
63 counter: AtomicUsize,
64 waiting: SegQueue<thread::Thread>,
65}
66
67/// The collection of errors that can be returned by [`CountdownEvent`] methods.
68///
69/// See [`CountdownEvent`] for more details.
70///
71/// [`CountdownEvent`]: struct.CountdownEvent.html
72#[derive(Debug, Copy, Clone, PartialEq, Eq)]
73pub enum CountdownError {
74 /// Returned when adding to a counter would have caused it to overflow.
75 SaturatedCounter,
76 /// Returned when attempting to signal would have caused the counter to go below zero.
77 TooManySignals,
78 /// Returned when attempting to modify the counter after it has reached zero.
79 AlreadySet,
80}
81
82impl CountdownEvent {
83 /// Creates a new `CountdownEvent`, initialized to the given count.
84 ///
85 /// Remember that once the counter reaches zero, calls to `add` or `signal` will fail, so
86 /// passing zero to this function will create a `CountdownEvent` that is permanently signaled.
87 pub fn new(count: usize) -> CountdownEvent {
88 CountdownEvent {
89 initial: count,
90 counter: AtomicUsize::new(count),
91 waiting: SegQueue::new(),
92 }
93 }
94
95 /// Resets the counter to the count given to `new`.
96 ///
97 /// This function is safe because the `&mut self` enforces that no other references or locks
98 /// exist.
99 pub fn reset(&mut self) {
100 self.counter = AtomicUsize::new(self.initial);
101 // there shouldn't be any remaining thread handles in here, but let's clear it out anyway
102 while let Some(thread) = self.waiting.pop() {
103 thread.unpark();
104 }
105 }
106
107 /// Resets the counter to the given count.
108 ///
109 /// This function is safe because the `&mut self` enforces that no other references or locks
110 /// exist.
111 pub fn reset_to_count(&mut self, count: usize) {
112 self.initial = count;
113 self.reset();
114 }
115
116 /// Returns the current counter value.
117 pub fn count(&self) -> usize {
118 self.counter.load(Ordering::SeqCst)
119 }
120
121 /// Adds the given count to the counter.
122 ///
123 /// # Errors
124 ///
125 /// If the counter is already at zero, this function will return `CountdownError::AlreadySet`.
126 ///
127 /// If the given count would cause the counter to overflow `usize`, this function will return
128 /// `CountdownError::SaturatedCounter`.
129 pub fn add(&self, count: usize) -> Result<(), CountdownError> {
130 let mut current = self.count();
131
132 loop {
133 if current == 0 {
134 return Err(CountdownError::AlreadySet);
135 }
136
137 if let Some(new_count) = current.checked_add(count) {
138 let exchange_result = self.counter.compare_exchange_weak(
139 current,
140 new_count,
141 Ordering::SeqCst,
142 Ordering::SeqCst,
143 );
144 match exchange_result {
145 Ok(_) => return Ok(()),
146 Err(last_count) => current = last_count,
147 }
148 } else {
149 return Err(CountdownError::SaturatedCounter);
150 }
151 }
152 }
153
154 /// Subtracts the given count to the counter, and returns whether this caused any waiting
155 /// threads to wake up.
156 ///
157 /// # Errors
158 ///
159 /// If the counter is already at zero, this function will return `CountdownError::AlreadySet`.
160 ///
161 /// If the given count would cause the counter to go *below* zero (instead of reaching zero),
162 /// this function will return `CountdownError::TooManySignals`.
163 pub fn signal(&self, count: usize) -> Result<bool, CountdownError> {
164 let mut current = self.count();
165
166 loop {
167 if current == 0 {
168 return Err(CountdownError::AlreadySet);
169 }
170
171 if let Some(new_count) = current.checked_sub(count) {
172 let exchange_result = self.counter.compare_exchange_weak(
173 current,
174 new_count,
175 Ordering::SeqCst,
176 Ordering::SeqCst,
177 );
178 match exchange_result {
179 Ok(_) => {
180 current = new_count;
181 break;
182 }
183 Err(last_count) => current = last_count,
184 }
185 } else {
186 return Err(CountdownError::TooManySignals);
187 }
188 }
189
190 if current == 0 {
191 while let Some(thread) = self.waiting.pop() {
192 thread.unpark();
193 }
194 Ok(true)
195 } else {
196 Ok(false)
197 }
198 }
199
200 /// Adds one to the count.
201 ///
202 /// # Errors
203 ///
204 /// See [`add`] for the situations where this function will return an error.
205 ///
206 /// [`add`]: #method.add
207 pub fn increment(&self) -> Result<(), CountdownError> {
208 self.add(1)
209 }
210
211 /// Subtracts one from the counter, and returns whether this caused any waiting threads to wake
212 /// up.
213 ///
214 /// # Errors
215 ///
216 /// See [`signal`] for the situations where this function will return an error.
217 ///
218 /// [`signal`]: #method.signal
219 pub fn decrement(&self) -> Result<bool, CountdownError> {
220 self.signal(1)
221 }
222
223 /// Increments the counter, then returns a guard object that will decrement the counter upon
224 /// drop.
225 ///
226 /// # Errors
227 ///
228 /// This function will return the same errors as `add`. If the event has already signaled by
229 /// the time the guard is dropped (and would cause its `decrement` call to return an error),
230 /// then the error will be silently ignored.
231 ///
232 /// # Example
233 ///
234 /// Here's the sample from the main docs, using `CountdownGuard`s instead of manually
235 /// decrementing:
236 ///
237 /// ```
238 /// use synchronoise::CountdownEvent;
239 /// use std::sync::Arc;
240 /// use std::thread;
241 /// use std::time::Duration;
242 ///
243 /// let thread_count = 5;
244 /// // counter can't start from zero, but the guard increments on its own, so start at one and
245 /// // just decrement once when we're ready to wait
246 /// let counter = Arc::new(CountdownEvent::new(1));
247 ///
248 /// for i in 0..thread_count {
249 /// let signal = counter.clone();
250 /// thread::spawn(move || {
251 /// let _guard = signal.guard().unwrap();
252 /// thread::sleep(Duration::from_secs(i));
253 /// println!("thread {} activated!", i);
254 /// });
255 /// }
256 ///
257 /// // give all the threads time to increment the counter before continuing
258 /// thread::sleep(Duration::from_millis(100));
259 /// counter.decrement().unwrap();
260 /// counter.wait();
261 ///
262 /// println!("all done!");
263 /// ```
264 pub fn guard(&self) -> Result<CountdownGuard, CountdownError> {
265 CountdownGuard::new(self)
266 }
267
268 /// Blocks the current thread until the counter reaches zero.
269 ///
270 /// This function will block indefinitely until the counter reaches zero. It will return
271 /// immediately if it is already at zero.
272 pub fn wait(&self) {
273 // see SignalEvent::wait for why we push first even if the count is already set
274 self.waiting.push(thread::current());
275
276 let mut first = true;
277 while self.count() > 0 {
278 if first {
279 first = false;
280 } else {
281 self.waiting.push(thread::current());
282 }
283
284 thread::park();
285 }
286 }
287
288 /// Blocks the current thread until the timer reaches zero, or until the given timeout elapses,
289 /// returning the count at the time of wakeup.
290 ///
291 /// This function will return immediately if the counter was already at zero. Otherwise, it
292 /// will block for roughly no longer than `timeout`, or when the counter reaches zero,
293 /// whichever comes first.
294 pub fn wait_timeout(&self, timeout: Duration) -> usize {
295 use std::time::Instant;
296
297 // see SignalEvent::wait for why we push first even if the count is already set
298 self.waiting.push(thread::current());
299
300 let begin = Instant::now();
301 let mut first = true;
302 let mut remaining = timeout;
303 loop {
304 let current = self.count();
305
306 if current == 0 {
307 return 0;
308 }
309
310 if first {
311 first = false;
312 } else {
313 let elapsed = begin.elapsed();
314 if elapsed >= timeout {
315 return current;
316 } else {
317 remaining = timeout - elapsed;
318 }
319
320 self.waiting.push(thread::current());
321 }
322
323 thread::park_timeout(remaining);
324 }
325 }
326}
327
328/// An opaque guard struct that decrements the count of a borrowed `CountdownEvent` on drop.
329///
330/// See [`CountdownEvent::guard`] for more information about this struct.
331///
332/// [`CountdownEvent::guard`]: struct.CountdownEvent.html#method.guard
333pub struct CountdownGuard<'a> {
334 event: &'a CountdownEvent,
335}
336
337impl<'a> CountdownGuard<'a> {
338 fn new(event: &'a CountdownEvent) -> Result<CountdownGuard<'a>, CountdownError> {
339 event.increment()?;
340 Ok(CountdownGuard { event })
341 }
342}
343
344/// Upon drop, this guard will decrement the counter of its parent `CountdownEvent`. If this would
345/// cause an error (see [`CountdownEvent::signal`] for details), the error is silently ignored.
346///
347/// [`CountdownEvent::signal`]: struct.CountdownEvent.html#method.signal
348impl<'a> Drop for CountdownGuard<'a> {
349 fn drop(&mut self) {
350 // if decrement() returns an error, then the event has already been signaled somehow. i'm
351 // not gonna care about it tho
352 self.event.decrement().ok();
353 }
354}
355
356/// Determines the reset behavior of a [`SignalEvent`].
357///
358/// See [`SignalEvent`] for more information.
359///
360/// [`SignalEvent`]: struct.SignalEvent.html
361#[derive(Debug, PartialEq, Copy, Clone)]
362pub enum SignalKind {
363 /// An activated `SignalEvent` automatically resets when a thread is resumed.
364 ///
365 /// `SignalEvent`s with this kind will only resume one thread at a time.
366 Auto,
367 /// An activated `SignalEvent` must be manually reset to block threads again.
368 ///
369 /// `SignalEvent`s with this kind will signal every waiting thread to continue at once.
370 Manual,
371}
372
373/// A synchronization primitive that allows one or more threads to wait on a signal from another
374/// thread.
375///
376/// With a `SignalEvent`, it's possible to have one or more threads gate on a signal from another
377/// thread. The behavior for what happens when an event is signaled depends on the value of the
378/// `signal_kind` parameter given to `new`, or whether `auto` or `manual` is used to construct the
379/// `SignalEvent`:
380///
381/// * A value of `SignalKind::Auto` (or a `SignalEvent` created via `SignalEvent::auto()`) will
382/// automatically reset the signal when a thread is resumed by this event. If more than one
383/// thread is waiting on the event when it is signaled, only one will be resumed.
384/// * A value of `SignalKind::Manual` (or a `SignalEvent` created via `SignalEvent::manual()`) will
385/// remain signaled until it is manually reset. If more than one thread is waiting on the event
386/// when it is signaled, all of them will be resumed. Any other thread that tries to wait on the
387/// signal before it is reset will not be blocked at all.
388///
389/// `SignalEvent` is a port of [System.Threading.EventWaitHandle][src-link] from .NET.
390///
391/// [src-link]: https://msdn.microsoft.com/en-us/library/system.threading.eventwaithandle(v=vs.110).aspx
392///
393/// # Example
394///
395/// The following example uses two `SignalEvent`s:
396///
397/// * `start_signal` is used as a kind of `std::sync::Barrier`, that keeps all the threads inside
398/// the loop from starting until they all have been spawned. All the `start.wait()` calls resume
399/// when `start_signal.signal()` is called after the initial loop.
400/// * Note that because the "coordinator" doesn't wait for each thread to be scheduled before
401/// signaling, it's possible that some later threads may not have had a chance to enter
402/// `start.wait()` before the signal is set. In this case they won't block in the first place,
403/// and immediately return.
404/// * `stop_signal` is used to wake up the "coordinator" thread when each "worker" thread is
405/// finished with its work. This allows it to keep a count of the number of threads yet to
406/// finish, so it can exit its final loop when all the threads have stopped.
407///
408/// ```
409/// use synchronoise::SignalEvent;
410/// use std::sync::Arc;
411/// use std::thread;
412/// use std::time::Duration;
413///
414/// let start_signal = Arc::new(SignalEvent::manual(false));
415/// let stop_signal = Arc::new(SignalEvent::auto(false));
416/// let mut thread_count = 5;
417///
418/// for i in 0..thread_count {
419/// let start = start_signal.clone();
420/// let stop = stop_signal.clone();
421/// thread::spawn(move || {
422/// // as a Manual-reset signal, all the threads will start at the same time
423/// start.wait();
424/// thread::sleep(Duration::from_secs(i));
425/// println!("thread {} activated!", i);
426/// stop.signal();
427/// });
428/// }
429///
430/// start_signal.signal();
431///
432/// while thread_count > 0 {
433/// // as an Auto-reset signal, this will automatically reset when resuming
434/// // so when the loop comes back, we don't have to reset before blocking again
435/// stop_signal.wait();
436/// thread_count -= 1;
437/// }
438///
439/// println!("all done!");
440/// ```
441pub struct SignalEvent {
442 reset: SignalKind,
443 signal: AtomicBool,
444 waiting: SegQueue<thread::Thread>,
445}
446
447impl SignalEvent {
448 /// Creates a new `SignalEvent` with the given starting state and reset behavior.
449 ///
450 /// If `init_state` is `true`, then this `SignalEvent` will start with the signal already set,
451 /// so that threads that wait will immediately unblock.
452 pub fn new(init_state: bool, signal_kind: SignalKind) -> SignalEvent {
453 SignalEvent {
454 reset: signal_kind,
455 signal: AtomicBool::new(init_state),
456 waiting: SegQueue::new(),
457 }
458 }
459
460 /// Creates a new automatically-resetting `SignalEvent` with the given starting state.
461 ///
462 /// If `init_state` is `true`, then this `SignalEvent` will start with the signal already set,
463 /// so that the first thread that tries to wait will immediately unblock.
464 pub fn auto(init_state: bool) -> SignalEvent {
465 SignalEvent::new(init_state, SignalKind::Auto)
466 }
467
468 /// Creates a new manually-resetting `SignalEvent` with the given starting state.
469 ///
470 /// If `init_state` is `true`, then this `SignalEvent` will start with the signal alraedy set,
471 /// so that threads that wait will immediately unblock until `reset` is called.
472 pub fn manual(init_state: bool) -> SignalEvent {
473 SignalEvent::new(init_state, SignalKind::Manual)
474 }
475
476 /// Returns the current signal status of the `SignalEvent`.
477 pub fn status(&self) -> bool {
478 self.signal.load(Ordering::SeqCst)
479 }
480
481 /// Sets the signal on this `SignalEvent`, potentially waking up one or all threads waiting on
482 /// it.
483 ///
484 /// If more than one thread is waiting on the event, the behavior is different depending on the
485 /// `SignalKind` passed to the event when it was created. For a value of `Auto`, one thread
486 /// will be resumed. For a value of `Manual`, all waiting threads will be resumed.
487 ///
488 /// If no thread is currently waiting on the event, its state will be set regardless. Any
489 /// future attempts to wait on the event will unblock immediately, except for a `SignalKind` of
490 /// Auto, which will immediately unblock the first thread only.
491 pub fn signal(&self) {
492 self.signal.store(true, Ordering::SeqCst);
493
494 match self.reset {
495 // there may be duplicate handles in the queue due to spurious wakeups, so just loop
496 // until we know the signal got reset - any that got woken up wrongly will also observe
497 // the reset signal and push their handle back in
498 SignalKind::Auto => {
499 while self.signal.load(Ordering::SeqCst) {
500 if let Some(thread) = self.waiting.pop() {
501 thread.unpark();
502 } else {
503 break;
504 }
505 }
506 }
507 // for manual resets, just unilaterally drain the queue
508 SignalKind::Manual => {
509 while let Some(thread) = self.waiting.pop() {
510 thread.unpark();
511 }
512 }
513 }
514 }
515
516 /// Resets the signal on this `SignalEvent`, allowing threads that wait on it to block.
517 pub fn reset(&self) {
518 self.signal.store(false, Ordering::SeqCst);
519 }
520
521 /// Blocks this thread until another thread calls `signal`.
522 ///
523 /// If this event is already set, then this function will immediately return without blocking.
524 /// For events with a `SignalKind` of `Auto`, this will reset the signal so that the next
525 /// thread to wait will block.
526 pub fn wait(&self) {
527 // Push first, regardless, because in SignalEvent's doctest there's a thorny race condition
528 // where (1) the waiting thread will see an unset signal, (2) the signalling thread will
529 // set the signal and drain the queue, and only then (3) the waiting thread will push its
530 // handle. Having erroneous handles is ultimately harmless from a correctness standpoint
531 // because signal loops properly anyway, and if the park handle is already set when a
532 // thread tries to wait it will just immediately unpark, see that the signal is still
533 // unset, and park again. Shame about those spent cycles dealing with it though.
534 self.waiting.push(thread::current());
535
536 // loop on the park in case we spuriously wake up
537 let mut first = true;
538 while !self.check_signal() {
539 // push every time in case there's a race between `signal` and this, since on
540 // `SignalKind::Auto` it will loop until someone turns it off - but only one will
541 // actually exit this loop, because `check_signal` does a CAS
542 if first {
543 first = false;
544 } else {
545 self.waiting.push(thread::current());
546 }
547
548 thread::park();
549 }
550 }
551
552 /// Blocks this thread until either another thread calls `signal`, or until the timeout
553 /// elapses.
554 ///
555 /// This function returns the status of the signal when it woke up. If this function exits
556 /// because the signal was set, and this event has a `SignalKind` of `Auto`, the signal will be
557 /// reset so that the next thread to wait will block.
558 pub fn wait_timeout(&self, timeout: Duration) -> bool {
559 use std::time::Instant;
560
561 // see SignalEvent::wait for why we push first even if the signal is already set
562 self.waiting.push(thread::current());
563
564 let begin = Instant::now();
565 let mut first = true;
566 let mut remaining = timeout;
567 loop {
568 if self.check_signal() {
569 return true;
570 }
571
572 if first {
573 first = false;
574 } else {
575 let elapsed = begin.elapsed();
576 if elapsed >= timeout {
577 return self.status();
578 } else {
579 remaining = timeout - elapsed;
580 }
581
582 self.waiting.push(thread::current());
583 }
584
585 thread::park_timeout(remaining);
586 }
587 }
588
589 /// Perfoms an atomic compare-exchange on the signal, resetting it if (1) it was set, and (2)
590 /// this `SignalEvent` was configured with `SignalKind::Auto`. Returns whether the signal was
591 /// previously set.
592 fn check_signal(&self) -> bool {
593 self.signal
594 .compare_exchange_weak(
595 true,
596 self.reset == SignalKind::Manual,
597 Ordering::SeqCst,
598 Ordering::SeqCst,
599 )
600 .unwrap_or_else(identity)
601 }
602}