tokio/sync/watch.rs
1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A multi-producer, multi-consumer channel that only retains the *last* sent
4//! value.
5//!
6//! This channel is useful for watching for changes to a value from multiple
7//! points in the code base, for example, changes to configuration values.
8//!
9//! # Usage
10//!
11//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12//! and consumer halves of the channel. The channel is created with an initial
13//! value.
14//!
15//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
16//!
17//! To access the **current** value stored in the channel and mark it as *seen*
18//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
19//!
20//! To access the current value **without** marking it as *seen*, use
21//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
22//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
23//!
24//! For more information on when to use these methods, see
25//! [here](#borrow_and_update-versus-borrow).
26//!
27//! ## Change notifications
28//!
29//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
30//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
31//!
32//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
33//! `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
34//! * If the current value is *unseen* when calling [`changed`], then
35//! [`changed`] will return immediately. If the current value is *seen*, then
36//! it will sleep until either a new message is sent via the [`Sender`] half,
37//! or the [`Sender`] is dropped.
38//! * On completion, the [`changed`] method marks the new value as *seen*.
39//! * At creation, the initial value is considered *seen*. In other words,
40//! [`Receiver::changed()`] will not return until a subsequent value is sent.
41//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
42//! The current value at the time the [`Receiver`] is created is considered
43//! *seen*.
44//!
45//! ## `borrow_and_update` versus `borrow`
46//!
47//! If the receiver intends to await notifications from [`changed`] in a loop,
48//! [`Receiver::borrow_and_update()`] should be preferred over
49//! [`Receiver::borrow()`]. This avoids a potential race where a new value is
50//! sent between [`changed`] being ready and the value being read. (If
51//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
52//!
53//! If the receiver is only interested in the current value, and does not intend
54//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
55//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
56//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
57//! self`.
58//!
59//! # Examples
60//!
61//! The following example prints `hello! world! `.
62//!
63//! ```
64//! use tokio::sync::watch;
65//! use tokio::time::{Duration, sleep};
66//!
67//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
68//! let (tx, mut rx) = watch::channel("hello");
69//!
70//! tokio::spawn(async move {
71//! // Use the equivalent of a "do-while" loop so the initial value is
72//! // processed before awaiting the `changed()` future.
73//! loop {
74//! println!("{}! ", *rx.borrow_and_update());
75//! if rx.changed().await.is_err() {
76//! break;
77//! }
78//! }
79//! });
80//!
81//! sleep(Duration::from_millis(100)).await;
82//! tx.send("world")?;
83//! # Ok(())
84//! # }
85//! ```
86//!
87//! # Closing
88//!
89//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
90//! when all [`Receiver`] handles have been dropped. This indicates that there
91//! is no further interest in the values being produced and work can be stopped.
92//!
93//! The value in the channel will not be dropped until the sender and all
94//! receivers have been dropped.
95//!
96//! # Thread safety
97//!
98//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
99//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
100//! handles may be moved to separate threads and also used concurrently.
101//!
102//! [`Sender`]: crate::sync::watch::Sender
103//! [`Receiver`]: crate::sync::watch::Receiver
104//! [`changed`]: crate::sync::watch::Receiver::changed
105//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
106//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
107//! [`Receiver::borrow_and_update()`]:
108//! crate::sync::watch::Receiver::borrow_and_update
109//! [`channel`]: crate::sync::watch::channel
110//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
111//! [`Sender::closed`]: crate::sync::watch::Sender::closed
112//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
113
114use crate::runtime::coop::cooperative;
115use crate::sync::notify::Notify;
116
117use crate::loom::sync::atomic::AtomicUsize;
118use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
119use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
120use std::fmt;
121use std::mem;
122use std::ops;
123use std::panic;
124
125/// Receives values from the associated [`Sender`](struct@Sender).
126///
127/// Instances are created by the [`channel`](fn@channel) function.
128///
129/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
130/// wrapper.
131///
132/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
133#[derive(Debug)]
134pub struct Receiver<T> {
135 /// Pointer to the shared state
136 shared: Arc<Shared<T>>,
137
138 /// Last observed version
139 version: Version,
140}
141
142/// Sends values to the associated [`Receiver`](struct@Receiver).
143///
144/// Instances are created by the [`channel`](fn@channel) function.
145#[derive(Debug)]
146pub struct Sender<T> {
147 shared: Arc<Shared<T>>,
148}
149
150impl<T> Clone for Sender<T> {
151 fn clone(&self) -> Self {
152 self.shared.ref_count_tx.fetch_add(1, Relaxed);
153
154 Self {
155 shared: self.shared.clone(),
156 }
157 }
158}
159
160impl<T: Default> Default for Sender<T> {
161 fn default() -> Self {
162 Self::new(T::default())
163 }
164}
165
166/// Returns a reference to the inner value.
167///
168/// Outstanding borrows hold a read lock on the inner value. This means that
169/// long-lived borrows could cause the producer half to block. It is recommended
170/// to keep the borrow as short-lived as possible. Additionally, if you are
171/// running in an environment that allows `!Send` futures, you must ensure that
172/// the returned `Ref` type is never held alive across an `.await` point,
173/// otherwise, it can lead to a deadlock.
174///
175/// The priority policy of the lock is dependent on the underlying lock
176/// implementation, and this type does not guarantee that any particular policy
177/// will be used. In particular, a producer which is waiting to acquire the lock
178/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
179///
180/// <details><summary>Potential deadlock example</summary>
181///
182/// ```text
183/// // Task 1 (on thread A) | // Task 2 (on thread B)
184/// let _ref1 = rx.borrow(); |
185/// | // will block
186/// | let _ = tx.send(());
187/// // may deadlock |
188/// let _ref2 = rx.borrow(); |
189/// ```
190/// </details>
191#[derive(Debug)]
192pub struct Ref<'a, T> {
193 inner: RwLockReadGuard<'a, T>,
194 has_changed: bool,
195}
196
197impl<'a, T> Ref<'a, T> {
198 /// Indicates if the borrowed value is considered as _changed_ since the last
199 /// time it has been marked as seen.
200 ///
201 /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
202 ///
203 /// When borrowed from the [`Sender`] this function will always return `false`.
204 ///
205 /// # Examples
206 ///
207 /// ```
208 /// use tokio::sync::watch;
209 ///
210 /// #[tokio::main]
211 /// async fn main() {
212 /// let (tx, mut rx) = watch::channel("hello");
213 ///
214 /// tx.send("goodbye").unwrap();
215 /// // The sender does never consider the value as changed.
216 /// assert!(!tx.borrow().has_changed());
217 ///
218 /// // Drop the sender immediately, just for testing purposes.
219 /// drop(tx);
220 ///
221 /// // Even if the sender has already been dropped...
222 /// assert!(rx.has_changed().is_err());
223 /// // ...the modified value is still readable and detected as changed.
224 /// assert_eq!(*rx.borrow(), "goodbye");
225 /// assert!(rx.borrow().has_changed());
226 ///
227 /// // Read the changed value and mark it as seen.
228 /// {
229 /// let received = rx.borrow_and_update();
230 /// assert_eq!(*received, "goodbye");
231 /// assert!(received.has_changed());
232 /// // Release the read lock when leaving this scope.
233 /// }
234 ///
235 /// // Now the value has already been marked as seen and could
236 /// // never be modified again (after the sender has been dropped).
237 /// assert!(!rx.borrow().has_changed());
238 /// }
239 /// ```
240 pub fn has_changed(&self) -> bool {
241 self.has_changed
242 }
243}
244
245struct Shared<T> {
246 /// The most recent value.
247 value: RwLock<T>,
248
249 /// The current version.
250 ///
251 /// The lowest bit represents a "closed" state. The rest of the bits
252 /// represent the current version.
253 state: AtomicState,
254
255 /// Tracks the number of `Receiver` instances.
256 ref_count_rx: AtomicUsize,
257
258 /// Tracks the number of `Sender` instances.
259 ref_count_tx: AtomicUsize,
260
261 /// Notifies waiting receivers that the value changed.
262 notify_rx: big_notify::BigNotify,
263
264 /// Notifies any task listening for `Receiver` dropped events.
265 notify_tx: Notify,
266}
267
268impl<T: fmt::Debug> fmt::Debug for Shared<T> {
269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270 let state = self.state.load();
271 f.debug_struct("Shared")
272 .field("value", &self.value)
273 .field("version", &state.version())
274 .field("is_closed", &state.is_closed())
275 .field("ref_count_rx", &self.ref_count_rx)
276 .finish()
277 }
278}
279
280pub mod error {
281 //! Watch error types.
282
283 use std::error::Error;
284 use std::fmt;
285
286 /// Error produced when sending a value fails.
287 #[derive(PartialEq, Eq, Clone, Copy)]
288 pub struct SendError<T>(pub T);
289
290 // ===== impl SendError =====
291
292 impl<T> fmt::Debug for SendError<T> {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 f.debug_struct("SendError").finish_non_exhaustive()
295 }
296 }
297
298 impl<T> fmt::Display for SendError<T> {
299 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
300 write!(fmt, "channel closed")
301 }
302 }
303
304 impl<T> Error for SendError<T> {}
305
306 /// Error produced when receiving a change notification.
307 #[derive(Debug, Clone)]
308 pub struct RecvError(pub(super) ());
309
310 // ===== impl RecvError =====
311
312 impl fmt::Display for RecvError {
313 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
314 write!(fmt, "channel closed")
315 }
316 }
317
318 impl Error for RecvError {}
319}
320
321mod big_notify {
322 use super::Notify;
323 use crate::sync::notify::Notified;
324
325 // To avoid contention on the lock inside the `Notify`, we store multiple
326 // copies of it. Then, we use either circular access or randomness to spread
327 // out threads over different `Notify` objects.
328 //
329 // Some simple benchmarks show that randomness performs slightly better than
330 // circular access (probably due to contention on `next`), so we prefer to
331 // use randomness when Tokio is compiled with a random number generator.
332 //
333 // When the random number generator is not available, we fall back to
334 // circular access.
335
336 pub(super) struct BigNotify {
337 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
338 next: std::sync::atomic::AtomicUsize,
339 inner: [Notify; 8],
340 }
341
342 impl BigNotify {
343 pub(super) fn new() -> Self {
344 Self {
345 #[cfg(not(all(
346 not(loom),
347 feature = "sync",
348 any(feature = "rt", feature = "macros")
349 )))]
350 next: std::sync::atomic::AtomicUsize::new(0),
351 inner: Default::default(),
352 }
353 }
354
355 pub(super) fn notify_waiters(&self) {
356 for notify in &self.inner {
357 notify.notify_waiters();
358 }
359 }
360
361 /// This function implements the case where randomness is not available.
362 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
363 pub(super) fn notified(&self) -> Notified<'_> {
364 let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
365 self.inner[i].notified()
366 }
367
368 /// This function implements the case where randomness is available.
369 #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
370 pub(super) fn notified(&self) -> Notified<'_> {
371 let i = crate::runtime::context::thread_rng_n(8) as usize;
372 self.inner[i].notified()
373 }
374 }
375}
376
377use self::state::{AtomicState, Version};
378mod state {
379 use crate::loom::sync::atomic::AtomicUsize;
380 use crate::loom::sync::atomic::Ordering;
381
382 const CLOSED_BIT: usize = 1;
383
384 // Using 2 as the step size preserves the `CLOSED_BIT`.
385 const STEP_SIZE: usize = 2;
386
387 /// The version part of the state. The lowest bit is always zero.
388 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
389 pub(super) struct Version(usize);
390
391 /// Snapshot of the state. The first bit is used as the CLOSED bit.
392 /// The remaining bits are used as the version.
393 ///
394 /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
395 /// receivers does not set it.
396 #[derive(Copy, Clone, Debug)]
397 pub(super) struct StateSnapshot(usize);
398
399 /// The state stored in an atomic integer.
400 ///
401 /// The `Sender` uses `Release` ordering for storing a new state
402 /// and the `Receiver`s use `Acquire` ordering for loading the
403 /// current state. This ensures that written values are seen by
404 /// the `Receiver`s for a proper handover.
405 #[derive(Debug)]
406 pub(super) struct AtomicState(AtomicUsize);
407
408 impl Version {
409 /// Decrements the version.
410 pub(super) fn decrement(&mut self) {
411 // Using a wrapping decrement here is required to ensure that the
412 // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
413 // which wraps on overflow.
414 self.0 = self.0.wrapping_sub(STEP_SIZE);
415 }
416
417 pub(super) const INITIAL: Self = Version(0);
418 }
419
420 impl StateSnapshot {
421 /// Extract the version from the state.
422 pub(super) fn version(self) -> Version {
423 Version(self.0 & !CLOSED_BIT)
424 }
425
426 /// Is the closed bit set?
427 pub(super) fn is_closed(self) -> bool {
428 (self.0 & CLOSED_BIT) == CLOSED_BIT
429 }
430 }
431
432 impl AtomicState {
433 /// Create a new `AtomicState` that is not closed and which has the
434 /// version set to `Version::INITIAL`.
435 pub(super) fn new() -> Self {
436 AtomicState(AtomicUsize::new(Version::INITIAL.0))
437 }
438
439 /// Load the current value of the state.
440 ///
441 /// Only used by the receiver and for debugging purposes.
442 ///
443 /// The receiver side (read-only) uses `Acquire` ordering for a proper handover
444 /// of the shared value with the sender side (single writer). The state is always
445 /// updated after modifying and before releasing the (exclusive) lock on the
446 /// shared value.
447 pub(super) fn load(&self) -> StateSnapshot {
448 StateSnapshot(self.0.load(Ordering::Acquire))
449 }
450
451 /// Increment the version counter.
452 pub(super) fn increment_version_while_locked(&self) {
453 // Use `Release` ordering to ensure that the shared value
454 // has been written before updating the version. The shared
455 // value is still protected by an exclusive lock during this
456 // method.
457 self.0.fetch_add(STEP_SIZE, Ordering::Release);
458 }
459
460 /// Set the closed bit in the state.
461 pub(super) fn set_closed(&self) {
462 self.0.fetch_or(CLOSED_BIT, Ordering::Release);
463 }
464 }
465}
466
467/// Creates a new watch channel, returning the "send" and "receive" handles.
468///
469/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
470/// Only the last value sent is made available to the [`Receiver`] half. All
471/// intermediate values are dropped.
472///
473/// # Examples
474///
475/// The following example prints `hello! world! `.
476///
477/// ```
478/// use tokio::sync::watch;
479/// use tokio::time::{Duration, sleep};
480///
481/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
482/// let (tx, mut rx) = watch::channel("hello");
483///
484/// tokio::spawn(async move {
485/// // Use the equivalent of a "do-while" loop so the initial value is
486/// // processed before awaiting the `changed()` future.
487/// loop {
488/// println!("{}! ", *rx.borrow_and_update());
489/// if rx.changed().await.is_err() {
490/// break;
491/// }
492/// }
493/// });
494///
495/// sleep(Duration::from_millis(100)).await;
496/// tx.send("world")?;
497/// # Ok(())
498/// # }
499/// ```
500///
501/// [`Sender`]: struct@Sender
502/// [`Receiver`]: struct@Receiver
503pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
504 let shared = Arc::new(Shared {
505 value: RwLock::new(init),
506 state: AtomicState::new(),
507 ref_count_rx: AtomicUsize::new(1),
508 ref_count_tx: AtomicUsize::new(1),
509 notify_rx: big_notify::BigNotify::new(),
510 notify_tx: Notify::new(),
511 });
512
513 let tx = Sender {
514 shared: shared.clone(),
515 };
516
517 let rx = Receiver {
518 shared,
519 version: Version::INITIAL,
520 };
521
522 (tx, rx)
523}
524
525impl<T> Receiver<T> {
526 fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
527 // No synchronization necessary as this is only used as a counter and
528 // not memory access.
529 shared.ref_count_rx.fetch_add(1, Relaxed);
530
531 Self { shared, version }
532 }
533
534 /// Returns a reference to the most recently sent value.
535 ///
536 /// This method does not mark the returned value as seen, so future calls to
537 /// [`changed`] may return immediately even if you have already seen the
538 /// value with a call to `borrow`.
539 ///
540 /// Outstanding borrows hold a read lock on the inner value. This means that
541 /// long-lived borrows could cause the producer half to block. It is recommended
542 /// to keep the borrow as short-lived as possible. Additionally, if you are
543 /// running in an environment that allows `!Send` futures, you must ensure that
544 /// the returned `Ref` type is never held alive across an `.await` point,
545 /// otherwise, it can lead to a deadlock.
546 ///
547 /// The priority policy of the lock is dependent on the underlying lock
548 /// implementation, and this type does not guarantee that any particular policy
549 /// will be used. In particular, a producer which is waiting to acquire the lock
550 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
551 ///
552 /// <details><summary>Potential deadlock example</summary>
553 ///
554 /// ```text
555 /// // Task 1 (on thread A) | // Task 2 (on thread B)
556 /// let _ref1 = rx.borrow(); |
557 /// | // will block
558 /// | let _ = tx.send(());
559 /// // may deadlock |
560 /// let _ref2 = rx.borrow(); |
561 /// ```
562 /// </details>
563 ///
564 /// For more information on when to use this method versus
565 /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
566 ///
567 /// [`changed`]: Receiver::changed
568 /// [`borrow_and_update`]: Receiver::borrow_and_update
569 ///
570 /// # Examples
571 ///
572 /// ```
573 /// use tokio::sync::watch;
574 ///
575 /// let (_, rx) = watch::channel("hello");
576 /// assert_eq!(*rx.borrow(), "hello");
577 /// ```
578 pub fn borrow(&self) -> Ref<'_, T> {
579 let inner = self.shared.value.read();
580
581 // After obtaining a read-lock no concurrent writes could occur
582 // and the loaded version matches that of the borrowed reference.
583 let new_version = self.shared.state.load().version();
584 let has_changed = self.version != new_version;
585
586 Ref { inner, has_changed }
587 }
588
589 /// Returns a reference to the most recently sent value and marks that value
590 /// as seen.
591 ///
592 /// This method marks the current value as seen. Subsequent calls to [`changed`]
593 /// will not return immediately until the [`Sender`] has modified the shared
594 /// value again.
595 ///
596 /// Outstanding borrows hold a read lock on the inner value. This means that
597 /// long-lived borrows could cause the producer half to block. It is recommended
598 /// to keep the borrow as short-lived as possible. Additionally, if you are
599 /// running in an environment that allows `!Send` futures, you must ensure that
600 /// the returned `Ref` type is never held alive across an `.await` point,
601 /// otherwise, it can lead to a deadlock.
602 ///
603 /// The priority policy of the lock is dependent on the underlying lock
604 /// implementation, and this type does not guarantee that any particular policy
605 /// will be used. In particular, a producer which is waiting to acquire the lock
606 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
607 ///
608 /// <details><summary>Potential deadlock example</summary>
609 ///
610 /// ```text
611 /// // Task 1 (on thread A) | // Task 2 (on thread B)
612 /// let _ref1 = rx1.borrow_and_update(); |
613 /// | // will block
614 /// | let _ = tx.send(());
615 /// // may deadlock |
616 /// let _ref2 = rx2.borrow_and_update(); |
617 /// ```
618 /// </details>
619 ///
620 /// For more information on when to use this method versus [`borrow`], see
621 /// [here](self#borrow_and_update-versus-borrow).
622 ///
623 /// [`changed`]: Receiver::changed
624 /// [`borrow`]: Receiver::borrow
625 pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
626 let inner = self.shared.value.read();
627
628 // After obtaining a read-lock no concurrent writes could occur
629 // and the loaded version matches that of the borrowed reference.
630 let new_version = self.shared.state.load().version();
631 let has_changed = self.version != new_version;
632
633 // Mark the shared value as seen by updating the version
634 self.version = new_version;
635
636 Ref { inner, has_changed }
637 }
638
639 /// Checks if this channel contains a message that this receiver has not yet
640 /// seen. The new value is not marked as seen.
641 ///
642 /// Although this method is called `has_changed`, it does not check new
643 /// messages for equality, so this call will return true even if the new
644 /// message is equal to the old message.
645 ///
646 /// Returns an error if the channel has been closed.
647 /// # Examples
648 ///
649 /// ```
650 /// use tokio::sync::watch;
651 ///
652 /// #[tokio::main]
653 /// async fn main() {
654 /// let (tx, mut rx) = watch::channel("hello");
655 ///
656 /// tx.send("goodbye").unwrap();
657 ///
658 /// assert!(rx.has_changed().unwrap());
659 /// assert_eq!(*rx.borrow_and_update(), "goodbye");
660 ///
661 /// // The value has been marked as seen
662 /// assert!(!rx.has_changed().unwrap());
663 ///
664 /// drop(tx);
665 /// // The `tx` handle has been dropped
666 /// assert!(rx.has_changed().is_err());
667 /// }
668 /// ```
669 pub fn has_changed(&self) -> Result<bool, error::RecvError> {
670 // Load the version from the state
671 let state = self.shared.state.load();
672 if state.is_closed() {
673 // The sender has dropped.
674 return Err(error::RecvError(()));
675 }
676 let new_version = state.version();
677
678 Ok(self.version != new_version)
679 }
680
681 /// Marks the state as changed.
682 ///
683 /// After invoking this method [`has_changed()`](Self::has_changed)
684 /// returns `true` and [`changed()`](Self::changed) returns
685 /// immediately, regardless of whether a new value has been sent.
686 ///
687 /// This is useful for triggering an initial change notification after
688 /// subscribing to synchronize new receivers.
689 pub fn mark_changed(&mut self) {
690 self.version.decrement();
691 }
692
693 /// Marks the state as unchanged.
694 ///
695 /// The current value will be considered seen by the receiver.
696 ///
697 /// This is useful if you are not interested in the current value
698 /// visible in the receiver.
699 pub fn mark_unchanged(&mut self) {
700 let current_version = self.shared.state.load().version();
701 self.version = current_version;
702 }
703
704 /// Waits for a change notification, then marks the newest value as seen.
705 ///
706 /// If the newest value in the channel has not yet been marked seen when
707 /// this method is called, the method marks that value seen and returns
708 /// immediately. If the newest value has already been marked seen, then the
709 /// method sleeps until a new message is sent by the [`Sender`] connected to
710 /// this `Receiver`, or until the [`Sender`] is dropped.
711 ///
712 /// This method returns an error if and only if the [`Sender`] is dropped.
713 ///
714 /// For more information, see
715 /// [*Change notifications*](self#change-notifications) in the module-level documentation.
716 ///
717 /// # Cancel safety
718 ///
719 /// This method is cancel safe. If you use it as the event in a
720 /// [`tokio::select!`](crate::select) statement and some other branch
721 /// completes first, then it is guaranteed that no values have been marked
722 /// seen by this call to `changed`.
723 ///
724 /// [`Sender`]: struct@Sender
725 ///
726 /// # Examples
727 ///
728 /// ```
729 /// use tokio::sync::watch;
730 ///
731 /// #[tokio::main]
732 /// async fn main() {
733 /// let (tx, mut rx) = watch::channel("hello");
734 ///
735 /// tokio::spawn(async move {
736 /// tx.send("goodbye").unwrap();
737 /// });
738 ///
739 /// assert!(rx.changed().await.is_ok());
740 /// assert_eq!(*rx.borrow_and_update(), "goodbye");
741 ///
742 /// // The `tx` handle has been dropped
743 /// assert!(rx.changed().await.is_err());
744 /// }
745 /// ```
746 pub async fn changed(&mut self) -> Result<(), error::RecvError> {
747 cooperative(changed_impl(&self.shared, &mut self.version)).await
748 }
749
750 /// Waits for a value that satisfies the provided condition.
751 ///
752 /// This method will call the provided closure whenever something is sent on
753 /// the channel. Once the closure returns `true`, this method will return a
754 /// reference to the value that was passed to the closure.
755 ///
756 /// Before `wait_for` starts waiting for changes, it will call the closure
757 /// on the current value. If the closure returns `true` when given the
758 /// current value, then `wait_for` will immediately return a reference to
759 /// the current value. This is the case even if the current value is already
760 /// considered seen.
761 ///
762 /// The watch channel only keeps track of the most recent value, so if
763 /// several messages are sent faster than `wait_for` is able to call the
764 /// closure, then it may skip some updates. Whenever the closure is called,
765 /// it will be called with the most recent value.
766 ///
767 /// When this function returns, the value that was passed to the closure
768 /// when it returned `true` will be considered seen.
769 ///
770 /// If the channel is closed, then `wait_for` will return a `RecvError`.
771 /// Once this happens, no more messages can ever be sent on the channel.
772 /// When an error is returned, it is guaranteed that the closure has been
773 /// called on the last value, and that it returned `false` for that value.
774 /// (If the closure returned `true`, then the last value would have been
775 /// returned instead of the error.)
776 ///
777 /// Like the `borrow` method, the returned borrow holds a read lock on the
778 /// inner value. This means that long-lived borrows could cause the producer
779 /// half to block. It is recommended to keep the borrow as short-lived as
780 /// possible. See the documentation of `borrow` for more information on
781 /// this.
782 ///
783 /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
784 ///
785 /// # Examples
786 ///
787 /// ```
788 /// use tokio::sync::watch;
789 ///
790 /// #[tokio::main]
791 ///
792 /// async fn main() {
793 /// let (tx, _rx) = watch::channel("hello");
794 ///
795 /// tx.send("goodbye").unwrap();
796 ///
797 /// // here we subscribe to a second receiver
798 /// // now in case of using `changed` we would have
799 /// // to first check the current value and then wait
800 /// // for changes or else `changed` would hang.
801 /// let mut rx2 = tx.subscribe();
802 ///
803 /// // in place of changed we have use `wait_for`
804 /// // which would automatically check the current value
805 /// // and wait for changes until the closure returns true.
806 /// assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok());
807 /// assert_eq!(*rx2.borrow(), "goodbye");
808 /// }
809 /// ```
810 pub async fn wait_for(
811 &mut self,
812 f: impl FnMut(&T) -> bool,
813 ) -> Result<Ref<'_, T>, error::RecvError> {
814 cooperative(self.wait_for_inner(f)).await
815 }
816
817 async fn wait_for_inner(
818 &mut self,
819 mut f: impl FnMut(&T) -> bool,
820 ) -> Result<Ref<'_, T>, error::RecvError> {
821 let mut closed = false;
822 loop {
823 {
824 let inner = self.shared.value.read();
825
826 let new_version = self.shared.state.load().version();
827 let has_changed = self.version != new_version;
828 self.version = new_version;
829
830 if !closed || has_changed {
831 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
832 match result {
833 Ok(true) => {
834 return Ok(Ref { inner, has_changed });
835 }
836 Ok(false) => {
837 // Skip the value.
838 }
839 Err(panicked) => {
840 // Drop the read-lock to avoid poisoning it.
841 drop(inner);
842 // Forward the panic to the caller.
843 panic::resume_unwind(panicked);
844 // Unreachable
845 }
846 };
847 }
848 }
849
850 if closed {
851 return Err(error::RecvError(()));
852 }
853
854 // Wait for the value to change.
855 closed = changed_impl(&self.shared, &mut self.version).await.is_err();
856 }
857 }
858
859 /// Returns `true` if receivers belong to the same channel.
860 ///
861 /// # Examples
862 ///
863 /// ```
864 /// let (tx, rx) = tokio::sync::watch::channel(true);
865 /// let rx2 = rx.clone();
866 /// assert!(rx.same_channel(&rx2));
867 ///
868 /// let (tx3, rx3) = tokio::sync::watch::channel(true);
869 /// assert!(!rx3.same_channel(&rx2));
870 /// ```
871 pub fn same_channel(&self, other: &Self) -> bool {
872 Arc::ptr_eq(&self.shared, &other.shared)
873 }
874
875 cfg_process_driver! {
876 pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
877 maybe_changed(&self.shared, &mut self.version)
878 }
879 }
880}
881
882fn maybe_changed<T>(
883 shared: &Shared<T>,
884 version: &mut Version,
885) -> Option<Result<(), error::RecvError>> {
886 // Load the version from the state
887 let state = shared.state.load();
888 let new_version = state.version();
889
890 if *version != new_version {
891 // Observe the new version and return
892 *version = new_version;
893 return Some(Ok(()));
894 }
895
896 if state.is_closed() {
897 // The sender has been dropped.
898 return Some(Err(error::RecvError(())));
899 }
900
901 None
902}
903
904async fn changed_impl<T>(
905 shared: &Shared<T>,
906 version: &mut Version,
907) -> Result<(), error::RecvError> {
908 crate::trace::async_trace_leaf().await;
909
910 loop {
911 // In order to avoid a race condition, we first request a notification,
912 // **then** check the current value's version. If a new version exists,
913 // the notification request is dropped.
914 let notified = shared.notify_rx.notified();
915
916 if let Some(ret) = maybe_changed(shared, version) {
917 return ret;
918 }
919
920 notified.await;
921 // loop around again in case the wake-up was spurious
922 }
923}
924
925impl<T> Clone for Receiver<T> {
926 fn clone(&self) -> Self {
927 let version = self.version;
928 let shared = self.shared.clone();
929
930 Self::from_shared(version, shared)
931 }
932}
933
934impl<T> Drop for Receiver<T> {
935 fn drop(&mut self) {
936 // No synchronization necessary as this is only used as a counter and
937 // not memory access.
938 if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
939 // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
940 self.shared.notify_tx.notify_waiters();
941 }
942 }
943}
944
945impl<T> Sender<T> {
946 /// Creates the sending-half of the [`watch`] channel.
947 ///
948 /// See documentation of [`watch::channel`] for errors when calling this function.
949 /// Beware that attempting to send a value when there are no receivers will
950 /// return an error.
951 ///
952 /// [`watch`]: crate::sync::watch
953 /// [`watch::channel`]: crate::sync::watch
954 ///
955 /// # Examples
956 /// ```
957 /// let sender = tokio::sync::watch::Sender::new(0u8);
958 /// assert!(sender.send(3).is_err());
959 /// let _rec = sender.subscribe();
960 /// assert!(sender.send(4).is_ok());
961 /// ```
962 pub fn new(init: T) -> Self {
963 let (tx, _) = channel(init);
964 tx
965 }
966
967 /// Sends a new value via the channel, notifying all receivers.
968 ///
969 /// This method fails if the channel is closed, which is the case when
970 /// every receiver has been dropped. It is possible to reopen the channel
971 /// using the [`subscribe`] method. However, when `send` fails, the value
972 /// isn't made available for future receivers (but returned with the
973 /// [`SendError`]).
974 ///
975 /// To always make a new value available for future receivers, even if no
976 /// receiver currently exists, one of the other send methods
977 /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
978 /// used instead.
979 ///
980 /// [`subscribe`]: Sender::subscribe
981 /// [`SendError`]: error::SendError
982 /// [`send_if_modified`]: Sender::send_if_modified
983 /// [`send_modify`]: Sender::send_modify
984 /// [`send_replace`]: Sender::send_replace
985 pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
986 // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
987 if 0 == self.receiver_count() {
988 return Err(error::SendError(value));
989 }
990
991 self.send_replace(value);
992 Ok(())
993 }
994
995 /// Modifies the watched value **unconditionally** in-place,
996 /// notifying all receivers.
997 ///
998 /// This can be useful for modifying the watched value, without
999 /// having to allocate a new instance. Additionally, this
1000 /// method permits sending values even when there are no receivers.
1001 ///
1002 /// Prefer to use the more versatile function [`Self::send_if_modified()`]
1003 /// if the value is only modified conditionally during the mutable borrow
1004 /// to prevent unneeded change notifications for unmodified values.
1005 ///
1006 /// # Panics
1007 ///
1008 /// This function panics when the invocation of the `modify` closure panics.
1009 /// No receivers are notified when panicking. All changes of the watched
1010 /// value applied by the closure before panicking will be visible in
1011 /// subsequent calls to `borrow`.
1012 ///
1013 /// # Examples
1014 ///
1015 /// ```
1016 /// use tokio::sync::watch;
1017 ///
1018 /// struct State {
1019 /// counter: usize,
1020 /// }
1021 /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
1022 /// state_tx.send_modify(|state| state.counter += 1);
1023 /// assert_eq!(state_rx.borrow().counter, 1);
1024 /// ```
1025 pub fn send_modify<F>(&self, modify: F)
1026 where
1027 F: FnOnce(&mut T),
1028 {
1029 self.send_if_modified(|value| {
1030 modify(value);
1031 true
1032 });
1033 }
1034
1035 /// Modifies the watched value **conditionally** in-place,
1036 /// notifying all receivers only if modified.
1037 ///
1038 /// This can be useful for modifying the watched value, without
1039 /// having to allocate a new instance. Additionally, this
1040 /// method permits sending values even when there are no receivers.
1041 ///
1042 /// The `modify` closure must return `true` if the value has actually
1043 /// been modified during the mutable borrow. It should only return `false`
1044 /// if the value is guaranteed to be unmodified despite the mutable
1045 /// borrow.
1046 ///
1047 /// Receivers are only notified if the closure returned `true`. If the
1048 /// closure has modified the value but returned `false` this results
1049 /// in a *silent modification*, i.e. the modified value will be visible
1050 /// in subsequent calls to `borrow`, but receivers will not receive
1051 /// a change notification.
1052 ///
1053 /// Returns the result of the closure, i.e. `true` if the value has
1054 /// been modified and `false` otherwise.
1055 ///
1056 /// # Panics
1057 ///
1058 /// This function panics when the invocation of the `modify` closure panics.
1059 /// No receivers are notified when panicking. All changes of the watched
1060 /// value applied by the closure before panicking will be visible in
1061 /// subsequent calls to `borrow`.
1062 ///
1063 /// # Examples
1064 ///
1065 /// ```
1066 /// use tokio::sync::watch;
1067 ///
1068 /// struct State {
1069 /// counter: usize,
1070 /// }
1071 /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
1072 /// let inc_counter_if_odd = |state: &mut State| {
1073 /// if state.counter % 2 == 1 {
1074 /// state.counter += 1;
1075 /// return true;
1076 /// }
1077 /// false
1078 /// };
1079 ///
1080 /// assert_eq!(state_rx.borrow().counter, 1);
1081 ///
1082 /// assert!(!state_rx.has_changed().unwrap());
1083 /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
1084 /// assert!(state_rx.has_changed().unwrap());
1085 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1086 ///
1087 /// assert!(!state_rx.has_changed().unwrap());
1088 /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
1089 /// assert!(!state_rx.has_changed().unwrap());
1090 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1091 /// ```
1092 pub fn send_if_modified<F>(&self, modify: F) -> bool
1093 where
1094 F: FnOnce(&mut T) -> bool,
1095 {
1096 {
1097 // Acquire the write lock and update the value.
1098 let mut lock = self.shared.value.write();
1099
1100 // Update the value and catch possible panic inside func.
1101 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
1102 match result {
1103 Ok(modified) => {
1104 if !modified {
1105 // Abort, i.e. don't notify receivers if unmodified
1106 return false;
1107 }
1108 // Continue if modified
1109 }
1110 Err(panicked) => {
1111 // Drop the lock to avoid poisoning it.
1112 drop(lock);
1113 // Forward the panic to the caller.
1114 panic::resume_unwind(panicked);
1115 // Unreachable
1116 }
1117 };
1118
1119 self.shared.state.increment_version_while_locked();
1120
1121 // Release the write lock.
1122 //
1123 // Incrementing the version counter while holding the lock ensures
1124 // that receivers are able to figure out the version number of the
1125 // value they are currently looking at.
1126 drop(lock);
1127 }
1128
1129 self.shared.notify_rx.notify_waiters();
1130
1131 true
1132 }
1133
1134 /// Sends a new value via the channel, notifying all receivers and returning
1135 /// the previous value in the channel.
1136 ///
1137 /// This can be useful for reusing the buffers inside a watched value.
1138 /// Additionally, this method permits sending values even when there are no
1139 /// receivers.
1140 ///
1141 /// # Examples
1142 ///
1143 /// ```
1144 /// use tokio::sync::watch;
1145 ///
1146 /// let (tx, _rx) = watch::channel(1);
1147 /// assert_eq!(tx.send_replace(2), 1);
1148 /// assert_eq!(tx.send_replace(3), 2);
1149 /// ```
1150 pub fn send_replace(&self, mut value: T) -> T {
1151 // swap old watched value with the new one
1152 self.send_modify(|old| mem::swap(old, &mut value));
1153
1154 value
1155 }
1156
1157 /// Returns a reference to the most recently sent value
1158 ///
1159 /// Outstanding borrows hold a read lock on the inner value. This means that
1160 /// long-lived borrows could cause the producer half to block. It is recommended
1161 /// to keep the borrow as short-lived as possible. Additionally, if you are
1162 /// running in an environment that allows `!Send` futures, you must ensure that
1163 /// the returned `Ref` type is never held alive across an `.await` point,
1164 /// otherwise, it can lead to a deadlock.
1165 ///
1166 /// # Examples
1167 ///
1168 /// ```
1169 /// use tokio::sync::watch;
1170 ///
1171 /// let (tx, _) = watch::channel("hello");
1172 /// assert_eq!(*tx.borrow(), "hello");
1173 /// ```
1174 pub fn borrow(&self) -> Ref<'_, T> {
1175 let inner = self.shared.value.read();
1176
1177 // The sender/producer always sees the current version
1178 let has_changed = false;
1179
1180 Ref { inner, has_changed }
1181 }
1182
1183 /// Checks if the channel has been closed. This happens when all receivers
1184 /// have dropped.
1185 ///
1186 /// # Examples
1187 ///
1188 /// ```
1189 /// let (tx, rx) = tokio::sync::watch::channel(());
1190 /// assert!(!tx.is_closed());
1191 ///
1192 /// drop(rx);
1193 /// assert!(tx.is_closed());
1194 /// ```
1195 pub fn is_closed(&self) -> bool {
1196 self.receiver_count() == 0
1197 }
1198
1199 /// Completes when all receivers have dropped.
1200 ///
1201 /// This allows the producer to get notified when interest in the produced
1202 /// values is canceled and immediately stop doing work. Once a channel is
1203 /// closed, the only way to reopen it is to call [`Sender::subscribe`] to
1204 /// get a new receiver.
1205 ///
1206 /// If the channel becomes closed for a brief amount of time (e.g., the last
1207 /// receiver is dropped and then `subscribe` is called), then this call to
1208 /// `closed` might return, but it is also possible that it does not "notice"
1209 /// that the channel was closed for a brief amount of time.
1210 ///
1211 /// # Cancel safety
1212 ///
1213 /// This method is cancel safe.
1214 ///
1215 /// # Examples
1216 ///
1217 /// ```
1218 /// use tokio::sync::watch;
1219 ///
1220 /// #[tokio::main]
1221 /// async fn main() {
1222 /// let (tx, rx) = watch::channel("hello");
1223 ///
1224 /// tokio::spawn(async move {
1225 /// // use `rx`
1226 /// drop(rx);
1227 /// });
1228 ///
1229 /// // Waits for `rx` to drop
1230 /// tx.closed().await;
1231 /// println!("the `rx` handles dropped")
1232 /// }
1233 /// ```
1234 pub async fn closed(&self) {
1235 cooperative(async {
1236 crate::trace::async_trace_leaf().await;
1237
1238 while self.receiver_count() > 0 {
1239 let notified = self.shared.notify_tx.notified();
1240
1241 if self.receiver_count() == 0 {
1242 return;
1243 }
1244
1245 notified.await;
1246 // The channel could have been reopened in the meantime by calling
1247 // `subscribe`, so we loop again.
1248 }
1249 })
1250 .await;
1251 }
1252
1253 /// Creates a new [`Receiver`] connected to this `Sender`.
1254 ///
1255 /// All messages sent before this call to `subscribe` are initially marked
1256 /// as seen by the new `Receiver`.
1257 ///
1258 /// This method can be called even if there are no other receivers. In this
1259 /// case, the channel is reopened.
1260 ///
1261 /// # Examples
1262 ///
1263 /// The new channel will receive messages sent on this `Sender`.
1264 ///
1265 /// ```
1266 /// use tokio::sync::watch;
1267 ///
1268 /// #[tokio::main]
1269 /// async fn main() {
1270 /// let (tx, _rx) = watch::channel(0u64);
1271 ///
1272 /// tx.send(5).unwrap();
1273 ///
1274 /// let rx = tx.subscribe();
1275 /// assert_eq!(5, *rx.borrow());
1276 ///
1277 /// tx.send(10).unwrap();
1278 /// assert_eq!(10, *rx.borrow());
1279 /// }
1280 /// ```
1281 ///
1282 /// The most recent message is considered seen by the channel, so this test
1283 /// is guaranteed to pass.
1284 ///
1285 /// ```
1286 /// use tokio::sync::watch;
1287 /// use tokio::time::Duration;
1288 ///
1289 /// #[tokio::main]
1290 /// async fn main() {
1291 /// let (tx, _rx) = watch::channel(0u64);
1292 /// tx.send(5).unwrap();
1293 /// let mut rx = tx.subscribe();
1294 ///
1295 /// tokio::spawn(async move {
1296 /// // by spawning and sleeping, the message is sent after `main`
1297 /// // hits the call to `changed`.
1298 /// # if false {
1299 /// tokio::time::sleep(Duration::from_millis(10)).await;
1300 /// # }
1301 /// tx.send(100).unwrap();
1302 /// });
1303 ///
1304 /// rx.changed().await.unwrap();
1305 /// assert_eq!(100, *rx.borrow());
1306 /// }
1307 /// ```
1308 pub fn subscribe(&self) -> Receiver<T> {
1309 let shared = self.shared.clone();
1310 let version = shared.state.load().version();
1311
1312 // The CLOSED bit in the state tracks only whether the sender is
1313 // dropped, so we do not need to unset it if this reopens the channel.
1314 Receiver::from_shared(version, shared)
1315 }
1316
1317 /// Returns the number of receivers that currently exist.
1318 ///
1319 /// # Examples
1320 ///
1321 /// ```
1322 /// use tokio::sync::watch;
1323 ///
1324 /// #[tokio::main]
1325 /// async fn main() {
1326 /// let (tx, rx1) = watch::channel("hello");
1327 ///
1328 /// assert_eq!(1, tx.receiver_count());
1329 ///
1330 /// let mut _rx2 = rx1.clone();
1331 ///
1332 /// assert_eq!(2, tx.receiver_count());
1333 /// }
1334 /// ```
1335 pub fn receiver_count(&self) -> usize {
1336 self.shared.ref_count_rx.load(Relaxed)
1337 }
1338
1339 /// Returns the number of senders that currently exist.
1340 ///
1341 /// # Examples
1342 ///
1343 /// ```
1344 /// use tokio::sync::watch;
1345 ///
1346 /// #[tokio::main]
1347 /// async fn main() {
1348 /// let (tx1, rx) = watch::channel("hello");
1349 ///
1350 /// assert_eq!(1, tx1.sender_count());
1351 ///
1352 /// let tx2 = tx1.clone();
1353 ///
1354 /// assert_eq!(2, tx1.sender_count());
1355 /// assert_eq!(2, tx2.sender_count());
1356 /// }
1357 /// ```
1358 pub fn sender_count(&self) -> usize {
1359 self.shared.ref_count_tx.load(Relaxed)
1360 }
1361
1362 /// Returns `true` if senders belong to the same channel.
1363 ///
1364 /// # Examples
1365 ///
1366 /// ```
1367 /// let (tx, rx) = tokio::sync::watch::channel(true);
1368 /// let tx2 = tx.clone();
1369 /// assert!(tx.same_channel(&tx2));
1370 ///
1371 /// let (tx3, rx3) = tokio::sync::watch::channel(true);
1372 /// assert!(!tx3.same_channel(&tx2));
1373 /// ```
1374 pub fn same_channel(&self, other: &Self) -> bool {
1375 Arc::ptr_eq(&self.shared, &other.shared)
1376 }
1377}
1378
1379impl<T> Drop for Sender<T> {
1380 fn drop(&mut self) {
1381 if self.shared.ref_count_tx.fetch_sub(1, AcqRel) == 1 {
1382 self.shared.state.set_closed();
1383 self.shared.notify_rx.notify_waiters();
1384 }
1385 }
1386}
1387
1388// ===== impl Ref =====
1389
1390impl<T> ops::Deref for Ref<'_, T> {
1391 type Target = T;
1392
1393 fn deref(&self) -> &T {
1394 self.inner.deref()
1395 }
1396}
1397
1398#[cfg(all(test, loom))]
1399mod tests {
1400 use futures::future::FutureExt;
1401 use loom::thread;
1402
1403 // test for https://github.com/tokio-rs/tokio/issues/3168
1404 #[test]
1405 fn watch_spurious_wakeup() {
1406 loom::model(|| {
1407 let (send, mut recv) = crate::sync::watch::channel(0i32);
1408
1409 send.send(1).unwrap();
1410
1411 let send_thread = thread::spawn(move || {
1412 send.send(2).unwrap();
1413 send
1414 });
1415
1416 recv.changed().now_or_never();
1417
1418 let send = send_thread.join().unwrap();
1419 let recv_thread = thread::spawn(move || {
1420 recv.changed().now_or_never();
1421 recv.changed().now_or_never();
1422 recv
1423 });
1424
1425 send.send(3).unwrap();
1426
1427 let mut recv = recv_thread.join().unwrap();
1428 let send_thread = thread::spawn(move || {
1429 send.send(2).unwrap();
1430 });
1431
1432 recv.changed().now_or_never();
1433
1434 send_thread.join().unwrap();
1435 });
1436 }
1437
1438 #[test]
1439 fn watch_borrow() {
1440 loom::model(|| {
1441 let (send, mut recv) = crate::sync::watch::channel(0i32);
1442
1443 assert!(send.borrow().eq(&0));
1444 assert!(recv.borrow().eq(&0));
1445
1446 send.send(1).unwrap();
1447 assert!(send.borrow().eq(&1));
1448
1449 let send_thread = thread::spawn(move || {
1450 send.send(2).unwrap();
1451 send
1452 });
1453
1454 recv.changed().now_or_never();
1455
1456 let send = send_thread.join().unwrap();
1457 let recv_thread = thread::spawn(move || {
1458 recv.changed().now_or_never();
1459 recv.changed().now_or_never();
1460 recv
1461 });
1462
1463 send.send(3).unwrap();
1464
1465 let recv = recv_thread.join().unwrap();
1466 assert!(recv.borrow().eq(&3));
1467 assert!(send.borrow().eq(&3));
1468
1469 send.send(2).unwrap();
1470
1471 thread::spawn(move || {
1472 assert!(recv.borrow().eq(&2));
1473 });
1474 assert!(send.borrow().eq(&2));
1475 });
1476 }
1477}