postage/
sink.rs

1//! A sink for values which are asynchronously accepted, until the target is closed.
2//!
3//! Postage channel senders implement Sink:
4//! ```rust
5//! use postage::mpsc::channel;
6//! use postage::sink::Sink;
7//!
8//! #[tokio::main]
9//! async fn main() {
10//!     let (mut tx, rx) = channel(16);
11//!     assert_eq!(Ok(()), tx.send(true).await);
12//! }
13//! ```
14//!
15//! Sinks return an error if the channel is closed, and the message cannot be accepted by the receiver:
16//! ```rust
17//! use postage::mpsc::channel;
18//! use postage::sink::{SendError, Sink};
19//!
20//! #[tokio::main]
21//! async fn main() {
22//!     let (mut tx, rx) = channel(16);
23//!     drop(rx);
24//!     assert_eq!(Err(SendError(true)), tx.send(true).await);
25//! }
26//! ```
27//!
28//! Note that `Sink::send` returns an `Err` type, unlike `Stream::recv` which returns an option.
29//! This is because the failure to send a message sometimes needs to be interpreted as an application error:
30//! ```rust
31//! use postage::mpsc::channel;
32//! use postage::sink::{SendError, Sink};
33//!
34//! #[tokio::main]
35//! async fn main() -> Result<(), SendError<bool>> {
36//!     let (mut tx, rx) = channel(16);
37//!     tx.send(true).await?;
38//!     Ok(())
39//! }
40//! ```
41//!
42//! Tasks can ignore send errors by using `Result::ok`:
43//! ```rust
44//! use postage::mpsc::channel;
45//! use postage::sink::Sink;
46//!
47//! #[tokio::main]
48//! async fn main() {
49//!     let (mut tx, rx) = channel(16);
50//!     tx.send(true).await.ok();
51//! }
52//! ```
53use std::marker::PhantomPinned;
54use std::{future::Future, ops::DerefMut, pin::Pin, task::Poll};
55
56use crate::Context;
57use pin_project::pin_project;
58
59mod chain;
60mod errors;
61mod filter;
62
63#[cfg(feature = "logging")]
64mod sink_log;
65
66pub use errors::*;
67
68/// A sink which can asynchronously accept messages, and at some point may refuse to accept any further messages.
69///
70/// Sinks implement `poll_send`, a poll-based method very similar to `std::future::Future`.
71///
72/// Sinks can be used in async code with `stream.send(value).await`, or with `stream.try_send(value)`.  Note that
73/// `send` returns an error if the sink has been closed.  And `try_send` returns an error if the sink is full, or it is closed.
74///
75/// Send errors can be ignored using `Result::ok`.
76///
77/// ```rust
78/// use postage::mpsc::channel;
79/// use postage::sink::{Sink, TrySendError};
80///
81/// #[tokio::main]
82/// async fn main() -> Result<(), TrySendError<bool>> {
83///     let (mut tx, mut rx) = channel(16);
84///     tx.send(true).await.ok();
85///     tx.try_send(true)?;
86///     drop(tx);
87///     Ok(())
88/// }
89/// ```
90///
91/// Sinks also support combinators, such as map, filter, chain, and log.
92/// ```rust
93/// use postage::mpsc::channel;
94/// use postage::sink::{Sink, SendError, TrySendError};
95/// use postage::stream::Stream;
96///
97/// #[tokio::main]
98/// async fn main() {
99///     let (mut tx, mut rx) = channel(16);
100///     let (tx2, mut rx2) = channel(16);
101///
102///     let mut combo = tx2
103///         .after(tx)
104///         .filter(|i| *i >= 2);
105///     
106///     // The `logging` feature enables a combinator that logs values using the Debug trait.
107///     #[cfg(feature = "logging")]
108///     let mut combo = combo
109///         .log(log::Level::Info);
110///
111///     combo.send(1usize).await.ok();
112///     combo.send(2usize).await.ok();
113
114///     assert_eq!(Some(2usize), rx.recv().await);
115///     drop(rx);
116///
117///     combo.send(3usize).await.ok();
118///     combo.send(4usize).await.ok();
119///     assert_eq!(Some(3usize), rx2.recv().await);
120///     assert_eq!(Some(4usize), rx2.recv().await);
121///
122///     drop(rx2);
123///     assert_eq!(Err(SendError(5usize)), combo.send(5usize).await);
124/// }
125/// ```
126pub trait Sink {
127    type Item;
128
129    /// Attempts to accept the message, without blocking.  
130    ///
131    /// Returns:
132    /// - `PollSend::Ready` if the value was sent
133    /// - `PollSend::Pending(value)` if the channel is full.  The channel will call the waker in `cx` when the item may be accepted in the future.
134    /// - `PollSend::Rejected(value)` if the channel is closed, and will never accept the item.
135    fn poll_send(
136        self: Pin<&mut Self>,
137        cx: &mut Context<'_>,
138        value: Self::Item,
139    ) -> PollSend<Self::Item>;
140
141    /// Attempts to send a message into the sink.  
142    ///
143    /// Returns:
144    /// - `Ok(())` if the value was accepted.
145    /// - `Err(SendError(value))` if the sink rejected the message.
146    fn send(&mut self, value: Self::Item) -> SendFuture<Self> {
147        SendFuture::new(self, value)
148    }
149
150    /// Attempts to send a message over the sink, without blocking.
151    ///
152    /// Returns:
153    /// - `Ok(())` if the value was accepted.
154    /// - `Err(TrySendError::Pending(value))` if the channel is full, and cannot accept the item at this time.
155    /// - `Err(TrySendError::Rejected(value))` if the channel is closed, and will never accept the item.
156    fn try_send(&mut self, value: Self::Item) -> Result<(), TrySendError<Self::Item>>
157    where
158        Self: Unpin,
159    {
160        let pin = Pin::new(self);
161
162        match pin.poll_send(&mut Context::empty(), value) {
163            PollSend::Ready => Ok(()),
164            PollSend::Pending(value) => Err(TrySendError::Pending(value)),
165            PollSend::Rejected(value) => Err(TrySendError::Rejected(value)),
166        }
167    }
168
169    /// Sends a message over the channel, blocking the current thread until the message is sent.
170    ///
171    /// Requires the `blocking` feature (enabled by default).
172    #[cfg(feature = "blocking")]
173    fn blocking_send(&mut self, value: Self::Item) -> Result<(), SendError<Self::Item>>
174    where
175        Self: Unpin,
176    {
177        pollster::block_on(self.send(value))
178    }
179
180    /// Chains two sink implementations.  Messages will be transmitted to the argument until it rejects a message.
181    /// Then messages will be transmitted to self.
182    fn after<Before>(self, before: Before) -> chain::ChainSink<Before, Self>
183    where
184        Before: Sink<Item = Self::Item>,
185        Self: Sized,
186    {
187        chain::ChainSink::new(before, self)
188    }
189
190    /// Filters messages, forwarding them to the sink if the filter returns true
191    fn filter<Filter>(self, filter: Filter) -> filter::FilterSink<Filter, Self>
192    where
193        Filter: FnMut(&Self::Item) -> bool,
194        Self: Sized,
195    {
196        filter::FilterSink::new(filter, self)
197    }
198
199    /// Logs messages that are accepted by the sink using the Debug trait, at the provided log level.
200    ///
201    /// Requires the `logging` feature
202    #[cfg(feature = "logging")]
203    fn log(self, level: log::Level) -> sink_log::SinkLog<Self>
204    where
205        Self: Sized,
206        Self::Item: std::fmt::Debug,
207    {
208        sink_log::SinkLog::new(self, level)
209    }
210}
211
212impl<S> Sink for &mut S
213where
214    S: Sink + Unpin + ?Sized,
215{
216    type Item = S::Item;
217
218    fn poll_send(
219        mut self: Pin<&mut Self>,
220        cx: &mut Context<'_>,
221        value: Self::Item,
222    ) -> PollSend<Self::Item> {
223        S::poll_send(Pin::new(&mut **self), cx, value)
224    }
225}
226
227impl<P, S> Sink for Pin<P>
228where
229    P: DerefMut<Target = S> + Unpin,
230    S: Sink + Unpin + ?Sized,
231{
232    type Item = <S as Sink>::Item;
233
234    fn poll_send(
235        self: Pin<&mut Self>,
236        cx: &mut Context<'_>,
237        value: Self::Item,
238    ) -> PollSend<Self::Item> {
239        Pin::get_mut(self).as_mut().poll_send(cx, value)
240    }
241}
242
243/// An enum of poll responses that are produced by Sink implementations.
244#[derive(Debug, Clone, PartialEq, Eq)]
245pub enum PollSend<T> {
246    /// The item was accepted and sent
247    Ready,
248    /// The sender is pending, and has registered with the waker context
249    Pending(T),
250    /// The sender has been closed, and will never accept the item
251    Rejected(T),
252}
253
254/// A future returned by `Sink::send`, which wraps an item.
255/// The item is sent to the sink, or returned if the sink is closed.
256#[pin_project]
257#[must_use = "futures do nothing unless polled"]
258pub struct SendFuture<'s, S>
259where
260    S: Sink + ?Sized,
261{
262    #[pin]
263    send: &'s mut S,
264    value: Option<S::Item>,
265    #[pin]
266    _pin: PhantomPinned,
267}
268
269impl<'s, S> SendFuture<'s, S>
270where
271    S: Sink + ?Sized,
272{
273    pub fn new(send: &'s mut S, value: S::Item) -> SendFuture<S> {
274        Self {
275            send,
276            value: Some(value),
277            _pin: PhantomPinned,
278        }
279    }
280}
281
282impl<'s, S> Future for SendFuture<'s, S>
283where
284    S: Sink + Unpin + ?Sized,
285{
286    type Output = Result<(), SendError<S::Item>>;
287
288    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
289        if self.value.is_none() {
290            return Poll::Ready(Ok(()));
291        }
292
293        let this = self.project();
294
295        let mut cx: crate::Context<'_> = cx.into();
296        match this.send.poll_send(&mut cx, this.value.take().unwrap()) {
297            PollSend::Ready => Poll::Ready(Ok(())),
298            PollSend::Pending(value) => {
299                *this.value = Some(value);
300                Poll::Pending
301            }
302            PollSend::Rejected(value) => Poll::Ready(Err(SendError(value))),
303        }
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    #[cfg(feature = "blocking")]
310    #[test]
311    fn test_blocking() {
312        use super::Sink;
313        use crate::test::sink::ready;
314
315        let mut stream = ready();
316        assert_eq!(Ok(()), stream.blocking_send(1usize));
317    }
318}