futures_util/stream/stream/
mod.rs

1//! Streams
2//!
3//! This module contains a number of functions for working with `Stream`s,
4//! including the `StreamExt` trait which adds methods to `Stream` types.
5
6use crate::future::{assert_future, Either};
7use crate::stream::assert_stream;
8#[cfg(feature = "alloc")]
9use alloc::boxed::Box;
10#[cfg(feature = "alloc")]
11use alloc::vec::Vec;
12use core::pin::Pin;
13#[cfg(feature = "sink")]
14use futures_core::stream::TryStream;
15#[cfg(feature = "alloc")]
16use futures_core::stream::{BoxStream, LocalBoxStream};
17use futures_core::{
18    future::Future,
19    stream::{FusedStream, Stream},
20    task::{Context, Poll},
21};
22#[cfg(feature = "sink")]
23use futures_sink::Sink;
24
25use crate::fns::{inspect_fn, InspectFn};
26
27mod chain;
28#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
29pub use self::chain::Chain;
30
31mod collect;
32#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
33pub use self::collect::Collect;
34
35mod unzip;
36#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
37pub use self::unzip::Unzip;
38
39mod concat;
40#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
41pub use self::concat::Concat;
42
43mod count;
44#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
45pub use self::count::Count;
46
47mod cycle;
48#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
49pub use self::cycle::Cycle;
50
51mod enumerate;
52#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
53pub use self::enumerate::Enumerate;
54
55mod filter;
56#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
57pub use self::filter::Filter;
58
59mod filter_map;
60#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
61pub use self::filter_map::FilterMap;
62
63mod flatten;
64
65delegate_all!(
66    /// Stream for the [`flatten`](StreamExt::flatten) method.
67    Flatten<St>(
68        flatten::Flatten<St, St::Item>
69    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
70    where St: Stream
71);
72
73mod fold;
74#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
75pub use self::fold::Fold;
76
77mod any;
78#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
79pub use self::any::Any;
80
81mod all;
82#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
83pub use self::all::All;
84
85#[cfg(feature = "sink")]
86mod forward;
87
88#[cfg(feature = "sink")]
89delegate_all!(
90    /// Future for the [`forward`](super::StreamExt::forward) method.
91    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
92    Forward<St, Si>(
93        forward::Forward<St, Si, St::Ok>
94    ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
95    where St: TryStream
96);
97
98mod for_each;
99#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
100pub use self::for_each::ForEach;
101
102mod fuse;
103#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
104pub use self::fuse::Fuse;
105
106mod into_future;
107#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
108pub use self::into_future::StreamFuture;
109
110delegate_all!(
111    /// Stream for the [`inspect`](StreamExt::inspect) method.
112    Inspect<St, F>(
113        map::Map<St, InspectFn<F>>
114    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))]
115);
116
117mod map;
118#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
119pub use self::map::Map;
120
121delegate_all!(
122    /// Stream for the [`flat_map`](StreamExt::flat_map) method.
123    FlatMap<St, U, F>(
124        flatten::Flatten<Map<St, F>, U>
125    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
126);
127
128mod next;
129#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130pub use self::next::Next;
131
132mod select_next_some;
133#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134pub use self::select_next_some::SelectNextSome;
135
136mod peek;
137#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
138pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};
139
140mod skip;
141#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142pub use self::skip::Skip;
143
144mod skip_while;
145#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
146pub use self::skip_while::SkipWhile;
147
148mod take;
149#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150pub use self::take::Take;
151
152mod take_while;
153#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
154pub use self::take_while::TakeWhile;
155
156mod take_until;
157#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158pub use self::take_until::TakeUntil;
159
160mod then;
161#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
162pub use self::then::Then;
163
164mod zip;
165#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
166pub use self::zip::Zip;
167
168#[cfg(feature = "alloc")]
169mod chunks;
170#[cfg(feature = "alloc")]
171#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
172pub use self::chunks::Chunks;
173
174#[cfg(feature = "alloc")]
175mod ready_chunks;
176#[cfg(feature = "alloc")]
177#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
178pub use self::ready_chunks::ReadyChunks;
179
180mod scan;
181#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
182pub use self::scan::Scan;
183
184#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
185#[cfg(feature = "alloc")]
186mod buffer_unordered;
187#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
188#[cfg(feature = "alloc")]
189#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
190pub use self::buffer_unordered::BufferUnordered;
191
192#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
193#[cfg(feature = "alloc")]
194mod buffered;
195#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
196#[cfg(feature = "alloc")]
197#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
198pub use self::buffered::Buffered;
199
200#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
201#[cfg(feature = "alloc")]
202pub(crate) mod flatten_unordered;
203
204#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
205#[cfg(feature = "alloc")]
206#[allow(unreachable_pub)]
207pub use self::flatten_unordered::FlattenUnordered;
208
209#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
210#[cfg(feature = "alloc")]
211delegate_all!(
212    /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
213    FlatMapUnordered<St, U, F>(
214        FlattenUnordered<Map<St, F>>
215    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
216    where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
217);
218
219#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
220#[cfg(feature = "alloc")]
221mod for_each_concurrent;
222#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
223#[cfg(feature = "alloc")]
224#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
225pub use self::for_each_concurrent::ForEachConcurrent;
226
227#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
228#[cfg(feature = "sink")]
229#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
230#[cfg(feature = "alloc")]
231mod split;
232#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
233#[cfg(feature = "sink")]
234#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
235#[cfg(feature = "alloc")]
236#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
237pub use self::split::{ReuniteError, SplitSink, SplitStream};
238
239#[cfg(feature = "std")]
240mod catch_unwind;
241#[cfg(feature = "std")]
242#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
243pub use self::catch_unwind::CatchUnwind;
244
245impl<T: ?Sized> StreamExt for T where T: Stream {}
246
247/// An extension trait for `Stream`s that provides a variety of convenient
248/// combinator functions.
249pub trait StreamExt: Stream {
250    /// Creates a future that resolves to the next item in the stream.
251    ///
252    /// Note that because `next` doesn't take ownership over the stream,
253    /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
254    /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
255    /// be done by boxing the stream using [`Box::pin`] or
256    /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
257    /// crate.
258    ///
259    /// # Examples
260    ///
261    /// ```
262    /// # futures::executor::block_on(async {
263    /// use futures::stream::{self, StreamExt};
264    ///
265    /// let mut stream = stream::iter(1..=3);
266    ///
267    /// assert_eq!(stream.next().await, Some(1));
268    /// assert_eq!(stream.next().await, Some(2));
269    /// assert_eq!(stream.next().await, Some(3));
270    /// assert_eq!(stream.next().await, None);
271    /// # });
272    /// ```
273    fn next(&mut self) -> Next<'_, Self>
274    where
275        Self: Unpin,
276    {
277        assert_future::<Option<Self::Item>, _>(Next::new(self))
278    }
279
280    /// Converts this stream into a future of `(next_item, tail_of_stream)`.
281    /// If the stream terminates, then the next item is [`None`].
282    ///
283    /// The returned future can be used to compose streams and futures together
284    /// by placing everything into the "world of futures".
285    ///
286    /// Note that because `into_future` moves the stream, the [`Stream`] type
287    /// must be [`Unpin`]. If you want to use `into_future` with a
288    /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
289    /// be done by boxing the stream using [`Box::pin`] or
290    /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
291    /// crate.
292    ///
293    /// # Examples
294    ///
295    /// ```
296    /// # futures::executor::block_on(async {
297    /// use futures::stream::{self, StreamExt};
298    ///
299    /// let stream = stream::iter(1..=3);
300    ///
301    /// let (item, stream) = stream.into_future().await;
302    /// assert_eq!(Some(1), item);
303    ///
304    /// let (item, stream) = stream.into_future().await;
305    /// assert_eq!(Some(2), item);
306    /// # });
307    /// ```
308    fn into_future(self) -> StreamFuture<Self>
309    where
310        Self: Sized + Unpin,
311    {
312        assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self))
313    }
314
315    /// Maps this stream's items to a different type, returning a new stream of
316    /// the resulting type.
317    ///
318    /// The provided closure is executed over all elements of this stream as
319    /// they are made available. It is executed inline with calls to
320    /// [`poll_next`](Stream::poll_next).
321    ///
322    /// Note that this function consumes the stream passed into it and returns a
323    /// wrapped version of it, similar to the existing `map` methods in the
324    /// standard library.
325    ///
326    /// See [`StreamExt::then`](Self::then) if you want to use a closure that
327    /// returns a future instead of a value.
328    ///
329    /// # Examples
330    ///
331    /// ```
332    /// # futures::executor::block_on(async {
333    /// use futures::stream::{self, StreamExt};
334    ///
335    /// let stream = stream::iter(1..=3);
336    /// let stream = stream.map(|x| x + 3);
337    ///
338    /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
339    /// # });
340    /// ```
341    fn map<T, F>(self, f: F) -> Map<Self, F>
342    where
343        F: FnMut(Self::Item) -> T,
344        Self: Sized,
345    {
346        assert_stream::<T, _>(Map::new(self, f))
347    }
348
349    /// Creates a stream which gives the current iteration count as well as
350    /// the next value.
351    ///
352    /// The stream returned yields pairs `(i, val)`, where `i` is the
353    /// current index of iteration and `val` is the value returned by the
354    /// stream.
355    ///
356    /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a
357    /// different sized integer, the [`zip`](StreamExt::zip) function provides similar
358    /// functionality.
359    ///
360    /// # Overflow Behavior
361    ///
362    /// The method does no guarding against overflows, so enumerating more than
363    /// [`usize::MAX`] elements either produces the wrong result or panics. If
364    /// debug assertions are enabled, a panic is guaranteed.
365    ///
366    /// # Panics
367    ///
368    /// The returned stream might panic if the to-be-returned index would
369    /// overflow a [`usize`].
370    ///
371    /// # Examples
372    ///
373    /// ```
374    /// # futures::executor::block_on(async {
375    /// use futures::stream::{self, StreamExt};
376    ///
377    /// let stream = stream::iter(vec!['a', 'b', 'c']);
378    ///
379    /// let mut stream = stream.enumerate();
380    ///
381    /// assert_eq!(stream.next().await, Some((0, 'a')));
382    /// assert_eq!(stream.next().await, Some((1, 'b')));
383    /// assert_eq!(stream.next().await, Some((2, 'c')));
384    /// assert_eq!(stream.next().await, None);
385    /// # });
386    /// ```
387    fn enumerate(self) -> Enumerate<Self>
388    where
389        Self: Sized,
390    {
391        assert_stream::<(usize, Self::Item), _>(Enumerate::new(self))
392    }
393
394    /// Filters the values produced by this stream according to the provided
395    /// asynchronous predicate.
396    ///
397    /// As values of this stream are made available, the provided predicate `f`
398    /// will be run against them. If the predicate returns a `Future` which
399    /// resolves to `true`, then the stream will yield the value, but if the
400    /// predicate returns a `Future` which resolves to `false`, then the value
401    /// will be discarded and the next value will be produced.
402    ///
403    /// Note that this function consumes the stream passed into it and returns a
404    /// wrapped version of it, similar to the existing `filter` methods in the
405    /// standard library.
406    ///
407    /// # Examples
408    ///
409    /// ```
410    /// # futures::executor::block_on(async {
411    /// use futures::future;
412    /// use futures::stream::{self, StreamExt};
413    ///
414    /// let stream = stream::iter(1..=10);
415    /// let events = stream.filter(|x| future::ready(x % 2 == 0));
416    ///
417    /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await);
418    /// # });
419    /// ```
420    fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
421    where
422        F: FnMut(&Self::Item) -> Fut,
423        Fut: Future<Output = bool>,
424        Self: Sized,
425    {
426        assert_stream::<Self::Item, _>(Filter::new(self, f))
427    }
428
429    /// Filters the values produced by this stream while simultaneously mapping
430    /// them to a different type according to the provided asynchronous closure.
431    ///
432    /// As values of this stream are made available, the provided function will
433    /// be run on them. If the future returned by the predicate `f` resolves to
434    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
435    /// it resolves to [`None`] then the next value will be produced.
436    ///
437    /// Note that this function consumes the stream passed into it and returns a
438    /// wrapped version of it, similar to the existing `filter_map` methods in
439    /// the standard library.
440    ///
441    /// # Examples
442    /// ```
443    /// # futures::executor::block_on(async {
444    /// use futures::stream::{self, StreamExt};
445    ///
446    /// let stream = stream::iter(1..=10);
447    /// let events = stream.filter_map(|x| async move {
448    ///     if x % 2 == 0 { Some(x + 1) } else { None }
449    /// });
450    ///
451    /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await);
452    /// # });
453    /// ```
454    fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
455    where
456        F: FnMut(Self::Item) -> Fut,
457        Fut: Future<Output = Option<T>>,
458        Self: Sized,
459    {
460        assert_stream::<T, _>(FilterMap::new(self, f))
461    }
462
463    /// Computes from this stream's items new items of a different type using
464    /// an asynchronous closure.
465    ///
466    /// The provided closure `f` will be called with an `Item` once a value is
467    /// ready, it returns a future which will then be run to completion
468    /// to produce the next value on this stream.
469    ///
470    /// Note that this function consumes the stream passed into it and returns a
471    /// wrapped version of it.
472    ///
473    /// See [`StreamExt::map`](Self::map) if you want to use a closure that
474    /// returns a value instead of a future.
475    ///
476    /// # Examples
477    ///
478    /// ```
479    /// # futures::executor::block_on(async {
480    /// use futures::stream::{self, StreamExt};
481    ///
482    /// let stream = stream::iter(1..=3);
483    /// let stream = stream.then(|x| async move { x + 3 });
484    ///
485    /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
486    /// # });
487    /// ```
488    fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
489    where
490        F: FnMut(Self::Item) -> Fut,
491        Fut: Future,
492        Self: Sized,
493    {
494        assert_stream::<Fut::Output, _>(Then::new(self, f))
495    }
496
497    /// Transforms a stream into a collection, returning a
498    /// future representing the result of that computation.
499    ///
500    /// The returned future will be resolved when the stream terminates.
501    ///
502    /// # Examples
503    ///
504    /// ```
505    /// # futures::executor::block_on(async {
506    /// use futures::channel::mpsc;
507    /// use futures::stream::StreamExt;
508    /// use std::thread;
509    ///
510    /// let (tx, rx) = mpsc::unbounded();
511    ///
512    /// thread::spawn(move || {
513    ///     for i in 1..=5 {
514    ///         tx.unbounded_send(i).unwrap();
515    ///     }
516    /// });
517    ///
518    /// let output = rx.collect::<Vec<i32>>().await;
519    /// assert_eq!(output, vec![1, 2, 3, 4, 5]);
520    /// # });
521    /// ```
522    fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
523    where
524        Self: Sized,
525    {
526        assert_future::<C, _>(Collect::new(self))
527    }
528
529    /// Converts a stream of pairs into a future, which
530    /// resolves to pair of containers.
531    ///
532    /// `unzip()` produces a future, which resolves to two
533    /// collections: one from the left elements of the pairs,
534    /// and one from the right elements.
535    ///
536    /// The returned future will be resolved when the stream terminates.
537    ///
538    /// # Examples
539    ///
540    /// ```
541    /// # futures::executor::block_on(async {
542    /// use futures::channel::mpsc;
543    /// use futures::stream::StreamExt;
544    /// use std::thread;
545    ///
546    /// let (tx, rx) = mpsc::unbounded();
547    ///
548    /// thread::spawn(move || {
549    ///     tx.unbounded_send((1, 2)).unwrap();
550    ///     tx.unbounded_send((3, 4)).unwrap();
551    ///     tx.unbounded_send((5, 6)).unwrap();
552    /// });
553    ///
554    /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await;
555    /// assert_eq!(o1, vec![1, 3, 5]);
556    /// assert_eq!(o2, vec![2, 4, 6]);
557    /// # });
558    /// ```
559    fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
560    where
561        FromA: Default + Extend<A>,
562        FromB: Default + Extend<B>,
563        Self: Sized + Stream<Item = (A, B)>,
564    {
565        assert_future::<(FromA, FromB), _>(Unzip::new(self))
566    }
567
568    /// Concatenate all items of a stream into a single extendable
569    /// destination, returning a future representing the end result.
570    ///
571    /// This combinator will extend the first item with the contents
572    /// of all the subsequent results of the stream. If the stream is
573    /// empty, the default value will be returned.
574    ///
575    /// Works with all collections that implement the
576    /// [`Extend`](std::iter::Extend) trait.
577    ///
578    /// # Examples
579    ///
580    /// ```
581    /// # futures::executor::block_on(async {
582    /// use futures::channel::mpsc;
583    /// use futures::stream::StreamExt;
584    /// use std::thread;
585    ///
586    /// let (tx, rx) = mpsc::unbounded();
587    ///
588    /// thread::spawn(move || {
589    ///     for i in (0..3).rev() {
590    ///         let n = i * 3;
591    ///         tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap();
592    ///     }
593    /// });
594    ///
595    /// let result = rx.concat().await;
596    ///
597    /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
598    /// # });
599    /// ```
600    fn concat(self) -> Concat<Self>
601    where
602        Self: Sized,
603        Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
604    {
605        assert_future::<Self::Item, _>(Concat::new(self))
606    }
607
608    /// Drives the stream to completion, counting the number of items.
609    ///
610    /// # Overflow Behavior
611    ///
612    /// The method does no guarding against overflows, so counting elements of a
613    /// stream with more than [`usize::MAX`] elements either produces the wrong
614    /// result or panics. If debug assertions are enabled, a panic is guaranteed.
615    ///
616    /// # Panics
617    ///
618    /// This function might panic if the iterator has more than [`usize::MAX`]
619    /// elements.
620    ///
621    /// # Examples
622    ///
623    /// ```
624    /// # futures::executor::block_on(async {
625    /// use futures::stream::{self, StreamExt};
626    ///
627    /// let stream = stream::iter(1..=10);
628    /// let count = stream.count().await;
629    ///
630    /// assert_eq!(count, 10);
631    /// # });
632    /// ```
633    fn count(self) -> Count<Self>
634    where
635        Self: Sized,
636    {
637        assert_future::<usize, _>(Count::new(self))
638    }
639
640    /// Repeats a stream endlessly.
641    ///
642    /// The stream never terminates. Note that you likely want to avoid
643    /// usage of `collect` or such on the returned stream as it will exhaust
644    /// available memory as it tries to just fill up all RAM.
645    ///
646    /// # Examples
647    ///
648    /// ```
649    /// # futures::executor::block_on(async {
650    /// use futures::stream::{self, StreamExt};
651    /// let a = [1, 2, 3];
652    /// let mut s = stream::iter(a.iter()).cycle();
653    ///
654    /// assert_eq!(s.next().await, Some(&1));
655    /// assert_eq!(s.next().await, Some(&2));
656    /// assert_eq!(s.next().await, Some(&3));
657    /// assert_eq!(s.next().await, Some(&1));
658    /// assert_eq!(s.next().await, Some(&2));
659    /// assert_eq!(s.next().await, Some(&3));
660    /// assert_eq!(s.next().await, Some(&1));
661    /// # });
662    /// ```
663    fn cycle(self) -> Cycle<Self>
664    where
665        Self: Sized + Clone,
666    {
667        assert_stream::<Self::Item, _>(Cycle::new(self))
668    }
669
670    /// Execute an accumulating asynchronous computation over a stream,
671    /// collecting all the values into one final result.
672    ///
673    /// This combinator will accumulate all values returned by this stream
674    /// according to the closure provided. The initial state is also provided to
675    /// this method and then is returned again by each execution of the closure.
676    /// Once the entire stream has been exhausted the returned future will
677    /// resolve to this value.
678    ///
679    /// # Examples
680    ///
681    /// ```
682    /// # futures::executor::block_on(async {
683    /// use futures::stream::{self, StreamExt};
684    ///
685    /// let number_stream = stream::iter(0..6);
686    /// let sum = number_stream.fold(0, |acc, x| async move { acc + x });
687    /// assert_eq!(sum.await, 15);
688    /// # });
689    /// ```
690    fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
691    where
692        F: FnMut(T, Self::Item) -> Fut,
693        Fut: Future<Output = T>,
694        Self: Sized,
695    {
696        assert_future::<T, _>(Fold::new(self, f, init))
697    }
698
699    /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate.
700    ///
701    /// # Examples
702    ///
703    /// ```
704    /// # futures::executor::block_on(async {
705    /// use futures::stream::{self, StreamExt};
706    ///
707    /// let number_stream = stream::iter(0..10);
708    /// let contain_three = number_stream.any(|i| async move { i == 3 });
709    /// assert_eq!(contain_three.await, true);
710    /// # });
711    /// ```
712    fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
713    where
714        F: FnMut(Self::Item) -> Fut,
715        Fut: Future<Output = bool>,
716        Self: Sized,
717    {
718        assert_future::<bool, _>(Any::new(self, f))
719    }
720
721    /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate.
722    ///
723    /// # Examples
724    ///
725    /// ```
726    /// # futures::executor::block_on(async {
727    /// use futures::stream::{self, StreamExt};
728    ///
729    /// let number_stream = stream::iter(0..10);
730    /// let less_then_twenty = number_stream.all(|i| async move { i < 20 });
731    /// assert_eq!(less_then_twenty.await, true);
732    /// # });
733    /// ```
734    fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
735    where
736        F: FnMut(Self::Item) -> Fut,
737        Fut: Future<Output = bool>,
738        Self: Sized,
739    {
740        assert_future::<bool, _>(All::new(self, f))
741    }
742
743    /// Flattens a stream of streams into just one continuous stream.
744    ///
745    /// # Examples
746    ///
747    /// ```
748    /// # futures::executor::block_on(async {
749    /// use futures::channel::mpsc;
750    /// use futures::stream::StreamExt;
751    /// use std::thread;
752    ///
753    /// let (tx1, rx1) = mpsc::unbounded();
754    /// let (tx2, rx2) = mpsc::unbounded();
755    /// let (tx3, rx3) = mpsc::unbounded();
756    ///
757    /// thread::spawn(move || {
758    ///     tx1.unbounded_send(1).unwrap();
759    ///     tx1.unbounded_send(2).unwrap();
760    /// });
761    /// thread::spawn(move || {
762    ///     tx2.unbounded_send(3).unwrap();
763    ///     tx2.unbounded_send(4).unwrap();
764    /// });
765    /// thread::spawn(move || {
766    ///     tx3.unbounded_send(rx1).unwrap();
767    ///     tx3.unbounded_send(rx2).unwrap();
768    /// });
769    ///
770    /// let output = rx3.flatten().collect::<Vec<i32>>().await;
771    /// assert_eq!(output, vec![1, 2, 3, 4]);
772    /// # });
773    /// ```
774    fn flatten(self) -> Flatten<Self>
775    where
776        Self::Item: Stream,
777        Self: Sized,
778    {
779        assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
780    }
781
782    /// Flattens a stream of streams into just one continuous stream. Polls
783    /// inner streams produced by the base stream concurrently.
784    ///
785    /// The only argument is an optional limit on the number of concurrently
786    /// polled streams. If this limit is not `None`, no more than `limit` streams
787    /// will be polled at the same time. The `limit` argument is of type
788    /// `Into<Option<usize>>`, and so can be provided as either `None`,
789    /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
790    /// no limit at all, and will have the same result as passing in `None`.
791    ///
792    /// # Examples
793    ///
794    /// ```
795    /// # futures::executor::block_on(async {
796    /// use futures::channel::mpsc;
797    /// use futures::stream::StreamExt;
798    /// use std::thread;
799    ///
800    /// let (tx1, rx1) = mpsc::unbounded();
801    /// let (tx2, rx2) = mpsc::unbounded();
802    /// let (tx3, rx3) = mpsc::unbounded();
803    ///
804    /// thread::spawn(move || {
805    ///     tx1.unbounded_send(1).unwrap();
806    ///     tx1.unbounded_send(2).unwrap();
807    /// });
808    /// thread::spawn(move || {
809    ///     tx2.unbounded_send(3).unwrap();
810    ///     tx2.unbounded_send(4).unwrap();
811    /// });
812    /// thread::spawn(move || {
813    ///     tx3.unbounded_send(rx1).unwrap();
814    ///     tx3.unbounded_send(rx2).unwrap();
815    /// });
816    ///
817    /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await;
818    /// output.sort();
819    ///
820    /// assert_eq!(output, vec![1, 2, 3, 4]);
821    /// # });
822    /// ```
823    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
824    #[cfg(feature = "alloc")]
825    fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
826    where
827        Self::Item: Stream + Unpin,
828        Self: Sized,
829    {
830        assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into()))
831    }
832
833    /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
834    ///
835    /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
836    /// you would have to chain combinators like `.map(f).flatten()` while this
837    /// combinator provides ability to write `.flat_map(f)` instead of chaining.
838    ///
839    /// The provided closure which produces inner streams is executed over all elements
840    /// of stream as last inner stream is terminated and next stream item is available.
841    ///
842    /// Note that this function consumes the stream passed into it and returns a
843    /// wrapped version of it, similar to the existing `flat_map` methods in the
844    /// standard library.
845    ///
846    /// # Examples
847    ///
848    /// ```
849    /// # futures::executor::block_on(async {
850    /// use futures::stream::{self, StreamExt};
851    ///
852    /// let stream = stream::iter(1..=3);
853    /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x]));
854    ///
855    /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await);
856    /// # });
857    /// ```
858    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
859    where
860        F: FnMut(Self::Item) -> U,
861        U: Stream,
862        Self: Sized,
863    {
864        assert_stream::<U::Item, _>(FlatMap::new(self, f))
865    }
866
867    /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s
868    /// and polls them concurrently, yielding items in any order, as they made
869    /// available.
870    ///
871    /// [`StreamExt::map`] is very useful, but if it produces `Stream`s
872    /// instead, and you need to poll all of them concurrently, you would
873    /// have to use something like `for_each_concurrent` and merge values
874    /// by hand. This combinator provides ability to collect all values
875    /// from concurrently polled streams into one stream.
876    ///
877    /// The first argument is an optional limit on the number of concurrently
878    /// polled streams. If this limit is not `None`, no more than `limit` streams
879    /// will be polled at the same time. The `limit` argument is of type
880    /// `Into<Option<usize>>`, and so can be provided as either `None`,
881    /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
882    /// no limit at all, and will have the same result as passing in `None`.
883    ///
884    /// The provided closure which produces inner streams is executed over
885    /// all elements of stream as next stream item is available and limit
886    /// of concurrently processed streams isn't exceeded.
887    ///
888    /// Note that this function consumes the stream passed into it and
889    /// returns a wrapped version of it.
890    ///
891    /// # Examples
892    ///
893    /// ```
894    /// # futures::executor::block_on(async {
895    /// use futures::stream::{self, StreamExt};
896    ///
897    /// let stream = stream::iter(1..5);
898    /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x]));
899    /// let mut values = stream.collect::<Vec<_>>().await;
900    /// values.sort();
901    ///
902    /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
903    /// # });
904    /// ```
905    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
906    #[cfg(feature = "alloc")]
907    fn flat_map_unordered<U, F>(
908        self,
909        limit: impl Into<Option<usize>>,
910        f: F,
911    ) -> FlatMapUnordered<Self, U, F>
912    where
913        U: Stream + Unpin,
914        F: FnMut(Self::Item) -> U,
915        Self: Sized,
916    {
917        assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f))
918    }
919
920    /// Combinator similar to [`StreamExt::fold`] that holds internal state
921    /// and produces a new stream.
922    ///
923    /// Accepts initial state and closure which will be applied to each element
924    /// of the stream until provided closure returns `None`. Once `None` is
925    /// returned, stream will be terminated.
926    ///
927    /// # Examples
928    ///
929    /// ```
930    /// # futures::executor::block_on(async {
931    /// use futures::future;
932    /// use futures::stream::{self, StreamExt};
933    ///
934    /// let stream = stream::iter(1..=10);
935    ///
936    /// let stream = stream.scan(0, |state, x| {
937    ///     *state += x;
938    ///     future::ready(if *state < 10 { Some(x) } else { None })
939    /// });
940    ///
941    /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
942    /// # });
943    /// ```
944    fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
945    where
946        F: FnMut(&mut S, Self::Item) -> Fut,
947        Fut: Future<Output = Option<B>>,
948        Self: Sized,
949    {
950        assert_stream::<B, _>(Scan::new(self, initial_state, f))
951    }
952
953    /// Skip elements on this stream while the provided asynchronous predicate
954    /// resolves to `true`.
955    ///
956    /// This function, like `Iterator::skip_while`, will skip elements on the
957    /// stream until the predicate `f` resolves to `false`. Once one element
958    /// returns `false`, all future elements will be returned from the underlying
959    /// stream.
960    ///
961    /// # Examples
962    ///
963    /// ```
964    /// # futures::executor::block_on(async {
965    /// use futures::future;
966    /// use futures::stream::{self, StreamExt};
967    ///
968    /// let stream = stream::iter(1..=10);
969    ///
970    /// let stream = stream.skip_while(|x| future::ready(*x <= 5));
971    ///
972    /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
973    /// # });
974    /// ```
975    fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
976    where
977        F: FnMut(&Self::Item) -> Fut,
978        Fut: Future<Output = bool>,
979        Self: Sized,
980    {
981        assert_stream::<Self::Item, _>(SkipWhile::new(self, f))
982    }
983
984    /// Take elements from this stream while the provided asynchronous predicate
985    /// resolves to `true`.
986    ///
987    /// This function, like `Iterator::take_while`, will take elements from the
988    /// stream until the predicate `f` resolves to `false`. Once one element
989    /// returns `false`, it will always return that the stream is done.
990    ///
991    /// # Examples
992    ///
993    /// ```
994    /// # futures::executor::block_on(async {
995    /// use futures::future;
996    /// use futures::stream::{self, StreamExt};
997    ///
998    /// let stream = stream::iter(1..=10);
999    ///
1000    /// let stream = stream.take_while(|x| future::ready(*x <= 5));
1001    ///
1002    /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
1003    /// # });
1004    /// ```
1005    fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
1006    where
1007        F: FnMut(&Self::Item) -> Fut,
1008        Fut: Future<Output = bool>,
1009        Self: Sized,
1010    {
1011        assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
1012    }
1013
1014    /// Take elements from this stream until the provided future resolves.
1015    ///
1016    /// This function will take elements from the stream until the provided
1017    /// stopping future `fut` resolves. Once the `fut` future becomes ready,
1018    /// this stream combinator will always return that the stream is done.
1019    ///
1020    /// The stopping future may return any type. Once the stream is stopped
1021    /// the result of the stopping future may be accessed with `TakeUntil::take_result()`.
1022    /// The stream may also be resumed with `TakeUntil::take_future()`.
1023    /// See the documentation of [`TakeUntil`] for more information.
1024    ///
1025    /// # Examples
1026    ///
1027    /// ```
1028    /// # futures::executor::block_on(async {
1029    /// use futures::future;
1030    /// use futures::stream::{self, StreamExt};
1031    /// use futures::task::Poll;
1032    ///
1033    /// let stream = stream::iter(1..=10);
1034    ///
1035    /// let mut i = 0;
1036    /// let stop_fut = future::poll_fn(|_cx| {
1037    ///     i += 1;
1038    ///     if i <= 5 {
1039    ///         Poll::Pending
1040    ///     } else {
1041    ///         Poll::Ready(())
1042    ///     }
1043    /// });
1044    ///
1045    /// let stream = stream.take_until(stop_fut);
1046    ///
1047    /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
1048    /// # });
1049    /// ```
1050    fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
1051    where
1052        Fut: Future,
1053        Self: Sized,
1054    {
1055        assert_stream::<Self::Item, _>(TakeUntil::new(self, fut))
1056    }
1057
1058    /// Runs this stream to completion, executing the provided asynchronous
1059    /// closure for each element on the stream.
1060    ///
1061    /// The closure provided will be called for each item this stream produces,
1062    /// yielding a future. That future will then be executed to completion
1063    /// before moving on to the next item.
1064    ///
1065    /// The returned value is a `Future` where the `Output` type is `()`; it is
1066    /// executed entirely for its side effects.
1067    ///
1068    /// To process each item in the stream and produce another stream instead
1069    /// of a single future, use `then` instead.
1070    ///
1071    /// # Examples
1072    ///
1073    /// ```
1074    /// # futures::executor::block_on(async {
1075    /// use futures::future;
1076    /// use futures::stream::{self, StreamExt};
1077    ///
1078    /// let mut x = 0;
1079    ///
1080    /// {
1081    ///     let fut = stream::repeat(1).take(3).for_each(|item| {
1082    ///         x += item;
1083    ///         future::ready(())
1084    ///     });
1085    ///     fut.await;
1086    /// }
1087    ///
1088    /// assert_eq!(x, 3);
1089    /// # });
1090    /// ```
1091    fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
1092    where
1093        F: FnMut(Self::Item) -> Fut,
1094        Fut: Future<Output = ()>,
1095        Self: Sized,
1096    {
1097        assert_future::<(), _>(ForEach::new(self, f))
1098    }
1099
1100    /// Runs this stream to completion, executing the provided asynchronous
1101    /// closure for each element on the stream concurrently as elements become
1102    /// available.
1103    ///
1104    /// This is similar to [`StreamExt::for_each`], but the futures
1105    /// produced by the closure are run concurrently (but not in parallel--
1106    /// this combinator does not introduce any threads).
1107    ///
1108    /// The closure provided will be called for each item this stream produces,
1109    /// yielding a future. That future will then be executed to completion
1110    /// concurrently with the other futures produced by the closure.
1111    ///
1112    /// The first argument is an optional limit on the number of concurrent
1113    /// futures. If this limit is not `None`, no more than `limit` futures
1114    /// will be run concurrently. The `limit` argument is of type
1115    /// `Into<Option<usize>>`, and so can be provided as either `None`,
1116    /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
1117    /// no limit at all, and will have the same result as passing in `None`.
1118    ///
1119    /// This method is only available when the `std` or `alloc` feature of this
1120    /// library is activated, and it is activated by default.
1121    ///
1122    /// # Examples
1123    ///
1124    /// ```
1125    /// # futures::executor::block_on(async {
1126    /// use futures::channel::oneshot;
1127    /// use futures::stream::{self, StreamExt};
1128    ///
1129    /// let (tx1, rx1) = oneshot::channel();
1130    /// let (tx2, rx2) = oneshot::channel();
1131    /// let (tx3, rx3) = oneshot::channel();
1132    ///
1133    /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
1134    ///     /* limit */ 2,
1135    ///     |rx| async move {
1136    ///         rx.await.unwrap();
1137    ///     }
1138    /// );
1139    /// tx1.send(()).unwrap();
1140    /// tx2.send(()).unwrap();
1141    /// tx3.send(()).unwrap();
1142    /// fut.await;
1143    /// # })
1144    /// ```
1145    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1146    #[cfg(feature = "alloc")]
1147    fn for_each_concurrent<Fut, F>(
1148        self,
1149        limit: impl Into<Option<usize>>,
1150        f: F,
1151    ) -> ForEachConcurrent<Self, Fut, F>
1152    where
1153        F: FnMut(Self::Item) -> Fut,
1154        Fut: Future<Output = ()>,
1155        Self: Sized,
1156    {
1157        assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
1158    }
1159
1160    /// Creates a new stream of at most `n` items of the underlying stream.
1161    ///
1162    /// Once `n` items have been yielded from this stream then it will always
1163    /// return that the stream is done.
1164    ///
1165    /// # Examples
1166    ///
1167    /// ```
1168    /// # futures::executor::block_on(async {
1169    /// use futures::stream::{self, StreamExt};
1170    ///
1171    /// let stream = stream::iter(1..=10).take(3);
1172    ///
1173    /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
1174    /// # });
1175    /// ```
1176    fn take(self, n: usize) -> Take<Self>
1177    where
1178        Self: Sized,
1179    {
1180        assert_stream::<Self::Item, _>(Take::new(self, n))
1181    }
1182
1183    /// Creates a new stream which skips `n` items of the underlying stream.
1184    ///
1185    /// Once `n` items have been skipped from this stream then it will always
1186    /// return the remaining items on this stream.
1187    ///
1188    /// # Examples
1189    ///
1190    /// ```
1191    /// # futures::executor::block_on(async {
1192    /// use futures::stream::{self, StreamExt};
1193    ///
1194    /// let stream = stream::iter(1..=10).skip(5);
1195    ///
1196    /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
1197    /// # });
1198    /// ```
1199    fn skip(self, n: usize) -> Skip<Self>
1200    where
1201        Self: Sized,
1202    {
1203        assert_stream::<Self::Item, _>(Skip::new(self, n))
1204    }
1205
1206    /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never
1207    /// again be called once it has finished. This method can be used to turn
1208    /// any `Stream` into a `FusedStream`.
1209    ///
1210    /// Normally, once a stream has returned [`None`] from
1211    /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad
1212    /// behavior such as block forever, panic, never return, etc. If it is known
1213    /// that [`poll_next`](Stream::poll_next) may be called after stream
1214    /// has already finished, then this method can be used to ensure that it has
1215    /// defined semantics.
1216    ///
1217    /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream
1218    /// is guaranteed to return [`None`] after the underlying stream has
1219    /// finished.
1220    ///
1221    /// # Examples
1222    ///
1223    /// ```
1224    /// use futures::executor::block_on_stream;
1225    /// use futures::stream::{self, StreamExt};
1226    /// use futures::task::Poll;
1227    ///
1228    /// let mut x = 0;
1229    /// let stream = stream::poll_fn(|_| {
1230    ///     x += 1;
1231    ///     match x {
1232    ///         0..=2 => Poll::Ready(Some(x)),
1233    ///         3 => Poll::Ready(None),
1234    ///         _ => panic!("should not happen")
1235    ///     }
1236    /// }).fuse();
1237    ///
1238    /// let mut iter = block_on_stream(stream);
1239    /// assert_eq!(Some(1), iter.next());
1240    /// assert_eq!(Some(2), iter.next());
1241    /// assert_eq!(None, iter.next());
1242    /// assert_eq!(None, iter.next());
1243    /// // ...
1244    /// ```
1245    fn fuse(self) -> Fuse<Self>
1246    where
1247        Self: Sized,
1248    {
1249        assert_stream::<Self::Item, _>(Fuse::new(self))
1250    }
1251
1252    /// Borrows a stream, rather than consuming it.
1253    ///
1254    /// This is useful to allow applying stream adaptors while still retaining
1255    /// ownership of the original stream.
1256    ///
1257    /// # Examples
1258    ///
1259    /// ```
1260    /// # futures::executor::block_on(async {
1261    /// use futures::stream::{self, StreamExt};
1262    ///
1263    /// let mut stream = stream::iter(1..5);
1264    ///
1265    /// let sum = stream.by_ref()
1266    ///                 .take(2)
1267    ///                 .fold(0, |a, b| async move { a + b })
1268    ///                 .await;
1269    /// assert_eq!(sum, 3);
1270    ///
1271    /// // You can use the stream again
1272    /// let sum = stream.take(2)
1273    ///                 .fold(0, |a, b| async move { a + b })
1274    ///                 .await;
1275    /// assert_eq!(sum, 7);
1276    /// # });
1277    /// ```
1278    fn by_ref(&mut self) -> &mut Self {
1279        self
1280    }
1281
1282    /// Catches unwinding panics while polling the stream.
1283    ///
1284    /// Caught panic (if any) will be the last element of the resulting stream.
1285    ///
1286    /// In general, panics within a stream can propagate all the way out to the
1287    /// task level. This combinator makes it possible to halt unwinding within
1288    /// the stream itself. It's most commonly used within task executors. This
1289    /// method should not be used for error handling.
1290    ///
1291    /// Note that this method requires the `UnwindSafe` bound from the standard
1292    /// library. This isn't always applied automatically, and the standard
1293    /// library provides an `AssertUnwindSafe` wrapper type to apply it
1294    /// after-the fact. To assist using this method, the [`Stream`] trait is
1295    /// also implemented for `AssertUnwindSafe<St>` where `St` implements
1296    /// [`Stream`].
1297    ///
1298    /// This method is only available when the `std` feature of this
1299    /// library is activated, and it is activated by default.
1300    ///
1301    /// # Examples
1302    ///
1303    /// ```
1304    /// # futures::executor::block_on(async {
1305    /// use futures::stream::{self, StreamExt};
1306    ///
1307    /// let stream = stream::iter(vec![Some(10), None, Some(11)]);
1308    /// // Panic on second element
1309    /// let stream_panicking = stream.map(|o| o.unwrap());
1310    /// // Collect all the results
1311    /// let stream = stream_panicking.catch_unwind();
1312    ///
1313    /// let results: Vec<Result<i32, _>> = stream.collect().await;
1314    /// match results[0] {
1315    ///     Ok(10) => {}
1316    ///     _ => panic!("unexpected result!"),
1317    /// }
1318    /// assert!(results[1].is_err());
1319    /// assert_eq!(results.len(), 2);
1320    /// # });
1321    /// ```
1322    #[cfg(feature = "std")]
1323    fn catch_unwind(self) -> CatchUnwind<Self>
1324    where
1325        Self: Sized + std::panic::UnwindSafe,
1326    {
1327        assert_stream(CatchUnwind::new(self))
1328    }
1329
1330    /// Wrap the stream in a Box, pinning it.
1331    ///
1332    /// This method is only available when the `std` or `alloc` feature of this
1333    /// library is activated, and it is activated by default.
1334    #[cfg(feature = "alloc")]
1335    fn boxed<'a>(self) -> BoxStream<'a, Self::Item>
1336    where
1337        Self: Sized + Send + 'a,
1338    {
1339        assert_stream::<Self::Item, _>(Box::pin(self))
1340    }
1341
1342    /// Wrap the stream in a Box, pinning it.
1343    ///
1344    /// Similar to `boxed`, but without the `Send` requirement.
1345    ///
1346    /// This method is only available when the `std` or `alloc` feature of this
1347    /// library is activated, and it is activated by default.
1348    #[cfg(feature = "alloc")]
1349    fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item>
1350    where
1351        Self: Sized + 'a,
1352    {
1353        assert_stream::<Self::Item, _>(Box::pin(self))
1354    }
1355
1356    /// An adaptor for creating a buffered list of pending futures.
1357    ///
1358    /// If this stream's item can be converted into a future, then this adaptor
1359    /// will buffer up to at most `n` futures and then return the outputs in the
1360    /// same order as the underlying stream. No more than `n` futures will be
1361    /// buffered at any point in time, and less than `n` may also be buffered
1362    /// depending on the state of each future.
1363    ///
1364    /// The returned stream will be a stream of each future's output.
1365    ///
1366    /// This method is only available when the `std` or `alloc` feature of this
1367    /// library is activated, and it is activated by default.
1368    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1369    #[cfg(feature = "alloc")]
1370    fn buffered(self, n: usize) -> Buffered<Self>
1371    where
1372        Self::Item: Future,
1373        Self: Sized,
1374    {
1375        assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n))
1376    }
1377
1378    /// An adaptor for creating a buffered list of pending futures (unordered).
1379    ///
1380    /// If this stream's item can be converted into a future, then this adaptor
1381    /// will buffer up to `n` futures and then return the outputs in the order
1382    /// in which they complete. No more than `n` futures will be buffered at
1383    /// any point in time, and less than `n` may also be buffered depending on
1384    /// the state of each future.
1385    ///
1386    /// The returned stream will be a stream of each future's output.
1387    ///
1388    /// This method is only available when the `std` or `alloc` feature of this
1389    /// library is activated, and it is activated by default.
1390    ///
1391    /// # Examples
1392    ///
1393    /// ```
1394    /// # futures::executor::block_on(async {
1395    /// use futures::channel::oneshot;
1396    /// use futures::stream::{self, StreamExt};
1397    ///
1398    /// let (send_one, recv_one) = oneshot::channel();
1399    /// let (send_two, recv_two) = oneshot::channel();
1400    ///
1401    /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
1402    /// let mut buffered = stream_of_futures.buffer_unordered(10);
1403    ///
1404    /// send_two.send(2i32)?;
1405    /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1406    ///
1407    /// send_one.send(1i32)?;
1408    /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1409    ///
1410    /// assert_eq!(buffered.next().await, None);
1411    /// # Ok::<(), i32>(()) }).unwrap();
1412    /// ```
1413    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1414    #[cfg(feature = "alloc")]
1415    fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
1416    where
1417        Self::Item: Future,
1418        Self: Sized,
1419    {
1420        assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n))
1421    }
1422
1423    /// An adapter for zipping two streams together.
1424    ///
1425    /// The zipped stream waits for both streams to produce an item, and then
1426    /// returns that pair. If either stream ends then the zipped stream will
1427    /// also end.
1428    ///
1429    /// # Examples
1430    ///
1431    /// ```
1432    /// # futures::executor::block_on(async {
1433    /// use futures::stream::{self, StreamExt};
1434    ///
1435    /// let stream1 = stream::iter(1..=3);
1436    /// let stream2 = stream::iter(5..=10);
1437    ///
1438    /// let vec = stream1.zip(stream2)
1439    ///                  .collect::<Vec<_>>()
1440    ///                  .await;
1441    /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec);
1442    /// # });
1443    /// ```
1444    ///
1445    fn zip<St>(self, other: St) -> Zip<Self, St>
1446    where
1447        St: Stream,
1448        Self: Sized,
1449    {
1450        assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other))
1451    }
1452
1453    /// Adapter for chaining two streams.
1454    ///
1455    /// The resulting stream emits elements from the first stream, and when
1456    /// first stream reaches the end, emits the elements from the second stream.
1457    ///
1458    /// ```
1459    /// # futures::executor::block_on(async {
1460    /// use futures::stream::{self, StreamExt};
1461    ///
1462    /// let stream1 = stream::iter(vec![Ok(10), Err(false)]);
1463    /// let stream2 = stream::iter(vec![Err(true), Ok(20)]);
1464    ///
1465    /// let stream = stream1.chain(stream2);
1466    ///
1467    /// let result: Vec<_> = stream.collect().await;
1468    /// assert_eq!(result, vec![
1469    ///     Ok(10),
1470    ///     Err(false),
1471    ///     Err(true),
1472    ///     Ok(20),
1473    /// ]);
1474    /// # });
1475    /// ```
1476    fn chain<St>(self, other: St) -> Chain<Self, St>
1477    where
1478        St: Stream<Item = Self::Item>,
1479        Self: Sized,
1480    {
1481        assert_stream::<Self::Item, _>(Chain::new(self, other))
1482    }
1483
1484    /// Creates a new stream which exposes a `peek` method.
1485    ///
1486    /// Calling `peek` returns a reference to the next item in the stream.
1487    fn peekable(self) -> Peekable<Self>
1488    where
1489        Self: Sized,
1490    {
1491        assert_stream::<Self::Item, _>(Peekable::new(self))
1492    }
1493
1494    /// An adaptor for chunking up items of the stream inside a vector.
1495    ///
1496    /// This combinator will attempt to pull items from this stream and buffer
1497    /// them into a local vector. At most `capacity` items will get buffered
1498    /// before they're yielded from the returned stream.
1499    ///
1500    /// Note that the vectors returned from this iterator may not always have
1501    /// `capacity` elements. If the underlying stream ended and only a partial
1502    /// vector was created, it'll be returned. Additionally if an error happens
1503    /// from the underlying stream then the currently buffered items will be
1504    /// yielded.
1505    ///
1506    /// This method is only available when the `std` or `alloc` feature of this
1507    /// library is activated, and it is activated by default.
1508    ///
1509    /// # Panics
1510    ///
1511    /// This method will panic if `capacity` is zero.
1512    #[cfg(feature = "alloc")]
1513    fn chunks(self, capacity: usize) -> Chunks<Self>
1514    where
1515        Self: Sized,
1516    {
1517        assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
1518    }
1519
1520    /// An adaptor for chunking up ready items of the stream inside a vector.
1521    ///
1522    /// This combinator will attempt to pull ready items from this stream and
1523    /// buffer them into a local vector. At most `capacity` items will get
1524    /// buffered before they're yielded from the returned stream. If underlying
1525    /// stream returns `Poll::Pending`, and collected chunk is not empty, it will
1526    /// be immediately returned.
1527    ///
1528    /// If the underlying stream ended and only a partial vector was created,
1529    /// it will be returned.
1530    ///
1531    /// This method is only available when the `std` or `alloc` feature of this
1532    /// library is activated, and it is activated by default.
1533    ///
1534    /// # Panics
1535    ///
1536    /// This method will panic if `capacity` is zero.
1537    #[cfg(feature = "alloc")]
1538    fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
1539    where
1540        Self: Sized,
1541    {
1542        assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
1543    }
1544
1545    /// A future that completes after the given stream has been fully processed
1546    /// into the sink and the sink has been flushed and closed.
1547    ///
1548    /// This future will drive the stream to keep producing items until it is
1549    /// exhausted, sending each item to the sink. It will complete once the
1550    /// stream is exhausted, the sink has received and flushed all items, and
1551    /// the sink is closed. Note that neither the original stream nor provided
1552    /// sink will be output by this future. Pass the sink by `Pin<&mut S>`
1553    /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
1554    /// order to preserve access to the `Sink`. If the stream produces an error,
1555    /// that error will be returned by this future without flushing/closing the sink.
1556    #[cfg(feature = "sink")]
1557    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
1558    fn forward<S>(self, sink: S) -> Forward<Self, S>
1559    where
1560        S: Sink<Self::Ok, Error = Self::Error>,
1561        Self: TryStream + Sized,
1562        // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>,
1563    {
1564        // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>`
1565        // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink))
1566        Forward::new(self, sink)
1567    }
1568
1569    /// Splits this `Stream + Sink` object into separate `Sink` and `Stream`
1570    /// objects.
1571    ///
1572    /// This can be useful when you want to split ownership between tasks, or
1573    /// allow direct interaction between the two objects (e.g. via
1574    /// `Sink::send_all`).
1575    ///
1576    /// This method is only available when the `std` or `alloc` feature of this
1577    /// library is activated, and it is activated by default.
1578    #[cfg(feature = "sink")]
1579    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
1580    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1581    #[cfg(feature = "alloc")]
1582    fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
1583    where
1584        Self: Sink<Item> + Sized,
1585    {
1586        let (sink, stream) = split::split(self);
1587        (
1588            crate::sink::assert_sink::<Item, Self::Error, _>(sink),
1589            assert_stream::<Self::Item, _>(stream),
1590        )
1591    }
1592
1593    /// Do something with each item of this stream, afterwards passing it on.
1594    ///
1595    /// This is similar to the `Iterator::inspect` method in the standard
1596    /// library where it allows easily inspecting each value as it passes
1597    /// through the stream, for example to debug what's going on.
1598    fn inspect<F>(self, f: F) -> Inspect<Self, F>
1599    where
1600        F: FnMut(&Self::Item),
1601        Self: Sized,
1602    {
1603        assert_stream::<Self::Item, _>(Inspect::new(self, f))
1604    }
1605
1606    /// Wrap this stream in an `Either` stream, making it the left-hand variant
1607    /// of that `Either`.
1608    ///
1609    /// This can be used in combination with the `right_stream` method to write `if`
1610    /// statements that evaluate to different streams in different branches.
1611    fn left_stream<B>(self) -> Either<Self, B>
1612    where
1613        B: Stream<Item = Self::Item>,
1614        Self: Sized,
1615    {
1616        assert_stream::<Self::Item, _>(Either::Left(self))
1617    }
1618
1619    /// Wrap this stream in an `Either` stream, making it the right-hand variant
1620    /// of that `Either`.
1621    ///
1622    /// This can be used in combination with the `left_stream` method to write `if`
1623    /// statements that evaluate to different streams in different branches.
1624    fn right_stream<B>(self) -> Either<B, Self>
1625    where
1626        B: Stream<Item = Self::Item>,
1627        Self: Sized,
1628    {
1629        assert_stream::<Self::Item, _>(Either::Right(self))
1630    }
1631
1632    /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
1633    /// stream types.
1634    fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
1635    where
1636        Self: Unpin,
1637    {
1638        Pin::new(self).poll_next(cx)
1639    }
1640
1641    /// Returns a [`Future`] that resolves when the next item in this stream is
1642    /// ready.
1643    ///
1644    /// This is similar to the [`next`][StreamExt::next] method, but it won't
1645    /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the
1646    /// returned future type will return `true` from
1647    /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing
1648    /// [`select_next_some`][StreamExt::select_next_some] to be easily used with
1649    /// the [`select!`] macro.
1650    ///
1651    /// If the future is polled after this [`Stream`] is empty it will panic.
1652    /// Using the future with a [`FusedFuture`][]-aware primitive like the
1653    /// [`select!`] macro will prevent this.
1654    ///
1655    /// [`FusedFuture`]: futures_core::future::FusedFuture
1656    /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated
1657    ///
1658    /// # Examples
1659    ///
1660    /// ```
1661    /// # futures::executor::block_on(async {
1662    /// use futures::{future, select};
1663    /// use futures::stream::{StreamExt, FuturesUnordered};
1664    ///
1665    /// let mut fut = future::ready(1);
1666    /// let mut async_tasks = FuturesUnordered::new();
1667    /// let mut total = 0;
1668    /// loop {
1669    ///     select! {
1670    ///         num = fut => {
1671    ///             // First, the `ready` future completes.
1672    ///             total += num;
1673    ///             // Then we spawn a new task onto `async_tasks`,
1674    ///             async_tasks.push(async { 5 });
1675    ///         },
1676    ///         // On the next iteration of the loop, the task we spawned
1677    ///         // completes.
1678    ///         num = async_tasks.select_next_some() => {
1679    ///             total += num;
1680    ///         }
1681    ///         // Finally, both the `ready` future and `async_tasks` have
1682    ///         // finished, so we enter the `complete` branch.
1683    ///         complete => break,
1684    ///     }
1685    /// }
1686    /// assert_eq!(total, 6);
1687    /// # });
1688    /// ```
1689    ///
1690    /// [`select!`]: crate::select
1691    fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
1692    where
1693        Self: Unpin + FusedStream,
1694    {
1695        assert_future::<Self::Item, _>(SelectNextSome::new(self))
1696    }
1697}