tower/util/call_all/ordered.rs
1//! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream].
2//!
3//! [`Service<Request>`]: crate::Service
4//! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
5
6use super::common;
7use futures_core::Stream;
8use futures_util::stream::FuturesOrdered;
9use pin_project_lite::pin_project;
10use std::{
11 future::Future,
12 pin::Pin,
13 task::{Context, Poll},
14};
15use tower_service::Service;
16
17pin_project! {
18 /// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each
19 /// request received on the wrapped [`Stream`].
20 ///
21 /// ```rust
22 /// # use std::task::{Poll, Context};
23 /// # use std::cell::Cell;
24 /// # use std::error::Error;
25 /// # use std::rc::Rc;
26 /// #
27 /// use futures::future::{ready, Ready};
28 /// use futures::StreamExt;
29 /// use futures::channel::mpsc;
30 /// use tower_service::Service;
31 /// use tower::util::ServiceExt;
32 ///
33 /// // First, we need to have a Service to process our requests.
34 /// #[derive(Debug, Eq, PartialEq)]
35 /// struct FirstLetter;
36 /// impl Service<&'static str> for FirstLetter {
37 /// type Response = &'static str;
38 /// type Error = Box<dyn Error + Send + Sync>;
39 /// type Future = Ready<Result<Self::Response, Self::Error>>;
40 ///
41 /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42 /// Poll::Ready(Ok(()))
43 /// }
44 ///
45 /// fn call(&mut self, req: &'static str) -> Self::Future {
46 /// ready(Ok(&req[..1]))
47 /// }
48 /// }
49 ///
50 /// #[tokio::main]
51 /// async fn main() {
52 /// // Next, we need a Stream of requests.
53 // TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams,
54 // tokio::sync::mpsc again?
55 /// let (mut reqs, rx) = mpsc::unbounded();
56 /// // Note that we have to help Rust out here by telling it what error type to use.
57 /// // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
58 /// let mut rsps = FirstLetter.call_all(rx);
59 ///
60 /// // Now, let's send a few requests and then check that we get the corresponding responses.
61 /// reqs.unbounded_send("one").unwrap();
62 /// reqs.unbounded_send("two").unwrap();
63 /// reqs.unbounded_send("three").unwrap();
64 /// drop(reqs);
65 ///
66 /// // We then loop over the response `Stream` that we get back from call_all.
67 /// let mut i = 0usize;
68 /// while let Some(rsp) = rsps.next().await {
69 /// // Each response is a Result (we could also have used TryStream::try_next)
70 /// match (i + 1, rsp.unwrap()) {
71 /// (1, "o") |
72 /// (2, "t") |
73 /// (3, "t") => {}
74 /// (n, i) => {
75 /// unreachable!("{}. response was '{}'", n, i);
76 /// }
77 /// }
78 /// i += 1;
79 /// }
80 ///
81 /// // And at the end, we can get the Service back when there are no more requests.
82 /// assert_eq!(rsps.into_inner(), FirstLetter);
83 /// }
84 /// ```
85 ///
86 /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
87 #[derive(Debug)]
88 pub struct CallAll<Svc, S>
89 where
90 Svc: Service<S::Item>,
91 S: Stream,
92 {
93 #[pin]
94 inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
95 }
96}
97
98impl<Svc, S> CallAll<Svc, S>
99where
100 Svc: Service<S::Item>,
101 S: Stream,
102{
103 /// Create new [`CallAll`] combinator.
104 ///
105 /// Each request yielded by `stream` is passed to `svc`, and the resulting responses are
106 /// yielded in the same order by the implementation of [`Stream`] for [`CallAll`].
107 ///
108 /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
109 pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> {
110 CallAll {
111 inner: common::CallAll::new(service, stream, FuturesOrdered::new()),
112 }
113 }
114
115 /// Extract the wrapped [`Service`].
116 ///
117 /// # Panics
118 ///
119 /// Panics if [`take_service`] was already called.
120 ///
121 /// [`take_service`]: crate::util::CallAll::take_service
122 pub fn into_inner(self) -> Svc {
123 self.inner.into_inner()
124 }
125
126 /// Extract the wrapped [`Service`].
127 ///
128 /// This [`CallAll`] can no longer be used after this function has been called.
129 ///
130 /// # Panics
131 ///
132 /// Panics if [`take_service`] was already called.
133 ///
134 /// [`take_service`]: crate::util::CallAll::take_service
135 pub fn take_service(self: Pin<&mut Self>) -> Svc {
136 self.project().inner.take_service()
137 }
138
139 /// Return responses as they are ready, regardless of the initial order.
140 ///
141 /// This function must be called before the stream is polled.
142 ///
143 /// # Panics
144 ///
145 /// Panics if [`poll`] was called.
146 ///
147 /// [`poll`]: std::future::Future::poll
148 pub fn unordered(self) -> super::CallAllUnordered<Svc, S> {
149 self.inner.unordered()
150 }
151}
152
153impl<Svc, S> Stream for CallAll<Svc, S>
154where
155 Svc: Service<S::Item>,
156 S: Stream,
157{
158 type Item = Result<Svc::Response, Svc::Error>;
159
160 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161 self.project().inner.poll_next(cx)
162 }
163}
164
165impl<F: Future> common::Drive<F> for FuturesOrdered<F> {
166 fn is_empty(&self) -> bool {
167 FuturesOrdered::is_empty(self)
168 }
169
170 fn push(&mut self, future: F) {
171 FuturesOrdered::push_back(self, future)
172 }
173
174 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
175 Stream::poll_next(Pin::new(self), cx)
176 }
177}