want/
lib.rs

1#![doc(html_root_url = "https://docs.rs/want/0.3.1")]
2#![deny(warnings)]
3#![deny(missing_docs)]
4#![deny(missing_debug_implementations)]
5
6//! A Futures channel-like utility to signal when a value is wanted.
7//!
8//! Futures are supposed to be lazy, and only starting work if `Future::poll`
9//! is called. The same is true of `Stream`s, but when using a channel as
10//! a `Stream`, it can be hard to know if the receiver is ready for the next
11//! value.
12//!
13//! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
14//! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
15//! work to be produced? Just because there is room in the channel buffer
16//! doesn't mean the work would be used by the receiver.
17//!
18//! This is where something like `want` comes in. Added to a channel, you can
19//! make sure that the `tx` only creates the message and sends it when the `rx`
20//! has `poll()` for it, and the buffer was empty.
21//!
22//! # Example
23//!
24//! ```nightly
25//! # //#![feature(async_await)]
26//! extern crate want;
27//!
28//! # fn spawn<T>(_t: T) {}
29//! # fn we_still_want_message() -> bool { true }
30//! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) }
31//! # struct Tx;
32//! # impl Tx { fn send<T>(&mut self, _: T) {} }
33//! # struct Rx;
34//! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } }
35//!
36//! // Some message that is expensive to produce.
37//! struct Expensive;
38//!
39//! // Some futures-aware MPSC channel...
40//! let (mut tx, mut rx) = mpsc_channel();
41//!
42//! // And our `want` channel!
43//! let (mut gv, mut tk) = want::new();
44//!
45//!
46//! // Our receiving task...
47//! spawn(async move {
48//!     // Maybe something comes up that prevents us from ever
49//!     // using the expensive message.
50//!     //
51//!     // Without `want`, the "send" task may have started to
52//!     // produce the expensive message even though we wouldn't
53//!     // be able to use it.
54//!     if !we_still_want_message() {
55//!         return;
56//!     }
57//!
58//!     // But we can use it! So tell the `want` channel.
59//!     tk.want();
60//!
61//!     match rx.recv().await {
62//!         Some(_msg) => println!("got a message"),
63//!         None => println!("DONE"),
64//!     }
65//! });
66//!
67//! // Our sending task
68//! spawn(async move {
69//!     // It's expensive to create a new message, so we wait until the
70//!     // receiving end truly *wants* the message.
71//!     if let Err(_closed) = gv.want().await {
72//!         // Looks like they will never want it...
73//!         return;
74//!     }
75//!
76//!     // They want it, let's go!
77//!     tx.send(Expensive);
78//! });
79//!
80//! # fn main() {}
81//! ```
82
83use std::fmt;
84use std::future::Future;
85use std::mem;
86use std::pin::Pin;
87use std::sync::Arc;
88use std::sync::atomic::AtomicUsize;
89// SeqCst is the only ordering used to ensure accessing the state and
90// TryLock are never re-ordered.
91use std::sync::atomic::Ordering::SeqCst;
92use std::task::{self, Poll, Waker};
93
94
95use try_lock::TryLock;
96
97/// Create a new `want` channel.
98pub fn new() -> (Giver, Taker) {
99    let inner = Arc::new(Inner {
100        state: AtomicUsize::new(State::Idle.into()),
101        task: TryLock::new(None),
102    });
103    let inner2 = inner.clone();
104    (
105        Giver {
106            inner,
107        },
108        Taker {
109            inner: inner2,
110        },
111    )
112}
113
114/// An entity that gives a value when wanted.
115pub struct Giver {
116    inner: Arc<Inner>,
117}
118
119/// An entity that wants a value.
120pub struct Taker {
121    inner: Arc<Inner>,
122}
123
124/// A cloneable `Giver`.
125///
126/// It differs from `Giver` in that you cannot poll for `want`. It's only
127/// usable as a cancellation watcher.
128#[derive(Clone)]
129pub struct SharedGiver {
130    inner: Arc<Inner>,
131}
132
133/// The `Taker` has canceled its interest in a value.
134pub struct Closed {
135    _inner: (),
136}
137
138#[derive(Clone, Copy, Debug)]
139enum State {
140    Idle,
141    Want,
142    Give,
143    Closed,
144}
145
146impl From<State> for usize {
147    fn from(s: State) -> usize {
148        match s {
149            State::Idle => 0,
150            State::Want => 1,
151            State::Give => 2,
152            State::Closed => 3,
153        }
154    }
155}
156
157impl From<usize> for State {
158    fn from(num: usize) -> State {
159        match num {
160            0 => State::Idle,
161            1 => State::Want,
162            2 => State::Give,
163            3 => State::Closed,
164            _ => unreachable!("unknown state: {}", num),
165        }
166    }
167}
168
169struct Inner {
170    state: AtomicUsize,
171    task: TryLock<Option<Waker>>,
172}
173
174// ===== impl Giver ======
175
176impl Giver {
177    /// Returns a `Future` that fulfills when the `Taker` has done some action.
178    pub fn want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_ {
179        Want(self)
180    }
181
182    /// Poll whether the `Taker` has registered interest in another value.
183    ///
184    /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
185    /// - If the `Taker` has not called `want()` since last poll, this
186    ///   returns `Async::NotReady`, and parks the current task to be notified
187    ///   when the `Taker` does call `want()`.
188    /// - If the `Taker` has canceled (or dropped), this returns `Closed`.
189    ///
190    /// After knowing that the Taker is wanting, the state can be reset by
191    /// calling [`give`](Giver::give).
192    pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> {
193        loop {
194            let state = self.inner.state.load(SeqCst).into();
195            match state {
196                State::Want => {
197                    return Poll::Ready(Ok(()));
198                },
199                State::Closed => {
200                    return Poll::Ready(Err(Closed { _inner: () }));
201                },
202                State::Idle | State::Give => {
203                    // Taker doesn't want anything yet, so park.
204                    if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
205
206                        // While we have the lock, try to set to GIVE.
207                        let old = self.inner.state.compare_exchange(
208                            state.into(),
209                            State::Give.into(),
210                            SeqCst,
211                            SeqCst,
212                        );
213                        // If it's still the first state (Idle or Give), park current task.
214                        if old == Ok(state.into()) {
215                            let park = locked.as_ref()
216                                .map(|w| !w.will_wake(cx.waker()))
217                                .unwrap_or(true);
218                            if park {
219                                let old = mem::replace(&mut *locked, Some(cx.waker().clone()));
220                                drop(locked);
221                                if let Some(prev_task) = old {
222                                    // there was an old task parked here.
223                                    // it might be waiting to be notified,
224                                    // so poke it before dropping.
225                                    prev_task.wake();
226                                };
227                            }
228                            return Poll::Pending;
229                        }
230                        // Otherwise, something happened! Go around the loop again.
231                    } else {
232                        // if we couldn't take the lock, then a Taker has it.
233                        // The *ONLY* reason is because it is in the process of notifying us
234                        // of its want.
235                        //
236                        // We need to loop again to see what state it was changed to.
237                    }
238                },
239            }
240        }
241    }
242
243    /// Mark the state as idle, if the Taker currently is wanting.
244    ///
245    /// Returns true if Taker was wanting, false otherwise.
246    #[inline]
247    pub fn give(&self) -> bool {
248        // only set to IDLE if it is still Want
249        let old = self.inner.state.compare_exchange(
250            State::Want.into(),
251            State::Idle.into(),
252            SeqCst,
253            SeqCst);
254        old == Ok(State::Want.into())
255    }
256
257    /// Check if the `Taker` has called `want()` without parking a task.
258    ///
259    /// This is safe to call outside of a futures task context, but other
260    /// means of being notified is left to the user.
261    #[inline]
262    pub fn is_wanting(&self) -> bool {
263        self.inner.state.load(SeqCst) == State::Want.into()
264    }
265
266
267    /// Check if the `Taker` has canceled interest without parking a task.
268    #[inline]
269    pub fn is_canceled(&self) -> bool {
270        self.inner.state.load(SeqCst) == State::Closed.into()
271    }
272
273    /// Converts this into a `SharedGiver`.
274    #[inline]
275    pub fn shared(self) -> SharedGiver {
276        SharedGiver {
277            inner: self.inner,
278        }
279    }
280}
281
282impl fmt::Debug for Giver {
283    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284        f.debug_struct("Giver")
285            .field("state", &self.inner.state())
286            .finish()
287    }
288}
289
290// ===== impl SharedGiver ======
291
292impl SharedGiver {
293    /// Check if the `Taker` has called `want()` without parking a task.
294    ///
295    /// This is safe to call outside of a futures task context, but other
296    /// means of being notified is left to the user.
297    #[inline]
298    pub fn is_wanting(&self) -> bool {
299        self.inner.state.load(SeqCst) == State::Want.into()
300    }
301
302
303    /// Check if the `Taker` has canceled interest without parking a task.
304    #[inline]
305    pub fn is_canceled(&self) -> bool {
306        self.inner.state.load(SeqCst) == State::Closed.into()
307    }
308}
309
310impl fmt::Debug for SharedGiver {
311    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312        f.debug_struct("SharedGiver")
313            .field("state", &self.inner.state())
314            .finish()
315    }
316}
317
318// ===== impl Taker ======
319
320impl Taker {
321    /// Signal to the `Giver` that the want is canceled.
322    ///
323    /// This is useful to tell that the channel is closed if you cannot
324    /// drop the value yet.
325    #[inline]
326    pub fn cancel(&mut self) {
327        self.signal(State::Closed)
328    }
329
330    /// Signal to the `Giver` that a value is wanted.
331    #[inline]
332    pub fn want(&mut self) {
333        debug_assert!(
334            self.inner.state.load(SeqCst) != State::Closed.into(),
335            "want called after cancel"
336        );
337        self.signal(State::Want)
338    }
339
340    #[inline]
341    fn signal(&mut self, state: State) {
342        let old_state = self.inner.state.swap(state.into(), SeqCst).into();
343        match old_state {
344            State::Idle | State::Want | State::Closed => (),
345            State::Give => {
346                loop {
347                    if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
348                        if let Some(task) = locked.take() {
349                            drop(locked);
350                            task.wake();
351                        }
352                        return;
353                    } else {
354                        // if we couldn't take the lock, then a Giver has it.
355                        // The *ONLY* reason is because it is in the process of parking.
356                        //
357                        // We need to loop and take the lock so we can notify this task.
358                    }
359                }
360            },
361        }
362    }
363}
364
365impl Drop for Taker {
366    #[inline]
367    fn drop(&mut self) {
368        self.signal(State::Closed);
369    }
370}
371
372impl fmt::Debug for Taker {
373    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374        f.debug_struct("Taker")
375            .field("state", &self.inner.state())
376            .finish()
377    }
378}
379
380// ===== impl Closed ======
381
382impl fmt::Debug for Closed {
383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384        f.debug_struct("Closed")
385            .finish()
386    }
387}
388
389// ===== impl Inner ======
390
391impl Inner {
392    #[inline]
393    fn state(&self) -> State {
394        self.state.load(SeqCst).into()
395    }
396}
397
398// ===== impl PollFn ======
399
400struct Want<'a>(&'a mut Giver);
401
402
403impl Future for Want<'_> {
404    type Output = Result<(), Closed>;
405
406    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
407        self.0.poll_want(cx)
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use std::thread;
414    use tokio_sync::oneshot;
415    use super::*;
416
417    fn block_on<F: Future>(f: F) -> F::Output {
418        tokio_executor::enter()
419            .expect("block_on enter")
420            .block_on(f)
421    }
422
423    #[test]
424    fn want_ready() {
425        let (mut gv, mut tk) = new();
426
427        tk.want();
428
429        block_on(gv.want()).unwrap();
430    }
431
432    #[test]
433    fn want_notify_0() {
434        let (mut gv, mut tk) = new();
435        let (tx, rx) = oneshot::channel();
436
437        thread::spawn(move || {
438            tk.want();
439            // use a oneshot to keep this thread alive
440            // until other thread was notified of want
441            block_on(rx).expect("rx");
442        });
443
444        block_on(gv.want()).expect("want");
445
446        assert!(gv.is_wanting(), "still wanting after poll_want success");
447        assert!(gv.give(), "give is true when wanting");
448
449        assert!(!gv.is_wanting(), "no longer wanting after give");
450        assert!(!gv.is_canceled(), "give doesn't cancel");
451
452        assert!(!gv.give(), "give is false if not wanting");
453
454        tx.send(()).expect("tx");
455    }
456
457    /*
458    /// This tests that if the Giver moves tasks after parking,
459    /// it will still wake up the correct task.
460    #[test]
461    fn want_notify_moving_tasks() {
462        use std::sync::Arc;
463        use futures::executor::{spawn, Notify, NotifyHandle};
464
465        struct WantNotify;
466
467        impl Notify for WantNotify {
468            fn notify(&self, _id: usize) {
469            }
470        }
471
472        fn n() -> NotifyHandle {
473            Arc::new(WantNotify).into()
474        }
475
476        let (mut gv, mut tk) = new();
477
478        let mut s = spawn(poll_fn(move || {
479            gv.poll_want()
480        }));
481
482        // Register with t1 as the task::current()
483        let t1 = n();
484        assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready());
485
486        thread::spawn(move || {
487            thread::sleep(::std::time::Duration::from_millis(100));
488            tk.want();
489        });
490
491        // And now, move to a ThreadNotify task.
492        s.into_inner().wait().expect("poll_want");
493    }
494    */
495
496    #[test]
497    fn cancel() {
498        // explicit
499        let (mut gv, mut tk) = new();
500
501        assert!(!gv.is_canceled());
502
503        tk.cancel();
504
505        assert!(gv.is_canceled());
506        block_on(gv.want()).unwrap_err();
507
508        // implicit
509        let (mut gv, tk) = new();
510
511        assert!(!gv.is_canceled());
512
513        drop(tk);
514
515        assert!(gv.is_canceled());
516        block_on(gv.want()).unwrap_err();
517
518        // notifies
519        let (mut gv, tk) = new();
520
521        thread::spawn(move || {
522            let _tk = tk;
523            // and dropped
524        });
525
526        block_on(gv.want()).unwrap_err();
527    }
528
529    /*
530    #[test]
531    fn stress() {
532        let nthreads = 5;
533        let nwants = 100;
534
535        for _ in 0..nthreads {
536            let (mut gv, mut tk) = new();
537            let (mut tx, mut rx) = mpsc::channel(0);
538
539            // rx thread
540            thread::spawn(move || {
541                let mut cnt = 0;
542                poll_fn(move || {
543                    while cnt < nwants {
544                        let n = match rx.poll().expect("rx poll") {
545                            Async::Ready(n) => n.expect("rx opt"),
546                            Async::NotReady => {
547                                tk.want();
548                                return Ok(Async::NotReady);
549                            },
550                        };
551                        assert_eq!(cnt, n);
552                        cnt += 1;
553                    }
554                    Ok::<_, ()>(Async::Ready(()))
555                }).wait().expect("rx wait");
556            });
557
558            // tx thread
559            thread::spawn(move || {
560                let mut cnt = 0;
561                let nsent = poll_fn(move || {
562                    loop {
563                        while let Ok(()) = tx.try_send(cnt) {
564                            cnt += 1;
565                        }
566                        match gv.poll_want() {
567                            Ok(Async::Ready(_)) => (),
568                            Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
569                            Err(_) => return Ok(Async::Ready(cnt)),
570                        }
571                    }
572                }).wait().expect("tx wait");
573
574                assert_eq!(nsent, nwants);
575            }).join().expect("thread join");
576        }
577    }
578    */
579}