tower/util/
mod.rs

1//! Various utility types and functions that are generally used with Tower.
2
3mod and_then;
4mod boxed;
5mod boxed_clone;
6mod boxed_clone_sync;
7mod call_all;
8mod either;
9
10mod future_service;
11mod map_err;
12mod map_request;
13mod map_response;
14mod map_result;
15
16mod map_future;
17mod oneshot;
18mod optional;
19mod ready;
20mod service_fn;
21mod then;
22
23pub mod rng;
24
25pub use self::{
26    and_then::{AndThen, AndThenLayer},
27    boxed::{
28        BoxCloneServiceLayer, BoxCloneSyncServiceLayer, BoxLayer, BoxService, UnsyncBoxService,
29    },
30    boxed_clone::BoxCloneService,
31    boxed_clone_sync::BoxCloneSyncService,
32    either::Either,
33    future_service::{future_service, FutureService},
34    map_err::{MapErr, MapErrLayer},
35    map_future::{MapFuture, MapFutureLayer},
36    map_request::{MapRequest, MapRequestLayer},
37    map_response::{MapResponse, MapResponseLayer},
38    map_result::{MapResult, MapResultLayer},
39    oneshot::Oneshot,
40    optional::Optional,
41    ready::{Ready, ReadyOneshot},
42    service_fn::{service_fn, ServiceFn},
43    then::{Then, ThenLayer},
44};
45
46pub use self::call_all::{CallAll, CallAllUnordered};
47use std::future::Future;
48
49use crate::layer::util::Identity;
50
51pub mod error {
52    //! Error types
53
54    pub use super::optional::error as optional;
55}
56
57pub mod future {
58    //! Future types
59
60    pub use super::and_then::AndThenFuture;
61    pub use super::either::EitherResponseFuture;
62    pub use super::map_err::MapErrFuture;
63    pub use super::map_response::MapResponseFuture;
64    pub use super::map_result::MapResultFuture;
65    pub use super::optional::future as optional;
66    pub use super::then::ThenFuture;
67}
68
69/// An extension trait for `Service`s that provides a variety of convenient
70/// adapters
71pub trait ServiceExt<Request>: tower_service::Service<Request> {
72    /// Yields a mutable reference to the service when it is ready to accept a request.
73    fn ready(&mut self) -> Ready<'_, Self, Request>
74    where
75        Self: Sized,
76    {
77        Ready::new(self)
78    }
79
80    /// Yields the service when it is ready to accept a request.
81    fn ready_oneshot(self) -> ReadyOneshot<Self, Request>
82    where
83        Self: Sized,
84    {
85        ReadyOneshot::new(self)
86    }
87
88    /// Consume this `Service`, calling it with the provided request once it is ready.
89    fn oneshot(self, req: Request) -> Oneshot<Self, Request>
90    where
91        Self: Sized,
92    {
93        Oneshot::new(self, req)
94    }
95
96    /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses.
97    ///
98    /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item =
99    /// Response>`][stream]. See the documentation for [`CallAll`] for
100    /// details.
101    ///
102    /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
103    /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
104    fn call_all<S>(self, reqs: S) -> CallAll<Self, S>
105    where
106        Self: Sized,
107        S: futures_core::Stream<Item = Request>,
108    {
109        CallAll::new(self, reqs)
110    }
111
112    /// Executes a new future after this service's future resolves. This does
113    /// not alter the behaviour of the [`poll_ready`] method.
114    ///
115    /// This method can be used to change the [`Response`] type of the service
116    /// into a different type. You can use this method to chain along a computation once the
117    /// service's response has been resolved.
118    ///
119    /// [`Response`]: crate::Service::Response
120    /// [`poll_ready`]: crate::Service::poll_ready
121    ///
122    /// # Example
123    /// ```
124    /// # use std::task::{Poll, Context};
125    /// # use tower::{Service, ServiceExt};
126    /// #
127    /// # struct DatabaseService;
128    /// # impl DatabaseService {
129    /// #   fn new(address: &str) -> Self {
130    /// #       DatabaseService
131    /// #   }
132    /// # }
133    /// #
134    /// # struct Record {
135    /// #   pub name: String,
136    /// #   pub age: u16
137    /// # }
138    /// #
139    /// # impl Service<u32> for DatabaseService {
140    /// #   type Response = Record;
141    /// #   type Error = u8;
142    /// #   type Future = futures_util::future::Ready<Result<Record, u8>>;
143    /// #
144    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145    /// #       Poll::Ready(Ok(()))
146    /// #   }
147    /// #
148    /// #   fn call(&mut self, request: u32) -> Self::Future {
149    /// #       futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
150    /// #   }
151    /// # }
152    /// #
153    /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
154    /// #
155    /// # fn main() {
156    /// #    async {
157    /// // A service returning Result<Record, _>
158    /// let service = DatabaseService::new("127.0.0.1:8080");
159    ///
160    /// // Map the response into a new response
161    /// let mut new_service = service.and_then(|record: Record| async move {
162    ///     let name = record.name;
163    ///     avatar_lookup(name).await
164    /// });
165    ///
166    /// // Call the new service
167    /// let id = 13;
168    /// let avatar = new_service.call(id).await.unwrap();
169    /// #    };
170    /// # }
171    /// ```
172    fn and_then<F>(self, f: F) -> AndThen<Self, F>
173    where
174        Self: Sized,
175        F: Clone,
176    {
177        AndThen::new(self, f)
178    }
179
180    /// Maps this service's response value to a different value. This does not
181    /// alter the behaviour of the [`poll_ready`] method.
182    ///
183    /// This method can be used to change the [`Response`] type of the service
184    /// into a different type. It is similar to the [`Result::map`]
185    /// method. You can use this method to chain along a computation once the
186    /// service's response has been resolved.
187    ///
188    /// [`Response`]: crate::Service::Response
189    /// [`poll_ready`]: crate::Service::poll_ready
190    ///
191    /// # Example
192    /// ```
193    /// # use std::task::{Poll, Context};
194    /// # use tower::{Service, ServiceExt};
195    /// #
196    /// # struct DatabaseService;
197    /// # impl DatabaseService {
198    /// #   fn new(address: &str) -> Self {
199    /// #       DatabaseService
200    /// #   }
201    /// # }
202    /// #
203    /// # struct Record {
204    /// #   pub name: String,
205    /// #   pub age: u16
206    /// # }
207    /// #
208    /// # impl Service<u32> for DatabaseService {
209    /// #   type Response = Record;
210    /// #   type Error = u8;
211    /// #   type Future = futures_util::future::Ready<Result<Record, u8>>;
212    /// #
213    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
214    /// #       Poll::Ready(Ok(()))
215    /// #   }
216    /// #
217    /// #   fn call(&mut self, request: u32) -> Self::Future {
218    /// #       futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
219    /// #   }
220    /// # }
221    /// #
222    /// # fn main() {
223    /// #    async {
224    /// // A service returning Result<Record, _>
225    /// let service = DatabaseService::new("127.0.0.1:8080");
226    ///
227    /// // Map the response into a new response
228    /// let mut new_service = service.map_response(|record| record.name);
229    ///
230    /// // Call the new service
231    /// let id = 13;
232    /// let name = new_service
233    ///     .ready()
234    ///     .await?
235    ///     .call(id)
236    ///     .await?;
237    /// # Ok::<(), u8>(())
238    /// #    };
239    /// # }
240    /// ```
241    fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
242    where
243        Self: Sized,
244        F: FnOnce(Self::Response) -> Response + Clone,
245    {
246        MapResponse::new(self, f)
247    }
248
249    /// Maps this service's error value to a different value. This does not
250    /// alter the behaviour of the [`poll_ready`] method.
251    ///
252    /// This method can be used to change the [`Error`] type of the service
253    /// into a different type. It is similar to the [`Result::map_err`] method.
254    ///
255    /// [`Error`]: crate::Service::Error
256    /// [`poll_ready`]: crate::Service::poll_ready
257    ///
258    /// # Example
259    /// ```
260    /// # use std::task::{Poll, Context};
261    /// # use tower::{Service, ServiceExt};
262    /// #
263    /// # struct DatabaseService;
264    /// # impl DatabaseService {
265    /// #   fn new(address: &str) -> Self {
266    /// #       DatabaseService
267    /// #   }
268    /// # }
269    /// #
270    /// # struct Error {
271    /// #   pub code: u32,
272    /// #   pub message: String
273    /// # }
274    /// #
275    /// # impl Service<u32> for DatabaseService {
276    /// #   type Response = String;
277    /// #   type Error = Error;
278    /// #   type Future = futures_util::future::Ready<Result<String, Error>>;
279    /// #
280    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
281    /// #       Poll::Ready(Ok(()))
282    /// #   }
283    /// #
284    /// #   fn call(&mut self, request: u32) -> Self::Future {
285    /// #       futures_util::future::ready(Ok(String::new()))
286    /// #   }
287    /// # }
288    /// #
289    /// # fn main() {
290    /// #   async {
291    /// // A service returning Result<_, Error>
292    /// let service = DatabaseService::new("127.0.0.1:8080");
293    ///
294    /// // Map the error to a new error
295    /// let mut new_service = service.map_err(|err| err.code);
296    ///
297    /// // Call the new service
298    /// let id = 13;
299    /// let code = new_service
300    ///     .ready()
301    ///     .await?
302    ///     .call(id)
303    ///     .await
304    ///     .unwrap_err();
305    /// # Ok::<(), u32>(())
306    /// #   };
307    /// # }
308    /// ```
309    fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
310    where
311        Self: Sized,
312        F: FnOnce(Self::Error) -> Error + Clone,
313    {
314        MapErr::new(self, f)
315    }
316
317    /// Maps this service's result type (`Result<Self::Response, Self::Error>`)
318    /// to a different value, regardless of whether the future succeeds or
319    /// fails.
320    ///
321    /// This is similar to the [`map_response`] and [`map_err`] combinators,
322    /// except that the *same* function is invoked when the service's future
323    /// completes, whether it completes successfully or fails. This function
324    /// takes the [`Result`] returned by the service's future, and returns a
325    /// [`Result`].
326    ///
327    /// Like the standard library's [`Result::and_then`], this method can be
328    /// used to implement control flow based on `Result` values. For example, it
329    /// may be used to implement error recovery, by turning some [`Err`]
330    /// responses from the service into [`Ok`] responses. Similarly, some
331    /// successful responses from the service could be rejected, by returning an
332    /// [`Err`] conditionally, depending on the value inside the [`Ok`.] Finally,
333    /// this method can also be used to implement behaviors that must run when a
334    /// service's future completes, regardless of whether it succeeded or failed.
335    ///
336    /// This method can be used to change the [`Response`] type of the service
337    /// into a different type. It can also be used to change the [`Error`] type
338    /// of the service. However, because the [`map_result`] function is not applied
339    /// to the errors returned by the service's [`poll_ready`] method, it must
340    /// be possible to convert the service's [`Error`] type into the error type
341    /// returned by the [`map_result`] function. This is trivial when the function
342    /// returns the same error type as the service, but in other cases, it can
343    /// be useful to use [`BoxError`] to erase differing error types.
344    ///
345    /// # Examples
346    ///
347    /// Recovering from certain errors:
348    ///
349    /// ```
350    /// # use std::task::{Poll, Context};
351    /// # use tower::{Service, ServiceExt};
352    /// #
353    /// # struct DatabaseService;
354    /// # impl DatabaseService {
355    /// #   fn new(address: &str) -> Self {
356    /// #       DatabaseService
357    /// #   }
358    /// # }
359    /// #
360    /// # struct Record {
361    /// #   pub name: String,
362    /// #   pub age: u16
363    /// # }
364    /// # #[derive(Debug)]
365    /// # enum DbError {
366    /// #   Parse(std::num::ParseIntError),
367    /// #   NoRecordsFound,
368    /// # }
369    /// #
370    /// # impl Service<u32> for DatabaseService {
371    /// #   type Response = Vec<Record>;
372    /// #   type Error = DbError;
373    /// #   type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>;
374    /// #
375    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
376    /// #       Poll::Ready(Ok(()))
377    /// #   }
378    /// #
379    /// #   fn call(&mut self, request: u32) -> Self::Future {
380    /// #       futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
381    /// #   }
382    /// # }
383    /// #
384    /// # fn main() {
385    /// #    async {
386    /// // A service returning Result<Vec<Record>, DbError>
387    /// let service = DatabaseService::new("127.0.0.1:8080");
388    ///
389    /// // If the database returns no records for the query, we just want an empty `Vec`.
390    /// let mut new_service = service.map_result(|result| match result {
391    ///     // If the error indicates that no records matched the query, return an empty
392    ///     // `Vec` instead.
393    ///     Err(DbError::NoRecordsFound) => Ok(Vec::new()),
394    ///     // Propagate all other responses (`Ok` and `Err`) unchanged
395    ///     x => x,
396    /// });
397    ///
398    /// // Call the new service
399    /// let id = 13;
400    /// let name = new_service
401    ///     .ready()
402    ///     .await?
403    ///     .call(id)
404    ///     .await?;
405    /// # Ok::<(), DbError>(())
406    /// #    };
407    /// # }
408    /// ```
409    ///
410    /// Rejecting some `Ok` responses:
411    ///
412    /// ```
413    /// # use std::task::{Poll, Context};
414    /// # use tower::{Service, ServiceExt};
415    /// #
416    /// # struct DatabaseService;
417    /// # impl DatabaseService {
418    /// #   fn new(address: &str) -> Self {
419    /// #       DatabaseService
420    /// #   }
421    /// # }
422    /// #
423    /// # struct Record {
424    /// #   pub name: String,
425    /// #   pub age: u16
426    /// # }
427    /// # type DbError = String;
428    /// # type AppError = String;
429    /// #
430    /// # impl Service<u32> for DatabaseService {
431    /// #   type Response = Record;
432    /// #   type Error = DbError;
433    /// #   type Future = futures_util::future::Ready<Result<Record, DbError>>;
434    /// #
435    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
436    /// #       Poll::Ready(Ok(()))
437    /// #   }
438    /// #
439    /// #   fn call(&mut self, request: u32) -> Self::Future {
440    /// #       futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
441    /// #   }
442    /// # }
443    /// #
444    /// # fn main() {
445    /// #    async {
446    /// use tower::BoxError;
447    ///
448    /// // A service returning Result<Record, DbError>
449    /// let service = DatabaseService::new("127.0.0.1:8080");
450    ///
451    /// // If the user is zero years old, return an error.
452    /// let mut new_service = service.map_result(|result| {
453    ///    let record = result?;
454    ///
455    ///    if record.age == 0 {
456    ///         // Users must have been born to use our app!
457    ///         let app_error = AppError::from("users cannot be 0 years old!");
458    ///
459    ///         // Box the error to erase its type (as it can be an `AppError`
460    ///         // *or* the inner service's `DbError`).
461    ///         return Err(BoxError::from(app_error));
462    ///     }
463    ///
464    ///     // Otherwise, return the record.
465    ///     Ok(record)
466    /// });
467    ///
468    /// // Call the new service
469    /// let id = 13;
470    /// let record = new_service
471    ///     .ready()
472    ///     .await?
473    ///     .call(id)
474    ///     .await?;
475    /// # Ok::<(), BoxError>(())
476    /// #    };
477    /// # }
478    /// ```
479    ///
480    /// Performing an action that must be run for both successes and failures:
481    ///
482    /// ```
483    /// # use std::convert::TryFrom;
484    /// # use std::task::{Poll, Context};
485    /// # use tower::{Service, ServiceExt};
486    /// #
487    /// # struct DatabaseService;
488    /// # impl DatabaseService {
489    /// #   fn new(address: &str) -> Self {
490    /// #       DatabaseService
491    /// #   }
492    /// # }
493    /// #
494    /// # impl Service<u32> for DatabaseService {
495    /// #   type Response = String;
496    /// #   type Error = u8;
497    /// #   type Future = futures_util::future::Ready<Result<String, u8>>;
498    /// #
499    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
500    /// #       Poll::Ready(Ok(()))
501    /// #   }
502    /// #
503    /// #   fn call(&mut self, request: u32) -> Self::Future {
504    /// #       futures_util::future::ready(Ok(String::new()))
505    /// #   }
506    /// # }
507    /// #
508    /// # fn main() {
509    /// #   async {
510    /// // A service returning Result<Record, DbError>
511    /// let service = DatabaseService::new("127.0.0.1:8080");
512    ///
513    /// // Print a message whenever a query completes.
514    /// let mut new_service = service.map_result(|result| {
515    ///     println!("query completed; success={}", result.is_ok());
516    ///     result
517    /// });
518    ///
519    /// // Call the new service
520    /// let id = 13;
521    /// let response = new_service
522    ///     .ready()
523    ///     .await?
524    ///     .call(id)
525    ///     .await;
526    /// # response
527    /// #    };
528    /// # }
529    /// ```
530    ///
531    /// [`map_response`]: ServiceExt::map_response
532    /// [`map_err`]: ServiceExt::map_err
533    /// [`map_result`]: ServiceExt::map_result
534    /// [`Error`]: crate::Service::Error
535    /// [`Response`]: crate::Service::Response
536    /// [`poll_ready`]: crate::Service::poll_ready
537    /// [`BoxError`]: crate::BoxError
538    fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
539    where
540        Self: Sized,
541        Error: From<Self::Error>,
542        F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,
543    {
544        MapResult::new(self, f)
545    }
546
547    /// Composes a function *in front of* the service.
548    ///
549    /// This adapter produces a new service that passes each value through the
550    /// given function `f` before sending it to `self`.
551    ///
552    /// # Example
553    /// ```
554    /// # use std::convert::TryFrom;
555    /// # use std::task::{Poll, Context};
556    /// # use tower::{Service, ServiceExt};
557    /// #
558    /// # struct DatabaseService;
559    /// # impl DatabaseService {
560    /// #   fn new(address: &str) -> Self {
561    /// #       DatabaseService
562    /// #   }
563    /// # }
564    /// #
565    /// # impl Service<String> for DatabaseService {
566    /// #   type Response = String;
567    /// #   type Error = u8;
568    /// #   type Future = futures_util::future::Ready<Result<String, u8>>;
569    /// #
570    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
571    /// #       Poll::Ready(Ok(()))
572    /// #   }
573    /// #
574    /// #   fn call(&mut self, request: String) -> Self::Future {
575    /// #       futures_util::future::ready(Ok(String::new()))
576    /// #   }
577    /// # }
578    /// #
579    /// # fn main() {
580    /// #   async {
581    /// // A service taking a String as a request
582    /// let service = DatabaseService::new("127.0.0.1:8080");
583    ///
584    /// // Map the request to a new request
585    /// let mut new_service = service.map_request(|id: u32| id.to_string());
586    ///
587    /// // Call the new service
588    /// let id = 13;
589    /// let response = new_service
590    ///     .ready()
591    ///     .await?
592    ///     .call(id)
593    ///     .await;
594    /// # response
595    /// #    };
596    /// # }
597    /// ```
598    fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
599    where
600        Self: Sized,
601        F: FnMut(NewRequest) -> Request,
602    {
603        MapRequest::new(self, f)
604    }
605
606    /// Composes this service with a [`Filter`] that conditionally accepts or
607    /// rejects requests based on a [predicate].
608    ///
609    /// This adapter produces a new service that passes each value through the
610    /// given function `predicate` before sending it to `self`.
611    ///
612    /// # Example
613    /// ```
614    /// # use std::convert::TryFrom;
615    /// # use std::task::{Poll, Context};
616    /// # use tower::{Service, ServiceExt};
617    /// #
618    /// # struct DatabaseService;
619    /// # impl DatabaseService {
620    /// #   fn new(address: &str) -> Self {
621    /// #       DatabaseService
622    /// #   }
623    /// # }
624    /// #
625    /// # #[derive(Debug)] enum DbError {
626    /// #   Parse(std::num::ParseIntError)
627    /// # }
628    /// #
629    /// # impl std::fmt::Display for DbError {
630    /// #    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
631    /// # }
632    /// # impl std::error::Error for DbError {}
633    /// # impl Service<u32> for DatabaseService {
634    /// #   type Response = String;
635    /// #   type Error = DbError;
636    /// #   type Future = futures_util::future::Ready<Result<String, DbError>>;
637    /// #
638    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
639    /// #       Poll::Ready(Ok(()))
640    /// #   }
641    /// #
642    /// #   fn call(&mut self, request: u32) -> Self::Future {
643    /// #       futures_util::future::ready(Ok(String::new()))
644    /// #   }
645    /// # }
646    /// #
647    /// # fn main() {
648    /// #    async {
649    /// // A service taking a u32 as a request and returning Result<_, DbError>
650    /// let service = DatabaseService::new("127.0.0.1:8080");
651    ///
652    /// // Fallibly map the request to a new request
653    /// let mut new_service = service
654    ///     .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
655    ///
656    /// // Call the new service
657    /// let id = "13";
658    /// let response = new_service
659    ///     .ready()
660    ///     .await?
661    ///     .call(id)
662    ///     .await;
663    /// # response
664    /// #    };
665    /// # }
666    /// ```
667    ///
668    /// [`Filter`]: crate::filter::Filter
669    /// [predicate]: crate::filter::Predicate
670    #[cfg(feature = "filter")]
671    fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F>
672    where
673        Self: Sized,
674        F: crate::filter::Predicate<NewRequest>,
675    {
676        crate::filter::Filter::new(self, filter)
677    }
678
679    /// Composes this service with an [`AsyncFilter`] that conditionally accepts or
680    /// rejects requests based on an [async predicate].
681    ///
682    /// This adapter produces a new service that passes each value through the
683    /// given function `predicate` before sending it to `self`.
684    ///
685    /// # Example
686    /// ```
687    /// # use std::convert::TryFrom;
688    /// # use std::task::{Poll, Context};
689    /// # use tower::{Service, ServiceExt};
690    /// #
691    /// # #[derive(Clone)] struct DatabaseService;
692    /// # impl DatabaseService {
693    /// #   fn new(address: &str) -> Self {
694    /// #       DatabaseService
695    /// #   }
696    /// # }
697    /// # #[derive(Debug)]
698    /// # enum DbError {
699    /// #   Rejected
700    /// # }
701    /// # impl std::fmt::Display for DbError {
702    /// #    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
703    /// # }
704    /// # impl std::error::Error for DbError {}
705    /// #
706    /// # impl Service<u32> for DatabaseService {
707    /// #   type Response = String;
708    /// #   type Error = DbError;
709    /// #   type Future = futures_util::future::Ready<Result<String, DbError>>;
710    /// #
711    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
712    /// #       Poll::Ready(Ok(()))
713    /// #   }
714    /// #
715    /// #   fn call(&mut self, request: u32) -> Self::Future {
716    /// #       futures_util::future::ready(Ok(String::new()))
717    /// #   }
718    /// # }
719    /// #
720    /// # fn main() {
721    /// #    async {
722    /// // A service taking a u32 as a request and returning Result<_, DbError>
723    /// let service = DatabaseService::new("127.0.0.1:8080");
724    ///
725    /// /// Returns `true` if we should query the database for an ID.
726    /// async fn should_query(id: u32) -> bool {
727    ///     // ...
728    ///     # true
729    /// }
730    ///
731    /// // Filter requests based on `should_query`.
732    /// let mut new_service = service
733    ///     .filter_async(|id: u32| async move {
734    ///         if should_query(id).await {
735    ///             return Ok(id);
736    ///         }
737    ///
738    ///         Err(DbError::Rejected)
739    ///     });
740    ///
741    /// // Call the new service
742    /// let id = 13;
743    /// # let id: u32 = id;
744    /// let response = new_service
745    ///     .ready()
746    ///     .await?
747    ///     .call(id)
748    ///     .await;
749    /// # response
750    /// #    };
751    /// # }
752    /// ```
753    ///
754    /// [`AsyncFilter`]: crate::filter::AsyncFilter
755    /// [asynchronous predicate]: crate::filter::AsyncPredicate
756    #[cfg(feature = "filter")]
757    fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F>
758    where
759        Self: Sized,
760        F: crate::filter::AsyncPredicate<NewRequest>,
761    {
762        crate::filter::AsyncFilter::new(self, filter)
763    }
764
765    /// Composes an asynchronous function *after* this service.
766    ///
767    /// This takes a function or closure returning a future, and returns a new
768    /// `Service` that chains that function after this service's [`Future`]. The
769    /// new `Service`'s future will consist of this service's future, followed
770    /// by the future returned by calling the chained function with the future's
771    /// [`Output`] type. The chained function is called regardless of whether
772    /// this service's future completes with a successful response or with an
773    /// error.
774    ///
775    /// This method can be thought of as an equivalent to the [`futures`
776    /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that
777    /// _return_ futures, rather than on an individual future. Similarly to that
778    /// combinator, [`ServiceExt::then`] can be used to implement asynchronous
779    /// error recovery, by calling some asynchronous function with errors
780    /// returned by this service. Alternatively, it may also be used to call a
781    /// fallible async function with the successful response of this service.
782    ///
783    /// This method can be used to change the [`Response`] type of the service
784    /// into a different type. It can also be used to change the [`Error`] type
785    /// of the service. However, because the `then` function is not applied
786    /// to the errors returned by the service's [`poll_ready`] method, it must
787    /// be possible to convert the service's [`Error`] type into the error type
788    /// returned by the `then` future. This is trivial when the function
789    /// returns the same error type as the service, but in other cases, it can
790    /// be useful to use [`BoxError`] to erase differing error types.
791    ///
792    /// # Examples
793    ///
794    /// ```
795    /// # use std::task::{Poll, Context};
796    /// # use tower::{Service, ServiceExt};
797    /// #
798    /// # struct DatabaseService;
799    /// # impl DatabaseService {
800    /// #   fn new(address: &str) -> Self {
801    /// #       DatabaseService
802    /// #   }
803    /// # }
804    /// #
805    /// # type Record = ();
806    /// # type DbError = ();
807    /// #
808    /// # impl Service<u32> for DatabaseService {
809    /// #   type Response = Record;
810    /// #   type Error = DbError;
811    /// #   type Future = futures_util::future::Ready<Result<Record, DbError>>;
812    /// #
813    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
814    /// #       Poll::Ready(Ok(()))
815    /// #   }
816    /// #
817    /// #   fn call(&mut self, request: u32) -> Self::Future {
818    /// #       futures_util::future::ready(Ok(()))
819    /// #   }
820    /// # }
821    /// #
822    /// # fn main() {
823    /// // A service returning Result<Record, DbError>
824    /// let service = DatabaseService::new("127.0.0.1:8080");
825    ///
826    /// // An async function that attempts to recover from errors returned by the
827    /// // database.
828    /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> {
829    ///     // ...
830    ///     # Ok(())
831    /// }
832    /// #    async {
833    ///
834    /// // If the database service returns an error, attempt to recover by
835    /// // calling `recover_from_error`. Otherwise, return the successful response.
836    /// let mut new_service = service.then(|result| async move {
837    ///     match result {
838    ///         Ok(record) => Ok(record),
839    ///         Err(e) => recover_from_error(e).await,
840    ///     }
841    /// });
842    ///
843    /// // Call the new service
844    /// let id = 13;
845    /// let record = new_service
846    ///     .ready()
847    ///     .await?
848    ///     .call(id)
849    ///     .await?;
850    /// # Ok::<(), DbError>(())
851    /// #    };
852    /// # }
853    /// ```
854    ///
855    /// [`Future`]: crate::Service::Future
856    /// [`Output`]: std::future::Future::Output
857    /// [`futures` crate]: https://docs.rs/futures
858    /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
859    /// [`Error`]: crate::Service::Error
860    /// [`Response`]: crate::Service::Response
861    /// [`poll_ready`]: crate::Service::poll_ready
862    /// [`BoxError`]: crate::BoxError
863    fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
864    where
865        Self: Sized,
866        Error: From<Self::Error>,
867        F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone,
868        Fut: Future<Output = Result<Response, Error>>,
869    {
870        Then::new(self, f)
871    }
872
873    /// Composes a function that transforms futures produced by the service.
874    ///
875    /// This takes a function or closure returning a future computed from the future returned by
876    /// the service's [`call`] method, as opposed to the responses produced by the future.
877    ///
878    /// # Examples
879    ///
880    /// ```
881    /// # use std::task::{Poll, Context};
882    /// # use tower::{Service, ServiceExt, BoxError};
883    /// #
884    /// # struct DatabaseService;
885    /// # impl DatabaseService {
886    /// #   fn new(address: &str) -> Self {
887    /// #       DatabaseService
888    /// #   }
889    /// # }
890    /// #
891    /// # type Record = ();
892    /// # type DbError = crate::BoxError;
893    /// #
894    /// # impl Service<u32> for DatabaseService {
895    /// #   type Response = Record;
896    /// #   type Error = DbError;
897    /// #   type Future = futures_util::future::Ready<Result<Record, DbError>>;
898    /// #
899    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
900    /// #       Poll::Ready(Ok(()))
901    /// #   }
902    /// #
903    /// #   fn call(&mut self, request: u32) -> Self::Future {
904    /// #       futures_util::future::ready(Ok(()))
905    /// #   }
906    /// # }
907    /// #
908    /// # fn main() {
909    /// use std::time::Duration;
910    /// use tokio::time::timeout;
911    ///
912    /// // A service returning Result<Record, DbError>
913    /// let service = DatabaseService::new("127.0.0.1:8080");
914    /// #    async {
915    ///
916    /// let mut new_service = service.map_future(|future| async move {
917    ///     let res = timeout(Duration::from_secs(1), future).await?;
918    ///     Ok::<_, BoxError>(res)
919    /// });
920    ///
921    /// // Call the new service
922    /// let id = 13;
923    /// let record = new_service
924    ///     .ready()
925    ///     .await?
926    ///     .call(id)
927    ///     .await?;
928    /// # Ok::<(), BoxError>(())
929    /// #    };
930    /// # }
931    /// ```
932    ///
933    /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`].
934    ///
935    /// [`call`]: crate::Service::call
936    /// [`Timeout`]: crate::timeout::Timeout
937    fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
938    where
939        Self: Sized,
940        F: FnMut(Self::Future) -> Fut,
941        Error: From<Self::Error>,
942        Fut: Future<Output = Result<Response, Error>>,
943    {
944        MapFuture::new(self, f)
945    }
946
947    /// Convert the service into a [`Service`] + [`Send`] trait object.
948    ///
949    /// See [`BoxService`] for more details.
950    ///
951    /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method
952    /// can be used instead, to produce a boxed service which will also
953    /// implement [`Clone`].
954    ///
955    /// # Example
956    ///
957    /// ```
958    /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService};
959    /// #
960    /// # struct Request;
961    /// # struct Response;
962    /// # impl Response {
963    /// #     fn new() -> Self { Self }
964    /// # }
965    ///
966    /// let service = service_fn(|req: Request| async {
967    ///     Ok::<_, BoxError>(Response::new())
968    /// });
969    ///
970    /// let service: BoxService<Request, Response, BoxError> = service
971    ///     .map_request(|req| {
972    ///         println!("received request");
973    ///         req
974    ///     })
975    ///     .map_response(|res| {
976    ///         println!("response produced");
977    ///         res
978    ///     })
979    ///     .boxed();
980    /// # let service = assert_service(service);
981    /// # fn assert_service<S, R>(svc: S) -> S
982    /// # where S: Service<R> { svc }
983    /// ```
984    ///
985    /// [`Service`]: crate::Service
986    /// [`boxed_clone`]: Self::boxed_clone
987    fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
988    where
989        Self: Sized + Send + 'static,
990        Self::Future: Send + 'static,
991    {
992        BoxService::new(self)
993    }
994
995    /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object.
996    ///
997    /// This is similar to the [`boxed`] method, but it requires that `Self` implement
998    /// [`Clone`], and the returned boxed service implements [`Clone`].
999    /// See [`BoxCloneService`] for more details.
1000    ///
1001    /// # Example
1002    ///
1003    /// ```
1004    /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService};
1005    /// #
1006    /// # struct Request;
1007    /// # struct Response;
1008    /// # impl Response {
1009    /// #     fn new() -> Self { Self }
1010    /// # }
1011    ///
1012    /// let service = service_fn(|req: Request| async {
1013    ///     Ok::<_, BoxError>(Response::new())
1014    /// });
1015    ///
1016    /// let service: BoxCloneService<Request, Response, BoxError> = service
1017    ///     .map_request(|req| {
1018    ///         println!("received request");
1019    ///         req
1020    ///     })
1021    ///     .map_response(|res| {
1022    ///         println!("response produced");
1023    ///         res
1024    ///     })
1025    ///     .boxed_clone();
1026    ///
1027    /// // The boxed service can still be cloned.
1028    /// service.clone();
1029    /// # let service = assert_service(service);
1030    /// # fn assert_service<S, R>(svc: S) -> S
1031    /// # where S: Service<R> { svc }
1032    /// ```
1033    ///
1034    /// [`Service`]: crate::Service
1035    /// [`boxed`]: Self::boxed
1036    fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error>
1037    where
1038        Self: Clone + Sized + Send + 'static,
1039        Self::Future: Send + 'static,
1040    {
1041        BoxCloneService::new(self)
1042    }
1043}
1044
1045impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
1046
1047/// Convert an `Option<Layer>` into a [`Layer`].
1048///
1049/// ```
1050/// # use std::time::Duration;
1051/// # use tower::Service;
1052/// # use tower::builder::ServiceBuilder;
1053/// use tower::util::option_layer;
1054/// # use tower::timeout::TimeoutLayer;
1055/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
1056/// # let timeout = Some(Duration::new(10, 0));
1057/// // Layer to apply a timeout if configured
1058/// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new));
1059///
1060/// ServiceBuilder::new()
1061///     .layer(maybe_timeout)
1062///     .service(svc);
1063/// # }
1064/// ```
1065///
1066/// [`Layer`]: crate::layer::Layer
1067pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> {
1068    if let Some(layer) = layer {
1069        Either::Left(layer)
1070    } else {
1071        Either::Right(Identity::new())
1072    }
1073}