1use alloc::collections::VecDeque;
2use alloc::vec::Vec;
3use core::{cmp, mem};
4#[cfg(feature = "std")]
5use std::io;
6#[cfg(feature = "std")]
7use std::io::Read;
8
9#[cfg(feature = "std")]
10use crate::msgs::message::OutboundChunks;
11
12pub(crate) struct ChunkVecBuffer {
17 prefix_used: usize,
22
23 chunks: VecDeque<Vec<u8>>,
24
25 limit: Option<usize>,
27}
28
29impl ChunkVecBuffer {
30 pub(crate) fn new(limit: Option<usize>) -> Self {
31 Self {
32 prefix_used: 0,
33 chunks: VecDeque::new(),
34 limit,
35 }
36 }
37
38 pub(crate) fn set_limit(&mut self, new_limit: Option<usize>) {
46 self.limit = new_limit;
47 }
48
49 pub(crate) fn is_empty(&self) -> bool {
51 self.chunks.is_empty()
52 }
53
54 pub(crate) fn len(&self) -> usize {
56 self.chunks
57 .iter()
58 .fold(0usize, |acc, chunk| acc + chunk.len())
59 - self.prefix_used
60 }
61
62 pub(crate) fn apply_limit(&self, len: usize) -> usize {
66 if let Some(limit) = self.limit {
67 let space = limit.saturating_sub(self.len());
68 cmp::min(len, space)
69 } else {
70 len
71 }
72 }
73
74 pub(crate) fn append(&mut self, bytes: Vec<u8>) -> usize {
76 let len = bytes.len();
77
78 if !bytes.is_empty() {
79 self.chunks.push_back(bytes);
80 }
81
82 len
83 }
84
85 pub(crate) fn pop(&mut self) -> Option<Vec<u8>> {
89 let mut first = self.chunks.pop_front();
90
91 if let Some(first) = &mut first {
92 let prefix = mem::take(&mut self.prefix_used);
94 first.drain(0..prefix);
95 }
96
97 first
98 }
99
100 #[cfg(read_buf)]
101 pub(crate) fn read_buf(&mut self, mut cursor: core::io::BorrowedCursor<'_>) -> io::Result<()> {
103 while !self.is_empty() && cursor.capacity() > 0 {
104 let chunk = &self.chunks[0][self.prefix_used..];
105 let used = cmp::min(chunk.len(), cursor.capacity());
106 cursor.append(&chunk[..used]);
107 self.consume(used);
108 }
109
110 Ok(())
111 }
112}
113
114#[cfg(feature = "std")]
115impl ChunkVecBuffer {
116 pub(crate) fn is_full(&self) -> bool {
117 self.limit
118 .map(|limit| self.len() > limit)
119 .unwrap_or_default()
120 }
121
122 pub(crate) fn append_limited_copy(&mut self, payload: OutboundChunks<'_>) -> usize {
125 let take = self.apply_limit(payload.len());
126 self.append(payload.split_at(take).0.to_vec());
127 take
128 }
129
130 pub(crate) fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
133 let mut offs = 0;
134
135 while offs < buf.len() && !self.is_empty() {
136 let used = (&self.chunks[0][self.prefix_used..]).read(&mut buf[offs..])?;
137
138 self.consume(used);
139 offs += used;
140 }
141
142 Ok(offs)
143 }
144
145 fn consume(&mut self, used: usize) {
146 self.prefix_used += used;
148
149 while let Some(buf) = self.chunks.front() {
152 if self.prefix_used < buf.len() {
153 break;
154 } else {
155 self.prefix_used -= buf.len();
156 self.chunks.pop_front();
157 }
158 }
159 }
160
161 pub(crate) fn write_to(&mut self, wr: &mut dyn io::Write) -> io::Result<usize> {
163 if self.is_empty() {
164 return Ok(0);
165 }
166
167 let mut prefix = self.prefix_used;
168 let mut bufs = [io::IoSlice::new(&[]); 64];
169 for (iov, chunk) in bufs.iter_mut().zip(self.chunks.iter()) {
170 *iov = io::IoSlice::new(&chunk[prefix..]);
171 prefix = 0;
172 }
173 let len = cmp::min(bufs.len(), self.chunks.len());
174 let used = wr.write_vectored(&bufs[..len])?;
175 self.consume(used);
176 Ok(used)
177 }
178}
179
180#[cfg(all(test, feature = "std"))]
181mod tests {
182 use alloc::vec;
183 use alloc::vec::Vec;
184
185 use super::ChunkVecBuffer;
186
187 #[test]
188 fn short_append_copy_with_limit() {
189 let mut cvb = ChunkVecBuffer::new(Some(12));
190 assert_eq!(cvb.append_limited_copy(b"hello"[..].into()), 5);
191 assert_eq!(cvb.append_limited_copy(b"world"[..].into()), 5);
192 assert_eq!(cvb.append_limited_copy(b"hello"[..].into()), 2);
193 assert_eq!(cvb.append_limited_copy(b"world"[..].into()), 0);
194
195 let mut buf = [0u8; 12];
196 assert_eq!(cvb.read(&mut buf).unwrap(), 12);
197 assert_eq!(buf.to_vec(), b"helloworldhe".to_vec());
198 }
199
200 #[test]
201 fn read_byte_by_byte() {
202 let mut cvb = ChunkVecBuffer::new(None);
203 cvb.append(b"test fixture data".to_vec());
204 assert!(!cvb.is_empty());
205 for expect in b"test fixture data" {
206 let mut byte = [0];
207 assert_eq!(cvb.read(&mut byte).unwrap(), 1);
208 assert_eq!(byte[0], *expect);
209 }
210
211 assert_eq!(cvb.read(&mut [0]).unwrap(), 0);
212 }
213
214 #[test]
215 fn every_possible_chunk_interleaving() {
216 let input = (0..=0xffu8)
217 .cycle()
218 .take(4096)
219 .collect::<Vec<u8>>();
220
221 for input_chunk_len in 1..64usize {
222 for output_chunk_len in 1..65usize {
223 std::println!("check input={input_chunk_len} output={output_chunk_len}");
224 let mut cvb = ChunkVecBuffer::new(None);
225 for chunk in input.chunks(input_chunk_len) {
226 cvb.append(chunk.to_vec());
227 }
228
229 assert_eq!(cvb.len(), input.len());
230 let mut buf = vec![0u8; output_chunk_len];
231
232 for expect in input.chunks(output_chunk_len) {
233 assert_eq!(expect.len(), cvb.read(&mut buf).unwrap());
234 assert_eq!(expect, &buf[..expect.len()]);
235 }
236
237 assert_eq!(cvb.read(&mut [0]).unwrap(), 0);
238 }
239 }
240 }
241
242 #[cfg(read_buf)]
243 #[test]
244 fn read_buf() {
245 use core::io::BorrowedBuf;
246 use core::mem::MaybeUninit;
247
248 {
249 let mut cvb = ChunkVecBuffer::new(None);
250 cvb.append(b"test ".to_vec());
251 cvb.append(b"fixture ".to_vec());
252 cvb.append(b"data".to_vec());
253
254 let mut buf = [MaybeUninit::<u8>::uninit(); 8];
255 let mut buf: BorrowedBuf<'_> = buf.as_mut_slice().into();
256 cvb.read_buf(buf.unfilled()).unwrap();
257 assert_eq!(buf.filled(), b"test fix");
258 buf.clear();
259 cvb.read_buf(buf.unfilled()).unwrap();
260 assert_eq!(buf.filled(), b"ture dat");
261 buf.clear();
262 cvb.read_buf(buf.unfilled()).unwrap();
263 assert_eq!(buf.filled(), b"a");
264 }
265
266 {
267 let mut cvb = ChunkVecBuffer::new(None);
268 cvb.append(b"short message".to_vec());
269
270 let mut buf = [MaybeUninit::<u8>::uninit(); 1024];
271 let mut buf: BorrowedBuf<'_> = buf.as_mut_slice().into();
272 cvb.read_buf(buf.unfilled()).unwrap();
273 assert_eq!(buf.filled(), b"short message");
274 }
275 }
276}
277
278#[cfg(bench)]
279mod benchmarks {
280 use alloc::vec;
281
282 use super::ChunkVecBuffer;
283
284 #[bench]
285 fn read_one_byte_from_large_message(b: &mut test::Bencher) {
286 b.iter(|| {
287 let mut cvb = ChunkVecBuffer::new(None);
288 cvb.append(vec![0u8; 16_384]);
289 assert_eq!(1, cvb.read(&mut [0u8]).unwrap());
290 });
291 }
292
293 #[bench]
294 fn read_all_individual_from_large_message(b: &mut test::Bencher) {
295 b.iter(|| {
296 let mut cvb = ChunkVecBuffer::new(None);
297 cvb.append(vec![0u8; 16_384]);
298 loop {
299 if let Ok(0) = cvb.read(&mut [0u8]) {
300 break;
301 }
302 }
303 });
304 }
305
306 #[bench]
307 fn read_half_bytes_from_large_message(b: &mut test::Bencher) {
308 b.iter(|| {
309 let mut cvb = ChunkVecBuffer::new(None);
310 cvb.append(vec![0u8; 16_384]);
311 assert_eq!(8192, cvb.read(&mut [0u8; 8192]).unwrap());
312 assert_eq!(8192, cvb.read(&mut [0u8; 8192]).unwrap());
313 });
314 }
315
316 #[bench]
317 fn read_entire_large_message(b: &mut test::Bencher) {
318 b.iter(|| {
319 let mut cvb = ChunkVecBuffer::new(None);
320 cvb.append(vec![0u8; 16_384]);
321 assert_eq!(16_384, cvb.read(&mut [0u8; 16_384]).unwrap());
322 });
323 }
324}