1use alloc::collections::VecDeque;
2use alloc::vec::Vec;
3use core::{cmp, mem};
4#[cfg(feature = "std")]
5use std::io;
6#[cfg(feature = "std")]
7use std::io::Read;
8
9#[cfg(feature = "std")]
10use crate::msgs::message::OutboundChunks;
11
12pub(crate) struct ChunkVecBuffer {
17 prefix_used: usize,
22
23 chunks: VecDeque<Vec<u8>>,
24
25 limit: Option<usize>,
27}
28
29impl ChunkVecBuffer {
30 pub(crate) fn new(limit: Option<usize>) -> Self {
31 Self {
32 prefix_used: 0,
33 chunks: VecDeque::new(),
34 limit,
35 }
36 }
37
38 pub(crate) fn set_limit(&mut self, new_limit: Option<usize>) {
46 self.limit = new_limit;
47 }
48
49 pub(crate) fn is_empty(&self) -> bool {
51 self.chunks.is_empty()
52 }
53
54 pub(crate) fn len(&self) -> usize {
56 self.chunks
57 .iter()
58 .fold(0usize, |acc, chunk| acc + chunk.len())
59 - self.prefix_used
60 }
61
62 pub(crate) fn apply_limit(&self, len: usize) -> usize {
66 if let Some(limit) = self.limit {
67 let space = limit.saturating_sub(self.len());
68 cmp::min(len, space)
69 } else {
70 len
71 }
72 }
73
74 pub(crate) fn append(&mut self, bytes: Vec<u8>) -> usize {
76 let len = bytes.len();
77
78 if !bytes.is_empty() {
79 if self.chunks.is_empty() {
80 debug_assert_eq!(self.prefix_used, 0);
81 }
82
83 self.chunks.push_back(bytes);
84 }
85
86 len
87 }
88
89 pub(crate) fn pop(&mut self) -> Option<Vec<u8>> {
93 let mut first = self.chunks.pop_front();
94
95 if let Some(first) = &mut first {
96 let prefix = mem::take(&mut self.prefix_used);
98 first.drain(0..prefix);
99 }
100
101 first
102 }
103
104 #[cfg(read_buf)]
105 pub(crate) fn read_buf(&mut self, mut cursor: core::io::BorrowedCursor<'_>) -> io::Result<()> {
107 while !self.is_empty() && cursor.capacity() > 0 {
108 let chunk = &self.chunks[0][self.prefix_used..];
109 let used = cmp::min(chunk.len(), cursor.capacity());
110 cursor.append(&chunk[..used]);
111 self.consume(used);
112 }
113
114 Ok(())
115 }
116
117 pub(crate) fn peek(&self) -> Option<&[u8]> {
119 self.chunks
120 .front()
121 .map(|ch| ch.as_slice())
122 }
123}
124
125#[cfg(feature = "std")]
126impl ChunkVecBuffer {
127 pub(crate) fn is_full(&self) -> bool {
128 self.limit
129 .map(|limit| self.len() > limit)
130 .unwrap_or_default()
131 }
132
133 pub(crate) fn append_limited_copy(&mut self, payload: OutboundChunks<'_>) -> usize {
136 let take = self.apply_limit(payload.len());
137 self.append(payload.split_at(take).0.to_vec());
138 take
139 }
140
141 pub(crate) fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
144 let mut offs = 0;
145
146 while offs < buf.len() && !self.is_empty() {
147 let used = (&self.chunks[0][self.prefix_used..]).read(&mut buf[offs..])?;
148
149 self.consume(used);
150 offs += used;
151 }
152
153 Ok(offs)
154 }
155
156 pub(crate) fn consume_first_chunk(&mut self, used: usize) {
157 assert!(
160 used <= self
161 .chunk()
162 .map(|ch| ch.len())
163 .unwrap_or_default(),
164 "illegal `BufRead::consume` usage",
165 );
166 self.consume(used);
167 }
168
169 fn consume(&mut self, used: usize) {
170 self.prefix_used += used;
172
173 while let Some(buf) = self.chunks.front() {
176 if self.prefix_used < buf.len() {
177 return;
178 } else {
179 self.prefix_used -= buf.len();
180 self.chunks.pop_front();
181 }
182 }
183
184 debug_assert_eq!(
185 self.prefix_used, 0,
186 "attempted to `ChunkVecBuffer::consume` more than available"
187 );
188 }
189
190 pub(crate) fn write_to(&mut self, wr: &mut dyn io::Write) -> io::Result<usize> {
192 if self.is_empty() {
193 return Ok(0);
194 }
195
196 let mut prefix = self.prefix_used;
197 let mut bufs = [io::IoSlice::new(&[]); 64];
198 for (iov, chunk) in bufs.iter_mut().zip(self.chunks.iter()) {
199 *iov = io::IoSlice::new(&chunk[prefix..]);
200 prefix = 0;
201 }
202 let len = cmp::min(bufs.len(), self.chunks.len());
203 let bufs = &bufs[..len];
204 let used = wr.write_vectored(bufs)?;
205 let available_bytes = bufs.iter().map(|ch| ch.len()).sum();
206
207 if used > available_bytes {
208 self.consume(available_bytes);
213 return Err(io::Error::new(
214 io::ErrorKind::Other,
215 std::format!("illegal write_vectored return value ({used} > {available_bytes})"),
216 ));
217 }
218 self.consume(used);
219 Ok(used)
220 }
221
222 pub(crate) fn chunk(&self) -> Option<&[u8]> {
224 self.chunks
225 .front()
226 .map(|chunk| &chunk[self.prefix_used..])
227 }
228}
229
230#[cfg(all(test, feature = "std"))]
231mod tests {
232 use alloc::vec;
233 use alloc::vec::Vec;
234
235 use super::ChunkVecBuffer;
236
237 #[test]
238 fn short_append_copy_with_limit() {
239 let mut cvb = ChunkVecBuffer::new(Some(12));
240 assert_eq!(cvb.append_limited_copy(b"hello"[..].into()), 5);
241 assert_eq!(cvb.append_limited_copy(b"world"[..].into()), 5);
242 assert_eq!(cvb.append_limited_copy(b"hello"[..].into()), 2);
243 assert_eq!(cvb.append_limited_copy(b"world"[..].into()), 0);
244
245 let mut buf = [0u8; 12];
246 assert_eq!(cvb.read(&mut buf).unwrap(), 12);
247 assert_eq!(buf.to_vec(), b"helloworldhe".to_vec());
248 }
249
250 #[test]
251 fn read_byte_by_byte() {
252 let mut cvb = ChunkVecBuffer::new(None);
253 cvb.append(b"test fixture data".to_vec());
254 assert!(!cvb.is_empty());
255 for expect in b"test fixture data" {
256 let mut byte = [0];
257 assert_eq!(cvb.read(&mut byte).unwrap(), 1);
258 assert_eq!(byte[0], *expect);
259 }
260
261 assert_eq!(cvb.read(&mut [0]).unwrap(), 0);
262 }
263
264 #[test]
265 fn every_possible_chunk_interleaving() {
266 let input = (0..=0xffu8)
267 .cycle()
268 .take(4096)
269 .collect::<Vec<u8>>();
270
271 for input_chunk_len in 1..64usize {
272 for output_chunk_len in 1..65usize {
273 std::println!("check input={input_chunk_len} output={output_chunk_len}");
274 let mut cvb = ChunkVecBuffer::new(None);
275 for chunk in input.chunks(input_chunk_len) {
276 cvb.append(chunk.to_vec());
277 }
278
279 assert_eq!(cvb.len(), input.len());
280 let mut buf = vec![0u8; output_chunk_len];
281
282 for expect in input.chunks(output_chunk_len) {
283 assert_eq!(expect.len(), cvb.read(&mut buf).unwrap());
284 assert_eq!(expect, &buf[..expect.len()]);
285 }
286
287 assert_eq!(cvb.read(&mut [0]).unwrap(), 0);
288 }
289 }
290 }
291
292 #[cfg(read_buf)]
293 #[test]
294 fn read_buf() {
295 use core::io::BorrowedBuf;
296 use core::mem::MaybeUninit;
297
298 {
299 let mut cvb = ChunkVecBuffer::new(None);
300 cvb.append(b"test ".to_vec());
301 cvb.append(b"fixture ".to_vec());
302 cvb.append(b"data".to_vec());
303
304 let mut buf = [MaybeUninit::<u8>::uninit(); 8];
305 let mut buf: BorrowedBuf<'_> = buf.as_mut_slice().into();
306 cvb.read_buf(buf.unfilled()).unwrap();
307 assert_eq!(buf.filled(), b"test fix");
308 buf.clear();
309 cvb.read_buf(buf.unfilled()).unwrap();
310 assert_eq!(buf.filled(), b"ture dat");
311 buf.clear();
312 cvb.read_buf(buf.unfilled()).unwrap();
313 assert_eq!(buf.filled(), b"a");
314 }
315
316 {
317 let mut cvb = ChunkVecBuffer::new(None);
318 cvb.append(b"short message".to_vec());
319
320 let mut buf = [MaybeUninit::<u8>::uninit(); 1024];
321 let mut buf: BorrowedBuf<'_> = buf.as_mut_slice().into();
322 cvb.read_buf(buf.unfilled()).unwrap();
323 assert_eq!(buf.filled(), b"short message");
324 }
325 }
326}
327
328#[cfg(bench)]
329mod benchmarks {
330 use alloc::vec;
331
332 use super::ChunkVecBuffer;
333
334 #[bench]
335 fn read_one_byte_from_large_message(b: &mut test::Bencher) {
336 b.iter(|| {
337 let mut cvb = ChunkVecBuffer::new(None);
338 cvb.append(vec![0u8; 16_384]);
339 assert_eq!(1, cvb.read(&mut [0u8]).unwrap());
340 });
341 }
342
343 #[bench]
344 fn read_all_individual_from_large_message(b: &mut test::Bencher) {
345 b.iter(|| {
346 let mut cvb = ChunkVecBuffer::new(None);
347 cvb.append(vec![0u8; 16_384]);
348 loop {
349 if let Ok(0) = cvb.read(&mut [0u8]) {
350 break;
351 }
352 }
353 });
354 }
355
356 #[bench]
357 fn read_half_bytes_from_large_message(b: &mut test::Bencher) {
358 b.iter(|| {
359 let mut cvb = ChunkVecBuffer::new(None);
360 cvb.append(vec![0u8; 16_384]);
361 assert_eq!(8192, cvb.read(&mut [0u8; 8192]).unwrap());
362 assert_eq!(8192, cvb.read(&mut [0u8; 8192]).unwrap());
363 });
364 }
365
366 #[bench]
367 fn read_entire_large_message(b: &mut test::Bencher) {
368 b.iter(|| {
369 let mut cvb = ChunkVecBuffer::new(None);
370 cvb.append(vec![0u8; 16_384]);
371 assert_eq!(16_384, cvb.read(&mut [0u8; 16_384]).unwrap());
372 });
373 }
374}