tokio/io/util/
copy_bidirectional.rs

1use super::copy::CopyBuffer;
2
3use crate::io::{AsyncRead, AsyncWrite};
4
5use std::future::poll_fn;
6use std::io;
7use std::pin::Pin;
8use std::task::{ready, Context, Poll};
9
10enum TransferState {
11    Running(CopyBuffer),
12    ShuttingDown(u64),
13    Done(u64),
14}
15
16fn transfer_one_direction<A, B>(
17    cx: &mut Context<'_>,
18    state: &mut TransferState,
19    r: &mut A,
20    w: &mut B,
21) -> Poll<io::Result<u64>>
22where
23    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
24    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
25{
26    let mut r = Pin::new(r);
27    let mut w = Pin::new(w);
28
29    loop {
30        match state {
31            TransferState::Running(buf) => {
32                let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
33                *state = TransferState::ShuttingDown(count);
34            }
35            TransferState::ShuttingDown(count) => {
36                ready!(w.as_mut().poll_shutdown(cx))?;
37
38                *state = TransferState::Done(*count);
39            }
40            TransferState::Done(count) => return Poll::Ready(Ok(*count)),
41        }
42    }
43}
44/// Copies data in both directions between `a` and `b`.
45///
46/// This function returns a future that will read from both streams,
47/// writing any data read to the opposing stream.
48/// This happens in both directions concurrently.
49///
50/// If an EOF is observed on one stream, [`shutdown()`] will be invoked on
51/// the other, and reading from that stream will stop. Copying of data in
52/// the other direction will continue.
53///
54/// The future will complete successfully once both directions of communication has been shut down.
55/// A direction is shut down when the reader reports EOF,
56/// at which point [`shutdown()`] is called on the corresponding writer. When finished,
57/// it will return a tuple of the number of bytes copied from a to b
58/// and the number of bytes copied from b to a, in that order.
59///
60/// It uses two 8 KB buffers for transferring bytes between `a` and `b` by default.
61/// To set your own buffers sizes use [`copy_bidirectional_with_sizes()`].
62///
63/// [`shutdown()`]: crate::io::AsyncWriteExt::shutdown
64///
65/// # Errors
66///
67/// The future will immediately return an error if any IO operation on `a`
68/// or `b` returns an error. Some data read from either stream may be lost (not
69/// written to the other stream) in this case.
70///
71/// # Return value
72///
73/// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`.
74#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
75pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> io::Result<(u64, u64)>
76where
77    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
78    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
79{
80    copy_bidirectional_impl(
81        a,
82        b,
83        CopyBuffer::new(super::DEFAULT_BUF_SIZE),
84        CopyBuffer::new(super::DEFAULT_BUF_SIZE),
85    )
86    .await
87}
88
89/// Copies data in both directions between `a` and `b` using buffers of the specified size.
90///
91/// This method is the same as the [`copy_bidirectional()`], except that it allows you to set the
92/// size of the internal buffers used when copying data.
93#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
94pub async fn copy_bidirectional_with_sizes<A, B>(
95    a: &mut A,
96    b: &mut B,
97    a_to_b_buf_size: usize,
98    b_to_a_buf_size: usize,
99) -> io::Result<(u64, u64)>
100where
101    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
102    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
103{
104    copy_bidirectional_impl(
105        a,
106        b,
107        CopyBuffer::new(a_to_b_buf_size),
108        CopyBuffer::new(b_to_a_buf_size),
109    )
110    .await
111}
112
113async fn copy_bidirectional_impl<A, B>(
114    a: &mut A,
115    b: &mut B,
116    a_to_b_buffer: CopyBuffer,
117    b_to_a_buffer: CopyBuffer,
118) -> io::Result<(u64, u64)>
119where
120    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
121    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
122{
123    let mut a_to_b = TransferState::Running(a_to_b_buffer);
124    let mut b_to_a = TransferState::Running(b_to_a_buffer);
125    poll_fn(|cx| {
126        let a_to_b = transfer_one_direction(cx, &mut a_to_b, a, b)?;
127        let b_to_a = transfer_one_direction(cx, &mut b_to_a, b, a)?;
128
129        // It is not a problem if ready! returns early because transfer_one_direction for the
130        // other direction will keep returning TransferState::Done(count) in future calls to poll
131        let a_to_b = ready!(a_to_b);
132        let b_to_a = ready!(b_to_a);
133
134        Poll::Ready(Ok((a_to_b, b_to_a)))
135    })
136    .await
137}