tokio/sync/rwlock.rs
1use crate::sync::batch_semaphore::{Semaphore, TryAcquireError};
2use crate::sync::mutex::TryLockError;
3#[cfg(all(tokio_unstable, feature = "tracing"))]
4use crate::util::trace;
5use std::cell::UnsafeCell;
6use std::marker;
7use std::marker::PhantomData;
8use std::sync::Arc;
9
10pub(crate) mod owned_read_guard;
11pub(crate) mod owned_write_guard;
12pub(crate) mod owned_write_guard_mapped;
13pub(crate) mod read_guard;
14pub(crate) mod write_guard;
15pub(crate) mod write_guard_mapped;
16pub(crate) use owned_read_guard::OwnedRwLockReadGuard;
17pub(crate) use owned_write_guard::OwnedRwLockWriteGuard;
18pub(crate) use owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
19pub(crate) use read_guard::RwLockReadGuard;
20pub(crate) use write_guard::RwLockWriteGuard;
21pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;
22
23#[cfg(not(loom))]
24const MAX_READS: u32 = u32::MAX >> 3;
25
26#[cfg(loom)]
27const MAX_READS: u32 = 10;
28
29/// An asynchronous reader-writer lock.
30///
31/// This type of lock allows a number of readers or at most one writer at any
32/// point in time. The write portion of this lock typically allows modification
33/// of the underlying data (exclusive access) and the read portion of this lock
34/// typically allows for read-only access (shared access).
35///
36/// In comparison, a [`Mutex`] does not distinguish between readers or writers
37/// that acquire the lock, therefore causing any tasks waiting for the lock to
38/// become available to yield. An `RwLock` will allow any number of readers to
39/// acquire the lock as long as a writer is not holding the lock.
40///
41/// The priority policy of Tokio's read-write lock is _fair_ (or
42/// [_write-preferring_]), in order to ensure that readers cannot starve
43/// writers. Fairness is ensured using a first-in, first-out queue for the tasks
44/// awaiting the lock; if a task that wishes to acquire the write lock is at the
45/// head of the queue, read locks will not be given out until the write lock has
46/// been released. This is in contrast to the Rust standard library's
47/// `std::sync::RwLock`, where the priority policy is dependent on the
48/// operating system's implementation.
49///
50/// The type parameter `T` represents the data that this lock protects. It is
51/// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards
52/// returned from the locking methods implement [`Deref`](trait@std::ops::Deref)
53/// (and [`DerefMut`](trait@std::ops::DerefMut)
54/// for the `write` methods) to allow access to the content of the lock.
55///
56/// # Examples
57///
58/// ```
59/// use tokio::sync::RwLock;
60///
61/// #[tokio::main]
62/// async fn main() {
63/// let lock = RwLock::new(5);
64///
65/// // many reader locks can be held at once
66/// {
67/// let r1 = lock.read().await;
68/// let r2 = lock.read().await;
69/// assert_eq!(*r1, 5);
70/// assert_eq!(*r2, 5);
71/// } // read locks are dropped at this point
72///
73/// // only one write lock may be held, however
74/// {
75/// let mut w = lock.write().await;
76/// *w += 1;
77/// assert_eq!(*w, 6);
78/// } // write lock is dropped here
79/// }
80/// ```
81///
82/// [`Mutex`]: struct@super::Mutex
83/// [`RwLock`]: struct@RwLock
84/// [`RwLockReadGuard`]: struct@RwLockReadGuard
85/// [`RwLockWriteGuard`]: struct@RwLockWriteGuard
86/// [`Send`]: trait@std::marker::Send
87/// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies
88pub struct RwLock<T: ?Sized> {
89 #[cfg(all(tokio_unstable, feature = "tracing"))]
90 resource_span: tracing::Span,
91
92 // maximum number of concurrent readers
93 mr: u32,
94
95 //semaphore to coordinate read and write access to T
96 s: Semaphore,
97
98 //inner data T
99 c: UnsafeCell<T>,
100}
101
102#[test]
103#[cfg(not(loom))]
104fn bounds() {
105 fn check_send<T: Send>() {}
106 fn check_sync<T: Sync>() {}
107 fn check_unpin<T: Unpin>() {}
108 // This has to take a value, since the async fn's return type is unnameable.
109 fn check_send_sync_val<T: Send + Sync>(_t: T) {}
110
111 check_send::<RwLock<u32>>();
112 check_sync::<RwLock<u32>>();
113 check_unpin::<RwLock<u32>>();
114
115 check_send::<RwLockReadGuard<'_, u32>>();
116 check_sync::<RwLockReadGuard<'_, u32>>();
117 check_unpin::<RwLockReadGuard<'_, u32>>();
118
119 check_send::<OwnedRwLockReadGuard<u32, i32>>();
120 check_sync::<OwnedRwLockReadGuard<u32, i32>>();
121 check_unpin::<OwnedRwLockReadGuard<u32, i32>>();
122
123 check_send::<RwLockWriteGuard<'_, u32>>();
124 check_sync::<RwLockWriteGuard<'_, u32>>();
125 check_unpin::<RwLockWriteGuard<'_, u32>>();
126
127 check_send::<RwLockMappedWriteGuard<'_, u32>>();
128 check_sync::<RwLockMappedWriteGuard<'_, u32>>();
129 check_unpin::<RwLockMappedWriteGuard<'_, u32>>();
130
131 check_send::<OwnedRwLockWriteGuard<u32>>();
132 check_sync::<OwnedRwLockWriteGuard<u32>>();
133 check_unpin::<OwnedRwLockWriteGuard<u32>>();
134
135 check_send::<OwnedRwLockMappedWriteGuard<u32, i32>>();
136 check_sync::<OwnedRwLockMappedWriteGuard<u32, i32>>();
137 check_unpin::<OwnedRwLockMappedWriteGuard<u32, i32>>();
138
139 let rwlock = Arc::new(RwLock::new(0));
140 check_send_sync_val(rwlock.read());
141 check_send_sync_val(Arc::clone(&rwlock).read_owned());
142 check_send_sync_val(rwlock.write());
143 check_send_sync_val(Arc::clone(&rwlock).write_owned());
144}
145
146// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
147// If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
148// RwLock<T>.
149unsafe impl<T> Send for RwLock<T> where T: ?Sized + Send {}
150unsafe impl<T> Sync for RwLock<T> where T: ?Sized + Send + Sync {}
151// NB: These impls need to be explicit since we're storing a raw pointer.
152// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
153// `T` is `Send`.
154unsafe impl<T> Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {}
155unsafe impl<T> Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {}
156// T is required to be `Send` because an OwnedRwLockReadGuard can be used to drop the value held in
157// the RwLock, unlike RwLockReadGuard.
158unsafe impl<T, U> Send for OwnedRwLockReadGuard<T, U>
159where
160 T: ?Sized + Send + Sync,
161 U: ?Sized + Sync,
162{
163}
164unsafe impl<T, U> Sync for OwnedRwLockReadGuard<T, U>
165where
166 T: ?Sized + Send + Sync,
167 U: ?Sized + Send + Sync,
168{
169}
170unsafe impl<T> Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
171unsafe impl<T> Sync for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
172unsafe impl<T> Sync for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
173unsafe impl<T, U> Sync for OwnedRwLockMappedWriteGuard<T, U>
174where
175 T: ?Sized + Send + Sync,
176 U: ?Sized + Send + Sync,
177{
178}
179// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
180// `T` is `Send` - but since this is also provides mutable access, we need to
181// make sure that `T` is `Send` since its value can be sent across thread
182// boundaries.
183unsafe impl<T> Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
184unsafe impl<T> Send for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
185unsafe impl<T> Send for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
186unsafe impl<T, U> Send for OwnedRwLockMappedWriteGuard<T, U>
187where
188 T: ?Sized + Send + Sync,
189 U: ?Sized + Send + Sync,
190{
191}
192
193impl<T: ?Sized> RwLock<T> {
194 /// Creates a new instance of an `RwLock<T>` which is unlocked.
195 ///
196 /// # Examples
197 ///
198 /// ```
199 /// use tokio::sync::RwLock;
200 ///
201 /// let lock = RwLock::new(5);
202 /// ```
203 #[track_caller]
204 pub fn new(value: T) -> RwLock<T>
205 where
206 T: Sized,
207 {
208 #[cfg(all(tokio_unstable, feature = "tracing"))]
209 let resource_span = {
210 let location = std::panic::Location::caller();
211 let resource_span = tracing::trace_span!(
212 parent: None,
213 "runtime.resource",
214 concrete_type = "RwLock",
215 kind = "Sync",
216 loc.file = location.file(),
217 loc.line = location.line(),
218 loc.col = location.column(),
219 );
220
221 resource_span.in_scope(|| {
222 tracing::trace!(
223 target: "runtime::resource::state_update",
224 max_readers = MAX_READS,
225 );
226
227 tracing::trace!(
228 target: "runtime::resource::state_update",
229 write_locked = false,
230 );
231
232 tracing::trace!(
233 target: "runtime::resource::state_update",
234 current_readers = 0,
235 );
236 });
237
238 resource_span
239 };
240
241 #[cfg(all(tokio_unstable, feature = "tracing"))]
242 let s = resource_span.in_scope(|| Semaphore::new(MAX_READS as usize));
243
244 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
245 let s = Semaphore::new(MAX_READS as usize);
246
247 RwLock {
248 mr: MAX_READS,
249 c: UnsafeCell::new(value),
250 s,
251 #[cfg(all(tokio_unstable, feature = "tracing"))]
252 resource_span,
253 }
254 }
255
256 /// Creates a new instance of an `RwLock<T>` which is unlocked
257 /// and allows a maximum of `max_reads` concurrent readers.
258 ///
259 /// # Examples
260 ///
261 /// ```
262 /// use tokio::sync::RwLock;
263 ///
264 /// let lock = RwLock::with_max_readers(5, 1024);
265 /// ```
266 ///
267 /// # Panics
268 ///
269 /// Panics if `max_reads` is more than `u32::MAX >> 3`.
270 #[track_caller]
271 pub fn with_max_readers(value: T, max_reads: u32) -> RwLock<T>
272 where
273 T: Sized,
274 {
275 assert!(
276 max_reads <= MAX_READS,
277 "a RwLock may not be created with more than {MAX_READS} readers"
278 );
279
280 #[cfg(all(tokio_unstable, feature = "tracing"))]
281 let resource_span = {
282 let location = std::panic::Location::caller();
283
284 let resource_span = tracing::trace_span!(
285 parent: None,
286 "runtime.resource",
287 concrete_type = "RwLock",
288 kind = "Sync",
289 loc.file = location.file(),
290 loc.line = location.line(),
291 loc.col = location.column(),
292 );
293
294 resource_span.in_scope(|| {
295 tracing::trace!(
296 target: "runtime::resource::state_update",
297 max_readers = max_reads,
298 );
299
300 tracing::trace!(
301 target: "runtime::resource::state_update",
302 write_locked = false,
303 );
304
305 tracing::trace!(
306 target: "runtime::resource::state_update",
307 current_readers = 0,
308 );
309 });
310
311 resource_span
312 };
313
314 #[cfg(all(tokio_unstable, feature = "tracing"))]
315 let s = resource_span.in_scope(|| Semaphore::new(max_reads as usize));
316
317 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
318 let s = Semaphore::new(max_reads as usize);
319
320 RwLock {
321 mr: max_reads,
322 c: UnsafeCell::new(value),
323 s,
324 #[cfg(all(tokio_unstable, feature = "tracing"))]
325 resource_span,
326 }
327 }
328
329 /// Creates a new instance of an `RwLock<T>` which is unlocked.
330 ///
331 /// When using the `tracing` [unstable feature], a `RwLock` created with
332 /// `const_new` will not be instrumented. As such, it will not be visible
333 /// in [`tokio-console`]. Instead, [`RwLock::new`] should be used to create
334 /// an instrumented object if that is needed.
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// use tokio::sync::RwLock;
340 ///
341 /// static LOCK: RwLock<i32> = RwLock::const_new(5);
342 /// ```
343 ///
344 /// [`tokio-console`]: https://github.com/tokio-rs/console
345 /// [unstable feature]: crate#unstable-features
346 #[cfg(not(all(loom, test)))]
347 pub const fn const_new(value: T) -> RwLock<T>
348 where
349 T: Sized,
350 {
351 RwLock {
352 mr: MAX_READS,
353 c: UnsafeCell::new(value),
354 s: Semaphore::const_new(MAX_READS as usize),
355 #[cfg(all(tokio_unstable, feature = "tracing"))]
356 resource_span: tracing::Span::none(),
357 }
358 }
359
360 /// Creates a new instance of an `RwLock<T>` which is unlocked
361 /// and allows a maximum of `max_reads` concurrent readers.
362 ///
363 /// # Examples
364 ///
365 /// ```
366 /// use tokio::sync::RwLock;
367 ///
368 /// static LOCK: RwLock<i32> = RwLock::const_with_max_readers(5, 1024);
369 /// ```
370 #[cfg(not(all(loom, test)))]
371 pub const fn const_with_max_readers(value: T, max_reads: u32) -> RwLock<T>
372 where
373 T: Sized,
374 {
375 assert!(max_reads <= MAX_READS);
376
377 RwLock {
378 mr: max_reads,
379 c: UnsafeCell::new(value),
380 s: Semaphore::const_new(max_reads as usize),
381 #[cfg(all(tokio_unstable, feature = "tracing"))]
382 resource_span: tracing::Span::none(),
383 }
384 }
385
386 /// Locks this `RwLock` with shared read access, causing the current task
387 /// to yield until the lock has been acquired.
388 ///
389 /// The calling task will yield until there are no writers which hold the
390 /// lock. There may be other readers inside the lock when the task resumes.
391 ///
392 /// Note that under the priority policy of [`RwLock`], read locks are not
393 /// granted until prior write locks, to prevent starvation. Therefore
394 /// deadlock may occur if a read lock is held by the current task, a write
395 /// lock attempt is made, and then a subsequent read lock attempt is made
396 /// by the current task.
397 ///
398 /// Returns an RAII guard which will drop this read access of the `RwLock`
399 /// when dropped.
400 ///
401 /// # Cancel safety
402 ///
403 /// This method uses a queue to fairly distribute locks in the order they
404 /// were requested. Cancelling a call to `read` makes you lose your place in
405 /// the queue.
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// use std::sync::Arc;
411 /// use tokio::sync::RwLock;
412 ///
413 /// #[tokio::main]
414 /// async fn main() {
415 /// let lock = Arc::new(RwLock::new(1));
416 /// let c_lock = lock.clone();
417 ///
418 /// let n = lock.read().await;
419 /// assert_eq!(*n, 1);
420 ///
421 /// tokio::spawn(async move {
422 /// // While main has an active read lock, we acquire one too.
423 /// let r = c_lock.read().await;
424 /// assert_eq!(*r, 1);
425 /// }).await.expect("The spawned task has panicked");
426 ///
427 /// // Drop the guard after the spawned task finishes.
428 /// drop(n);
429 /// }
430 /// ```
431 pub async fn read(&self) -> RwLockReadGuard<'_, T> {
432 let acquire_fut = async {
433 self.s.acquire(1).await.unwrap_or_else(|_| {
434 // The semaphore was closed. but, we never explicitly close it, and we have a
435 // handle to it through the Arc, which means that this can never happen.
436 unreachable!()
437 });
438
439 RwLockReadGuard {
440 s: &self.s,
441 data: self.c.get(),
442 marker: PhantomData,
443 #[cfg(all(tokio_unstable, feature = "tracing"))]
444 resource_span: self.resource_span.clone(),
445 }
446 };
447
448 #[cfg(all(tokio_unstable, feature = "tracing"))]
449 let acquire_fut = trace::async_op(
450 move || acquire_fut,
451 self.resource_span.clone(),
452 "RwLock::read",
453 "poll",
454 false,
455 );
456
457 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
458 let guard = acquire_fut.await;
459
460 #[cfg(all(tokio_unstable, feature = "tracing"))]
461 self.resource_span.in_scope(|| {
462 tracing::trace!(
463 target: "runtime::resource::state_update",
464 current_readers = 1,
465 current_readers.op = "add",
466 )
467 });
468
469 guard
470 }
471
472 /// Blockingly locks this `RwLock` with shared read access.
473 ///
474 /// This method is intended for use cases where you
475 /// need to use this rwlock in asynchronous code as well as in synchronous code.
476 ///
477 /// Returns an RAII guard which will drop the read access of this `RwLock` when dropped.
478 ///
479 /// # Panics
480 ///
481 /// This function panics if called within an asynchronous execution context.
482 ///
483 /// - If you find yourself in an asynchronous execution context and needing
484 /// to call some (synchronous) function which performs one of these
485 /// `blocking_` operations, then consider wrapping that call inside
486 /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
487 /// (or [`block_in_place()`][crate::task::block_in_place]).
488 ///
489 /// # Examples
490 ///
491 /// ```
492 /// use std::sync::Arc;
493 /// use tokio::sync::RwLock;
494 ///
495 /// #[tokio::main]
496 /// async fn main() {
497 /// let rwlock = Arc::new(RwLock::new(1));
498 /// let mut write_lock = rwlock.write().await;
499 ///
500 /// let blocking_task = tokio::task::spawn_blocking({
501 /// let rwlock = Arc::clone(&rwlock);
502 /// move || {
503 /// // This shall block until the `write_lock` is released.
504 /// let read_lock = rwlock.blocking_read();
505 /// assert_eq!(*read_lock, 0);
506 /// }
507 /// });
508 ///
509 /// *write_lock -= 1;
510 /// drop(write_lock); // release the lock.
511 ///
512 /// // Await the completion of the blocking task.
513 /// blocking_task.await.unwrap();
514 ///
515 /// // Assert uncontended.
516 /// assert!(rwlock.try_write().is_ok());
517 /// }
518 /// ```
519 #[track_caller]
520 #[cfg(feature = "sync")]
521 pub fn blocking_read(&self) -> RwLockReadGuard<'_, T> {
522 crate::future::block_on(self.read())
523 }
524
525 /// Locks this `RwLock` with shared read access, causing the current task
526 /// to yield until the lock has been acquired.
527 ///
528 /// The calling task will yield until there are no writers which hold the
529 /// lock. There may be other readers inside the lock when the task resumes.
530 ///
531 /// This method is identical to [`RwLock::read`], except that the returned
532 /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
533 /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
534 /// method, and the guard will live for the `'static` lifetime, as it keeps
535 /// the `RwLock` alive by holding an `Arc`.
536 ///
537 /// Note that under the priority policy of [`RwLock`], read locks are not
538 /// granted until prior write locks, to prevent starvation. Therefore
539 /// deadlock may occur if a read lock is held by the current task, a write
540 /// lock attempt is made, and then a subsequent read lock attempt is made
541 /// by the current task.
542 ///
543 /// Returns an RAII guard which will drop this read access of the `RwLock`
544 /// when dropped.
545 ///
546 /// # Cancel safety
547 ///
548 /// This method uses a queue to fairly distribute locks in the order they
549 /// were requested. Cancelling a call to `read_owned` makes you lose your
550 /// place in the queue.
551 ///
552 /// # Examples
553 ///
554 /// ```
555 /// use std::sync::Arc;
556 /// use tokio::sync::RwLock;
557 ///
558 /// #[tokio::main]
559 /// async fn main() {
560 /// let lock = Arc::new(RwLock::new(1));
561 /// let c_lock = lock.clone();
562 ///
563 /// let n = lock.read_owned().await;
564 /// assert_eq!(*n, 1);
565 ///
566 /// tokio::spawn(async move {
567 /// // While main has an active read lock, we acquire one too.
568 /// let r = c_lock.read_owned().await;
569 /// assert_eq!(*r, 1);
570 /// }).await.expect("The spawned task has panicked");
571 ///
572 /// // Drop the guard after the spawned task finishes.
573 /// drop(n);
574 ///}
575 /// ```
576 pub async fn read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T> {
577 #[cfg(all(tokio_unstable, feature = "tracing"))]
578 let resource_span = self.resource_span.clone();
579
580 let acquire_fut = async {
581 self.s.acquire(1).await.unwrap_or_else(|_| {
582 // The semaphore was closed. but, we never explicitly close it, and we have a
583 // handle to it through the Arc, which means that this can never happen.
584 unreachable!()
585 });
586
587 OwnedRwLockReadGuard {
588 #[cfg(all(tokio_unstable, feature = "tracing"))]
589 resource_span: self.resource_span.clone(),
590 data: self.c.get(),
591 lock: self,
592 _p: PhantomData,
593 }
594 };
595
596 #[cfg(all(tokio_unstable, feature = "tracing"))]
597 let acquire_fut = trace::async_op(
598 move || acquire_fut,
599 resource_span,
600 "RwLock::read_owned",
601 "poll",
602 false,
603 );
604
605 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
606 let guard = acquire_fut.await;
607
608 #[cfg(all(tokio_unstable, feature = "tracing"))]
609 guard.resource_span.in_scope(|| {
610 tracing::trace!(
611 target: "runtime::resource::state_update",
612 current_readers = 1,
613 current_readers.op = "add",
614 )
615 });
616
617 guard
618 }
619
620 /// Attempts to acquire this `RwLock` with shared read access.
621 ///
622 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
623 /// Otherwise, an RAII guard is returned which will release read access
624 /// when dropped.
625 ///
626 /// [`TryLockError`]: TryLockError
627 ///
628 /// # Examples
629 ///
630 /// ```
631 /// use std::sync::Arc;
632 /// use tokio::sync::RwLock;
633 ///
634 /// #[tokio::main]
635 /// async fn main() {
636 /// let lock = Arc::new(RwLock::new(1));
637 /// let c_lock = lock.clone();
638 ///
639 /// let v = lock.try_read().unwrap();
640 /// assert_eq!(*v, 1);
641 ///
642 /// tokio::spawn(async move {
643 /// // While main has an active read lock, we acquire one too.
644 /// let n = c_lock.read().await;
645 /// assert_eq!(*n, 1);
646 /// }).await.expect("The spawned task has panicked");
647 ///
648 /// // Drop the guard when spawned task finishes.
649 /// drop(v);
650 /// }
651 /// ```
652 pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryLockError> {
653 match self.s.try_acquire(1) {
654 Ok(permit) => permit,
655 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
656 Err(TryAcquireError::Closed) => unreachable!(),
657 }
658
659 let guard = RwLockReadGuard {
660 s: &self.s,
661 data: self.c.get(),
662 marker: marker::PhantomData,
663 #[cfg(all(tokio_unstable, feature = "tracing"))]
664 resource_span: self.resource_span.clone(),
665 };
666
667 #[cfg(all(tokio_unstable, feature = "tracing"))]
668 self.resource_span.in_scope(|| {
669 tracing::trace!(
670 target: "runtime::resource::state_update",
671 current_readers = 1,
672 current_readers.op = "add",
673 )
674 });
675
676 Ok(guard)
677 }
678
679 /// Attempts to acquire this `RwLock` with shared read access.
680 ///
681 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
682 /// Otherwise, an RAII guard is returned which will release read access
683 /// when dropped.
684 ///
685 /// This method is identical to [`RwLock::try_read`], except that the
686 /// returned guard references the `RwLock` with an [`Arc`] rather than by
687 /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
688 /// call this method, and the guard will live for the `'static` lifetime,
689 /// as it keeps the `RwLock` alive by holding an `Arc`.
690 ///
691 /// [`TryLockError`]: TryLockError
692 ///
693 /// # Examples
694 ///
695 /// ```
696 /// use std::sync::Arc;
697 /// use tokio::sync::RwLock;
698 ///
699 /// #[tokio::main]
700 /// async fn main() {
701 /// let lock = Arc::new(RwLock::new(1));
702 /// let c_lock = lock.clone();
703 ///
704 /// let v = lock.try_read_owned().unwrap();
705 /// assert_eq!(*v, 1);
706 ///
707 /// tokio::spawn(async move {
708 /// // While main has an active read lock, we acquire one too.
709 /// let n = c_lock.read_owned().await;
710 /// assert_eq!(*n, 1);
711 /// }).await.expect("The spawned task has panicked");
712 ///
713 /// // Drop the guard when spawned task finishes.
714 /// drop(v);
715 /// }
716 /// ```
717 pub fn try_read_owned(self: Arc<Self>) -> Result<OwnedRwLockReadGuard<T>, TryLockError> {
718 match self.s.try_acquire(1) {
719 Ok(permit) => permit,
720 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
721 Err(TryAcquireError::Closed) => unreachable!(),
722 }
723
724 let guard = OwnedRwLockReadGuard {
725 #[cfg(all(tokio_unstable, feature = "tracing"))]
726 resource_span: self.resource_span.clone(),
727 data: self.c.get(),
728 lock: self,
729 _p: PhantomData,
730 };
731
732 #[cfg(all(tokio_unstable, feature = "tracing"))]
733 guard.resource_span.in_scope(|| {
734 tracing::trace!(
735 target: "runtime::resource::state_update",
736 current_readers = 1,
737 current_readers.op = "add",
738 )
739 });
740
741 Ok(guard)
742 }
743
744 /// Locks this `RwLock` with exclusive write access, causing the current
745 /// task to yield until the lock has been acquired.
746 ///
747 /// The calling task will yield while other writers or readers currently
748 /// have access to the lock.
749 ///
750 /// Returns an RAII guard which will drop the write access of this `RwLock`
751 /// when dropped.
752 ///
753 /// # Cancel safety
754 ///
755 /// This method uses a queue to fairly distribute locks in the order they
756 /// were requested. Cancelling a call to `write` makes you lose your place
757 /// in the queue.
758 ///
759 /// # Examples
760 ///
761 /// ```
762 /// use tokio::sync::RwLock;
763 ///
764 /// #[tokio::main]
765 /// async fn main() {
766 /// let lock = RwLock::new(1);
767 ///
768 /// let mut n = lock.write().await;
769 /// *n = 2;
770 ///}
771 /// ```
772 pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
773 let acquire_fut = async {
774 self.s.acquire(self.mr as usize).await.unwrap_or_else(|_| {
775 // The semaphore was closed. but, we never explicitly close it, and we have a
776 // handle to it through the Arc, which means that this can never happen.
777 unreachable!()
778 });
779
780 RwLockWriteGuard {
781 permits_acquired: self.mr,
782 s: &self.s,
783 data: self.c.get(),
784 marker: marker::PhantomData,
785 #[cfg(all(tokio_unstable, feature = "tracing"))]
786 resource_span: self.resource_span.clone(),
787 }
788 };
789
790 #[cfg(all(tokio_unstable, feature = "tracing"))]
791 let acquire_fut = trace::async_op(
792 move || acquire_fut,
793 self.resource_span.clone(),
794 "RwLock::write",
795 "poll",
796 false,
797 );
798
799 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
800 let guard = acquire_fut.await;
801
802 #[cfg(all(tokio_unstable, feature = "tracing"))]
803 self.resource_span.in_scope(|| {
804 tracing::trace!(
805 target: "runtime::resource::state_update",
806 write_locked = true,
807 write_locked.op = "override",
808 )
809 });
810
811 guard
812 }
813
814 /// Blockingly locks this `RwLock` with exclusive write access.
815 ///
816 /// This method is intended for use cases where you
817 /// need to use this rwlock in asynchronous code as well as in synchronous code.
818 ///
819 /// Returns an RAII guard which will drop the write access of this `RwLock` when dropped.
820 ///
821 /// # Panics
822 ///
823 /// This function panics if called within an asynchronous execution context.
824 ///
825 /// - If you find yourself in an asynchronous execution context and needing
826 /// to call some (synchronous) function which performs one of these
827 /// `blocking_` operations, then consider wrapping that call inside
828 /// [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
829 /// (or [`block_in_place()`][crate::task::block_in_place]).
830 ///
831 /// # Examples
832 ///
833 /// ```
834 /// use std::sync::Arc;
835 /// use tokio::{sync::RwLock};
836 ///
837 /// #[tokio::main]
838 /// async fn main() {
839 /// let rwlock = Arc::new(RwLock::new(1));
840 /// let read_lock = rwlock.read().await;
841 ///
842 /// let blocking_task = tokio::task::spawn_blocking({
843 /// let rwlock = Arc::clone(&rwlock);
844 /// move || {
845 /// // This shall block until the `read_lock` is released.
846 /// let mut write_lock = rwlock.blocking_write();
847 /// *write_lock = 2;
848 /// }
849 /// });
850 ///
851 /// assert_eq!(*read_lock, 1);
852 /// // Release the last outstanding read lock.
853 /// drop(read_lock);
854 ///
855 /// // Await the completion of the blocking task.
856 /// blocking_task.await.unwrap();
857 ///
858 /// // Assert uncontended.
859 /// let read_lock = rwlock.try_read().unwrap();
860 /// assert_eq!(*read_lock, 2);
861 /// }
862 /// ```
863 #[track_caller]
864 #[cfg(feature = "sync")]
865 pub fn blocking_write(&self) -> RwLockWriteGuard<'_, T> {
866 crate::future::block_on(self.write())
867 }
868
869 /// Locks this `RwLock` with exclusive write access, causing the current
870 /// task to yield until the lock has been acquired.
871 ///
872 /// The calling task will yield while other writers or readers currently
873 /// have access to the lock.
874 ///
875 /// This method is identical to [`RwLock::write`], except that the returned
876 /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
877 /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
878 /// method, and the guard will live for the `'static` lifetime, as it keeps
879 /// the `RwLock` alive by holding an `Arc`.
880 ///
881 /// Returns an RAII guard which will drop the write access of this `RwLock`
882 /// when dropped.
883 ///
884 /// # Cancel safety
885 ///
886 /// This method uses a queue to fairly distribute locks in the order they
887 /// were requested. Cancelling a call to `write_owned` makes you lose your
888 /// place in the queue.
889 ///
890 /// # Examples
891 ///
892 /// ```
893 /// use std::sync::Arc;
894 /// use tokio::sync::RwLock;
895 ///
896 /// #[tokio::main]
897 /// async fn main() {
898 /// let lock = Arc::new(RwLock::new(1));
899 ///
900 /// let mut n = lock.write_owned().await;
901 /// *n = 2;
902 ///}
903 /// ```
904 pub async fn write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T> {
905 #[cfg(all(tokio_unstable, feature = "tracing"))]
906 let resource_span = self.resource_span.clone();
907
908 let acquire_fut = async {
909 self.s.acquire(self.mr as usize).await.unwrap_or_else(|_| {
910 // The semaphore was closed. but, we never explicitly close it, and we have a
911 // handle to it through the Arc, which means that this can never happen.
912 unreachable!()
913 });
914
915 OwnedRwLockWriteGuard {
916 #[cfg(all(tokio_unstable, feature = "tracing"))]
917 resource_span: self.resource_span.clone(),
918 permits_acquired: self.mr,
919 data: self.c.get(),
920 lock: self,
921 _p: PhantomData,
922 }
923 };
924
925 #[cfg(all(tokio_unstable, feature = "tracing"))]
926 let acquire_fut = trace::async_op(
927 move || acquire_fut,
928 resource_span,
929 "RwLock::write_owned",
930 "poll",
931 false,
932 );
933
934 #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
935 let guard = acquire_fut.await;
936
937 #[cfg(all(tokio_unstable, feature = "tracing"))]
938 guard.resource_span.in_scope(|| {
939 tracing::trace!(
940 target: "runtime::resource::state_update",
941 write_locked = true,
942 write_locked.op = "override",
943 )
944 });
945
946 guard
947 }
948
949 /// Attempts to acquire this `RwLock` with exclusive write access.
950 ///
951 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
952 /// Otherwise, an RAII guard is returned which will release write access
953 /// when dropped.
954 ///
955 /// [`TryLockError`]: TryLockError
956 ///
957 /// # Examples
958 ///
959 /// ```
960 /// use tokio::sync::RwLock;
961 ///
962 /// #[tokio::main]
963 /// async fn main() {
964 /// let rw = RwLock::new(1);
965 ///
966 /// let v = rw.read().await;
967 /// assert_eq!(*v, 1);
968 ///
969 /// assert!(rw.try_write().is_err());
970 /// }
971 /// ```
972 pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError> {
973 match self.s.try_acquire(self.mr as usize) {
974 Ok(permit) => permit,
975 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
976 Err(TryAcquireError::Closed) => unreachable!(),
977 }
978
979 let guard = RwLockWriteGuard {
980 permits_acquired: self.mr,
981 s: &self.s,
982 data: self.c.get(),
983 marker: marker::PhantomData,
984 #[cfg(all(tokio_unstable, feature = "tracing"))]
985 resource_span: self.resource_span.clone(),
986 };
987
988 #[cfg(all(tokio_unstable, feature = "tracing"))]
989 self.resource_span.in_scope(|| {
990 tracing::trace!(
991 target: "runtime::resource::state_update",
992 write_locked = true,
993 write_locked.op = "override",
994 )
995 });
996
997 Ok(guard)
998 }
999
1000 /// Attempts to acquire this `RwLock` with exclusive write access.
1001 ///
1002 /// If the access couldn't be acquired immediately, returns [`TryLockError`].
1003 /// Otherwise, an RAII guard is returned which will release write access
1004 /// when dropped.
1005 ///
1006 /// This method is identical to [`RwLock::try_write`], except that the
1007 /// returned guard references the `RwLock` with an [`Arc`] rather than by
1008 /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
1009 /// call this method, and the guard will live for the `'static` lifetime,
1010 /// as it keeps the `RwLock` alive by holding an `Arc`.
1011 ///
1012 /// [`TryLockError`]: TryLockError
1013 ///
1014 /// # Examples
1015 ///
1016 /// ```
1017 /// use std::sync::Arc;
1018 /// use tokio::sync::RwLock;
1019 ///
1020 /// #[tokio::main]
1021 /// async fn main() {
1022 /// let rw = Arc::new(RwLock::new(1));
1023 ///
1024 /// let v = Arc::clone(&rw).read_owned().await;
1025 /// assert_eq!(*v, 1);
1026 ///
1027 /// assert!(rw.try_write_owned().is_err());
1028 /// }
1029 /// ```
1030 pub fn try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError> {
1031 match self.s.try_acquire(self.mr as usize) {
1032 Ok(permit) => permit,
1033 Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
1034 Err(TryAcquireError::Closed) => unreachable!(),
1035 }
1036
1037 let guard = OwnedRwLockWriteGuard {
1038 #[cfg(all(tokio_unstable, feature = "tracing"))]
1039 resource_span: self.resource_span.clone(),
1040 permits_acquired: self.mr,
1041 data: self.c.get(),
1042 lock: self,
1043 _p: PhantomData,
1044 };
1045
1046 #[cfg(all(tokio_unstable, feature = "tracing"))]
1047 guard.resource_span.in_scope(|| {
1048 tracing::trace!(
1049 target: "runtime::resource::state_update",
1050 write_locked = true,
1051 write_locked.op = "override",
1052 )
1053 });
1054
1055 Ok(guard)
1056 }
1057
1058 /// Returns a mutable reference to the underlying data.
1059 ///
1060 /// Since this call borrows the `RwLock` mutably, no actual locking needs to
1061 /// take place -- the mutable borrow statically guarantees no locks exist.
1062 ///
1063 /// # Examples
1064 ///
1065 /// ```
1066 /// use tokio::sync::RwLock;
1067 ///
1068 /// fn main() {
1069 /// let mut lock = RwLock::new(1);
1070 ///
1071 /// let n = lock.get_mut();
1072 /// *n = 2;
1073 /// }
1074 /// ```
1075 pub fn get_mut(&mut self) -> &mut T {
1076 unsafe {
1077 // Safety: This is https://github.com/rust-lang/rust/pull/76936
1078 &mut *self.c.get()
1079 }
1080 }
1081
1082 /// Consumes the lock, returning the underlying data.
1083 pub fn into_inner(self) -> T
1084 where
1085 T: Sized,
1086 {
1087 self.c.into_inner()
1088 }
1089}
1090
1091impl<T> From<T> for RwLock<T> {
1092 fn from(s: T) -> Self {
1093 Self::new(s)
1094 }
1095}
1096
1097impl<T: ?Sized> Default for RwLock<T>
1098where
1099 T: Default,
1100{
1101 fn default() -> Self {
1102 Self::new(T::default())
1103 }
1104}
1105
1106impl<T: ?Sized> std::fmt::Debug for RwLock<T>
1107where
1108 T: std::fmt::Debug,
1109{
1110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1111 let mut d = f.debug_struct("RwLock");
1112 match self.try_read() {
1113 Ok(inner) => d.field("data", &&*inner),
1114 Err(_) => d.field("data", &format_args!("<locked>")),
1115 };
1116 d.finish()
1117 }
1118}