Stream

Trait Stream 

Source
pub trait Stream {
    type Item;

    // Required method
    fn poll_recv(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> PollRecv<Self::Item>;

    // Provided methods
    fn recv(&mut self) -> RecvFuture<'_, Self> 
       where Self: Unpin { ... }
    fn try_recv(&mut self) -> Result<Self::Item, TryRecvError>
       where Self: Unpin { ... }
    fn map<Map, Into>(self, map: Map) -> MapStream<Self, Map, Into>
       where Map: Fn(Self::Item) -> Into,
             Self: Sized { ... }
    fn filter<Filter>(self, filter: Filter) -> FilterStream<Self, Filter>
       where Self: Sized + Unpin,
             Filter: FnMut(&Self::Item) -> bool + Unpin { ... }
    fn merge<Other>(self, other: Other) -> MergeStream<Self, Other>
       where Other: Stream<Item = Self::Item>,
             Self: Sized { ... }
    fn chain<Other>(self, other: Other) -> ChainStream<Self, Other>
       where Other: Stream<Item = Self::Item>,
             Self: Sized { ... }
    fn find<Condition>(
        self,
        condition: Condition,
    ) -> FindStream<Self, Condition>
       where Self: Sized + Unpin,
             Condition: Fn(&Self::Item) -> bool + Unpin { ... }
}
Expand description

An asynchronous stream, which produces a series of messages until closed.

Streams implement poll_recv, a poll-based method very similar to std::future::Future.

Streams can be used in async code with stream.recv().await, or with stream.try_recv().

use postage::mpsc::channel;
use postage::sink::Sink;
use postage::stream::Stream;

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = channel(16);
    tx.send(true).await;
    tx.send(true).await;
    drop(tx);

    while let Some(_v) = rx.recv().await {
        println!("Value received!");
        if let Ok(_v) = rx.try_recv() {
            println!("Extra value received!");
        }
    }
}

Streams also support combinators, such as map, filter, find, and log.

use postage::mpsc::channel;
use postage::sink::Sink;
use postage::stream::{Stream, TryRecvError};

#[tokio::main]
async fn main() {
    let (mut tx, rx) = channel(16);

    tx.send(1usize).await;
    tx.send(2usize).await;
    tx.send(3usize).await;
    drop(tx);

    let mut rx = rx
        .map(|i| i * 2)
        .filter(|i| *i >= 4)
        .find(|i| *i == 6);

    // The `logging` feature enables a combinator that logs values using the Debug trait.
    #[cfg(feature = "logging")]
    let mut rx = rx
        .log(log::Level::Info);

    assert_eq!(Ok(6), rx.try_recv());
    assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
}

Required Associated Types§

Required Methods§

Source

fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item>

Attempts to retrieve an item from the stream, without blocking.

Returns:

  • PollRecv::Ready(value) if a message is ready
  • PollRecv::Pending if the stream is open, but no message is currently available.
  • PollRecv::Closed if the stream is closed, and no messages are expected.

Provided Methods§

Source

fn recv(&mut self) -> RecvFuture<'_, Self>
where Self: Unpin,

Retrieves a message from the stream.

Returns:

  • Some(value) if the stream is open
  • None if the stream is closed, and no further messages are expected.
Source

fn try_recv(&mut self) -> Result<Self::Item, TryRecvError>
where Self: Unpin,

Attempts to retrive a message from the stream, without blocking.

Returns:

  • Ok(value) if a message is ready.
  • TryRecvError::Pending if the stream is open, but no messages are available.
  • TryRecvError::Closed if the stream has been closed, and no items are expected.
Source

fn map<Map, Into>(self, map: Map) -> MapStream<Self, Map, Into>
where Map: Fn(Self::Item) -> Into, Self: Sized,

Transforms the stream with a map function.

Source

fn filter<Filter>(self, filter: Filter) -> FilterStream<Self, Filter>
where Self: Sized + Unpin, Filter: FnMut(&Self::Item) -> bool + Unpin,

Filters messages returned by the stream, ignoring messages where filter returns false.

Source

fn merge<Other>(self, other: Other) -> MergeStream<Self, Other>
where Other: Stream<Item = Self::Item>, Self: Sized,

Merges two streams, returning values from both at once, until both are closed.

Source

fn chain<Other>(self, other: Other) -> ChainStream<Self, Other>
where Other: Stream<Item = Self::Item>, Self: Sized,

Chains two streams, returning values from self until it is closed, and then returning values from other.

Source

fn find<Condition>(self, condition: Condition) -> FindStream<Self, Condition>
where Self: Sized + Unpin, Condition: Fn(&Self::Item) -> bool + Unpin,

Finds a message matching a condition. When the condition is matched, a single value will be returned. Then the stream will be closed.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementations on Foreign Types§

Source§

impl<P, S> Stream for Pin<P>
where P: DerefMut<Target = S> + Unpin, S: Stream + Unpin + ?Sized,

Source§

type Item = <S as Stream>::Item

Source§

fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item>

Source§

impl<S> Stream for &mut S
where S: Stream + Unpin + ?Sized,

Source§

type Item = <S as Stream>::Item

Source§

fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item>

Implementors§

Source§

impl Stream for postage::barrier::Receiver

Source§

impl<T> Stream for postage::broadcast::Receiver<T>
where T: Clone,

Source§

type Item = T

Source§

impl<T> Stream for postage::dispatch::Receiver<T>

Source§

type Item = T

Source§

impl<T> Stream for postage::mpsc::Receiver<T>

Source§

type Item = T

Source§

impl<T> Stream for postage::oneshot::Receiver<T>

Source§

type Item = T

Source§

impl<T> Stream for postage::watch::Receiver<T>
where T: Clone,

Source§

type Item = T