postage/sink.rs
1//! A sink for values which are asynchronously accepted, until the target is closed.
2//!
3//! Postage channel senders implement Sink:
4//! ```rust
5//! use postage::mpsc::channel;
6//! use postage::sink::Sink;
7//!
8//! #[tokio::main]
9//! async fn main() {
10//! let (mut tx, rx) = channel(16);
11//! assert_eq!(Ok(()), tx.send(true).await);
12//! }
13//! ```
14//!
15//! Sinks return an error if the channel is closed, and the message cannot be accepted by the receiver:
16//! ```rust
17//! use postage::mpsc::channel;
18//! use postage::sink::{SendError, Sink};
19//!
20//! #[tokio::main]
21//! async fn main() {
22//! let (mut tx, rx) = channel(16);
23//! drop(rx);
24//! assert_eq!(Err(SendError(true)), tx.send(true).await);
25//! }
26//! ```
27//!
28//! Note that `Sink::send` returns an `Err` type, unlike `Stream::recv` which returns an option.
29//! This is because the failure to send a message sometimes needs to be interpreted as an application error:
30//! ```rust
31//! use postage::mpsc::channel;
32//! use postage::sink::{SendError, Sink};
33//!
34//! #[tokio::main]
35//! async fn main() -> Result<(), SendError<bool>> {
36//! let (mut tx, rx) = channel(16);
37//! tx.send(true).await?;
38//! Ok(())
39//! }
40//! ```
41//!
42//! Tasks can ignore send errors by using `Result::ok`:
43//! ```rust
44//! use postage::mpsc::channel;
45//! use postage::sink::Sink;
46//!
47//! #[tokio::main]
48//! async fn main() {
49//! let (mut tx, rx) = channel(16);
50//! tx.send(true).await.ok();
51//! }
52//! ```
53use std::marker::PhantomPinned;
54use std::{future::Future, ops::DerefMut, pin::Pin, task::Poll};
55
56use crate::Context;
57use pin_project::pin_project;
58
59mod chain;
60mod errors;
61mod filter;
62
63#[cfg(feature = "logging")]
64mod sink_log;
65
66pub use errors::*;
67
68/// A sink which can asynchronously accept messages, and at some point may refuse to accept any further messages.
69///
70/// Sinks implement `poll_send`, a poll-based method very similar to `std::future::Future`.
71///
72/// Sinks can be used in async code with `stream.send(value).await`, or with `stream.try_send(value)`. Note that
73/// `send` returns an error if the sink has been closed. And `try_send` returns an error if the sink is full, or it is closed.
74///
75/// Send errors can be ignored using `Result::ok`.
76///
77/// ```rust
78/// use postage::mpsc::channel;
79/// use postage::sink::{Sink, TrySendError};
80///
81/// #[tokio::main]
82/// async fn main() -> Result<(), TrySendError<bool>> {
83/// let (mut tx, mut rx) = channel(16);
84/// tx.send(true).await.ok();
85/// tx.try_send(true)?;
86/// drop(tx);
87/// Ok(())
88/// }
89/// ```
90///
91/// Sinks also support combinators, such as map, filter, chain, and log.
92/// ```rust
93/// use postage::mpsc::channel;
94/// use postage::sink::{Sink, SendError, TrySendError};
95/// use postage::stream::Stream;
96///
97/// #[tokio::main]
98/// async fn main() {
99/// let (mut tx, mut rx) = channel(16);
100/// let (tx2, mut rx2) = channel(16);
101///
102/// let mut combo = tx2
103/// .after(tx)
104/// .filter(|i| *i >= 2);
105///
106/// // The `logging` feature enables a combinator that logs values using the Debug trait.
107/// #[cfg(feature = "logging")]
108/// let mut combo = combo
109/// .log(log::Level::Info);
110///
111/// combo.send(1usize).await.ok();
112/// combo.send(2usize).await.ok();
113
114/// assert_eq!(Some(2usize), rx.recv().await);
115/// drop(rx);
116///
117/// combo.send(3usize).await.ok();
118/// combo.send(4usize).await.ok();
119/// assert_eq!(Some(3usize), rx2.recv().await);
120/// assert_eq!(Some(4usize), rx2.recv().await);
121///
122/// drop(rx2);
123/// assert_eq!(Err(SendError(5usize)), combo.send(5usize).await);
124/// }
125/// ```
126pub trait Sink {
127 type Item;
128
129 /// Attempts to accept the message, without blocking.
130 ///
131 /// Returns:
132 /// - `PollSend::Ready` if the value was sent
133 /// - `PollSend::Pending(value)` if the channel is full. The channel will call the waker in `cx` when the item may be accepted in the future.
134 /// - `PollSend::Rejected(value)` if the channel is closed, and will never accept the item.
135 fn poll_send(
136 self: Pin<&mut Self>,
137 cx: &mut Context<'_>,
138 value: Self::Item,
139 ) -> PollSend<Self::Item>;
140
141 /// Attempts to send a message into the sink.
142 ///
143 /// Returns:
144 /// - `Ok(())` if the value was accepted.
145 /// - `Err(SendError(value))` if the sink rejected the message.
146 fn send(&mut self, value: Self::Item) -> SendFuture<Self> {
147 SendFuture::new(self, value)
148 }
149
150 /// Attempts to send a message over the sink, without blocking.
151 ///
152 /// Returns:
153 /// - `Ok(())` if the value was accepted.
154 /// - `Err(TrySendError::Pending(value))` if the channel is full, and cannot accept the item at this time.
155 /// - `Err(TrySendError::Rejected(value))` if the channel is closed, and will never accept the item.
156 fn try_send(&mut self, value: Self::Item) -> Result<(), TrySendError<Self::Item>>
157 where
158 Self: Unpin,
159 {
160 let pin = Pin::new(self);
161
162 match pin.poll_send(&mut Context::empty(), value) {
163 PollSend::Ready => Ok(()),
164 PollSend::Pending(value) => Err(TrySendError::Pending(value)),
165 PollSend::Rejected(value) => Err(TrySendError::Rejected(value)),
166 }
167 }
168
169 /// Sends a message over the channel, blocking the current thread until the message is sent.
170 ///
171 /// Requires the `blocking` feature (enabled by default).
172 #[cfg(feature = "blocking")]
173 fn blocking_send(&mut self, value: Self::Item) -> Result<(), SendError<Self::Item>>
174 where
175 Self: Unpin,
176 {
177 pollster::block_on(self.send(value))
178 }
179
180 /// Chains two sink implementations. Messages will be transmitted to the argument until it rejects a message.
181 /// Then messages will be transmitted to self.
182 fn after<Before>(self, before: Before) -> chain::ChainSink<Before, Self>
183 where
184 Before: Sink<Item = Self::Item>,
185 Self: Sized,
186 {
187 chain::ChainSink::new(before, self)
188 }
189
190 /// Filters messages, forwarding them to the sink if the filter returns true
191 fn filter<Filter>(self, filter: Filter) -> filter::FilterSink<Filter, Self>
192 where
193 Filter: FnMut(&Self::Item) -> bool,
194 Self: Sized,
195 {
196 filter::FilterSink::new(filter, self)
197 }
198
199 /// Logs messages that are accepted by the sink using the Debug trait, at the provided log level.
200 ///
201 /// Requires the `logging` feature
202 #[cfg(feature = "logging")]
203 fn log(self, level: log::Level) -> sink_log::SinkLog<Self>
204 where
205 Self: Sized,
206 Self::Item: std::fmt::Debug,
207 {
208 sink_log::SinkLog::new(self, level)
209 }
210}
211
212impl<S> Sink for &mut S
213where
214 S: Sink + Unpin + ?Sized,
215{
216 type Item = S::Item;
217
218 fn poll_send(
219 mut self: Pin<&mut Self>,
220 cx: &mut Context<'_>,
221 value: Self::Item,
222 ) -> PollSend<Self::Item> {
223 S::poll_send(Pin::new(&mut **self), cx, value)
224 }
225}
226
227impl<P, S> Sink for Pin<P>
228where
229 P: DerefMut<Target = S> + Unpin,
230 S: Sink + Unpin + ?Sized,
231{
232 type Item = <S as Sink>::Item;
233
234 fn poll_send(
235 self: Pin<&mut Self>,
236 cx: &mut Context<'_>,
237 value: Self::Item,
238 ) -> PollSend<Self::Item> {
239 Pin::get_mut(self).as_mut().poll_send(cx, value)
240 }
241}
242
243/// An enum of poll responses that are produced by Sink implementations.
244#[derive(Debug, Clone, PartialEq, Eq)]
245pub enum PollSend<T> {
246 /// The item was accepted and sent
247 Ready,
248 /// The sender is pending, and has registered with the waker context
249 Pending(T),
250 /// The sender has been closed, and will never accept the item
251 Rejected(T),
252}
253
254/// A future returned by `Sink::send`, which wraps an item.
255/// The item is sent to the sink, or returned if the sink is closed.
256#[pin_project]
257#[must_use = "futures do nothing unless polled"]
258pub struct SendFuture<'s, S>
259where
260 S: Sink + ?Sized,
261{
262 #[pin]
263 send: &'s mut S,
264 value: Option<S::Item>,
265 #[pin]
266 _pin: PhantomPinned,
267}
268
269impl<'s, S> SendFuture<'s, S>
270where
271 S: Sink + ?Sized,
272{
273 pub fn new(send: &'s mut S, value: S::Item) -> SendFuture<S> {
274 Self {
275 send,
276 value: Some(value),
277 _pin: PhantomPinned,
278 }
279 }
280}
281
282impl<'s, S> Future for SendFuture<'s, S>
283where
284 S: Sink + Unpin + ?Sized,
285{
286 type Output = Result<(), SendError<S::Item>>;
287
288 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
289 if self.value.is_none() {
290 return Poll::Ready(Ok(()));
291 }
292
293 let this = self.project();
294
295 let mut cx: crate::Context<'_> = cx.into();
296 match this.send.poll_send(&mut cx, this.value.take().unwrap()) {
297 PollSend::Ready => Poll::Ready(Ok(())),
298 PollSend::Pending(value) => {
299 *this.value = Some(value);
300 Poll::Pending
301 }
302 PollSend::Rejected(value) => Poll::Ready(Err(SendError(value))),
303 }
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 #[cfg(feature = "blocking")]
310 #[test]
311 fn test_blocking() {
312 use super::Sink;
313 use crate::test::sink::ready;
314
315 let mut stream = ready();
316 assert_eq!(Ok(()), stream.blocking_send(1usize));
317 }
318}