tor_circmgr/mgr/
streams.rs1use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
5use futures::task::{Context, Poll};
6use pin_project::pin_project;
7use std::pin::Pin;
8
9#[derive(Debug, Copy, Clone, Eq, PartialEq)]
11pub(super) enum Source {
12 Left,
14 Right,
16}
17
18#[pin_project]
22pub(super) struct SelectBiased<S, T> {
23 #[pin]
29 left: Fuse<S>,
30 #[pin]
32 right: Fuse<T>,
33}
34
35pub(super) fn select_biased<S, T>(left: S, right: T) -> SelectBiased<S, T>
57where
58 S: Stream,
59 T: Stream<Item = S::Item>,
60{
61 SelectBiased {
62 left: left.fuse(),
63 right: right.fuse(),
64 }
65}
66
67impl<S, T> FusedStream for SelectBiased<S, T>
68where
69 S: Stream,
70 T: Stream<Item = S::Item>,
71{
72 fn is_terminated(&self) -> bool {
73 self.left.is_terminated()
76 }
77}
78
79impl<S, T> Stream for SelectBiased<S, T>
80where
81 S: Stream,
82 T: Stream<Item = S::Item>,
83{
84 type Item = (Source, S::Item);
85
86 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 let this = self.project();
88 match this.left.poll_next(cx) {
90 Poll::Ready(Some(val)) => {
91 return Poll::Ready(Some((Source::Left, val)));
93 }
94 Poll::Ready(None) => {
95 return Poll::Ready(None);
97 }
98 Poll::Pending => {}
99 }
100
101 match this.right.poll_next(cx) {
104 Poll::Ready(Some(val)) => {
105 Poll::Ready(Some((Source::Right, val)))
107 }
108 _ => {
109 Poll::Pending
112 }
113 }
114 }
115}
116
117#[cfg(test)]
118mod test {
119 #![allow(clippy::bool_assert_comparison)]
121 #![allow(clippy::clone_on_copy)]
122 #![allow(clippy::dbg_macro)]
123 #![allow(clippy::mixed_attributes_style)]
124 #![allow(clippy::print_stderr)]
125 #![allow(clippy::print_stdout)]
126 #![allow(clippy::single_char_pattern)]
127 #![allow(clippy::unwrap_used)]
128 #![allow(clippy::unchecked_duration_subtraction)]
129 #![allow(clippy::useless_vec)]
130 #![allow(clippy::needless_pass_by_value)]
131 use super::*;
133 use futures_await_test::async_test;
134
135 #[async_test]
137 async fn left_only() {
138 use futures::stream::iter;
139 use Source::Left as L;
140 let left = vec![1_usize, 2, 3];
142 let right = vec![];
143
144 let s = select_biased(iter(left), iter(right));
145 let result: Vec<_> = s.collect().await;
146 assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
147
148 let left = vec![1_usize, 2, 3];
151 let right = vec![4, 5, 6];
152 let s = select_biased(iter(left), iter(right));
153 let result: Vec<_> = s.collect().await;
154 assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
155
156 let left = vec![];
158 let right = vec![4_usize, 5, 6];
159 let s = select_biased(iter(left), iter(right));
160 let result: Vec<_> = s.collect().await;
161 assert_eq!(result, vec![]);
162 }
163
164 #[async_test]
166 async fn right_only() {
167 use futures::stream::{iter, pending};
168 use Source::Right as R;
169
170 let left = pending();
172 let right = vec![4_usize, 5, 6];
173 let mut s = select_biased(left, iter(right));
174 assert_eq!(s.next().await, Some((R, 4)));
175 assert_eq!(s.next().await, Some((R, 5)));
176 assert_eq!(s.next().await, Some((R, 6)));
177 }
178
179 #[async_test]
181 async fn multiplex() {
182 use futures::SinkExt;
183 use Source::{Left as L, Right as R};
184
185 let (mut snd_l, rcv_l) = futures::channel::mpsc::channel(5);
186 let (mut snd_r, rcv_r) = futures::channel::mpsc::channel(5);
187 let mut s = select_biased(rcv_l, rcv_r);
188
189 snd_l.send(1_usize).await.unwrap();
190 snd_r.send(4_usize).await.unwrap();
191 snd_l.send(2_usize).await.unwrap();
192
193 assert_eq!(s.next().await, Some((L, 1)));
194 assert_eq!(s.next().await, Some((L, 2)));
195 assert_eq!(s.next().await, Some((R, 4)));
196
197 snd_r.send(5_usize).await.unwrap();
198 snd_l.send(3_usize).await.unwrap();
199
200 assert!(!s.is_terminated());
201 drop(snd_r);
202
203 assert_eq!(s.next().await, Some((L, 3)));
204 assert_eq!(s.next().await, Some((R, 5)));
205
206 drop(snd_l);
207 assert_eq!(s.next().await, None);
208
209 assert!(s.is_terminated());
210 }
211}