1use crate::elision::{have_elision, AtomicElisionExt};
9use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::util;
11use core::{
12 cell::Cell,
13 sync::atomic::{AtomicUsize, Ordering},
14};
15use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
16use parking_lot_core::{
17 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18};
19use std::time::{Duration, Instant};
20
21const PARKED_BIT: usize = 0b0001;
36const WRITER_PARKED_BIT: usize = 0b0010;
38const UPGRADABLE_BIT: usize = 0b0100;
41const WRITER_BIT: usize = 0b1000;
44const READERS_MASK: usize = !0b1111;
46const ONE_READER: usize = 0b10000;
48
49const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53
54pub struct RawRwLock {
56 state: AtomicUsize,
57}
58
59unsafe impl lock_api::RawRwLock for RawRwLock {
60 const INIT: RawRwLock = RawRwLock {
61 state: AtomicUsize::new(0),
62 };
63
64 type GuardMarker = crate::GuardMarker;
65
66 #[inline]
67 fn lock_exclusive(&self) {
68 if self
69 .state
70 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71 .is_err()
72 {
73 let result = self.lock_exclusive_slow(None);
74 debug_assert!(result);
75 }
76 self.deadlock_acquire();
77 }
78
79 #[inline]
80 fn try_lock_exclusive(&self) -> bool {
81 if self
82 .state
83 .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84 .is_ok()
85 {
86 self.deadlock_acquire();
87 true
88 } else {
89 false
90 }
91 }
92
93 #[inline]
94 unsafe fn unlock_exclusive(&self) {
95 self.deadlock_release();
96 if self
97 .state
98 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99 .is_ok()
100 {
101 return;
102 }
103 self.unlock_exclusive_slow(false);
104 }
105
106 #[inline]
107 fn lock_shared(&self) {
108 if !self.try_lock_shared_fast(false) {
109 let result = self.lock_shared_slow(false, None);
110 debug_assert!(result);
111 }
112 self.deadlock_acquire();
113 }
114
115 #[inline]
116 fn try_lock_shared(&self) -> bool {
117 let result = if self.try_lock_shared_fast(false) {
118 true
119 } else {
120 self.try_lock_shared_slow(false)
121 };
122 if result {
123 self.deadlock_acquire();
124 }
125 result
126 }
127
128 #[inline]
129 unsafe fn unlock_shared(&self) {
130 self.deadlock_release();
131 let state = if have_elision() {
132 self.state.elision_fetch_sub_release(ONE_READER)
133 } else {
134 self.state.fetch_sub(ONE_READER, Ordering::Release)
135 };
136 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137 self.unlock_shared_slow();
138 }
139 }
140
141 #[inline]
142 fn is_locked(&self) -> bool {
143 let state = self.state.load(Ordering::Relaxed);
144 state & (WRITER_BIT | READERS_MASK) != 0
145 }
146
147 #[inline]
148 fn is_locked_exclusive(&self) -> bool {
149 let state = self.state.load(Ordering::Relaxed);
150 state & (WRITER_BIT) != 0
151 }
152}
153
154unsafe impl lock_api::RawRwLockFair for RawRwLock {
155 #[inline]
156 unsafe fn unlock_shared_fair(&self) {
157 self.unlock_shared();
159 }
160
161 #[inline]
162 unsafe fn unlock_exclusive_fair(&self) {
163 self.deadlock_release();
164 if self
165 .state
166 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
167 .is_ok()
168 {
169 return;
170 }
171 self.unlock_exclusive_slow(true);
172 }
173
174 #[inline]
175 unsafe fn bump_shared(&self) {
176 if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
177 == ONE_READER | WRITER_BIT
178 {
179 self.bump_shared_slow();
180 }
181 }
182
183 #[inline]
184 unsafe fn bump_exclusive(&self) {
185 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
186 self.bump_exclusive_slow();
187 }
188 }
189}
190
191unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
192 #[inline]
193 unsafe fn downgrade(&self) {
194 let state = self
195 .state
196 .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
197
198 if state & PARKED_BIT != 0 {
200 self.downgrade_slow();
201 }
202 }
203}
204
205unsafe impl lock_api::RawRwLockTimed for RawRwLock {
206 type Duration = Duration;
207 type Instant = Instant;
208
209 #[inline]
210 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
211 let result = if self.try_lock_shared_fast(false) {
212 true
213 } else {
214 self.lock_shared_slow(false, util::to_deadline(timeout))
215 };
216 if result {
217 self.deadlock_acquire();
218 }
219 result
220 }
221
222 #[inline]
223 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
224 let result = if self.try_lock_shared_fast(false) {
225 true
226 } else {
227 self.lock_shared_slow(false, Some(timeout))
228 };
229 if result {
230 self.deadlock_acquire();
231 }
232 result
233 }
234
235 #[inline]
236 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
237 let result = if self
238 .state
239 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
240 .is_ok()
241 {
242 true
243 } else {
244 self.lock_exclusive_slow(util::to_deadline(timeout))
245 };
246 if result {
247 self.deadlock_acquire();
248 }
249 result
250 }
251
252 #[inline]
253 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
254 let result = if self
255 .state
256 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
257 .is_ok()
258 {
259 true
260 } else {
261 self.lock_exclusive_slow(Some(timeout))
262 };
263 if result {
264 self.deadlock_acquire();
265 }
266 result
267 }
268}
269
270unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
271 #[inline]
272 fn lock_shared_recursive(&self) {
273 if !self.try_lock_shared_fast(true) {
274 let result = self.lock_shared_slow(true, None);
275 debug_assert!(result);
276 }
277 self.deadlock_acquire();
278 }
279
280 #[inline]
281 fn try_lock_shared_recursive(&self) -> bool {
282 let result = if self.try_lock_shared_fast(true) {
283 true
284 } else {
285 self.try_lock_shared_slow(true)
286 };
287 if result {
288 self.deadlock_acquire();
289 }
290 result
291 }
292}
293
294unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
295 #[inline]
296 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
297 let result = if self.try_lock_shared_fast(true) {
298 true
299 } else {
300 self.lock_shared_slow(true, util::to_deadline(timeout))
301 };
302 if result {
303 self.deadlock_acquire();
304 }
305 result
306 }
307
308 #[inline]
309 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
310 let result = if self.try_lock_shared_fast(true) {
311 true
312 } else {
313 self.lock_shared_slow(true, Some(timeout))
314 };
315 if result {
316 self.deadlock_acquire();
317 }
318 result
319 }
320}
321
322unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
323 #[inline]
324 fn lock_upgradable(&self) {
325 if !self.try_lock_upgradable_fast() {
326 let result = self.lock_upgradable_slow(None);
327 debug_assert!(result);
328 }
329 self.deadlock_acquire();
330 }
331
332 #[inline]
333 fn try_lock_upgradable(&self) -> bool {
334 let result = if self.try_lock_upgradable_fast() {
335 true
336 } else {
337 self.try_lock_upgradable_slow()
338 };
339 if result {
340 self.deadlock_acquire();
341 }
342 result
343 }
344
345 #[inline]
346 unsafe fn unlock_upgradable(&self) {
347 self.deadlock_release();
348 let state = self.state.load(Ordering::Relaxed);
349 #[allow(clippy::collapsible_if)]
350 if state & PARKED_BIT == 0 {
351 if self
352 .state
353 .compare_exchange_weak(
354 state,
355 state - (ONE_READER | UPGRADABLE_BIT),
356 Ordering::Release,
357 Ordering::Relaxed,
358 )
359 .is_ok()
360 {
361 return;
362 }
363 }
364 self.unlock_upgradable_slow(false);
365 }
366
367 #[inline]
368 unsafe fn upgrade(&self) {
369 let state = self.state.fetch_sub(
370 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
371 Ordering::Acquire,
372 );
373 if state & READERS_MASK != ONE_READER {
374 let result = self.upgrade_slow(None);
375 debug_assert!(result);
376 }
377 }
378
379 #[inline]
380 unsafe fn try_upgrade(&self) -> bool {
381 if self
382 .state
383 .compare_exchange_weak(
384 ONE_READER | UPGRADABLE_BIT,
385 WRITER_BIT,
386 Ordering::Acquire,
387 Ordering::Relaxed,
388 )
389 .is_ok()
390 {
391 true
392 } else {
393 self.try_upgrade_slow()
394 }
395 }
396}
397
398unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
399 #[inline]
400 unsafe fn unlock_upgradable_fair(&self) {
401 self.deadlock_release();
402 let state = self.state.load(Ordering::Relaxed);
403 #[allow(clippy::collapsible_if)]
404 if state & PARKED_BIT == 0 {
405 if self
406 .state
407 .compare_exchange_weak(
408 state,
409 state - (ONE_READER | UPGRADABLE_BIT),
410 Ordering::Release,
411 Ordering::Relaxed,
412 )
413 .is_ok()
414 {
415 return;
416 }
417 }
418 self.unlock_upgradable_slow(false);
419 }
420
421 #[inline]
422 unsafe fn bump_upgradable(&self) {
423 if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
424 self.bump_upgradable_slow();
425 }
426 }
427}
428
429unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
430 #[inline]
431 unsafe fn downgrade_upgradable(&self) {
432 let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
433
434 if state & PARKED_BIT != 0 {
436 self.downgrade_slow();
437 }
438 }
439
440 #[inline]
441 unsafe fn downgrade_to_upgradable(&self) {
442 let state = self.state.fetch_add(
443 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
444 Ordering::Release,
445 );
446
447 if state & PARKED_BIT != 0 {
449 self.downgrade_to_upgradable_slow();
450 }
451 }
452}
453
454unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
455 #[inline]
456 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
457 let result = if self.try_lock_upgradable_fast() {
458 true
459 } else {
460 self.lock_upgradable_slow(Some(timeout))
461 };
462 if result {
463 self.deadlock_acquire();
464 }
465 result
466 }
467
468 #[inline]
469 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
470 let result = if self.try_lock_upgradable_fast() {
471 true
472 } else {
473 self.lock_upgradable_slow(util::to_deadline(timeout))
474 };
475 if result {
476 self.deadlock_acquire();
477 }
478 result
479 }
480
481 #[inline]
482 unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
483 let state = self.state.fetch_sub(
484 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
485 Ordering::Relaxed,
486 );
487 if state & READERS_MASK == ONE_READER {
488 true
489 } else {
490 self.upgrade_slow(Some(timeout))
491 }
492 }
493
494 #[inline]
495 unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
496 let state = self.state.fetch_sub(
497 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
498 Ordering::Relaxed,
499 );
500 if state & READERS_MASK == ONE_READER {
501 true
502 } else {
503 self.upgrade_slow(util::to_deadline(timeout))
504 }
505 }
506}
507
508impl RawRwLock {
509 #[inline(always)]
510 fn try_lock_shared_fast(&self, recursive: bool) -> bool {
511 let state = self.state.load(Ordering::Relaxed);
512
513 if state & WRITER_BIT != 0 {
516 if !recursive || state & READERS_MASK == 0 {
520 return false;
521 }
522 }
523
524 if have_elision() && state == 0 {
528 self.state
529 .elision_compare_exchange_acquire(0, ONE_READER)
530 .is_ok()
531 } else if let Some(new_state) = state.checked_add(ONE_READER) {
532 self.state
533 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
534 .is_ok()
535 } else {
536 false
537 }
538 }
539
540 #[cold]
541 fn try_lock_shared_slow(&self, recursive: bool) -> bool {
542 let mut state = self.state.load(Ordering::Relaxed);
543 loop {
544 #[allow(clippy::collapsible_if)]
546 if state & WRITER_BIT != 0 {
547 if !recursive || state & READERS_MASK == 0 {
548 return false;
549 }
550 }
551 if have_elision() && state == 0 {
552 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
553 Ok(_) => return true,
554 Err(x) => state = x,
555 }
556 } else {
557 match self.state.compare_exchange_weak(
558 state,
559 state
560 .checked_add(ONE_READER)
561 .expect("RwLock reader count overflow"),
562 Ordering::Acquire,
563 Ordering::Relaxed,
564 ) {
565 Ok(_) => return true,
566 Err(x) => state = x,
567 }
568 }
569 }
570 }
571
572 #[inline(always)]
573 fn try_lock_upgradable_fast(&self) -> bool {
574 let state = self.state.load(Ordering::Relaxed);
575
576 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
579 return false;
580 }
581
582 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
583 self.state
584 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
585 .is_ok()
586 } else {
587 false
588 }
589 }
590
591 #[cold]
592 fn try_lock_upgradable_slow(&self) -> bool {
593 let mut state = self.state.load(Ordering::Relaxed);
594 loop {
595 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
597 return false;
598 }
599
600 match self.state.compare_exchange_weak(
601 state,
602 state
603 .checked_add(ONE_READER | UPGRADABLE_BIT)
604 .expect("RwLock reader count overflow"),
605 Ordering::Acquire,
606 Ordering::Relaxed,
607 ) {
608 Ok(_) => return true,
609 Err(x) => state = x,
610 }
611 }
612 }
613
614 #[cold]
615 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
616 let try_lock = |state: &mut usize| {
617 loop {
618 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
619 return false;
620 }
621
622 match self.state.compare_exchange_weak(
624 *state,
625 *state | WRITER_BIT,
626 Ordering::Acquire,
627 Ordering::Relaxed,
628 ) {
629 Ok(_) => return true,
630 Err(x) => *state = x,
631 }
632 }
633 };
634
635 let timed_out = !self.lock_common(
637 timeout,
638 TOKEN_EXCLUSIVE,
639 try_lock,
640 WRITER_BIT | UPGRADABLE_BIT,
641 );
642 if timed_out {
643 return false;
644 }
645
646 self.wait_for_readers(timeout, 0)
648 }
649
650 #[cold]
651 fn unlock_exclusive_slow(&self, force_fair: bool) {
652 let callback = |mut new_state, result: UnparkResult| {
654 if result.unparked_threads != 0 && (force_fair || result.be_fair) {
657 if result.have_more_threads {
658 new_state |= PARKED_BIT;
659 }
660 self.state.store(new_state, Ordering::Release);
661 TOKEN_HANDOFF
662 } else {
663 if result.have_more_threads {
665 self.state.store(PARKED_BIT, Ordering::Release);
666 } else {
667 self.state.store(0, Ordering::Release);
668 }
669 TOKEN_NORMAL
670 }
671 };
672 unsafe {
674 self.wake_parked_threads(0, callback);
675 }
676 }
677
678 #[cold]
679 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
680 let try_lock = |state: &mut usize| {
681 let mut spinwait_shared = SpinWait::new();
682 loop {
683 if have_elision() && *state == 0 {
687 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
688 Ok(_) => return true,
689 Err(x) => *state = x,
690 }
691 }
692
693 #[allow(clippy::collapsible_if)]
695 if *state & WRITER_BIT != 0 {
696 if !recursive || *state & READERS_MASK == 0 {
697 return false;
698 }
699 }
700
701 if self
702 .state
703 .compare_exchange_weak(
704 *state,
705 state
706 .checked_add(ONE_READER)
707 .expect("RwLock reader count overflow"),
708 Ordering::Acquire,
709 Ordering::Relaxed,
710 )
711 .is_ok()
712 {
713 return true;
714 }
715
716 spinwait_shared.spin_no_yield();
720 *state = self.state.load(Ordering::Relaxed);
721 }
722 };
723 self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
724 }
725
726 #[cold]
727 fn unlock_shared_slow(&self) {
728 let addr = self as *const _ as usize + 1;
732 let callback = |_result: UnparkResult| {
733 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
736 TOKEN_NORMAL
737 };
738 unsafe {
742 parking_lot_core::unpark_one(addr, callback);
743 }
744 }
745
746 #[cold]
747 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
748 let try_lock = |state: &mut usize| {
749 let mut spinwait_shared = SpinWait::new();
750 loop {
751 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
752 return false;
753 }
754
755 if self
756 .state
757 .compare_exchange_weak(
758 *state,
759 state
760 .checked_add(ONE_READER | UPGRADABLE_BIT)
761 .expect("RwLock reader count overflow"),
762 Ordering::Acquire,
763 Ordering::Relaxed,
764 )
765 .is_ok()
766 {
767 return true;
768 }
769
770 spinwait_shared.spin_no_yield();
774 *state = self.state.load(Ordering::Relaxed);
775 }
776 };
777 self.lock_common(
778 timeout,
779 TOKEN_UPGRADABLE,
780 try_lock,
781 WRITER_BIT | UPGRADABLE_BIT,
782 )
783 }
784
785 #[cold]
786 fn unlock_upgradable_slow(&self, force_fair: bool) {
787 let mut state = self.state.load(Ordering::Relaxed);
789 while state & PARKED_BIT == 0 {
790 match self.state.compare_exchange_weak(
791 state,
792 state - (ONE_READER | UPGRADABLE_BIT),
793 Ordering::Release,
794 Ordering::Relaxed,
795 ) {
796 Ok(_) => return,
797 Err(x) => state = x,
798 }
799 }
800
801 let callback = |new_state, result: UnparkResult| {
803 let mut state = self.state.load(Ordering::Relaxed);
806 if force_fair || result.be_fair {
807 while let Some(mut new_state) =
810 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
811 {
812 if result.have_more_threads {
813 new_state |= PARKED_BIT;
814 } else {
815 new_state &= !PARKED_BIT;
816 }
817 match self.state.compare_exchange_weak(
818 state,
819 new_state,
820 Ordering::Relaxed,
821 Ordering::Relaxed,
822 ) {
823 Ok(_) => return TOKEN_HANDOFF,
824 Err(x) => state = x,
825 }
826 }
827 }
828
829 loop {
831 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
832 if result.have_more_threads {
833 new_state |= PARKED_BIT;
834 } else {
835 new_state &= !PARKED_BIT;
836 }
837 match self.state.compare_exchange_weak(
838 state,
839 new_state,
840 Ordering::Relaxed,
841 Ordering::Relaxed,
842 ) {
843 Ok(_) => return TOKEN_NORMAL,
844 Err(x) => state = x,
845 }
846 }
847 };
848 unsafe {
850 self.wake_parked_threads(0, callback);
851 }
852 }
853
854 #[cold]
855 fn try_upgrade_slow(&self) -> bool {
856 let mut state = self.state.load(Ordering::Relaxed);
857 loop {
858 if state & READERS_MASK != ONE_READER {
859 return false;
860 }
861 match self.state.compare_exchange_weak(
862 state,
863 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
864 Ordering::Relaxed,
865 Ordering::Relaxed,
866 ) {
867 Ok(_) => return true,
868 Err(x) => state = x,
869 }
870 }
871 }
872
873 #[cold]
874 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
875 self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
876 }
877
878 #[cold]
879 fn downgrade_slow(&self) {
880 let callback = |_, result: UnparkResult| {
882 if !result.have_more_threads {
884 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
885 }
886 TOKEN_NORMAL
887 };
888 unsafe {
890 self.wake_parked_threads(ONE_READER, callback);
891 }
892 }
893
894 #[cold]
895 fn downgrade_to_upgradable_slow(&self) {
896 let callback = |_, result: UnparkResult| {
898 if !result.have_more_threads {
900 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
901 }
902 TOKEN_NORMAL
903 };
904 unsafe {
906 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
907 }
908 }
909
910 #[cold]
911 unsafe fn bump_shared_slow(&self) {
912 self.unlock_shared();
913 self.lock_shared();
914 }
915
916 #[cold]
917 fn bump_exclusive_slow(&self) {
918 self.deadlock_release();
919 self.unlock_exclusive_slow(true);
920 self.lock_exclusive();
921 }
922
923 #[cold]
924 fn bump_upgradable_slow(&self) {
925 self.deadlock_release();
926 self.unlock_upgradable_slow(true);
927 self.lock_upgradable();
928 }
929
930 #[inline]
939 unsafe fn wake_parked_threads(
940 &self,
941 new_state: usize,
942 callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
943 ) {
944 let new_state = Cell::new(new_state);
948 let addr = self as *const _ as usize;
949 let filter = |ParkToken(token)| {
950 let s = new_state.get();
951
952 if s & WRITER_BIT != 0 {
954 return FilterOp::Stop;
955 }
956
957 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
959 FilterOp::Skip
962 } else {
963 new_state.set(s + token);
964 FilterOp::Unpark
965 }
966 };
967 let callback = |result| callback(new_state.get(), result);
968 parking_lot_core::unpark_filter(addr, filter, callback);
973 }
974
975 #[inline]
978 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
979 let mut spinwait = SpinWait::new();
982 let mut state = self.state.load(Ordering::Acquire);
983 while state & READERS_MASK != 0 {
984 if spinwait.spin() {
986 state = self.state.load(Ordering::Acquire);
987 continue;
988 }
989
990 if state & WRITER_PARKED_BIT == 0 {
992 if let Err(x) = self.state.compare_exchange_weak(
993 state,
994 state | WRITER_PARKED_BIT,
995 Ordering::Acquire,
996 Ordering::Acquire,
997 ) {
998 state = x;
999 continue;
1000 }
1001 }
1002
1003 let addr = self as *const _ as usize + 1;
1006 let validate = || {
1007 let state = self.state.load(Ordering::Relaxed);
1008 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1009 };
1010 let before_sleep = || {};
1011 let timed_out = |_, _| {};
1012 let park_result = unsafe {
1017 parking_lot_core::park(
1018 addr,
1019 validate,
1020 before_sleep,
1021 timed_out,
1022 TOKEN_EXCLUSIVE,
1023 timeout,
1024 )
1025 };
1026 match park_result {
1027 ParkResult::Unparked(_) | ParkResult::Invalid => {
1031 state = self.state.load(Ordering::Acquire);
1032 continue;
1033 }
1034
1035 ParkResult::TimedOut => {
1037 let state = self.state.fetch_add(
1041 prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1042 Ordering::Relaxed,
1043 );
1044 if state & PARKED_BIT != 0 {
1045 let callback = |_, result: UnparkResult| {
1046 if !result.have_more_threads {
1048 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1049 }
1050 TOKEN_NORMAL
1051 };
1052 unsafe {
1054 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1055 }
1056 }
1057 return false;
1058 }
1059 }
1060 }
1061 true
1062 }
1063
1064 #[inline]
1066 fn lock_common(
1067 &self,
1068 timeout: Option<Instant>,
1069 token: ParkToken,
1070 mut try_lock: impl FnMut(&mut usize) -> bool,
1071 validate_flags: usize,
1072 ) -> bool {
1073 let mut spinwait = SpinWait::new();
1074 let mut state = self.state.load(Ordering::Relaxed);
1075 loop {
1076 if try_lock(&mut state) {
1078 return true;
1079 }
1080
1081 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1083 state = self.state.load(Ordering::Relaxed);
1084 continue;
1085 }
1086
1087 if state & PARKED_BIT == 0 {
1089 if let Err(x) = self.state.compare_exchange_weak(
1090 state,
1091 state | PARKED_BIT,
1092 Ordering::Relaxed,
1093 Ordering::Relaxed,
1094 ) {
1095 state = x;
1096 continue;
1097 }
1098 }
1099
1100 let addr = self as *const _ as usize;
1102 let validate = || {
1103 let state = self.state.load(Ordering::Relaxed);
1104 state & PARKED_BIT != 0 && (state & validate_flags != 0)
1105 };
1106 let before_sleep = || {};
1107 let timed_out = |_, was_last_thread| {
1108 if was_last_thread {
1110 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1111 }
1112 };
1113
1114 let park_result = unsafe {
1119 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1120 };
1121 match park_result {
1122 ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1125
1126 ParkResult::Unparked(_) => (),
1128
1129 ParkResult::Invalid => (),
1131
1132 ParkResult::TimedOut => return false,
1134 }
1135
1136 spinwait.reset();
1138 state = self.state.load(Ordering::Relaxed);
1139 }
1140 }
1141
1142 #[inline]
1143 fn deadlock_acquire(&self) {
1144 unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1145 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1146 }
1147
1148 #[inline]
1149 fn deadlock_release(&self) {
1150 unsafe { deadlock::release_resource(self as *const _ as usize) };
1151 unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1152 }
1153}