tower/util/mod.rs
1//! Various utility types and functions that are generally used with Tower.
2
3mod and_then;
4mod boxed;
5mod boxed_clone;
6mod boxed_clone_sync;
7mod call_all;
8mod either;
9
10mod future_service;
11mod map_err;
12mod map_request;
13mod map_response;
14mod map_result;
15
16mod map_future;
17mod oneshot;
18mod optional;
19mod ready;
20mod service_fn;
21mod then;
22
23pub mod rng;
24
25pub use self::{
26 and_then::{AndThen, AndThenLayer},
27 boxed::{
28 BoxCloneServiceLayer, BoxCloneSyncServiceLayer, BoxLayer, BoxService, UnsyncBoxService,
29 },
30 boxed_clone::BoxCloneService,
31 boxed_clone_sync::BoxCloneSyncService,
32 either::Either,
33 future_service::{future_service, FutureService},
34 map_err::{MapErr, MapErrLayer},
35 map_future::{MapFuture, MapFutureLayer},
36 map_request::{MapRequest, MapRequestLayer},
37 map_response::{MapResponse, MapResponseLayer},
38 map_result::{MapResult, MapResultLayer},
39 oneshot::Oneshot,
40 optional::Optional,
41 ready::{Ready, ReadyOneshot},
42 service_fn::{service_fn, ServiceFn},
43 then::{Then, ThenLayer},
44};
45
46pub use self::call_all::{CallAll, CallAllUnordered};
47use std::future::Future;
48
49use crate::layer::util::Identity;
50
51pub mod error {
52 //! Error types
53
54 pub use super::optional::error as optional;
55}
56
57pub mod future {
58 //! Future types
59
60 pub use super::and_then::AndThenFuture;
61 pub use super::either::EitherResponseFuture;
62 pub use super::map_err::MapErrFuture;
63 pub use super::map_response::MapResponseFuture;
64 pub use super::map_result::MapResultFuture;
65 pub use super::optional::future as optional;
66 pub use super::then::ThenFuture;
67}
68
69/// An extension trait for `Service`s that provides a variety of convenient
70/// adapters
71pub trait ServiceExt<Request>: tower_service::Service<Request> {
72 /// Yields a mutable reference to the service when it is ready to accept a request.
73 fn ready(&mut self) -> Ready<'_, Self, Request>
74 where
75 Self: Sized,
76 {
77 Ready::new(self)
78 }
79
80 /// Yields the service when it is ready to accept a request.
81 fn ready_oneshot(self) -> ReadyOneshot<Self, Request>
82 where
83 Self: Sized,
84 {
85 ReadyOneshot::new(self)
86 }
87
88 /// Consume this `Service`, calling it with the provided request once it is ready.
89 fn oneshot(self, req: Request) -> Oneshot<Self, Request>
90 where
91 Self: Sized,
92 {
93 Oneshot::new(self, req)
94 }
95
96 /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses.
97 ///
98 /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item =
99 /// Response>`][stream]. See the documentation for [`CallAll`] for
100 /// details.
101 ///
102 /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
103 /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
104 fn call_all<S>(self, reqs: S) -> CallAll<Self, S>
105 where
106 Self: Sized,
107 S: futures_core::Stream<Item = Request>,
108 {
109 CallAll::new(self, reqs)
110 }
111
112 /// Executes a new future after this service's future resolves. This does
113 /// not alter the behaviour of the [`poll_ready`] method.
114 ///
115 /// This method can be used to change the [`Response`] type of the service
116 /// into a different type. You can use this method to chain along a computation once the
117 /// service's response has been resolved.
118 ///
119 /// [`Response`]: crate::Service::Response
120 /// [`poll_ready`]: crate::Service::poll_ready
121 ///
122 /// # Example
123 /// ```
124 /// # use std::task::{Poll, Context};
125 /// # use tower::{Service, ServiceExt};
126 /// #
127 /// # struct DatabaseService;
128 /// # impl DatabaseService {
129 /// # fn new(address: &str) -> Self {
130 /// # DatabaseService
131 /// # }
132 /// # }
133 /// #
134 /// # struct Record {
135 /// # pub name: String,
136 /// # pub age: u16
137 /// # }
138 /// #
139 /// # impl Service<u32> for DatabaseService {
140 /// # type Response = Record;
141 /// # type Error = u8;
142 /// # type Future = futures_util::future::Ready<Result<Record, u8>>;
143 /// #
144 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145 /// # Poll::Ready(Ok(()))
146 /// # }
147 /// #
148 /// # fn call(&mut self, request: u32) -> Self::Future {
149 /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
150 /// # }
151 /// # }
152 /// #
153 /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
154 /// #
155 /// # fn main() {
156 /// # async {
157 /// // A service returning Result<Record, _>
158 /// let service = DatabaseService::new("127.0.0.1:8080");
159 ///
160 /// // Map the response into a new response
161 /// let mut new_service = service.and_then(|record: Record| async move {
162 /// let name = record.name;
163 /// avatar_lookup(name).await
164 /// });
165 ///
166 /// // Call the new service
167 /// let id = 13;
168 /// let avatar = new_service.call(id).await.unwrap();
169 /// # };
170 /// # }
171 /// ```
172 fn and_then<F>(self, f: F) -> AndThen<Self, F>
173 where
174 Self: Sized,
175 F: Clone,
176 {
177 AndThen::new(self, f)
178 }
179
180 /// Maps this service's response value to a different value. This does not
181 /// alter the behaviour of the [`poll_ready`] method.
182 ///
183 /// This method can be used to change the [`Response`] type of the service
184 /// into a different type. It is similar to the [`Result::map`]
185 /// method. You can use this method to chain along a computation once the
186 /// service's response has been resolved.
187 ///
188 /// [`Response`]: crate::Service::Response
189 /// [`poll_ready`]: crate::Service::poll_ready
190 ///
191 /// # Example
192 /// ```
193 /// # use std::task::{Poll, Context};
194 /// # use tower::{Service, ServiceExt};
195 /// #
196 /// # struct DatabaseService;
197 /// # impl DatabaseService {
198 /// # fn new(address: &str) -> Self {
199 /// # DatabaseService
200 /// # }
201 /// # }
202 /// #
203 /// # struct Record {
204 /// # pub name: String,
205 /// # pub age: u16
206 /// # }
207 /// #
208 /// # impl Service<u32> for DatabaseService {
209 /// # type Response = Record;
210 /// # type Error = u8;
211 /// # type Future = futures_util::future::Ready<Result<Record, u8>>;
212 /// #
213 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
214 /// # Poll::Ready(Ok(()))
215 /// # }
216 /// #
217 /// # fn call(&mut self, request: u32) -> Self::Future {
218 /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
219 /// # }
220 /// # }
221 /// #
222 /// # fn main() {
223 /// # async {
224 /// // A service returning Result<Record, _>
225 /// let service = DatabaseService::new("127.0.0.1:8080");
226 ///
227 /// // Map the response into a new response
228 /// let mut new_service = service.map_response(|record| record.name);
229 ///
230 /// // Call the new service
231 /// let id = 13;
232 /// let name = new_service
233 /// .ready()
234 /// .await?
235 /// .call(id)
236 /// .await?;
237 /// # Ok::<(), u8>(())
238 /// # };
239 /// # }
240 /// ```
241 fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
242 where
243 Self: Sized,
244 F: FnOnce(Self::Response) -> Response + Clone,
245 {
246 MapResponse::new(self, f)
247 }
248
249 /// Maps this service's error value to a different value. This does not
250 /// alter the behaviour of the [`poll_ready`] method.
251 ///
252 /// This method can be used to change the [`Error`] type of the service
253 /// into a different type. It is similar to the [`Result::map_err`] method.
254 ///
255 /// [`Error`]: crate::Service::Error
256 /// [`poll_ready`]: crate::Service::poll_ready
257 ///
258 /// # Example
259 /// ```
260 /// # use std::task::{Poll, Context};
261 /// # use tower::{Service, ServiceExt};
262 /// #
263 /// # struct DatabaseService;
264 /// # impl DatabaseService {
265 /// # fn new(address: &str) -> Self {
266 /// # DatabaseService
267 /// # }
268 /// # }
269 /// #
270 /// # struct Error {
271 /// # pub code: u32,
272 /// # pub message: String
273 /// # }
274 /// #
275 /// # impl Service<u32> for DatabaseService {
276 /// # type Response = String;
277 /// # type Error = Error;
278 /// # type Future = futures_util::future::Ready<Result<String, Error>>;
279 /// #
280 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
281 /// # Poll::Ready(Ok(()))
282 /// # }
283 /// #
284 /// # fn call(&mut self, request: u32) -> Self::Future {
285 /// # futures_util::future::ready(Ok(String::new()))
286 /// # }
287 /// # }
288 /// #
289 /// # fn main() {
290 /// # async {
291 /// // A service returning Result<_, Error>
292 /// let service = DatabaseService::new("127.0.0.1:8080");
293 ///
294 /// // Map the error to a new error
295 /// let mut new_service = service.map_err(|err| err.code);
296 ///
297 /// // Call the new service
298 /// let id = 13;
299 /// let code = new_service
300 /// .ready()
301 /// .await?
302 /// .call(id)
303 /// .await
304 /// .unwrap_err();
305 /// # Ok::<(), u32>(())
306 /// # };
307 /// # }
308 /// ```
309 fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
310 where
311 Self: Sized,
312 F: FnOnce(Self::Error) -> Error + Clone,
313 {
314 MapErr::new(self, f)
315 }
316
317 /// Maps this service's result type (`Result<Self::Response, Self::Error>`)
318 /// to a different value, regardless of whether the future succeeds or
319 /// fails.
320 ///
321 /// This is similar to the [`map_response`] and [`map_err`] combinators,
322 /// except that the *same* function is invoked when the service's future
323 /// completes, whether it completes successfully or fails. This function
324 /// takes the [`Result`] returned by the service's future, and returns a
325 /// [`Result`].
326 ///
327 /// Like the standard library's [`Result::and_then`], this method can be
328 /// used to implement control flow based on `Result` values. For example, it
329 /// may be used to implement error recovery, by turning some [`Err`]
330 /// responses from the service into [`Ok`] responses. Similarly, some
331 /// successful responses from the service could be rejected, by returning an
332 /// [`Err`] conditionally, depending on the value inside the [`Ok`.] Finally,
333 /// this method can also be used to implement behaviors that must run when a
334 /// service's future completes, regardless of whether it succeeded or failed.
335 ///
336 /// This method can be used to change the [`Response`] type of the service
337 /// into a different type. It can also be used to change the [`Error`] type
338 /// of the service. However, because the [`map_result`] function is not applied
339 /// to the errors returned by the service's [`poll_ready`] method, it must
340 /// be possible to convert the service's [`Error`] type into the error type
341 /// returned by the [`map_result`] function. This is trivial when the function
342 /// returns the same error type as the service, but in other cases, it can
343 /// be useful to use [`BoxError`] to erase differing error types.
344 ///
345 /// # Examples
346 ///
347 /// Recovering from certain errors:
348 ///
349 /// ```
350 /// # use std::task::{Poll, Context};
351 /// # use tower::{Service, ServiceExt};
352 /// #
353 /// # struct DatabaseService;
354 /// # impl DatabaseService {
355 /// # fn new(address: &str) -> Self {
356 /// # DatabaseService
357 /// # }
358 /// # }
359 /// #
360 /// # struct Record {
361 /// # pub name: String,
362 /// # pub age: u16
363 /// # }
364 /// # #[derive(Debug)]
365 /// # enum DbError {
366 /// # Parse(std::num::ParseIntError),
367 /// # NoRecordsFound,
368 /// # }
369 /// #
370 /// # impl Service<u32> for DatabaseService {
371 /// # type Response = Vec<Record>;
372 /// # type Error = DbError;
373 /// # type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>;
374 /// #
375 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
376 /// # Poll::Ready(Ok(()))
377 /// # }
378 /// #
379 /// # fn call(&mut self, request: u32) -> Self::Future {
380 /// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
381 /// # }
382 /// # }
383 /// #
384 /// # fn main() {
385 /// # async {
386 /// // A service returning Result<Vec<Record>, DbError>
387 /// let service = DatabaseService::new("127.0.0.1:8080");
388 ///
389 /// // If the database returns no records for the query, we just want an empty `Vec`.
390 /// let mut new_service = service.map_result(|result| match result {
391 /// // If the error indicates that no records matched the query, return an empty
392 /// // `Vec` instead.
393 /// Err(DbError::NoRecordsFound) => Ok(Vec::new()),
394 /// // Propagate all other responses (`Ok` and `Err`) unchanged
395 /// x => x,
396 /// });
397 ///
398 /// // Call the new service
399 /// let id = 13;
400 /// let name = new_service
401 /// .ready()
402 /// .await?
403 /// .call(id)
404 /// .await?;
405 /// # Ok::<(), DbError>(())
406 /// # };
407 /// # }
408 /// ```
409 ///
410 /// Rejecting some `Ok` responses:
411 ///
412 /// ```
413 /// # use std::task::{Poll, Context};
414 /// # use tower::{Service, ServiceExt};
415 /// #
416 /// # struct DatabaseService;
417 /// # impl DatabaseService {
418 /// # fn new(address: &str) -> Self {
419 /// # DatabaseService
420 /// # }
421 /// # }
422 /// #
423 /// # struct Record {
424 /// # pub name: String,
425 /// # pub age: u16
426 /// # }
427 /// # type DbError = String;
428 /// # type AppError = String;
429 /// #
430 /// # impl Service<u32> for DatabaseService {
431 /// # type Response = Record;
432 /// # type Error = DbError;
433 /// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
434 /// #
435 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
436 /// # Poll::Ready(Ok(()))
437 /// # }
438 /// #
439 /// # fn call(&mut self, request: u32) -> Self::Future {
440 /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
441 /// # }
442 /// # }
443 /// #
444 /// # fn main() {
445 /// # async {
446 /// use tower::BoxError;
447 ///
448 /// // A service returning Result<Record, DbError>
449 /// let service = DatabaseService::new("127.0.0.1:8080");
450 ///
451 /// // If the user is zero years old, return an error.
452 /// let mut new_service = service.map_result(|result| {
453 /// let record = result?;
454 ///
455 /// if record.age == 0 {
456 /// // Users must have been born to use our app!
457 /// let app_error = AppError::from("users cannot be 0 years old!");
458 ///
459 /// // Box the error to erase its type (as it can be an `AppError`
460 /// // *or* the inner service's `DbError`).
461 /// return Err(BoxError::from(app_error));
462 /// }
463 ///
464 /// // Otherwise, return the record.
465 /// Ok(record)
466 /// });
467 ///
468 /// // Call the new service
469 /// let id = 13;
470 /// let record = new_service
471 /// .ready()
472 /// .await?
473 /// .call(id)
474 /// .await?;
475 /// # Ok::<(), BoxError>(())
476 /// # };
477 /// # }
478 /// ```
479 ///
480 /// Performing an action that must be run for both successes and failures:
481 ///
482 /// ```
483 /// # use std::convert::TryFrom;
484 /// # use std::task::{Poll, Context};
485 /// # use tower::{Service, ServiceExt};
486 /// #
487 /// # struct DatabaseService;
488 /// # impl DatabaseService {
489 /// # fn new(address: &str) -> Self {
490 /// # DatabaseService
491 /// # }
492 /// # }
493 /// #
494 /// # impl Service<u32> for DatabaseService {
495 /// # type Response = String;
496 /// # type Error = u8;
497 /// # type Future = futures_util::future::Ready<Result<String, u8>>;
498 /// #
499 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
500 /// # Poll::Ready(Ok(()))
501 /// # }
502 /// #
503 /// # fn call(&mut self, request: u32) -> Self::Future {
504 /// # futures_util::future::ready(Ok(String::new()))
505 /// # }
506 /// # }
507 /// #
508 /// # fn main() {
509 /// # async {
510 /// // A service returning Result<Record, DbError>
511 /// let service = DatabaseService::new("127.0.0.1:8080");
512 ///
513 /// // Print a message whenever a query completes.
514 /// let mut new_service = service.map_result(|result| {
515 /// println!("query completed; success={}", result.is_ok());
516 /// result
517 /// });
518 ///
519 /// // Call the new service
520 /// let id = 13;
521 /// let response = new_service
522 /// .ready()
523 /// .await?
524 /// .call(id)
525 /// .await;
526 /// # response
527 /// # };
528 /// # }
529 /// ```
530 ///
531 /// [`map_response`]: ServiceExt::map_response
532 /// [`map_err`]: ServiceExt::map_err
533 /// [`map_result`]: ServiceExt::map_result
534 /// [`Error`]: crate::Service::Error
535 /// [`Response`]: crate::Service::Response
536 /// [`poll_ready`]: crate::Service::poll_ready
537 /// [`BoxError`]: crate::BoxError
538 fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
539 where
540 Self: Sized,
541 Error: From<Self::Error>,
542 F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,
543 {
544 MapResult::new(self, f)
545 }
546
547 /// Composes a function *in front of* the service.
548 ///
549 /// This adapter produces a new service that passes each value through the
550 /// given function `f` before sending it to `self`.
551 ///
552 /// # Example
553 /// ```
554 /// # use std::convert::TryFrom;
555 /// # use std::task::{Poll, Context};
556 /// # use tower::{Service, ServiceExt};
557 /// #
558 /// # struct DatabaseService;
559 /// # impl DatabaseService {
560 /// # fn new(address: &str) -> Self {
561 /// # DatabaseService
562 /// # }
563 /// # }
564 /// #
565 /// # impl Service<String> for DatabaseService {
566 /// # type Response = String;
567 /// # type Error = u8;
568 /// # type Future = futures_util::future::Ready<Result<String, u8>>;
569 /// #
570 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
571 /// # Poll::Ready(Ok(()))
572 /// # }
573 /// #
574 /// # fn call(&mut self, request: String) -> Self::Future {
575 /// # futures_util::future::ready(Ok(String::new()))
576 /// # }
577 /// # }
578 /// #
579 /// # fn main() {
580 /// # async {
581 /// // A service taking a String as a request
582 /// let service = DatabaseService::new("127.0.0.1:8080");
583 ///
584 /// // Map the request to a new request
585 /// let mut new_service = service.map_request(|id: u32| id.to_string());
586 ///
587 /// // Call the new service
588 /// let id = 13;
589 /// let response = new_service
590 /// .ready()
591 /// .await?
592 /// .call(id)
593 /// .await;
594 /// # response
595 /// # };
596 /// # }
597 /// ```
598 fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
599 where
600 Self: Sized,
601 F: FnMut(NewRequest) -> Request,
602 {
603 MapRequest::new(self, f)
604 }
605
606 /// Composes this service with a [`Filter`] that conditionally accepts or
607 /// rejects requests based on a [predicate].
608 ///
609 /// This adapter produces a new service that passes each value through the
610 /// given function `predicate` before sending it to `self`.
611 ///
612 /// # Example
613 /// ```
614 /// # use std::convert::TryFrom;
615 /// # use std::task::{Poll, Context};
616 /// # use tower::{Service, ServiceExt};
617 /// #
618 /// # struct DatabaseService;
619 /// # impl DatabaseService {
620 /// # fn new(address: &str) -> Self {
621 /// # DatabaseService
622 /// # }
623 /// # }
624 /// #
625 /// # #[derive(Debug)] enum DbError {
626 /// # Parse(std::num::ParseIntError)
627 /// # }
628 /// #
629 /// # impl std::fmt::Display for DbError {
630 /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
631 /// # }
632 /// # impl std::error::Error for DbError {}
633 /// # impl Service<u32> for DatabaseService {
634 /// # type Response = String;
635 /// # type Error = DbError;
636 /// # type Future = futures_util::future::Ready<Result<String, DbError>>;
637 /// #
638 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
639 /// # Poll::Ready(Ok(()))
640 /// # }
641 /// #
642 /// # fn call(&mut self, request: u32) -> Self::Future {
643 /// # futures_util::future::ready(Ok(String::new()))
644 /// # }
645 /// # }
646 /// #
647 /// # fn main() {
648 /// # async {
649 /// // A service taking a u32 as a request and returning Result<_, DbError>
650 /// let service = DatabaseService::new("127.0.0.1:8080");
651 ///
652 /// // Fallibly map the request to a new request
653 /// let mut new_service = service
654 /// .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
655 ///
656 /// // Call the new service
657 /// let id = "13";
658 /// let response = new_service
659 /// .ready()
660 /// .await?
661 /// .call(id)
662 /// .await;
663 /// # response
664 /// # };
665 /// # }
666 /// ```
667 ///
668 /// [`Filter`]: crate::filter::Filter
669 /// [predicate]: crate::filter::Predicate
670 #[cfg(feature = "filter")]
671 fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F>
672 where
673 Self: Sized,
674 F: crate::filter::Predicate<NewRequest>,
675 {
676 crate::filter::Filter::new(self, filter)
677 }
678
679 /// Composes this service with an [`AsyncFilter`] that conditionally accepts or
680 /// rejects requests based on an [async predicate].
681 ///
682 /// This adapter produces a new service that passes each value through the
683 /// given function `predicate` before sending it to `self`.
684 ///
685 /// # Example
686 /// ```
687 /// # use std::convert::TryFrom;
688 /// # use std::task::{Poll, Context};
689 /// # use tower::{Service, ServiceExt};
690 /// #
691 /// # #[derive(Clone)] struct DatabaseService;
692 /// # impl DatabaseService {
693 /// # fn new(address: &str) -> Self {
694 /// # DatabaseService
695 /// # }
696 /// # }
697 /// # #[derive(Debug)]
698 /// # enum DbError {
699 /// # Rejected
700 /// # }
701 /// # impl std::fmt::Display for DbError {
702 /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
703 /// # }
704 /// # impl std::error::Error for DbError {}
705 /// #
706 /// # impl Service<u32> for DatabaseService {
707 /// # type Response = String;
708 /// # type Error = DbError;
709 /// # type Future = futures_util::future::Ready<Result<String, DbError>>;
710 /// #
711 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
712 /// # Poll::Ready(Ok(()))
713 /// # }
714 /// #
715 /// # fn call(&mut self, request: u32) -> Self::Future {
716 /// # futures_util::future::ready(Ok(String::new()))
717 /// # }
718 /// # }
719 /// #
720 /// # fn main() {
721 /// # async {
722 /// // A service taking a u32 as a request and returning Result<_, DbError>
723 /// let service = DatabaseService::new("127.0.0.1:8080");
724 ///
725 /// /// Returns `true` if we should query the database for an ID.
726 /// async fn should_query(id: u32) -> bool {
727 /// // ...
728 /// # true
729 /// }
730 ///
731 /// // Filter requests based on `should_query`.
732 /// let mut new_service = service
733 /// .filter_async(|id: u32| async move {
734 /// if should_query(id).await {
735 /// return Ok(id);
736 /// }
737 ///
738 /// Err(DbError::Rejected)
739 /// });
740 ///
741 /// // Call the new service
742 /// let id = 13;
743 /// # let id: u32 = id;
744 /// let response = new_service
745 /// .ready()
746 /// .await?
747 /// .call(id)
748 /// .await;
749 /// # response
750 /// # };
751 /// # }
752 /// ```
753 ///
754 /// [`AsyncFilter`]: crate::filter::AsyncFilter
755 /// [asynchronous predicate]: crate::filter::AsyncPredicate
756 #[cfg(feature = "filter")]
757 fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F>
758 where
759 Self: Sized,
760 F: crate::filter::AsyncPredicate<NewRequest>,
761 {
762 crate::filter::AsyncFilter::new(self, filter)
763 }
764
765 /// Composes an asynchronous function *after* this service.
766 ///
767 /// This takes a function or closure returning a future, and returns a new
768 /// `Service` that chains that function after this service's [`Future`]. The
769 /// new `Service`'s future will consist of this service's future, followed
770 /// by the future returned by calling the chained function with the future's
771 /// [`Output`] type. The chained function is called regardless of whether
772 /// this service's future completes with a successful response or with an
773 /// error.
774 ///
775 /// This method can be thought of as an equivalent to the [`futures`
776 /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that
777 /// _return_ futures, rather than on an individual future. Similarly to that
778 /// combinator, [`ServiceExt::then`] can be used to implement asynchronous
779 /// error recovery, by calling some asynchronous function with errors
780 /// returned by this service. Alternatively, it may also be used to call a
781 /// fallible async function with the successful response of this service.
782 ///
783 /// This method can be used to change the [`Response`] type of the service
784 /// into a different type. It can also be used to change the [`Error`] type
785 /// of the service. However, because the `then` function is not applied
786 /// to the errors returned by the service's [`poll_ready`] method, it must
787 /// be possible to convert the service's [`Error`] type into the error type
788 /// returned by the `then` future. This is trivial when the function
789 /// returns the same error type as the service, but in other cases, it can
790 /// be useful to use [`BoxError`] to erase differing error types.
791 ///
792 /// # Examples
793 ///
794 /// ```
795 /// # use std::task::{Poll, Context};
796 /// # use tower::{Service, ServiceExt};
797 /// #
798 /// # struct DatabaseService;
799 /// # impl DatabaseService {
800 /// # fn new(address: &str) -> Self {
801 /// # DatabaseService
802 /// # }
803 /// # }
804 /// #
805 /// # type Record = ();
806 /// # type DbError = ();
807 /// #
808 /// # impl Service<u32> for DatabaseService {
809 /// # type Response = Record;
810 /// # type Error = DbError;
811 /// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
812 /// #
813 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
814 /// # Poll::Ready(Ok(()))
815 /// # }
816 /// #
817 /// # fn call(&mut self, request: u32) -> Self::Future {
818 /// # futures_util::future::ready(Ok(()))
819 /// # }
820 /// # }
821 /// #
822 /// # fn main() {
823 /// // A service returning Result<Record, DbError>
824 /// let service = DatabaseService::new("127.0.0.1:8080");
825 ///
826 /// // An async function that attempts to recover from errors returned by the
827 /// // database.
828 /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> {
829 /// // ...
830 /// # Ok(())
831 /// }
832 /// # async {
833 ///
834 /// // If the database service returns an error, attempt to recover by
835 /// // calling `recover_from_error`. Otherwise, return the successful response.
836 /// let mut new_service = service.then(|result| async move {
837 /// match result {
838 /// Ok(record) => Ok(record),
839 /// Err(e) => recover_from_error(e).await,
840 /// }
841 /// });
842 ///
843 /// // Call the new service
844 /// let id = 13;
845 /// let record = new_service
846 /// .ready()
847 /// .await?
848 /// .call(id)
849 /// .await?;
850 /// # Ok::<(), DbError>(())
851 /// # };
852 /// # }
853 /// ```
854 ///
855 /// [`Future`]: crate::Service::Future
856 /// [`Output`]: std::future::Future::Output
857 /// [`futures` crate]: https://docs.rs/futures
858 /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
859 /// [`Error`]: crate::Service::Error
860 /// [`Response`]: crate::Service::Response
861 /// [`poll_ready`]: crate::Service::poll_ready
862 /// [`BoxError`]: crate::BoxError
863 fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
864 where
865 Self: Sized,
866 Error: From<Self::Error>,
867 F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone,
868 Fut: Future<Output = Result<Response, Error>>,
869 {
870 Then::new(self, f)
871 }
872
873 /// Composes a function that transforms futures produced by the service.
874 ///
875 /// This takes a function or closure returning a future computed from the future returned by
876 /// the service's [`call`] method, as opposed to the responses produced by the future.
877 ///
878 /// # Examples
879 ///
880 /// ```
881 /// # use std::task::{Poll, Context};
882 /// # use tower::{Service, ServiceExt, BoxError};
883 /// #
884 /// # struct DatabaseService;
885 /// # impl DatabaseService {
886 /// # fn new(address: &str) -> Self {
887 /// # DatabaseService
888 /// # }
889 /// # }
890 /// #
891 /// # type Record = ();
892 /// # type DbError = crate::BoxError;
893 /// #
894 /// # impl Service<u32> for DatabaseService {
895 /// # type Response = Record;
896 /// # type Error = DbError;
897 /// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
898 /// #
899 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
900 /// # Poll::Ready(Ok(()))
901 /// # }
902 /// #
903 /// # fn call(&mut self, request: u32) -> Self::Future {
904 /// # futures_util::future::ready(Ok(()))
905 /// # }
906 /// # }
907 /// #
908 /// # fn main() {
909 /// use std::time::Duration;
910 /// use tokio::time::timeout;
911 ///
912 /// // A service returning Result<Record, DbError>
913 /// let service = DatabaseService::new("127.0.0.1:8080");
914 /// # async {
915 ///
916 /// let mut new_service = service.map_future(|future| async move {
917 /// let res = timeout(Duration::from_secs(1), future).await?;
918 /// Ok::<_, BoxError>(res)
919 /// });
920 ///
921 /// // Call the new service
922 /// let id = 13;
923 /// let record = new_service
924 /// .ready()
925 /// .await?
926 /// .call(id)
927 /// .await?;
928 /// # Ok::<(), BoxError>(())
929 /// # };
930 /// # }
931 /// ```
932 ///
933 /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`].
934 ///
935 /// [`call`]: crate::Service::call
936 /// [`Timeout`]: crate::timeout::Timeout
937 fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
938 where
939 Self: Sized,
940 F: FnMut(Self::Future) -> Fut,
941 Error: From<Self::Error>,
942 Fut: Future<Output = Result<Response, Error>>,
943 {
944 MapFuture::new(self, f)
945 }
946
947 /// Convert the service into a [`Service`] + [`Send`] trait object.
948 ///
949 /// See [`BoxService`] for more details.
950 ///
951 /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method
952 /// can be used instead, to produce a boxed service which will also
953 /// implement [`Clone`].
954 ///
955 /// # Example
956 ///
957 /// ```
958 /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService};
959 /// #
960 /// # struct Request;
961 /// # struct Response;
962 /// # impl Response {
963 /// # fn new() -> Self { Self }
964 /// # }
965 ///
966 /// let service = service_fn(|req: Request| async {
967 /// Ok::<_, BoxError>(Response::new())
968 /// });
969 ///
970 /// let service: BoxService<Request, Response, BoxError> = service
971 /// .map_request(|req| {
972 /// println!("received request");
973 /// req
974 /// })
975 /// .map_response(|res| {
976 /// println!("response produced");
977 /// res
978 /// })
979 /// .boxed();
980 /// # let service = assert_service(service);
981 /// # fn assert_service<S, R>(svc: S) -> S
982 /// # where S: Service<R> { svc }
983 /// ```
984 ///
985 /// [`Service`]: crate::Service
986 /// [`boxed_clone`]: Self::boxed_clone
987 fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
988 where
989 Self: Sized + Send + 'static,
990 Self::Future: Send + 'static,
991 {
992 BoxService::new(self)
993 }
994
995 /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object.
996 ///
997 /// This is similar to the [`boxed`] method, but it requires that `Self` implement
998 /// [`Clone`], and the returned boxed service implements [`Clone`].
999 /// See [`BoxCloneService`] for more details.
1000 ///
1001 /// # Example
1002 ///
1003 /// ```
1004 /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService};
1005 /// #
1006 /// # struct Request;
1007 /// # struct Response;
1008 /// # impl Response {
1009 /// # fn new() -> Self { Self }
1010 /// # }
1011 ///
1012 /// let service = service_fn(|req: Request| async {
1013 /// Ok::<_, BoxError>(Response::new())
1014 /// });
1015 ///
1016 /// let service: BoxCloneService<Request, Response, BoxError> = service
1017 /// .map_request(|req| {
1018 /// println!("received request");
1019 /// req
1020 /// })
1021 /// .map_response(|res| {
1022 /// println!("response produced");
1023 /// res
1024 /// })
1025 /// .boxed_clone();
1026 ///
1027 /// // The boxed service can still be cloned.
1028 /// service.clone();
1029 /// # let service = assert_service(service);
1030 /// # fn assert_service<S, R>(svc: S) -> S
1031 /// # where S: Service<R> { svc }
1032 /// ```
1033 ///
1034 /// [`Service`]: crate::Service
1035 /// [`boxed`]: Self::boxed
1036 fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error>
1037 where
1038 Self: Clone + Sized + Send + 'static,
1039 Self::Future: Send + 'static,
1040 {
1041 BoxCloneService::new(self)
1042 }
1043}
1044
1045impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
1046
1047/// Convert an `Option<Layer>` into a [`Layer`].
1048///
1049/// ```
1050/// # use std::time::Duration;
1051/// # use tower::Service;
1052/// # use tower::builder::ServiceBuilder;
1053/// use tower::util::option_layer;
1054/// # use tower::timeout::TimeoutLayer;
1055/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
1056/// # let timeout = Some(Duration::new(10, 0));
1057/// // Layer to apply a timeout if configured
1058/// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new));
1059///
1060/// ServiceBuilder::new()
1061/// .layer(maybe_timeout)
1062/// .service(svc);
1063/// # }
1064/// ```
1065///
1066/// [`Layer`]: crate::layer::Layer
1067pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> {
1068 if let Some(layer) = layer {
1069 Either::Left(layer)
1070 } else {
1071 Either::Right(Identity::new())
1072 }
1073}