async_compression/futures/bufread/generic/
decoder.rs1use crate::codecs::Decode;
2use crate::core::util::PartialBuffer;
3
4use core::{
5 pin::Pin,
6 task::{Context, Poll},
7};
8use futures_core::ready;
9use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
10use pin_project_lite::pin_project;
11use std::io::{IoSlice, Result};
12
13#[derive(Debug)]
14enum State {
15 Decoding,
16 Flushing,
17 Done,
18 Next,
19}
20
21pin_project! {
22 #[derive(Debug)]
23 pub struct Decoder<R, D> {
24 #[pin]
25 reader: R,
26 decoder: D,
27 state: State,
28 multiple_members: bool,
29 }
30}
31
32impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
33 pub fn new(reader: R, decoder: D) -> Self {
34 Self {
35 reader,
36 decoder,
37 state: State::Decoding,
38 multiple_members: false,
39 }
40 }
41}
42
43impl<R, D> Decoder<R, D> {
44 pub fn get_ref(&self) -> &R {
45 &self.reader
46 }
47
48 pub fn get_mut(&mut self) -> &mut R {
49 &mut self.reader
50 }
51
52 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
53 self.project().reader
54 }
55
56 pub fn into_inner(self) -> R {
57 self.reader
58 }
59
60 pub fn multiple_members(&mut self, enabled: bool) {
61 self.multiple_members = enabled;
62 }
63}
64
65impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
66 fn do_poll_read(
67 self: Pin<&mut Self>,
68 cx: &mut Context<'_>,
69 output: &mut PartialBuffer<&mut [u8]>,
70 ) -> Poll<Result<()>> {
71 let mut this = self.project();
72
73 let mut first = true;
74
75 loop {
76 *this.state = match this.state {
77 State::Decoding => {
78 let input = if first {
79 &[][..]
80 } else {
81 ready!(this.reader.as_mut().poll_fill_buf(cx))?
82 };
83
84 if input.is_empty() && !first {
85 *this.multiple_members = false;
88
89 State::Flushing
90 } else {
91 let mut input = PartialBuffer::new(input);
92 let res = this.decoder.decode(&mut input, output).or_else(|err| {
93 if first {
96 Ok(false)
97 } else {
98 Err(err)
99 }
100 });
101
102 if !first {
103 let len = input.written().len();
104 this.reader.as_mut().consume(len);
105 }
106
107 first = false;
108
109 if res? {
110 State::Flushing
111 } else {
112 State::Decoding
113 }
114 }
115 }
116
117 State::Flushing => {
118 if this.decoder.finish(output)? {
119 if *this.multiple_members {
120 this.decoder.reinit()?;
121 State::Next
122 } else {
123 State::Done
124 }
125 } else {
126 State::Flushing
127 }
128 }
129
130 State::Done => State::Done,
131
132 State::Next => {
133 let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
134 if input.is_empty() {
135 State::Done
136 } else {
137 State::Decoding
138 }
139 }
140 };
141
142 if let State::Done = *this.state {
143 return Poll::Ready(Ok(()));
144 }
145 if output.unwritten().is_empty() {
146 return Poll::Ready(Ok(()));
147 }
148 }
149 }
150}
151
152impl<R: AsyncBufRead, D: Decode> AsyncRead for Decoder<R, D> {
153 fn poll_read(
154 self: Pin<&mut Self>,
155 cx: &mut Context<'_>,
156 buf: &mut [u8],
157 ) -> Poll<Result<usize>> {
158 if buf.is_empty() {
159 return Poll::Ready(Ok(0));
160 }
161
162 let mut output = PartialBuffer::new(buf);
163 match self.do_poll_read(cx, &mut output)? {
164 Poll::Pending if output.written().is_empty() => Poll::Pending,
165 _ => Poll::Ready(Ok(output.written().len())),
166 }
167 }
168}
169
170impl<R: AsyncWrite, D> AsyncWrite for Decoder<R, D> {
171 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
172 self.get_pin_mut().poll_write(cx, buf)
173 }
174
175 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
176 self.get_pin_mut().poll_flush(cx)
177 }
178
179 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
180 self.get_pin_mut().poll_close(cx)
181 }
182
183 fn poll_write_vectored(
184 self: Pin<&mut Self>,
185 cx: &mut Context<'_>,
186 bufs: &[IoSlice<'_>],
187 ) -> Poll<Result<usize>> {
188 self.get_pin_mut().poll_write_vectored(cx, bufs)
189 }
190}