spin/mutex/ticket.rs
1//! A ticket-based mutex.
2//!
3//! Waiting threads take a 'ticket' from the lock in the order they arrive and gain access to the lock when their
4//! ticket is next in the queue. Best-case latency is slightly worse than a regular spinning mutex, but worse-case
5//! latency is infinitely better. Waiting threads simply need to wait for all threads that come before them in the
6//! queue to finish.
7
8use crate::{
9 atomic::{AtomicUsize, Ordering},
10 RelaxStrategy, Spin,
11};
12use core::{
13 cell::UnsafeCell,
14 fmt,
15 marker::PhantomData,
16 ops::{Deref, DerefMut},
17};
18
19/// A spin-based [ticket lock](https://en.wikipedia.org/wiki/Ticket_lock) providing mutually exclusive access to data.
20///
21/// A ticket lock is analogous to a queue management system for lock requests. When a thread tries to take a lock, it
22/// is assigned a 'ticket'. It then spins until its ticket becomes next in line. When the lock guard is released, the
23/// next ticket will be processed.
24///
25/// Ticket locks significantly reduce the worse-case performance of locking at the cost of slightly higher average-time
26/// overhead.
27///
28/// # Example
29///
30/// ```
31/// use spin;
32///
33/// let lock = spin::mutex::TicketMutex::<_>::new(0);
34///
35/// // Modify the data
36/// *lock.lock() = 2;
37///
38/// // Read the data
39/// let answer = *lock.lock();
40/// assert_eq!(answer, 2);
41/// ```
42///
43/// # Thread safety example
44///
45/// ```
46/// use spin;
47/// use std::sync::{Arc, Barrier};
48///
49/// let thread_count = 1000;
50/// let spin_mutex = Arc::new(spin::mutex::TicketMutex::<_>::new(0));
51///
52/// // We use a barrier to ensure the readout happens after all writing
53/// let barrier = Arc::new(Barrier::new(thread_count + 1));
54///
55/// for _ in (0..thread_count) {
56/// let my_barrier = barrier.clone();
57/// let my_lock = spin_mutex.clone();
58/// std::thread::spawn(move || {
59/// let mut guard = my_lock.lock();
60/// *guard += 1;
61///
62/// // Release the lock to prevent a deadlock
63/// drop(guard);
64/// my_barrier.wait();
65/// });
66/// }
67///
68/// barrier.wait();
69///
70/// let answer = { *spin_mutex.lock() };
71/// assert_eq!(answer, thread_count);
72/// ```
73pub struct TicketMutex<T: ?Sized, R = Spin> {
74 phantom: PhantomData<R>,
75 next_ticket: AtomicUsize,
76 next_serving: AtomicUsize,
77 data: UnsafeCell<T>,
78}
79
80/// A guard that protects some data.
81///
82/// When the guard is dropped, the next ticket will be processed.
83pub struct TicketMutexGuard<'a, T: ?Sized + 'a> {
84 next_serving: &'a AtomicUsize,
85 ticket: usize,
86 data: &'a mut T,
87}
88
89unsafe impl<T: ?Sized + Send, R> Sync for TicketMutex<T, R> {}
90unsafe impl<T: ?Sized + Send, R> Send for TicketMutex<T, R> {}
91
92impl<T, R> TicketMutex<T, R> {
93 /// Creates a new [`TicketMutex`] wrapping the supplied data.
94 ///
95 /// # Example
96 ///
97 /// ```
98 /// use spin::mutex::TicketMutex;
99 ///
100 /// static MUTEX: TicketMutex<()> = TicketMutex::<_>::new(());
101 ///
102 /// fn demo() {
103 /// let lock = MUTEX.lock();
104 /// // do something with lock
105 /// drop(lock);
106 /// }
107 /// ```
108 #[inline(always)]
109 pub const fn new(data: T) -> Self {
110 Self {
111 phantom: PhantomData,
112 next_ticket: AtomicUsize::new(0),
113 next_serving: AtomicUsize::new(0),
114 data: UnsafeCell::new(data),
115 }
116 }
117
118 /// Consumes this [`TicketMutex`] and unwraps the underlying data.
119 ///
120 /// # Example
121 ///
122 /// ```
123 /// let lock = spin::mutex::TicketMutex::<_>::new(42);
124 /// assert_eq!(42, lock.into_inner());
125 /// ```
126 #[inline(always)]
127 pub fn into_inner(self) -> T {
128 self.data.into_inner()
129 }
130 /// Returns a mutable pointer to the underying data.
131 ///
132 /// This is mostly meant to be used for applications which require manual unlocking, but where
133 /// storing both the lock and the pointer to the inner data gets inefficient.
134 ///
135 /// # Example
136 /// ```
137 /// let lock = spin::mutex::SpinMutex::<_>::new(42);
138 ///
139 /// unsafe {
140 /// core::mem::forget(lock.lock());
141 ///
142 /// assert_eq!(lock.as_mut_ptr().read(), 42);
143 /// lock.as_mut_ptr().write(58);
144 ///
145 /// lock.force_unlock();
146 /// }
147 ///
148 /// assert_eq!(*lock.lock(), 58);
149 ///
150 /// ```
151 #[inline(always)]
152 pub fn as_mut_ptr(&self) -> *mut T {
153 self.data.get()
154 }
155}
156
157impl<T: ?Sized + fmt::Debug, R> fmt::Debug for TicketMutex<T, R> {
158 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159 match self.try_lock() {
160 Some(guard) => write!(f, "Mutex {{ data: ")
161 .and_then(|()| (&*guard).fmt(f))
162 .and_then(|()| write!(f, " }}")),
163 None => write!(f, "Mutex {{ <locked> }}"),
164 }
165 }
166}
167
168impl<T: ?Sized, R: RelaxStrategy> TicketMutex<T, R> {
169 /// Locks the [`TicketMutex`] and returns a guard that permits access to the inner data.
170 ///
171 /// The returned data may be dereferenced for data access
172 /// and the lock will be dropped when the guard falls out of scope.
173 ///
174 /// ```
175 /// let lock = spin::mutex::TicketMutex::<_>::new(0);
176 /// {
177 /// let mut data = lock.lock();
178 /// // The lock is now locked and the data can be accessed
179 /// *data += 1;
180 /// // The lock is implicitly dropped at the end of the scope
181 /// }
182 /// ```
183 #[inline(always)]
184 pub fn lock(&self) -> TicketMutexGuard<T> {
185 let ticket = self.next_ticket.fetch_add(1, Ordering::Relaxed);
186
187 while self.next_serving.load(Ordering::Acquire) != ticket {
188 R::relax();
189 }
190
191 TicketMutexGuard {
192 next_serving: &self.next_serving,
193 ticket,
194 // Safety
195 // We know that we are the next ticket to be served,
196 // so there's no other thread accessing the data.
197 //
198 // Every other thread has another ticket number so it's
199 // definitely stuck in the spin loop above.
200 data: unsafe { &mut *self.data.get() },
201 }
202 }
203}
204
205impl<T: ?Sized, R> TicketMutex<T, R> {
206 /// Returns `true` if the lock is currently held.
207 ///
208 /// # Safety
209 ///
210 /// This function provides no synchronization guarantees and so its result should be considered 'out of date'
211 /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic.
212 #[inline(always)]
213 pub fn is_locked(&self) -> bool {
214 let ticket = self.next_ticket.load(Ordering::Relaxed);
215 self.next_serving.load(Ordering::Relaxed) != ticket
216 }
217
218 /// Force unlock this [`TicketMutex`], by serving the next ticket.
219 ///
220 /// # Safety
221 ///
222 /// This is *extremely* unsafe if the lock is not held by the current
223 /// thread. However, this can be useful in some instances for exposing the
224 /// lock to FFI that doesn't know how to deal with RAII.
225 #[inline(always)]
226 pub unsafe fn force_unlock(&self) {
227 self.next_serving.fetch_add(1, Ordering::Release);
228 }
229
230 /// Try to lock this [`TicketMutex`], returning a lock guard if successful.
231 ///
232 /// # Example
233 ///
234 /// ```
235 /// let lock = spin::mutex::TicketMutex::<_>::new(42);
236 ///
237 /// let maybe_guard = lock.try_lock();
238 /// assert!(maybe_guard.is_some());
239 ///
240 /// // `maybe_guard` is still held, so the second call fails
241 /// let maybe_guard2 = lock.try_lock();
242 /// assert!(maybe_guard2.is_none());
243 /// ```
244 #[inline(always)]
245 pub fn try_lock(&self) -> Option<TicketMutexGuard<T>> {
246 // TODO: Replace with `fetch_update` to avoid manual CAS when upgrading MSRV
247 let ticket = {
248 let mut prev = self.next_ticket.load(Ordering::SeqCst);
249 loop {
250 if self.next_serving.load(Ordering::Acquire) == prev {
251 match self.next_ticket.compare_exchange_weak(
252 prev,
253 prev + 1,
254 Ordering::SeqCst,
255 Ordering::SeqCst,
256 ) {
257 Ok(x) => break Some(x),
258 Err(next_prev) => prev = next_prev,
259 }
260 } else {
261 break None;
262 }
263 }
264 };
265
266 ticket.map(|ticket| TicketMutexGuard {
267 next_serving: &self.next_serving,
268 ticket,
269 // Safety
270 // We have a ticket that is equal to the next_serving ticket, so we know:
271 // - that no other thread can have the same ticket id as this thread
272 // - that we are the next one to be served so we have exclusive access to the data
273 data: unsafe { &mut *self.data.get() },
274 })
275 }
276
277 /// Returns a mutable reference to the underlying data.
278 ///
279 /// Since this call borrows the [`TicketMutex`] mutably, and a mutable reference is guaranteed to be exclusive in
280 /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As
281 /// such, this is a 'zero-cost' operation.
282 ///
283 /// # Example
284 ///
285 /// ```
286 /// let mut lock = spin::mutex::TicketMutex::<_>::new(0);
287 /// *lock.get_mut() = 10;
288 /// assert_eq!(*lock.lock(), 10);
289 /// ```
290 #[inline(always)]
291 pub fn get_mut(&mut self) -> &mut T {
292 // Safety:
293 // We know that there are no other references to `self`,
294 // so it's safe to return a exclusive reference to the data.
295 unsafe { &mut *self.data.get() }
296 }
297}
298
299impl<T: ?Sized + Default, R> Default for TicketMutex<T, R> {
300 fn default() -> Self {
301 Self::new(Default::default())
302 }
303}
304
305impl<T, R> From<T> for TicketMutex<T, R> {
306 fn from(data: T) -> Self {
307 Self::new(data)
308 }
309}
310
311impl<'a, T: ?Sized> TicketMutexGuard<'a, T> {
312 /// Leak the lock guard, yielding a mutable reference to the underlying data.
313 ///
314 /// Note that this function will permanently lock the original [`TicketMutex`].
315 ///
316 /// ```
317 /// let mylock = spin::mutex::TicketMutex::<_>::new(0);
318 ///
319 /// let data: &mut i32 = spin::mutex::TicketMutexGuard::leak(mylock.lock());
320 ///
321 /// *data = 1;
322 /// assert_eq!(*data, 1);
323 /// ```
324 #[inline(always)]
325 pub fn leak(this: Self) -> &'a mut T {
326 let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing
327 core::mem::forget(this);
328 unsafe { &mut *data }
329 }
330}
331
332impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for TicketMutexGuard<'a, T> {
333 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
334 fmt::Debug::fmt(&**self, f)
335 }
336}
337
338impl<'a, T: ?Sized + fmt::Display> fmt::Display for TicketMutexGuard<'a, T> {
339 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
340 fmt::Display::fmt(&**self, f)
341 }
342}
343
344impl<'a, T: ?Sized> Deref for TicketMutexGuard<'a, T> {
345 type Target = T;
346 fn deref(&self) -> &T {
347 self.data
348 }
349}
350
351impl<'a, T: ?Sized> DerefMut for TicketMutexGuard<'a, T> {
352 fn deref_mut(&mut self) -> &mut T {
353 self.data
354 }
355}
356
357impl<'a, T: ?Sized> Drop for TicketMutexGuard<'a, T> {
358 fn drop(&mut self) {
359 let new_ticket = self.ticket + 1;
360 self.next_serving.store(new_ticket, Ordering::Release);
361 }
362}
363
364#[cfg(feature = "lock_api")]
365unsafe impl<R: RelaxStrategy> lock_api_crate::RawMutex for TicketMutex<(), R> {
366 type GuardMarker = lock_api_crate::GuardSend;
367
368 const INIT: Self = Self::new(());
369
370 fn lock(&self) {
371 // Prevent guard destructor running
372 core::mem::forget(Self::lock(self));
373 }
374
375 fn try_lock(&self) -> bool {
376 // Prevent guard destructor running
377 Self::try_lock(self).map(core::mem::forget).is_some()
378 }
379
380 unsafe fn unlock(&self) {
381 self.force_unlock();
382 }
383
384 fn is_locked(&self) -> bool {
385 Self::is_locked(self)
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use std::prelude::v1::*;
392
393 use std::sync::atomic::{AtomicUsize, Ordering};
394 use std::sync::mpsc::channel;
395 use std::sync::Arc;
396 use std::thread;
397
398 type TicketMutex<T> = super::TicketMutex<T>;
399
400 #[derive(Eq, PartialEq, Debug)]
401 struct NonCopy(i32);
402
403 #[test]
404 fn smoke() {
405 let m = TicketMutex::<_>::new(());
406 drop(m.lock());
407 drop(m.lock());
408 }
409
410 #[test]
411 fn lots_and_lots() {
412 static M: TicketMutex<()> = TicketMutex::<_>::new(());
413 static mut CNT: u32 = 0;
414 const J: u32 = 1000;
415 const K: u32 = 3;
416
417 fn inc() {
418 for _ in 0..J {
419 unsafe {
420 let _g = M.lock();
421 CNT += 1;
422 }
423 }
424 }
425
426 let (tx, rx) = channel();
427 for _ in 0..K {
428 let tx2 = tx.clone();
429 thread::spawn(move || {
430 inc();
431 tx2.send(()).unwrap();
432 });
433 let tx2 = tx.clone();
434 thread::spawn(move || {
435 inc();
436 tx2.send(()).unwrap();
437 });
438 }
439
440 drop(tx);
441 for _ in 0..2 * K {
442 rx.recv().unwrap();
443 }
444 assert_eq!(unsafe { CNT }, J * K * 2);
445 }
446
447 #[test]
448 fn try_lock() {
449 let mutex = TicketMutex::<_>::new(42);
450
451 // First lock succeeds
452 let a = mutex.try_lock();
453 assert_eq!(a.as_ref().map(|r| **r), Some(42));
454
455 // Additional lock fails
456 let b = mutex.try_lock();
457 assert!(b.is_none());
458
459 // After dropping lock, it succeeds again
460 ::core::mem::drop(a);
461 let c = mutex.try_lock();
462 assert_eq!(c.as_ref().map(|r| **r), Some(42));
463 }
464
465 #[test]
466 fn test_into_inner() {
467 let m = TicketMutex::<_>::new(NonCopy(10));
468 assert_eq!(m.into_inner(), NonCopy(10));
469 }
470
471 #[test]
472 fn test_into_inner_drop() {
473 struct Foo(Arc<AtomicUsize>);
474 impl Drop for Foo {
475 fn drop(&mut self) {
476 self.0.fetch_add(1, Ordering::SeqCst);
477 }
478 }
479 let num_drops = Arc::new(AtomicUsize::new(0));
480 let m = TicketMutex::<_>::new(Foo(num_drops.clone()));
481 assert_eq!(num_drops.load(Ordering::SeqCst), 0);
482 {
483 let _inner = m.into_inner();
484 assert_eq!(num_drops.load(Ordering::SeqCst), 0);
485 }
486 assert_eq!(num_drops.load(Ordering::SeqCst), 1);
487 }
488
489 #[test]
490 fn test_mutex_arc_nested() {
491 // Tests nested mutexes and access
492 // to underlying data.
493 let arc = Arc::new(TicketMutex::<_>::new(1));
494 let arc2 = Arc::new(TicketMutex::<_>::new(arc));
495 let (tx, rx) = channel();
496 let _t = thread::spawn(move || {
497 let lock = arc2.lock();
498 let lock2 = lock.lock();
499 assert_eq!(*lock2, 1);
500 tx.send(()).unwrap();
501 });
502 rx.recv().unwrap();
503 }
504
505 #[test]
506 fn test_mutex_arc_access_in_unwind() {
507 let arc = Arc::new(TicketMutex::<_>::new(1));
508 let arc2 = arc.clone();
509 let _ = thread::spawn(move || -> () {
510 struct Unwinder {
511 i: Arc<TicketMutex<i32>>,
512 }
513 impl Drop for Unwinder {
514 fn drop(&mut self) {
515 *self.i.lock() += 1;
516 }
517 }
518 let _u = Unwinder { i: arc2 };
519 panic!();
520 })
521 .join();
522 let lock = arc.lock();
523 assert_eq!(*lock, 2);
524 }
525
526 #[test]
527 fn test_mutex_unsized() {
528 let mutex: &TicketMutex<[i32]> = &TicketMutex::<_>::new([1, 2, 3]);
529 {
530 let b = &mut *mutex.lock();
531 b[0] = 4;
532 b[2] = 5;
533 }
534 let comp: &[i32] = &[4, 2, 5];
535 assert_eq!(&*mutex.lock(), comp);
536 }
537
538 #[test]
539 fn is_locked() {
540 let mutex = TicketMutex::<_>::new(());
541 assert!(!mutex.is_locked());
542 let lock = mutex.lock();
543 assert!(mutex.is_locked());
544 drop(lock);
545 assert!(!mutex.is_locked());
546 }
547}