synchronoise/
phaser.rs

1//! Support module for `WriterReaderPhaser` and related structs.
2//!
3//! See the documentation of the [`WriterReaderPhaser`] struct for more information.
4//!
5//! [`WriterReaderPhaser`]: struct.WriterReaderPhaser.html
6
7use std::sync::{Arc, Mutex, LockResult, MutexGuard};
8use std::sync::atomic::{AtomicIsize, Ordering};
9use std::isize::MIN as ISIZE_MIN;
10use std::time::Duration;
11use std::thread;
12
13/// A synchronization primitive that allows for multiple concurrent wait-free "writer critical
14/// sections" and a "reader phase flip" that can wait for all currently-active writers to finish.
15///
16/// The basic interaction setup for a `WriterReaderPhaser` is as follows:
17///
18/// * Any number of writers can open and close a "writer critical section" with no waiting.
19/// * Zero or one readers can be active at one time, by holding a "read lock". Any reader who
20///   wishes to open a "read lock" while another one is active is blocked until the previous one
21///   finishes.
22/// * The holder of a read lock may request a "phase flip", which causes the reader to wait until
23///   all current writer critical sections are finished before continuing.
24///
25/// `WriterReaderPhaser` is a port of the primitive of the same name from `HdrHistogram`. For a
26/// summary of the rationale behind its design, see [this post by its author][wrp-blog]. Part of
27/// its assumptions is that this primitive is synchronizing access to a double-buffered set of
28/// counters, and the readers are expected to swap the buffers while holding a read lock but before
29/// flipping the phase. This allows them to access a stable sample to read and perform calculations
30/// from, while writers still have wait-free synchronization.
31///
32/// [wrp-blog]: https://stuff-gil-says.blogspot.com/2014/11/writerreaderphaser-story-about-new.html
33///
34/// "Writer critical sections" and "read locks" are represented by guard structs that allow
35/// scope-based resource management of the counters and locks.
36///
37/// * The `PhaserCriticalSection` atomically increments and decrements the phase counters upon
38///   creation and drop. These operations use `std::sync::atomic::AtomicIsize` from the standard
39///   library, and provide no-wait handling for platforms with atomic addition instructions.
40/// * The `PhaserReadLock` is kept in the `WriterReaderPhaser` as a Mutex, enforcing the mutual
41///   exclusion of the read lock. The "phase flip" operation is defined on the read lock guard
42///   itself, enforcing that only the holder of a read lock can execute one.
43pub struct WriterReaderPhaser {
44    start_epoch: Arc<AtomicIsize>,
45    even_end_epoch: Arc<AtomicIsize>,
46    odd_end_epoch: Arc<AtomicIsize>,
47    read_lock: Mutex<PhaserReadLock>,
48}
49
50/// Guard struct that represents a "writer critical section" for a `WriterReaderPhaser`.
51///
52/// `PhaserCriticalSection` is a scope-based guard to signal the beginning and end of a "writer
53/// critical section" to the phaser. Upon calling `writer_critical_section`, the phaser atomically
54/// increments a counter, and when the returned `PhaserCriticalSection` drops, the `drop` call
55/// atomically increments another counter. On platforms with atomic increment instructions, this
56/// should result in wait-free synchronization.
57///
58/// # Example
59///
60/// ```
61/// # let phaser = synchronoise::WriterReaderPhaser::new();
62/// {
63///     let _guard = phaser.writer_critical_section();
64///     // perform writes
65/// } // _guard drops, signaling the end of the section
66/// ```
67pub struct PhaserCriticalSection {
68    end_epoch: Arc<AtomicIsize>,
69}
70
71/// Upon drop, a `PhaserCriticalSection` will signal its parent `WriterReaderPhaser` that the
72/// critical section has ended.
73impl Drop for PhaserCriticalSection {
74    fn drop(&mut self) {
75        self.end_epoch.fetch_add(1, Ordering::Release);
76    }
77}
78
79/// Guard struct for a `WriterReaderPhaser` that allows a reader to perform a "phase flip".
80///
81/// The `PhaserReadLock` struct allows one to perform a "phase flip" on its parent
82/// `WriterReaderPhaser`. It is held in a `std::sync::Mutex` in its parent phaser, enforcing that
83/// only one reader may be active at once.
84///
85/// The `flip_phase` call performs a spin-wait while waiting the the currently-active writers to
86/// finish. A sleep time may be added between checks by calling `flip_with_sleep` instead.
87///
88/// # Example
89///
90/// ```
91/// # let phaser = synchronoise::WriterReaderPhaser::new();
92/// {
93///     let lock = phaser.read_lock().unwrap();
94///     // swap buffers
95///     lock.flip_phase();
96///     // reader now has access to a stable snapshot
97/// } // lock drops, relinquishing the read lock and allowing another reader to lock
98/// ```
99pub struct PhaserReadLock {
100    start_epoch: Arc<AtomicIsize>,
101    even_end_epoch: Arc<AtomicIsize>,
102    odd_end_epoch: Arc<AtomicIsize>,
103}
104
105impl WriterReaderPhaser {
106    /// Creates a new `WriterReaderPhaser`.
107    pub fn new() -> WriterReaderPhaser {
108        let start = Arc::new(AtomicIsize::new(0));
109        let even = Arc::new(AtomicIsize::new(0));
110        let odd = Arc::new(AtomicIsize::new(ISIZE_MIN));
111        let read_lock = PhaserReadLock {
112            start_epoch: start.clone(),
113            even_end_epoch: even.clone(),
114            odd_end_epoch: odd.clone(),
115        };
116
117        WriterReaderPhaser {
118            start_epoch: start,
119            even_end_epoch: even,
120            odd_end_epoch: odd,
121            read_lock: Mutex::new(read_lock),
122        }
123    }
124
125    /// Enters a writer critical section, returning a guard object that signals the end of the
126    /// critical section upon drop.
127    pub fn writer_critical_section(&self) -> PhaserCriticalSection {
128        let flag = self.start_epoch.fetch_add(1, Ordering::Release);
129
130        if flag < 0 {
131            PhaserCriticalSection {
132                end_epoch: self.odd_end_epoch.clone(),
133            }
134        } else {
135            PhaserCriticalSection {
136                end_epoch: self.even_end_epoch.clone(),
137            }
138        }
139    }
140
141    /// Enter a reader criticial section, potentially blocking until a currently active read
142    /// section finishes. Returns a guard object that allows the user to flip the phase of the
143    /// `WriterReaderPhaser`, and unlocks the read lock upon drop.
144    ///
145    /// # Errors
146    ///
147    /// If another reader critical section panicked while holding the read lock, this call will
148    /// return an error once the lock is acquired. See the documentation for
149    /// `std::sync::Mutex::lock` for details.
150    pub fn read_lock(&self) -> LockResult<MutexGuard<PhaserReadLock>> {
151        self.read_lock.lock()
152    }
153}
154
155impl PhaserReadLock {
156    /// Wait until all currently-active writer critical sections have completed.
157    pub fn flip_phase(&self) {
158        self.flip_with_sleep(Duration::default());
159    }
160
161    /// Wait until all currently-active writer critical sections have completed. While waiting,
162    /// sleep with the given duration.
163    pub fn flip_with_sleep(&self, sleep_time: Duration) {
164        let next_phase_even = self.start_epoch.load(Ordering::Relaxed) < 0;
165
166        let start_value = if next_phase_even {
167            let tmp = 0;
168            self.even_end_epoch.store(tmp, Ordering::Relaxed);
169            tmp
170        } else {
171            let tmp = ISIZE_MIN;
172            self.odd_end_epoch.store(tmp, Ordering::Relaxed);
173            tmp
174        };
175
176        let value_at_flip = self.start_epoch.swap(start_value, Ordering::AcqRel);
177
178        let end_epoch = if next_phase_even {
179            self.odd_end_epoch.clone()
180        } else {
181            self.even_end_epoch.clone()
182        };
183
184        while end_epoch.load(Ordering::Relaxed) != value_at_flip {
185            if sleep_time == Duration::default() {
186                thread::yield_now();
187            } else {
188                thread::sleep(sleep_time);
189            }
190        }
191    }
192}
193