spin/mutex/ticket.rs
1//! A ticket-based mutex.
2//!
3//! Waiting threads take a 'ticket' from the lock in the order they arrive and gain access to the lock when their
4//! ticket is next in the queue. Best-case latency is slightly worse than a regular spinning mutex, but worse-case
5//! latency is infinitely better. Waiting threads simply need to wait for all threads that come before them in the
6//! queue to finish.
7
8use crate::{
9 atomic::{AtomicUsize, Ordering},
10 RelaxStrategy, Spin,
11};
12use core::{
13 cell::UnsafeCell,
14 fmt,
15 marker::PhantomData,
16 ops::{Deref, DerefMut},
17};
18
19/// A spin-based [ticket lock](https://en.wikipedia.org/wiki/Ticket_lock) providing mutually exclusive access to data.
20///
21/// A ticket lock is analogous to a queue management system for lock requests. When a thread tries to take a lock, it
22/// is assigned a 'ticket'. It then spins until its ticket becomes next in line. When the lock guard is released, the
23/// next ticket will be processed.
24///
25/// Ticket locks significantly reduce the worse-case performance of locking at the cost of slightly higher average-time
26/// overhead.
27///
28/// # Example
29///
30/// ```
31/// use spin;
32///
33/// let lock = spin::mutex::TicketMutex::<_>::new(0);
34///
35/// // Modify the data
36/// *lock.lock() = 2;
37///
38/// // Read the data
39/// let answer = *lock.lock();
40/// assert_eq!(answer, 2);
41/// ```
42///
43/// # Thread safety example
44///
45/// ```
46/// use spin;
47/// use std::sync::{Arc, Barrier};
48///
49/// let thread_count = 1000;
50/// let spin_mutex = Arc::new(spin::mutex::TicketMutex::<_>::new(0));
51///
52/// // We use a barrier to ensure the readout happens after all writing
53/// let barrier = Arc::new(Barrier::new(thread_count + 1));
54///
55/// for _ in (0..thread_count) {
56/// let my_barrier = barrier.clone();
57/// let my_lock = spin_mutex.clone();
58/// std::thread::spawn(move || {
59/// let mut guard = my_lock.lock();
60/// *guard += 1;
61///
62/// // Release the lock to prevent a deadlock
63/// drop(guard);
64/// my_barrier.wait();
65/// });
66/// }
67///
68/// barrier.wait();
69///
70/// let answer = { *spin_mutex.lock() };
71/// assert_eq!(answer, thread_count);
72/// ```
73pub struct TicketMutex<T: ?Sized, R = Spin> {
74 phantom: PhantomData<R>,
75 next_ticket: AtomicUsize,
76 next_serving: AtomicUsize,
77 data: UnsafeCell<T>,
78}
79
80/// A guard that protects some data.
81///
82/// When the guard is dropped, the next ticket will be processed.
83pub struct TicketMutexGuard<'a, T: ?Sized + 'a> {
84 next_serving: &'a AtomicUsize,
85 ticket: usize,
86 data: &'a mut T,
87}
88
89unsafe impl<T: ?Sized + Send, R> Sync for TicketMutex<T, R> {}
90unsafe impl<T: ?Sized + Send, R> Send for TicketMutex<T, R> {}
91
92impl<T, R> TicketMutex<T, R> {
93 /// Creates a new [`TicketMutex`] wrapping the supplied data.
94 ///
95 /// # Example
96 ///
97 /// ```
98 /// use spin::mutex::TicketMutex;
99 ///
100 /// static MUTEX: TicketMutex<()> = TicketMutex::<_>::new(());
101 ///
102 /// fn demo() {
103 /// let lock = MUTEX.lock();
104 /// // do something with lock
105 /// drop(lock);
106 /// }
107 /// ```
108 #[inline(always)]
109 pub const fn new(data: T) -> Self {
110 Self {
111 phantom: PhantomData,
112 next_ticket: AtomicUsize::new(0),
113 next_serving: AtomicUsize::new(0),
114 data: UnsafeCell::new(data),
115 }
116 }
117
118 /// Consumes this [`TicketMutex`] and unwraps the underlying data.
119 ///
120 /// # Example
121 ///
122 /// ```
123 /// let lock = spin::mutex::TicketMutex::<_>::new(42);
124 /// assert_eq!(42, lock.into_inner());
125 /// ```
126 #[inline(always)]
127 pub fn into_inner(self) -> T {
128 self.data.into_inner()
129 }
130 /// Returns a mutable pointer to the underying data.
131 ///
132 /// This is mostly meant to be used for applications which require manual unlocking, but where
133 /// storing both the lock and the pointer to the inner data gets inefficient.
134 ///
135 /// # Example
136 /// ```
137 /// let lock = spin::mutex::SpinMutex::<_>::new(42);
138 ///
139 /// unsafe {
140 /// core::mem::forget(lock.lock());
141 ///
142 /// assert_eq!(lock.as_mut_ptr().read(), 42);
143 /// lock.as_mut_ptr().write(58);
144 ///
145 /// lock.force_unlock();
146 /// }
147 ///
148 /// assert_eq!(*lock.lock(), 58);
149 ///
150 /// ```
151 #[inline(always)]
152 pub fn as_mut_ptr(&self) -> *mut T {
153 self.data.get()
154 }
155}
156
157impl<T: ?Sized + fmt::Debug, R> fmt::Debug for TicketMutex<T, R> {
158 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159 match self.try_lock() {
160 Some(guard) => write!(f, "Mutex {{ data: ")
161 .and_then(|()| (&*guard).fmt(f))
162 .and_then(|()| write!(f, "}}")),
163 None => write!(f, "Mutex {{ <locked> }}"),
164 }
165 }
166}
167
168impl<T: ?Sized, R: RelaxStrategy> TicketMutex<T, R> {
169 /// Locks the [`TicketMutex`] and returns a guard that permits access to the inner data.
170 ///
171 /// The returned data may be dereferenced for data access
172 /// and the lock will be dropped when the guard falls out of scope.
173 ///
174 /// ```
175 /// let lock = spin::mutex::TicketMutex::<_>::new(0);
176 /// {
177 /// let mut data = lock.lock();
178 /// // The lock is now locked and the data can be accessed
179 /// *data += 1;
180 /// // The lock is implicitly dropped at the end of the scope
181 /// }
182 /// ```
183 #[inline(always)]
184 pub fn lock(&self) -> TicketMutexGuard<T> {
185 let ticket = self.next_ticket.fetch_add(1, Ordering::Relaxed);
186
187 while self.next_serving.load(Ordering::Acquire) != ticket {
188 R::relax();
189 }
190
191 TicketMutexGuard {
192 next_serving: &self.next_serving,
193 ticket,
194 // Safety
195 // We know that we are the next ticket to be served,
196 // so there's no other thread accessing the data.
197 //
198 // Every other thread has another ticket number so it's
199 // definitely stuck in the spin loop above.
200 data: unsafe { &mut *self.data.get() },
201 }
202 }
203}
204
205impl<T: ?Sized, R> TicketMutex<T, R> {
206 /// Returns `true` if the lock is currently held.
207 ///
208 /// # Safety
209 ///
210 /// This function provides no synchronization guarantees and so its result should be considered 'out of date'
211 /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic.
212 #[inline(always)]
213 pub fn is_locked(&self) -> bool {
214 let ticket = self.next_ticket.load(Ordering::Relaxed);
215 self.next_serving.load(Ordering::Relaxed) != ticket
216 }
217
218 /// Force unlock this [`TicketMutex`], by serving the next ticket.
219 ///
220 /// # Safety
221 ///
222 /// This is *extremely* unsafe if the lock is not held by the current
223 /// thread. However, this can be useful in some instances for exposing the
224 /// lock to FFI that doesn't know how to deal with RAII.
225 #[inline(always)]
226 pub unsafe fn force_unlock(&self) {
227 self.next_serving.fetch_add(1, Ordering::Release);
228 }
229
230 /// Try to lock this [`TicketMutex`], returning a lock guard if successful.
231 ///
232 /// # Example
233 ///
234 /// ```
235 /// let lock = spin::mutex::TicketMutex::<_>::new(42);
236 ///
237 /// let maybe_guard = lock.try_lock();
238 /// assert!(maybe_guard.is_some());
239 ///
240 /// // `maybe_guard` is still held, so the second call fails
241 /// let maybe_guard2 = lock.try_lock();
242 /// assert!(maybe_guard2.is_none());
243 /// ```
244 #[inline(always)]
245 pub fn try_lock(&self) -> Option<TicketMutexGuard<T>> {
246 let ticket = self
247 .next_ticket
248 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |ticket| {
249 if self.next_serving.load(Ordering::Acquire) == ticket {
250 Some(ticket + 1)
251 } else {
252 None
253 }
254 });
255
256 ticket.ok().map(|ticket| TicketMutexGuard {
257 next_serving: &self.next_serving,
258 ticket,
259 // Safety
260 // We have a ticket that is equal to the next_serving ticket, so we know:
261 // - that no other thread can have the same ticket id as this thread
262 // - that we are the next one to be served so we have exclusive access to the data
263 data: unsafe { &mut *self.data.get() },
264 })
265 }
266
267 /// Returns a mutable reference to the underlying data.
268 ///
269 /// Since this call borrows the [`TicketMutex`] mutably, and a mutable reference is guaranteed to be exclusive in
270 /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As
271 /// such, this is a 'zero-cost' operation.
272 ///
273 /// # Example
274 ///
275 /// ```
276 /// let mut lock = spin::mutex::TicketMutex::<_>::new(0);
277 /// *lock.get_mut() = 10;
278 /// assert_eq!(*lock.lock(), 10);
279 /// ```
280 #[inline(always)]
281 pub fn get_mut(&mut self) -> &mut T {
282 // Safety:
283 // We know that there are no other references to `self`,
284 // so it's safe to return a exclusive reference to the data.
285 unsafe { &mut *self.data.get() }
286 }
287}
288
289impl<T: ?Sized + Default, R> Default for TicketMutex<T, R> {
290 fn default() -> Self {
291 Self::new(Default::default())
292 }
293}
294
295impl<T, R> From<T> for TicketMutex<T, R> {
296 fn from(data: T) -> Self {
297 Self::new(data)
298 }
299}
300
301impl<'a, T: ?Sized> TicketMutexGuard<'a, T> {
302 /// Leak the lock guard, yielding a mutable reference to the underlying data.
303 ///
304 /// Note that this function will permanently lock the original [`TicketMutex`].
305 ///
306 /// ```
307 /// let mylock = spin::mutex::TicketMutex::<_>::new(0);
308 ///
309 /// let data: &mut i32 = spin::mutex::TicketMutexGuard::leak(mylock.lock());
310 ///
311 /// *data = 1;
312 /// assert_eq!(*data, 1);
313 /// ```
314 #[inline(always)]
315 pub fn leak(this: Self) -> &'a mut T {
316 let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing
317 core::mem::forget(this);
318 unsafe { &mut *data }
319 }
320}
321
322impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for TicketMutexGuard<'a, T> {
323 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
324 fmt::Debug::fmt(&**self, f)
325 }
326}
327
328impl<'a, T: ?Sized + fmt::Display> fmt::Display for TicketMutexGuard<'a, T> {
329 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
330 fmt::Display::fmt(&**self, f)
331 }
332}
333
334impl<'a, T: ?Sized> Deref for TicketMutexGuard<'a, T> {
335 type Target = T;
336 fn deref(&self) -> &T {
337 self.data
338 }
339}
340
341impl<'a, T: ?Sized> DerefMut for TicketMutexGuard<'a, T> {
342 fn deref_mut(&mut self) -> &mut T {
343 self.data
344 }
345}
346
347impl<'a, T: ?Sized> Drop for TicketMutexGuard<'a, T> {
348 fn drop(&mut self) {
349 let new_ticket = self.ticket + 1;
350 self.next_serving.store(new_ticket, Ordering::Release);
351 }
352}
353
354#[cfg(feature = "lock_api")]
355unsafe impl<R: RelaxStrategy> lock_api_crate::RawMutex for TicketMutex<(), R> {
356 type GuardMarker = lock_api_crate::GuardSend;
357
358 const INIT: Self = Self::new(());
359
360 fn lock(&self) {
361 // Prevent guard destructor running
362 core::mem::forget(Self::lock(self));
363 }
364
365 fn try_lock(&self) -> bool {
366 // Prevent guard destructor running
367 Self::try_lock(self).map(core::mem::forget).is_some()
368 }
369
370 unsafe fn unlock(&self) {
371 self.force_unlock();
372 }
373
374 fn is_locked(&self) -> bool {
375 Self::is_locked(self)
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use std::prelude::v1::*;
382
383 use std::sync::atomic::{AtomicUsize, Ordering};
384 use std::sync::mpsc::channel;
385 use std::sync::Arc;
386 use std::thread;
387
388 type TicketMutex<T> = super::TicketMutex<T>;
389
390 #[derive(Eq, PartialEq, Debug)]
391 struct NonCopy(i32);
392
393 #[test]
394 fn smoke() {
395 let m = TicketMutex::<_>::new(());
396 drop(m.lock());
397 drop(m.lock());
398 }
399
400 #[test]
401 fn lots_and_lots() {
402 static M: TicketMutex<()> = TicketMutex::<_>::new(());
403 static mut CNT: u32 = 0;
404 const J: u32 = 1000;
405 const K: u32 = 3;
406
407 fn inc() {
408 for _ in 0..J {
409 unsafe {
410 let _g = M.lock();
411 CNT += 1;
412 }
413 }
414 }
415
416 let (tx, rx) = channel();
417 for _ in 0..K {
418 let tx2 = tx.clone();
419 thread::spawn(move || {
420 inc();
421 tx2.send(()).unwrap();
422 });
423 let tx2 = tx.clone();
424 thread::spawn(move || {
425 inc();
426 tx2.send(()).unwrap();
427 });
428 }
429
430 drop(tx);
431 for _ in 0..2 * K {
432 rx.recv().unwrap();
433 }
434 assert_eq!(unsafe { CNT }, J * K * 2);
435 }
436
437 #[test]
438 fn try_lock() {
439 let mutex = TicketMutex::<_>::new(42);
440
441 // First lock succeeds
442 let a = mutex.try_lock();
443 assert_eq!(a.as_ref().map(|r| **r), Some(42));
444
445 // Additional lock fails
446 let b = mutex.try_lock();
447 assert!(b.is_none());
448
449 // After dropping lock, it succeeds again
450 ::core::mem::drop(a);
451 let c = mutex.try_lock();
452 assert_eq!(c.as_ref().map(|r| **r), Some(42));
453 }
454
455 #[test]
456 fn test_into_inner() {
457 let m = TicketMutex::<_>::new(NonCopy(10));
458 assert_eq!(m.into_inner(), NonCopy(10));
459 }
460
461 #[test]
462 fn test_into_inner_drop() {
463 struct Foo(Arc<AtomicUsize>);
464 impl Drop for Foo {
465 fn drop(&mut self) {
466 self.0.fetch_add(1, Ordering::SeqCst);
467 }
468 }
469 let num_drops = Arc::new(AtomicUsize::new(0));
470 let m = TicketMutex::<_>::new(Foo(num_drops.clone()));
471 assert_eq!(num_drops.load(Ordering::SeqCst), 0);
472 {
473 let _inner = m.into_inner();
474 assert_eq!(num_drops.load(Ordering::SeqCst), 0);
475 }
476 assert_eq!(num_drops.load(Ordering::SeqCst), 1);
477 }
478
479 #[test]
480 fn test_mutex_arc_nested() {
481 // Tests nested mutexes and access
482 // to underlying data.
483 let arc = Arc::new(TicketMutex::<_>::new(1));
484 let arc2 = Arc::new(TicketMutex::<_>::new(arc));
485 let (tx, rx) = channel();
486 let _t = thread::spawn(move || {
487 let lock = arc2.lock();
488 let lock2 = lock.lock();
489 assert_eq!(*lock2, 1);
490 tx.send(()).unwrap();
491 });
492 rx.recv().unwrap();
493 }
494
495 #[test]
496 fn test_mutex_arc_access_in_unwind() {
497 let arc = Arc::new(TicketMutex::<_>::new(1));
498 let arc2 = arc.clone();
499 let _ = thread::spawn(move || -> () {
500 struct Unwinder {
501 i: Arc<TicketMutex<i32>>,
502 }
503 impl Drop for Unwinder {
504 fn drop(&mut self) {
505 *self.i.lock() += 1;
506 }
507 }
508 let _u = Unwinder { i: arc2 };
509 panic!();
510 })
511 .join();
512 let lock = arc.lock();
513 assert_eq!(*lock, 2);
514 }
515
516 #[test]
517 fn test_mutex_unsized() {
518 let mutex: &TicketMutex<[i32]> = &TicketMutex::<_>::new([1, 2, 3]);
519 {
520 let b = &mut *mutex.lock();
521 b[0] = 4;
522 b[2] = 5;
523 }
524 let comp: &[i32] = &[4, 2, 5];
525 assert_eq!(&*mutex.lock(), comp);
526 }
527
528 #[test]
529 fn is_locked() {
530 let mutex = TicketMutex::<_>::new(());
531 assert!(!mutex.is_locked());
532 let lock = mutex.lock();
533 assert!(mutex.is_locked());
534 drop(lock);
535 assert!(!mutex.is_locked());
536 }
537}