futures_channel/mpsc/
sink_impl.rs

1use super::{SendError, Sender, TrySendError, UnboundedSender};
2use futures_core::task::{Context, Poll};
3use futures_sink::Sink;
4use std::pin::Pin;
5
6impl<T> Sink<T> for Sender<T> {
7    type Error = SendError;
8
9    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
10        (*self).poll_ready(cx)
11    }
12
13    fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
14        (*self).start_send(msg)
15    }
16
17    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
18        match (*self).poll_ready(cx) {
19            Poll::Ready(Err(ref e)) if e.is_disconnected() => {
20                // If the receiver disconnected, we consider the sink to be flushed.
21                Poll::Ready(Ok(()))
22            }
23            x => x,
24        }
25    }
26
27    fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
28        self.disconnect();
29        Poll::Ready(Ok(()))
30    }
31}
32
33impl<T> Sink<T> for UnboundedSender<T> {
34    type Error = SendError;
35
36    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
37        Self::poll_ready(&*self, cx)
38    }
39
40    fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
41        Self::start_send(&mut *self, msg)
42    }
43
44    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
45        Poll::Ready(Ok(()))
46    }
47
48    fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49        self.disconnect();
50        Poll::Ready(Ok(()))
51    }
52}
53
54impl<T> Sink<T> for &UnboundedSender<T> {
55    type Error = SendError;
56
57    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58        UnboundedSender::poll_ready(*self, cx)
59    }
60
61    fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
62        self.unbounded_send(msg).map_err(TrySendError::into_send_error)
63    }
64
65    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66        Poll::Ready(Ok(()))
67    }
68
69    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
70        self.close_channel();
71        Poll::Ready(Ok(()))
72    }
73}