futures_util/
lib.rs

1//! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
2//! and the `AsyncRead` and `AsyncWrite` traits.
3
4#![no_std]
5#![doc(test(
6    no_crate_inject,
7    attr(
8        deny(warnings, rust_2018_idioms, single_use_lifetimes),
9        allow(dead_code, unused_assignments, unused_variables)
10    )
11))]
12#![warn(missing_docs, unsafe_op_in_unsafe_fn)]
13#![cfg_attr(docsrs, feature(doc_cfg))]
14
15#[cfg(all(feature = "bilock", not(feature = "unstable")))]
16compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
17
18#[cfg(feature = "alloc")]
19extern crate alloc;
20#[cfg(feature = "std")]
21extern crate std;
22
23// Macro re-exports
24pub use futures_core::ready;
25pub use pin_utils::pin_mut;
26
27#[cfg(feature = "async-await")]
28#[macro_use]
29mod async_await;
30#[cfg(feature = "async-await")]
31#[doc(hidden)]
32pub use self::async_await::*;
33
34// Not public API.
35#[cfg(feature = "async-await")]
36#[doc(hidden)]
37pub mod __private {
38    pub use crate::*;
39    pub use core::{
40        option::Option::{self, None, Some},
41        pin::Pin,
42        result::Result::{Err, Ok},
43    };
44
45    pub mod async_await {
46        pub use crate::async_await::*;
47    }
48}
49
50#[cfg(feature = "sink")]
51macro_rules! delegate_sink {
52    ($field:ident, $item:ty) => {
53        fn poll_ready(
54            self: core::pin::Pin<&mut Self>,
55            cx: &mut core::task::Context<'_>,
56        ) -> core::task::Poll<Result<(), Self::Error>> {
57            self.project().$field.poll_ready(cx)
58        }
59
60        fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
61            self.project().$field.start_send(item)
62        }
63
64        fn poll_flush(
65            self: core::pin::Pin<&mut Self>,
66            cx: &mut core::task::Context<'_>,
67        ) -> core::task::Poll<Result<(), Self::Error>> {
68            self.project().$field.poll_flush(cx)
69        }
70
71        fn poll_close(
72            self: core::pin::Pin<&mut Self>,
73            cx: &mut core::task::Context<'_>,
74        ) -> core::task::Poll<Result<(), Self::Error>> {
75            self.project().$field.poll_close(cx)
76        }
77    };
78}
79
80macro_rules! delegate_future {
81    ($field:ident) => {
82        fn poll(
83            self: core::pin::Pin<&mut Self>,
84            cx: &mut core::task::Context<'_>,
85        ) -> core::task::Poll<Self::Output> {
86            self.project().$field.poll(cx)
87        }
88    };
89}
90
91macro_rules! delegate_stream {
92    ($field:ident) => {
93        fn poll_next(
94            self: core::pin::Pin<&mut Self>,
95            cx: &mut core::task::Context<'_>,
96        ) -> core::task::Poll<Option<Self::Item>> {
97            self.project().$field.poll_next(cx)
98        }
99        fn size_hint(&self) -> (usize, Option<usize>) {
100            self.$field.size_hint()
101        }
102    };
103}
104
105#[cfg(feature = "io")]
106#[cfg(feature = "std")]
107macro_rules! delegate_async_write {
108    ($field:ident) => {
109        fn poll_write(
110            self: core::pin::Pin<&mut Self>,
111            cx: &mut core::task::Context<'_>,
112            buf: &[u8],
113        ) -> core::task::Poll<std::io::Result<usize>> {
114            self.project().$field.poll_write(cx, buf)
115        }
116        fn poll_write_vectored(
117            self: core::pin::Pin<&mut Self>,
118            cx: &mut core::task::Context<'_>,
119            bufs: &[std::io::IoSlice<'_>],
120        ) -> core::task::Poll<std::io::Result<usize>> {
121            self.project().$field.poll_write_vectored(cx, bufs)
122        }
123        fn poll_flush(
124            self: core::pin::Pin<&mut Self>,
125            cx: &mut core::task::Context<'_>,
126        ) -> core::task::Poll<std::io::Result<()>> {
127            self.project().$field.poll_flush(cx)
128        }
129        fn poll_close(
130            self: core::pin::Pin<&mut Self>,
131            cx: &mut core::task::Context<'_>,
132        ) -> core::task::Poll<std::io::Result<()>> {
133            self.project().$field.poll_close(cx)
134        }
135    };
136}
137
138#[cfg(feature = "io")]
139#[cfg(feature = "std")]
140macro_rules! delegate_async_read {
141    ($field:ident) => {
142        fn poll_read(
143            self: core::pin::Pin<&mut Self>,
144            cx: &mut core::task::Context<'_>,
145            buf: &mut [u8],
146        ) -> core::task::Poll<std::io::Result<usize>> {
147            self.project().$field.poll_read(cx, buf)
148        }
149
150        fn poll_read_vectored(
151            self: core::pin::Pin<&mut Self>,
152            cx: &mut core::task::Context<'_>,
153            bufs: &mut [std::io::IoSliceMut<'_>],
154        ) -> core::task::Poll<std::io::Result<usize>> {
155            self.project().$field.poll_read_vectored(cx, bufs)
156        }
157    };
158}
159
160#[cfg(feature = "io")]
161#[cfg(feature = "std")]
162macro_rules! delegate_async_buf_read {
163    ($field:ident) => {
164        fn poll_fill_buf(
165            self: core::pin::Pin<&mut Self>,
166            cx: &mut core::task::Context<'_>,
167        ) -> core::task::Poll<std::io::Result<&[u8]>> {
168            self.project().$field.poll_fill_buf(cx)
169        }
170
171        fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
172            self.project().$field.consume(amt)
173        }
174    };
175}
176
177macro_rules! delegate_access_inner {
178    ($field:ident, $inner:ty, ($($ind:tt)*)) => {
179        /// Acquires a reference to the underlying sink or stream that this combinator is
180        /// pulling from.
181        pub fn get_ref(&self) -> &$inner {
182            (&self.$field) $($ind get_ref())*
183        }
184
185        /// Acquires a mutable reference to the underlying sink or stream that this
186        /// combinator is pulling from.
187        ///
188        /// Note that care must be taken to avoid tampering with the state of the
189        /// sink or stream which may otherwise confuse this combinator.
190        pub fn get_mut(&mut self) -> &mut $inner {
191            (&mut self.$field) $($ind get_mut())*
192        }
193
194        /// Acquires a pinned mutable reference to the underlying sink or stream that this
195        /// combinator is pulling from.
196        ///
197        /// Note that care must be taken to avoid tampering with the state of the
198        /// sink or stream which may otherwise confuse this combinator.
199        pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> {
200            self.project().$field $($ind get_pin_mut())*
201        }
202
203        /// Consumes this combinator, returning the underlying sink or stream.
204        ///
205        /// Note that this may discard intermediate state of this combinator, so
206        /// care should be taken to avoid losing resources when this is called.
207        pub fn into_inner(self) -> $inner {
208            self.$field $($ind into_inner())*
209        }
210    }
211}
212
213macro_rules! delegate_all {
214    (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
215        impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* {
216            type Output = <$t as futures_core::future::Future>::Output;
217
218            delegate_future!(inner);
219        }
220    };
221    (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
222        impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* {
223            fn is_terminated(&self) -> bool {
224                self.inner.is_terminated()
225            }
226        }
227    };
228    (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
229        impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* {
230            type Item = <$t as futures_core::stream::Stream>::Item;
231
232            delegate_stream!(inner);
233        }
234    };
235    (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
236        impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* {
237            fn is_terminated(&self) -> bool {
238                self.inner.is_terminated()
239            }
240        }
241    };
242    (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
243        #[cfg(feature = "sink")]
244        impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* {
245            type Error = <$t as futures_sink::Sink<_Item>>::Error;
246
247            delegate_sink!(inner, _Item);
248        }
249    };
250    (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
251        impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* {
252            fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
253                core::fmt::Debug::fmt(&self.inner, f)
254            }
255        }
256    };
257    (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
258        impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
259            delegate_access_inner!(inner, $inner, ($($ind)*));
260        }
261    };
262    (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
263        impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
264            pub(crate) fn new($($param: $paramt),*) -> Self {
265                Self { inner: $cons }
266            }
267        }
268    };
269    ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
270        pin_project_lite::pin_project! {
271            #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
272            $(#[$attr])*
273            pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t }
274        }
275
276        impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* {
277            $($($item)*)*
278        }
279
280        delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
281    };
282    ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
283        delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*);
284
285        delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
286    };
287}
288
289pub mod future;
290#[doc(no_inline)]
291pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt};
292
293pub mod stream;
294#[doc(no_inline)]
295pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt};
296
297#[cfg(feature = "sink")]
298#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
299pub mod sink;
300#[cfg(feature = "sink")]
301#[doc(no_inline)]
302pub use crate::sink::{Sink, SinkExt};
303
304pub mod task;
305
306pub mod never;
307
308#[cfg(feature = "compat")]
309#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
310pub mod compat;
311
312#[cfg(feature = "io")]
313#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
314#[cfg(feature = "std")]
315pub mod io;
316#[cfg(feature = "io")]
317#[cfg(feature = "std")]
318#[doc(no_inline)]
319pub use crate::io::{
320    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
321    AsyncWriteExt,
322};
323
324#[cfg(feature = "alloc")]
325pub mod lock;
326
327#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
328#[cfg(feature = "alloc")]
329mod abortable;
330
331mod fns;
332mod unfold_state;