tor_async_utils/
sink_close_channel.rs1use std::pin::Pin;
4
5use futures::channel::mpsc;
6use futures::Sink;
7
8pub trait SinkCloseChannel<T>: Sink<T> {
12 fn close_channel(self: Pin<&mut Self>);
22}
23
24impl<T> SinkCloseChannel<T> for mpsc::Sender<T> {
27 fn close_channel(self: Pin<&mut Self>) {
28 let self_: &mut Self = Pin::into_inner(self);
29 self_.close_channel();
30 }
31}
32
33#[cfg(test)]
34mod test {
35 #![allow(clippy::bool_assert_comparison)]
37 #![allow(clippy::clone_on_copy)]
38 #![allow(clippy::dbg_macro)]
39 #![allow(clippy::mixed_attributes_style)]
40 #![allow(clippy::print_stderr)]
41 #![allow(clippy::print_stdout)]
42 #![allow(clippy::single_char_pattern)]
43 #![allow(clippy::unwrap_used)]
44 #![allow(clippy::unchecked_duration_subtraction)]
45 #![allow(clippy::useless_vec)]
46 #![allow(clippy::needless_pass_by_value)]
47 #![allow(clippy::arithmetic_side_effects)] #![allow(clippy::useless_format)] use super::*;
52 use futures::{SinkExt as _, StreamExt as _};
53
54 #[test]
55 fn close_channel() {
56 tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
57 let (mut tx, mut rx) = mpsc::channel::<i32>(20);
58 tx.send(0).await.unwrap();
59 let mut tx2 = tx.clone();
60 tx2.send(1).await.unwrap();
61 tx2.close_channel();
62 let _: mpsc::SendError = tx.send(66).await.unwrap_err();
63 for i in 0..=1 {
64 assert_eq!(rx.next().await.unwrap(), i);
65 }
66 assert_eq!(rx.next().await, None);
67 });
68 }
69}