tokio/io/util/
copy_bidirectional.rs1use super::copy::CopyBuffer;
2
3use crate::io::{AsyncRead, AsyncWrite};
4
5use std::future::poll_fn;
6use std::io;
7use std::pin::Pin;
8use std::task::{ready, Context, Poll};
9
10enum TransferState {
11 Running(CopyBuffer),
12 ShuttingDown(u64),
13 Done(u64),
14}
15
16fn transfer_one_direction<A, B>(
17 cx: &mut Context<'_>,
18 state: &mut TransferState,
19 r: &mut A,
20 w: &mut B,
21) -> Poll<io::Result<u64>>
22where
23 A: AsyncRead + AsyncWrite + Unpin + ?Sized,
24 B: AsyncRead + AsyncWrite + Unpin + ?Sized,
25{
26 let mut r = Pin::new(r);
27 let mut w = Pin::new(w);
28
29 loop {
30 match state {
31 TransferState::Running(buf) => {
32 let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
33 *state = TransferState::ShuttingDown(count);
34 }
35 TransferState::ShuttingDown(count) => {
36 ready!(w.as_mut().poll_shutdown(cx))?;
37
38 *state = TransferState::Done(*count);
39 }
40 TransferState::Done(count) => return Poll::Ready(Ok(*count)),
41 }
42 }
43}
44#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
75pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> io::Result<(u64, u64)>
76where
77 A: AsyncRead + AsyncWrite + Unpin + ?Sized,
78 B: AsyncRead + AsyncWrite + Unpin + ?Sized,
79{
80 copy_bidirectional_impl(
81 a,
82 b,
83 CopyBuffer::new(super::DEFAULT_BUF_SIZE),
84 CopyBuffer::new(super::DEFAULT_BUF_SIZE),
85 )
86 .await
87}
88
89#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
94pub async fn copy_bidirectional_with_sizes<A, B>(
95 a: &mut A,
96 b: &mut B,
97 a_to_b_buf_size: usize,
98 b_to_a_buf_size: usize,
99) -> io::Result<(u64, u64)>
100where
101 A: AsyncRead + AsyncWrite + Unpin + ?Sized,
102 B: AsyncRead + AsyncWrite + Unpin + ?Sized,
103{
104 copy_bidirectional_impl(
105 a,
106 b,
107 CopyBuffer::new(a_to_b_buf_size),
108 CopyBuffer::new(b_to_a_buf_size),
109 )
110 .await
111}
112
113async fn copy_bidirectional_impl<A, B>(
114 a: &mut A,
115 b: &mut B,
116 a_to_b_buffer: CopyBuffer,
117 b_to_a_buffer: CopyBuffer,
118) -> io::Result<(u64, u64)>
119where
120 A: AsyncRead + AsyncWrite + Unpin + ?Sized,
121 B: AsyncRead + AsyncWrite + Unpin + ?Sized,
122{
123 let mut a_to_b = TransferState::Running(a_to_b_buffer);
124 let mut b_to_a = TransferState::Running(b_to_a_buffer);
125 poll_fn(|cx| {
126 let a_to_b = transfer_one_direction(cx, &mut a_to_b, a, b)?;
127 let b_to_a = transfer_one_direction(cx, &mut b_to_a, b, a)?;
128
129 let a_to_b = ready!(a_to_b);
132 let b_to_a = ready!(b_to_a);
133
134 Poll::Ready(Ok((a_to_b, b_to_a)))
135 })
136 .await
137}