async_compression/futures/bufread/generic/
decoder.rs

1use crate::codecs::Decode;
2use crate::core::util::PartialBuffer;
3
4use core::{
5    pin::Pin,
6    task::{Context, Poll},
7};
8use futures_core::ready;
9use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
10use pin_project_lite::pin_project;
11use std::io::{IoSlice, Result};
12
13#[derive(Debug)]
14enum State {
15    Decoding,
16    Flushing,
17    Done,
18    Next,
19}
20
21pin_project! {
22    #[derive(Debug)]
23    pub struct Decoder<R, D> {
24        #[pin]
25        reader: R,
26        decoder: D,
27        state: State,
28        multiple_members: bool,
29    }
30}
31
32impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
33    pub fn new(reader: R, decoder: D) -> Self {
34        Self {
35            reader,
36            decoder,
37            state: State::Decoding,
38            multiple_members: false,
39        }
40    }
41}
42
43impl<R, D> Decoder<R, D> {
44    pub fn get_ref(&self) -> &R {
45        &self.reader
46    }
47
48    pub fn get_mut(&mut self) -> &mut R {
49        &mut self.reader
50    }
51
52    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
53        self.project().reader
54    }
55
56    pub fn into_inner(self) -> R {
57        self.reader
58    }
59
60    pub fn multiple_members(&mut self, enabled: bool) {
61        self.multiple_members = enabled;
62    }
63}
64
65impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
66    fn do_poll_read(
67        self: Pin<&mut Self>,
68        cx: &mut Context<'_>,
69        output: &mut PartialBuffer<&mut [u8]>,
70    ) -> Poll<Result<()>> {
71        let mut this = self.project();
72
73        let mut first = true;
74
75        loop {
76            *this.state = match this.state {
77                State::Decoding => {
78                    let input = if first {
79                        &[][..]
80                    } else {
81                        ready!(this.reader.as_mut().poll_fill_buf(cx))?
82                    };
83
84                    if input.is_empty() && !first {
85                        // Avoid attempting to reinitialise the decoder if the
86                        // reader has returned EOF.
87                        *this.multiple_members = false;
88
89                        State::Flushing
90                    } else {
91                        let mut input = PartialBuffer::new(input);
92                        let res = this.decoder.decode(&mut input, output).or_else(|err| {
93                            // ignore the first error, occurs when input is empty
94                            // but we need to run decode to flush
95                            if first {
96                                Ok(false)
97                            } else {
98                                Err(err)
99                            }
100                        });
101
102                        if !first {
103                            let len = input.written().len();
104                            this.reader.as_mut().consume(len);
105                        }
106
107                        first = false;
108
109                        if res? {
110                            State::Flushing
111                        } else {
112                            State::Decoding
113                        }
114                    }
115                }
116
117                State::Flushing => {
118                    if this.decoder.finish(output)? {
119                        if *this.multiple_members {
120                            this.decoder.reinit()?;
121                            State::Next
122                        } else {
123                            State::Done
124                        }
125                    } else {
126                        State::Flushing
127                    }
128                }
129
130                State::Done => State::Done,
131
132                State::Next => {
133                    let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
134                    if input.is_empty() {
135                        State::Done
136                    } else {
137                        State::Decoding
138                    }
139                }
140            };
141
142            if let State::Done = *this.state {
143                return Poll::Ready(Ok(()));
144            }
145            if output.unwritten().is_empty() {
146                return Poll::Ready(Ok(()));
147            }
148        }
149    }
150}
151
152impl<R: AsyncBufRead, D: Decode> AsyncRead for Decoder<R, D> {
153    fn poll_read(
154        self: Pin<&mut Self>,
155        cx: &mut Context<'_>,
156        buf: &mut [u8],
157    ) -> Poll<Result<usize>> {
158        if buf.is_empty() {
159            return Poll::Ready(Ok(0));
160        }
161
162        let mut output = PartialBuffer::new(buf);
163        match self.do_poll_read(cx, &mut output)? {
164            Poll::Pending if output.written().is_empty() => Poll::Pending,
165            _ => Poll::Ready(Ok(output.written().len())),
166        }
167    }
168}
169
170impl<R: AsyncWrite, D> AsyncWrite for Decoder<R, D> {
171    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
172        self.get_pin_mut().poll_write(cx, buf)
173    }
174
175    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
176        self.get_pin_mut().poll_flush(cx)
177    }
178
179    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
180        self.get_pin_mut().poll_close(cx)
181    }
182
183    fn poll_write_vectored(
184        self: Pin<&mut Self>,
185        cx: &mut Context<'_>,
186        bufs: &[IoSlice<'_>],
187    ) -> Poll<Result<usize>> {
188        self.get_pin_mut().poll_write_vectored(cx, bufs)
189    }
190}