tower/util/call_all/
ordered.rs

1//! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream].
2//!
3//! [`Service<Request>`]: crate::Service
4//! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
5
6use super::common;
7use futures_core::Stream;
8use futures_util::stream::FuturesOrdered;
9use pin_project_lite::pin_project;
10use std::{
11    future::Future,
12    pin::Pin,
13    task::{Context, Poll},
14};
15use tower_service::Service;
16
17pin_project! {
18    /// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each
19    /// request received on the wrapped [`Stream`].
20    ///
21    /// ```rust
22    /// # use std::task::{Poll, Context};
23    /// # use std::cell::Cell;
24    /// # use std::error::Error;
25    /// # use std::rc::Rc;
26    /// #
27    /// use futures::future::{ready, Ready};
28    /// use futures::StreamExt;
29    /// use futures::channel::mpsc;
30    /// use tower_service::Service;
31    /// use tower::util::ServiceExt;
32    ///
33    /// // First, we need to have a Service to process our requests.
34    /// #[derive(Debug, Eq, PartialEq)]
35    /// struct FirstLetter;
36    /// impl Service<&'static str> for FirstLetter {
37    ///      type Response = &'static str;
38    ///      type Error = Box<dyn Error + Send + Sync>;
39    ///      type Future = Ready<Result<Self::Response, Self::Error>>;
40    ///
41    ///      fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42    ///          Poll::Ready(Ok(()))
43    ///      }
44    ///
45    ///      fn call(&mut self, req: &'static str) -> Self::Future {
46    ///          ready(Ok(&req[..1]))
47    ///      }
48    /// }
49    ///
50    /// #[tokio::main]
51    /// async fn main() {
52    ///     // Next, we need a Stream of requests.
53    // TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams,
54    //              tokio::sync::mpsc again?
55    ///     let (mut reqs, rx) = mpsc::unbounded();
56    ///     // Note that we have to help Rust out here by telling it what error type to use.
57    ///     // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
58    ///     let mut rsps = FirstLetter.call_all(rx);
59    ///
60    ///     // Now, let's send a few requests and then check that we get the corresponding responses.
61    ///     reqs.unbounded_send("one").unwrap();
62    ///     reqs.unbounded_send("two").unwrap();
63    ///     reqs.unbounded_send("three").unwrap();
64    ///     drop(reqs);
65    ///
66    ///     // We then loop over the response `Stream` that we get back from call_all.
67    ///     let mut i = 0usize;
68    ///     while let Some(rsp) = rsps.next().await {
69    ///         // Each response is a Result (we could also have used TryStream::try_next)
70    ///         match (i + 1, rsp.unwrap()) {
71    ///             (1, "o") |
72    ///             (2, "t") |
73    ///             (3, "t") => {}
74    ///             (n, i) => {
75    ///                 unreachable!("{}. response was '{}'", n, i);
76    ///             }
77    ///         }
78    ///         i += 1;
79    ///     }
80    ///
81    ///     // And at the end, we can get the Service back when there are no more requests.
82    ///     assert_eq!(rsps.into_inner(), FirstLetter);
83    /// }
84    /// ```
85    ///
86    /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
87    #[derive(Debug)]
88    pub struct CallAll<Svc, S>
89    where
90        Svc: Service<S::Item>,
91        S: Stream,
92    {
93        #[pin]
94        inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
95    }
96}
97
98impl<Svc, S> CallAll<Svc, S>
99where
100    Svc: Service<S::Item>,
101    S: Stream,
102{
103    /// Create new [`CallAll`] combinator.
104    ///
105    /// Each request yielded by `stream` is passed to `svc`, and the resulting responses are
106    /// yielded in the same order by the implementation of [`Stream`] for [`CallAll`].
107    ///
108    /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
109    pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> {
110        CallAll {
111            inner: common::CallAll::new(service, stream, FuturesOrdered::new()),
112        }
113    }
114
115    /// Extract the wrapped [`Service`].
116    ///
117    /// # Panics
118    ///
119    /// Panics if [`take_service`] was already called.
120    ///
121    /// [`take_service`]: crate::util::CallAll::take_service
122    pub fn into_inner(self) -> Svc {
123        self.inner.into_inner()
124    }
125
126    /// Extract the wrapped [`Service`].
127    ///
128    /// This [`CallAll`] can no longer be used after this function has been called.
129    ///
130    /// # Panics
131    ///
132    /// Panics if [`take_service`] was already called.
133    ///
134    /// [`take_service`]: crate::util::CallAll::take_service
135    pub fn take_service(self: Pin<&mut Self>) -> Svc {
136        self.project().inner.take_service()
137    }
138
139    /// Return responses as they are ready, regardless of the initial order.
140    ///
141    /// This function must be called before the stream is polled.
142    ///
143    /// # Panics
144    ///
145    /// Panics if [`poll`] was called.
146    ///
147    /// [`poll`]: std::future::Future::poll
148    pub fn unordered(self) -> super::CallAllUnordered<Svc, S> {
149        self.inner.unordered()
150    }
151}
152
153impl<Svc, S> Stream for CallAll<Svc, S>
154where
155    Svc: Service<S::Item>,
156    S: Stream,
157{
158    type Item = Result<Svc::Response, Svc::Error>;
159
160    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161        self.project().inner.poll_next(cx)
162    }
163}
164
165impl<F: Future> common::Drive<F> for FuturesOrdered<F> {
166    fn is_empty(&self) -> bool {
167        FuturesOrdered::is_empty(self)
168    }
169
170    fn push(&mut self, future: F) {
171        FuturesOrdered::push_back(self, future)
172    }
173
174    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
175        Stream::poll_next(Pin::new(self), cx)
176    }
177}