parking_lot/
raw_rwlock.rs

1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::elision::{have_elision, AtomicElisionExt};
9use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::util;
11use core::{
12    cell::Cell,
13    sync::atomic::{AtomicUsize, Ordering},
14};
15use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
16use parking_lot_core::{
17    self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18};
19use std::time::{Duration, Instant};
20
21// This reader-writer lock implementation is based on Boost's upgrade_mutex:
22// https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
23//
24// This implementation uses 2 wait queues, one at key [addr] and one at key
25// [addr + 1]. The primary queue is used for all new waiting threads, and the
26// secondary queue is used by the thread which has acquired WRITER_BIT but is
27// waiting for the remaining readers to exit the lock.
28//
29// This implementation is fair between readers and writers since it uses the
30// order in which threads first started queuing to alternate between read phases
31// and write phases. In particular is it not vulnerable to write starvation
32// since readers will block if there is a pending writer.
33
34// There is at least one thread in the main queue.
35const PARKED_BIT: usize = 0b0001;
36// There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
37const WRITER_PARKED_BIT: usize = 0b0010;
38// A reader is holding an upgradable lock. The reader count must be non-zero and
39// WRITER_BIT must not be set.
40const UPGRADABLE_BIT: usize = 0b0100;
41// If the reader count is zero: a writer is currently holding an exclusive lock.
42// Otherwise: a writer is waiting for the remaining readers to exit the lock.
43const WRITER_BIT: usize = 0b1000;
44// Mask of bits used to count readers.
45const READERS_MASK: usize = !0b1111;
46// Base unit for counting readers.
47const ONE_READER: usize = 0b10000;
48
49// Token indicating what type of lock a queued thread is trying to acquire
50const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53
54/// Raw reader-writer lock type backed by the parking lot.
55pub struct RawRwLock {
56    state: AtomicUsize,
57}
58
59unsafe impl lock_api::RawRwLock for RawRwLock {
60    const INIT: RawRwLock = RawRwLock {
61        state: AtomicUsize::new(0),
62    };
63
64    type GuardMarker = crate::GuardMarker;
65
66    #[inline]
67    fn lock_exclusive(&self) {
68        if self
69            .state
70            .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71            .is_err()
72        {
73            let result = self.lock_exclusive_slow(None);
74            debug_assert!(result);
75        }
76        self.deadlock_acquire();
77    }
78
79    #[inline]
80    fn try_lock_exclusive(&self) -> bool {
81        if self
82            .state
83            .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84            .is_ok()
85        {
86            self.deadlock_acquire();
87            true
88        } else {
89            false
90        }
91    }
92
93    #[inline]
94    unsafe fn unlock_exclusive(&self) {
95        self.deadlock_release();
96        if self
97            .state
98            .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99            .is_ok()
100        {
101            return;
102        }
103        self.unlock_exclusive_slow(false);
104    }
105
106    #[inline]
107    fn lock_shared(&self) {
108        if !self.try_lock_shared_fast(false) {
109            let result = self.lock_shared_slow(false, None);
110            debug_assert!(result);
111        }
112        self.deadlock_acquire();
113    }
114
115    #[inline]
116    fn try_lock_shared(&self) -> bool {
117        let result = if self.try_lock_shared_fast(false) {
118            true
119        } else {
120            self.try_lock_shared_slow(false)
121        };
122        if result {
123            self.deadlock_acquire();
124        }
125        result
126    }
127
128    #[inline]
129    unsafe fn unlock_shared(&self) {
130        self.deadlock_release();
131        let state = if have_elision() {
132            self.state.elision_fetch_sub_release(ONE_READER)
133        } else {
134            self.state.fetch_sub(ONE_READER, Ordering::Release)
135        };
136        if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137            self.unlock_shared_slow();
138        }
139    }
140
141    #[inline]
142    fn is_locked(&self) -> bool {
143        let state = self.state.load(Ordering::Relaxed);
144        state & (WRITER_BIT | READERS_MASK) != 0
145    }
146
147    #[inline]
148    fn is_locked_exclusive(&self) -> bool {
149        let state = self.state.load(Ordering::Relaxed);
150        state & (WRITER_BIT) != 0
151    }
152}
153
154unsafe impl lock_api::RawRwLockFair for RawRwLock {
155    #[inline]
156    unsafe fn unlock_shared_fair(&self) {
157        // Shared unlocking is always fair in this implementation.
158        self.unlock_shared();
159    }
160
161    #[inline]
162    unsafe fn unlock_exclusive_fair(&self) {
163        self.deadlock_release();
164        if self
165            .state
166            .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
167            .is_ok()
168        {
169            return;
170        }
171        self.unlock_exclusive_slow(true);
172    }
173
174    #[inline]
175    unsafe fn bump_shared(&self) {
176        if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
177            == ONE_READER | WRITER_BIT
178        {
179            self.bump_shared_slow();
180        }
181    }
182
183    #[inline]
184    unsafe fn bump_exclusive(&self) {
185        if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
186            self.bump_exclusive_slow();
187        }
188    }
189}
190
191unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
192    #[inline]
193    unsafe fn downgrade(&self) {
194        let state = self
195            .state
196            .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
197
198        // Wake up parked shared and upgradable threads if there are any
199        if state & PARKED_BIT != 0 {
200            self.downgrade_slow();
201        }
202    }
203}
204
205unsafe impl lock_api::RawRwLockTimed for RawRwLock {
206    type Duration = Duration;
207    type Instant = Instant;
208
209    #[inline]
210    fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
211        let result = if self.try_lock_shared_fast(false) {
212            true
213        } else {
214            self.lock_shared_slow(false, util::to_deadline(timeout))
215        };
216        if result {
217            self.deadlock_acquire();
218        }
219        result
220    }
221
222    #[inline]
223    fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
224        let result = if self.try_lock_shared_fast(false) {
225            true
226        } else {
227            self.lock_shared_slow(false, Some(timeout))
228        };
229        if result {
230            self.deadlock_acquire();
231        }
232        result
233    }
234
235    #[inline]
236    fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
237        let result = if self
238            .state
239            .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
240            .is_ok()
241        {
242            true
243        } else {
244            self.lock_exclusive_slow(util::to_deadline(timeout))
245        };
246        if result {
247            self.deadlock_acquire();
248        }
249        result
250    }
251
252    #[inline]
253    fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
254        let result = if self
255            .state
256            .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
257            .is_ok()
258        {
259            true
260        } else {
261            self.lock_exclusive_slow(Some(timeout))
262        };
263        if result {
264            self.deadlock_acquire();
265        }
266        result
267    }
268}
269
270unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
271    #[inline]
272    fn lock_shared_recursive(&self) {
273        if !self.try_lock_shared_fast(true) {
274            let result = self.lock_shared_slow(true, None);
275            debug_assert!(result);
276        }
277        self.deadlock_acquire();
278    }
279
280    #[inline]
281    fn try_lock_shared_recursive(&self) -> bool {
282        let result = if self.try_lock_shared_fast(true) {
283            true
284        } else {
285            self.try_lock_shared_slow(true)
286        };
287        if result {
288            self.deadlock_acquire();
289        }
290        result
291    }
292}
293
294unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
295    #[inline]
296    fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
297        let result = if self.try_lock_shared_fast(true) {
298            true
299        } else {
300            self.lock_shared_slow(true, util::to_deadline(timeout))
301        };
302        if result {
303            self.deadlock_acquire();
304        }
305        result
306    }
307
308    #[inline]
309    fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
310        let result = if self.try_lock_shared_fast(true) {
311            true
312        } else {
313            self.lock_shared_slow(true, Some(timeout))
314        };
315        if result {
316            self.deadlock_acquire();
317        }
318        result
319    }
320}
321
322unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
323    #[inline]
324    fn lock_upgradable(&self) {
325        if !self.try_lock_upgradable_fast() {
326            let result = self.lock_upgradable_slow(None);
327            debug_assert!(result);
328        }
329        self.deadlock_acquire();
330    }
331
332    #[inline]
333    fn try_lock_upgradable(&self) -> bool {
334        let result = if self.try_lock_upgradable_fast() {
335            true
336        } else {
337            self.try_lock_upgradable_slow()
338        };
339        if result {
340            self.deadlock_acquire();
341        }
342        result
343    }
344
345    #[inline]
346    unsafe fn unlock_upgradable(&self) {
347        self.deadlock_release();
348        let state = self.state.load(Ordering::Relaxed);
349        #[allow(clippy::collapsible_if)]
350        if state & PARKED_BIT == 0 {
351            if self
352                .state
353                .compare_exchange_weak(
354                    state,
355                    state - (ONE_READER | UPGRADABLE_BIT),
356                    Ordering::Release,
357                    Ordering::Relaxed,
358                )
359                .is_ok()
360            {
361                return;
362            }
363        }
364        self.unlock_upgradable_slow(false);
365    }
366
367    #[inline]
368    unsafe fn upgrade(&self) {
369        let state = self.state.fetch_sub(
370            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
371            Ordering::Acquire,
372        );
373        if state & READERS_MASK != ONE_READER {
374            let result = self.upgrade_slow(None);
375            debug_assert!(result);
376        }
377    }
378
379    #[inline]
380    unsafe fn try_upgrade(&self) -> bool {
381        if self
382            .state
383            .compare_exchange_weak(
384                ONE_READER | UPGRADABLE_BIT,
385                WRITER_BIT,
386                Ordering::Acquire,
387                Ordering::Relaxed,
388            )
389            .is_ok()
390        {
391            true
392        } else {
393            self.try_upgrade_slow()
394        }
395    }
396}
397
398unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
399    #[inline]
400    unsafe fn unlock_upgradable_fair(&self) {
401        self.deadlock_release();
402        let state = self.state.load(Ordering::Relaxed);
403        #[allow(clippy::collapsible_if)]
404        if state & PARKED_BIT == 0 {
405            if self
406                .state
407                .compare_exchange_weak(
408                    state,
409                    state - (ONE_READER | UPGRADABLE_BIT),
410                    Ordering::Release,
411                    Ordering::Relaxed,
412                )
413                .is_ok()
414            {
415                return;
416            }
417        }
418        self.unlock_upgradable_slow(false);
419    }
420
421    #[inline]
422    unsafe fn bump_upgradable(&self) {
423        if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
424            self.bump_upgradable_slow();
425        }
426    }
427}
428
429unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
430    #[inline]
431    unsafe fn downgrade_upgradable(&self) {
432        let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
433
434        // Wake up parked upgradable threads if there are any
435        if state & PARKED_BIT != 0 {
436            self.downgrade_slow();
437        }
438    }
439
440    #[inline]
441    unsafe fn downgrade_to_upgradable(&self) {
442        let state = self.state.fetch_add(
443            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
444            Ordering::Release,
445        );
446
447        // Wake up parked shared threads if there are any
448        if state & PARKED_BIT != 0 {
449            self.downgrade_to_upgradable_slow();
450        }
451    }
452}
453
454unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
455    #[inline]
456    fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
457        let result = if self.try_lock_upgradable_fast() {
458            true
459        } else {
460            self.lock_upgradable_slow(Some(timeout))
461        };
462        if result {
463            self.deadlock_acquire();
464        }
465        result
466    }
467
468    #[inline]
469    fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
470        let result = if self.try_lock_upgradable_fast() {
471            true
472        } else {
473            self.lock_upgradable_slow(util::to_deadline(timeout))
474        };
475        if result {
476            self.deadlock_acquire();
477        }
478        result
479    }
480
481    #[inline]
482    unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
483        let state = self.state.fetch_sub(
484            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
485            Ordering::Relaxed,
486        );
487        if state & READERS_MASK == ONE_READER {
488            true
489        } else {
490            self.upgrade_slow(Some(timeout))
491        }
492    }
493
494    #[inline]
495    unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
496        let state = self.state.fetch_sub(
497            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
498            Ordering::Relaxed,
499        );
500        if state & READERS_MASK == ONE_READER {
501            true
502        } else {
503            self.upgrade_slow(util::to_deadline(timeout))
504        }
505    }
506}
507
508impl RawRwLock {
509    #[inline(always)]
510    fn try_lock_shared_fast(&self, recursive: bool) -> bool {
511        let state = self.state.load(Ordering::Relaxed);
512
513        // We can't allow grabbing a shared lock if there is a writer, even if
514        // the writer is still waiting for the remaining readers to exit.
515        if state & WRITER_BIT != 0 {
516            // To allow recursive locks, we make an exception and allow readers
517            // to skip ahead of a pending writer to avoid deadlocking, at the
518            // cost of breaking the fairness guarantees.
519            if !recursive || state & READERS_MASK == 0 {
520                return false;
521            }
522        }
523
524        // Use hardware lock elision to avoid cache conflicts when multiple
525        // readers try to acquire the lock. We only do this if the lock is
526        // completely empty since elision handles conflicts poorly.
527        if have_elision() && state == 0 {
528            self.state
529                .elision_compare_exchange_acquire(0, ONE_READER)
530                .is_ok()
531        } else if let Some(new_state) = state.checked_add(ONE_READER) {
532            self.state
533                .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
534                .is_ok()
535        } else {
536            false
537        }
538    }
539
540    #[cold]
541    fn try_lock_shared_slow(&self, recursive: bool) -> bool {
542        let mut state = self.state.load(Ordering::Relaxed);
543        loop {
544            // This mirrors the condition in try_lock_shared_fast
545            #[allow(clippy::collapsible_if)]
546            if state & WRITER_BIT != 0 {
547                if !recursive || state & READERS_MASK == 0 {
548                    return false;
549                }
550            }
551            if have_elision() && state == 0 {
552                match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
553                    Ok(_) => return true,
554                    Err(x) => state = x,
555                }
556            } else {
557                match self.state.compare_exchange_weak(
558                    state,
559                    state
560                        .checked_add(ONE_READER)
561                        .expect("RwLock reader count overflow"),
562                    Ordering::Acquire,
563                    Ordering::Relaxed,
564                ) {
565                    Ok(_) => return true,
566                    Err(x) => state = x,
567                }
568            }
569        }
570    }
571
572    #[inline(always)]
573    fn try_lock_upgradable_fast(&self) -> bool {
574        let state = self.state.load(Ordering::Relaxed);
575
576        // We can't grab an upgradable lock if there is already a writer or
577        // upgradable reader.
578        if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
579            return false;
580        }
581
582        if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
583            self.state
584                .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
585                .is_ok()
586        } else {
587            false
588        }
589    }
590
591    #[cold]
592    fn try_lock_upgradable_slow(&self) -> bool {
593        let mut state = self.state.load(Ordering::Relaxed);
594        loop {
595            // This mirrors the condition in try_lock_upgradable_fast
596            if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
597                return false;
598            }
599
600            match self.state.compare_exchange_weak(
601                state,
602                state
603                    .checked_add(ONE_READER | UPGRADABLE_BIT)
604                    .expect("RwLock reader count overflow"),
605                Ordering::Acquire,
606                Ordering::Relaxed,
607            ) {
608                Ok(_) => return true,
609                Err(x) => state = x,
610            }
611        }
612    }
613
614    #[cold]
615    fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
616        let try_lock = |state: &mut usize| {
617            loop {
618                if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
619                    return false;
620                }
621
622                // Grab WRITER_BIT if it isn't set, even if there are parked threads.
623                match self.state.compare_exchange_weak(
624                    *state,
625                    *state | WRITER_BIT,
626                    Ordering::Acquire,
627                    Ordering::Relaxed,
628                ) {
629                    Ok(_) => return true,
630                    Err(x) => *state = x,
631                }
632            }
633        };
634
635        // Step 1: grab exclusive ownership of WRITER_BIT
636        let timed_out = !self.lock_common(
637            timeout,
638            TOKEN_EXCLUSIVE,
639            try_lock,
640            WRITER_BIT | UPGRADABLE_BIT,
641        );
642        if timed_out {
643            return false;
644        }
645
646        // Step 2: wait for all remaining readers to exit the lock.
647        self.wait_for_readers(timeout, 0)
648    }
649
650    #[cold]
651    fn unlock_exclusive_slow(&self, force_fair: bool) {
652        // There are threads to unpark. Try to unpark as many as we can.
653        let callback = |mut new_state, result: UnparkResult| {
654            // If we are using a fair unlock then we should keep the
655            // rwlock locked and hand it off to the unparked threads.
656            if result.unparked_threads != 0 && (force_fair || result.be_fair) {
657                if result.have_more_threads {
658                    new_state |= PARKED_BIT;
659                }
660                self.state.store(new_state, Ordering::Release);
661                TOKEN_HANDOFF
662            } else {
663                // Clear the parked bit if there are no more parked threads.
664                if result.have_more_threads {
665                    self.state.store(PARKED_BIT, Ordering::Release);
666                } else {
667                    self.state.store(0, Ordering::Release);
668                }
669                TOKEN_NORMAL
670            }
671        };
672        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
673        unsafe {
674            self.wake_parked_threads(0, callback);
675        }
676    }
677
678    #[cold]
679    fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
680        let try_lock = |state: &mut usize| {
681            let mut spinwait_shared = SpinWait::new();
682            loop {
683                // Use hardware lock elision to avoid cache conflicts when multiple
684                // readers try to acquire the lock. We only do this if the lock is
685                // completely empty since elision handles conflicts poorly.
686                if have_elision() && *state == 0 {
687                    match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
688                        Ok(_) => return true,
689                        Err(x) => *state = x,
690                    }
691                }
692
693                // This is the same condition as try_lock_shared_fast
694                #[allow(clippy::collapsible_if)]
695                if *state & WRITER_BIT != 0 {
696                    if !recursive || *state & READERS_MASK == 0 {
697                        return false;
698                    }
699                }
700
701                if self
702                    .state
703                    .compare_exchange_weak(
704                        *state,
705                        state
706                            .checked_add(ONE_READER)
707                            .expect("RwLock reader count overflow"),
708                        Ordering::Acquire,
709                        Ordering::Relaxed,
710                    )
711                    .is_ok()
712                {
713                    return true;
714                }
715
716                // If there is high contention on the reader count then we want
717                // to leave some time between attempts to acquire the lock to
718                // let other threads make progress.
719                spinwait_shared.spin_no_yield();
720                *state = self.state.load(Ordering::Relaxed);
721            }
722        };
723        self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
724    }
725
726    #[cold]
727    fn unlock_shared_slow(&self) {
728        // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
729        // just need to wake up a potentially sleeping pending writer.
730        // Using the 2nd key at addr + 1
731        let addr = self as *const _ as usize + 1;
732        let callback = |_result: UnparkResult| {
733            // Clear the WRITER_PARKED_BIT here since there can only be one
734            // parked writer thread.
735            self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
736            TOKEN_NORMAL
737        };
738        // SAFETY:
739        //   * `addr` is an address we control.
740        //   * `callback` does not panic or call into any function of `parking_lot`.
741        unsafe {
742            parking_lot_core::unpark_one(addr, callback);
743        }
744    }
745
746    #[cold]
747    fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
748        let try_lock = |state: &mut usize| {
749            let mut spinwait_shared = SpinWait::new();
750            loop {
751                if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
752                    return false;
753                }
754
755                if self
756                    .state
757                    .compare_exchange_weak(
758                        *state,
759                        state
760                            .checked_add(ONE_READER | UPGRADABLE_BIT)
761                            .expect("RwLock reader count overflow"),
762                        Ordering::Acquire,
763                        Ordering::Relaxed,
764                    )
765                    .is_ok()
766                {
767                    return true;
768                }
769
770                // If there is high contention on the reader count then we want
771                // to leave some time between attempts to acquire the lock to
772                // let other threads make progress.
773                spinwait_shared.spin_no_yield();
774                *state = self.state.load(Ordering::Relaxed);
775            }
776        };
777        self.lock_common(
778            timeout,
779            TOKEN_UPGRADABLE,
780            try_lock,
781            WRITER_BIT | UPGRADABLE_BIT,
782        )
783    }
784
785    #[cold]
786    fn unlock_upgradable_slow(&self, force_fair: bool) {
787        // Just release the lock if there are no parked threads.
788        let mut state = self.state.load(Ordering::Relaxed);
789        while state & PARKED_BIT == 0 {
790            match self.state.compare_exchange_weak(
791                state,
792                state - (ONE_READER | UPGRADABLE_BIT),
793                Ordering::Release,
794                Ordering::Relaxed,
795            ) {
796                Ok(_) => return,
797                Err(x) => state = x,
798            }
799        }
800
801        // There are threads to unpark. Try to unpark as many as we can.
802        let callback = |new_state, result: UnparkResult| {
803            // If we are using a fair unlock then we should keep the
804            // rwlock locked and hand it off to the unparked threads.
805            let mut state = self.state.load(Ordering::Relaxed);
806            if force_fair || result.be_fair {
807                // Fall back to normal unpark on overflow. Panicking is
808                // not allowed in parking_lot callbacks.
809                while let Some(mut new_state) =
810                    (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
811                {
812                    if result.have_more_threads {
813                        new_state |= PARKED_BIT;
814                    } else {
815                        new_state &= !PARKED_BIT;
816                    }
817                    match self.state.compare_exchange_weak(
818                        state,
819                        new_state,
820                        Ordering::Relaxed,
821                        Ordering::Relaxed,
822                    ) {
823                        Ok(_) => return TOKEN_HANDOFF,
824                        Err(x) => state = x,
825                    }
826                }
827            }
828
829            // Otherwise just release the upgradable lock and update PARKED_BIT.
830            loop {
831                let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
832                if result.have_more_threads {
833                    new_state |= PARKED_BIT;
834                } else {
835                    new_state &= !PARKED_BIT;
836                }
837                match self.state.compare_exchange_weak(
838                    state,
839                    new_state,
840                    Ordering::Relaxed,
841                    Ordering::Relaxed,
842                ) {
843                    Ok(_) => return TOKEN_NORMAL,
844                    Err(x) => state = x,
845                }
846            }
847        };
848        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
849        unsafe {
850            self.wake_parked_threads(0, callback);
851        }
852    }
853
854    #[cold]
855    fn try_upgrade_slow(&self) -> bool {
856        let mut state = self.state.load(Ordering::Relaxed);
857        loop {
858            if state & READERS_MASK != ONE_READER {
859                return false;
860            }
861            match self.state.compare_exchange_weak(
862                state,
863                state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
864                Ordering::Relaxed,
865                Ordering::Relaxed,
866            ) {
867                Ok(_) => return true,
868                Err(x) => state = x,
869            }
870        }
871    }
872
873    #[cold]
874    fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
875        self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
876    }
877
878    #[cold]
879    fn downgrade_slow(&self) {
880        // We only reach this point if PARKED_BIT is set.
881        let callback = |_, result: UnparkResult| {
882            // Clear the parked bit if there no more parked threads
883            if !result.have_more_threads {
884                self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
885            }
886            TOKEN_NORMAL
887        };
888        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
889        unsafe {
890            self.wake_parked_threads(ONE_READER, callback);
891        }
892    }
893
894    #[cold]
895    fn downgrade_to_upgradable_slow(&self) {
896        // We only reach this point if PARKED_BIT is set.
897        let callback = |_, result: UnparkResult| {
898            // Clear the parked bit if there no more parked threads
899            if !result.have_more_threads {
900                self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
901            }
902            TOKEN_NORMAL
903        };
904        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
905        unsafe {
906            self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
907        }
908    }
909
910    #[cold]
911    unsafe fn bump_shared_slow(&self) {
912        self.unlock_shared();
913        self.lock_shared();
914    }
915
916    #[cold]
917    fn bump_exclusive_slow(&self) {
918        self.deadlock_release();
919        self.unlock_exclusive_slow(true);
920        self.lock_exclusive();
921    }
922
923    #[cold]
924    fn bump_upgradable_slow(&self) {
925        self.deadlock_release();
926        self.unlock_upgradable_slow(true);
927        self.lock_upgradable();
928    }
929
930    /// Common code for waking up parked threads after releasing `WRITER_BIT` or
931    /// `UPGRADABLE_BIT`.
932    ///
933    /// # Safety
934    ///
935    /// `callback` must uphold the requirements of the `callback` parameter to
936    /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
937    /// `parking_lot`.
938    #[inline]
939    unsafe fn wake_parked_threads(
940        &self,
941        new_state: usize,
942        callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
943    ) {
944        // We must wake up at least one upgrader or writer if there is one,
945        // otherwise they may end up parked indefinitely since unlock_shared
946        // does not call wake_parked_threads.
947        let new_state = Cell::new(new_state);
948        let addr = self as *const _ as usize;
949        let filter = |ParkToken(token)| {
950            let s = new_state.get();
951
952            // If we are waking up a writer, don't wake anything else.
953            if s & WRITER_BIT != 0 {
954                return FilterOp::Stop;
955            }
956
957            // Otherwise wake *all* readers and one upgrader/writer.
958            if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
959                // Skip writers and upgradable readers if we already have
960                // a writer/upgradable reader.
961                FilterOp::Skip
962            } else {
963                new_state.set(s + token);
964                FilterOp::Unpark
965            }
966        };
967        let callback = |result| callback(new_state.get(), result);
968        // SAFETY:
969        // * `addr` is an address we control.
970        // * `filter` does not panic or call into any function of `parking_lot`.
971        // * `callback` safety responsibility is on caller
972        parking_lot_core::unpark_filter(addr, filter, callback);
973    }
974
975    // Common code for waiting for readers to exit the lock after acquiring
976    // WRITER_BIT.
977    #[inline]
978    fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
979        // At this point WRITER_BIT is already set, we just need to wait for the
980        // remaining readers to exit the lock.
981        let mut spinwait = SpinWait::new();
982        let mut state = self.state.load(Ordering::Acquire);
983        while state & READERS_MASK != 0 {
984            // Spin a few times to wait for readers to exit
985            if spinwait.spin() {
986                state = self.state.load(Ordering::Acquire);
987                continue;
988            }
989
990            // Set the parked bit
991            if state & WRITER_PARKED_BIT == 0 {
992                if let Err(x) = self.state.compare_exchange_weak(
993                    state,
994                    state | WRITER_PARKED_BIT,
995                    Ordering::Acquire,
996                    Ordering::Acquire,
997                ) {
998                    state = x;
999                    continue;
1000                }
1001            }
1002
1003            // Park our thread until we are woken up by an unlock
1004            // Using the 2nd key at addr + 1
1005            let addr = self as *const _ as usize + 1;
1006            let validate = || {
1007                let state = self.state.load(Ordering::Relaxed);
1008                state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1009            };
1010            let before_sleep = || {};
1011            let timed_out = |_, _| {};
1012            // SAFETY:
1013            //   * `addr` is an address we control.
1014            //   * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1015            //   * `before_sleep` does not call `park`, nor does it panic.
1016            let park_result = unsafe {
1017                parking_lot_core::park(
1018                    addr,
1019                    validate,
1020                    before_sleep,
1021                    timed_out,
1022                    TOKEN_EXCLUSIVE,
1023                    timeout,
1024                )
1025            };
1026            match park_result {
1027                // We still need to re-check the state if we are unparked
1028                // since a previous writer timing-out could have allowed
1029                // another reader to sneak in before we parked.
1030                ParkResult::Unparked(_) | ParkResult::Invalid => {
1031                    state = self.state.load(Ordering::Acquire);
1032                    continue;
1033                }
1034
1035                // Timeout expired
1036                ParkResult::TimedOut => {
1037                    // We need to release WRITER_BIT and revert back to
1038                    // our previous value. We also wake up any threads that
1039                    // might be waiting on WRITER_BIT.
1040                    let state = self.state.fetch_add(
1041                        prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1042                        Ordering::Relaxed,
1043                    );
1044                    if state & PARKED_BIT != 0 {
1045                        let callback = |_, result: UnparkResult| {
1046                            // Clear the parked bit if there no more parked threads
1047                            if !result.have_more_threads {
1048                                self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1049                            }
1050                            TOKEN_NORMAL
1051                        };
1052                        // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1053                        unsafe {
1054                            self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1055                        }
1056                    }
1057                    return false;
1058                }
1059            }
1060        }
1061        true
1062    }
1063
1064    /// Common code for acquiring a lock
1065    #[inline]
1066    fn lock_common(
1067        &self,
1068        timeout: Option<Instant>,
1069        token: ParkToken,
1070        mut try_lock: impl FnMut(&mut usize) -> bool,
1071        validate_flags: usize,
1072    ) -> bool {
1073        let mut spinwait = SpinWait::new();
1074        let mut state = self.state.load(Ordering::Relaxed);
1075        loop {
1076            // Attempt to grab the lock
1077            if try_lock(&mut state) {
1078                return true;
1079            }
1080
1081            // If there are no parked threads, try spinning a few times.
1082            if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1083                state = self.state.load(Ordering::Relaxed);
1084                continue;
1085            }
1086
1087            // Set the parked bit
1088            if state & PARKED_BIT == 0 {
1089                if let Err(x) = self.state.compare_exchange_weak(
1090                    state,
1091                    state | PARKED_BIT,
1092                    Ordering::Relaxed,
1093                    Ordering::Relaxed,
1094                ) {
1095                    state = x;
1096                    continue;
1097                }
1098            }
1099
1100            // Park our thread until we are woken up by an unlock
1101            let addr = self as *const _ as usize;
1102            let validate = || {
1103                let state = self.state.load(Ordering::Relaxed);
1104                state & PARKED_BIT != 0 && (state & validate_flags != 0)
1105            };
1106            let before_sleep = || {};
1107            let timed_out = |_, was_last_thread| {
1108                // Clear the parked bit if we were the last parked thread
1109                if was_last_thread {
1110                    self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1111                }
1112            };
1113
1114            // SAFETY:
1115            // * `addr` is an address we control.
1116            // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1117            // * `before_sleep` does not call `park`, nor does it panic.
1118            let park_result = unsafe {
1119                parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1120            };
1121            match park_result {
1122                // The thread that unparked us passed the lock on to us
1123                // directly without unlocking it.
1124                ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1125
1126                // We were unparked normally, try acquiring the lock again
1127                ParkResult::Unparked(_) => (),
1128
1129                // The validation function failed, try locking again
1130                ParkResult::Invalid => (),
1131
1132                // Timeout expired
1133                ParkResult::TimedOut => return false,
1134            }
1135
1136            // Loop back and try locking again
1137            spinwait.reset();
1138            state = self.state.load(Ordering::Relaxed);
1139        }
1140    }
1141
1142    #[inline]
1143    fn deadlock_acquire(&self) {
1144        unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1145        unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1146    }
1147
1148    #[inline]
1149    fn deadlock_release(&self) {
1150        unsafe { deadlock::release_resource(self as *const _ as usize) };
1151        unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1152    }
1153}