1use bytes::{Bytes, BytesMut};
7
8use cuprate_helper::cast::usize_to_u64;
9
10use crate::{
11 header::{Flags, HEADER_SIZE},
12 Bucket, BucketBuilder, BucketError, BucketHead, LevinBody, LevinCommand, Protocol,
13};
14
15pub enum LevinMessage<T: LevinBody> {
17 Body(T),
21 Bucket(Bucket<T::Command>),
27 Dummy(usize),
32}
33
34impl<T: LevinBody> From<T> for LevinMessage<T> {
35 fn from(value: T) -> Self {
36 Self::Body(value)
37 }
38}
39
40impl<T: LevinBody> From<Bucket<T::Command>> for LevinMessage<T> {
41 fn from(value: Bucket<T::Command>) -> Self {
42 Self::Bucket(value)
43 }
44}
45
46pub struct Dummy(pub usize);
58
59impl<T: LevinBody> From<Dummy> for LevinMessage<T> {
60 fn from(value: Dummy) -> Self {
61 Self::Dummy(value.0)
62 }
63}
64
65pub fn make_fragmented_messages<T: LevinBody>(
75 protocol: &Protocol,
76 fragment_size: usize,
77 message: T,
78) -> Result<Vec<Bucket<T::Command>>, BucketError> {
79 assert!(
80 fragment_size >= 2 * HEADER_SIZE,
81 "Fragment size: {fragment_size}, is too small, must be at least {}",
82 2 * HEADER_SIZE
83 );
84
85 let mut builder = BucketBuilder::new(protocol);
86 message.encode(&mut builder)?;
87 let mut bucket = builder.finish();
88
89 if !bucket
91 .header
92 .flags
93 .intersects(Flags::REQUEST | Flags::RESPONSE)
94 {
95 return Err(BucketError::InvalidFragmentedMessage(
97 "Can't make a fragmented message out of a message which is already fragmented",
98 ));
99 }
100
101 if bucket.body.len() + HEADER_SIZE <= fragment_size {
103 if bucket.body.len() + HEADER_SIZE < fragment_size {
105 let mut new_body = BytesMut::from(bucket.body.as_ref());
106 new_body.resize(fragment_size - HEADER_SIZE, 0);
108
109 bucket.body = new_body.freeze();
110 bucket.header.size = usize_to_u64(fragment_size - HEADER_SIZE);
111 }
112
113 return Ok(vec![bucket]);
114 }
115
116 let fragment_head = BucketHead {
119 signature: protocol.signature,
120 size: usize_to_u64(fragment_size - HEADER_SIZE),
121 have_to_return_data: false,
122 command: T::Command::from(0),
124 return_code: 0,
125 flags: Flags::empty(),
126 protocol_version: protocol.version,
127 };
128
129 let data_space = fragment_size - HEADER_SIZE;
131
132 let amount_of_fragments = (bucket.body.len() + HEADER_SIZE).div_ceil(data_space);
133
134 let mut first_bucket_body = BytesMut::with_capacity(fragment_size);
135 bucket.header.write_bytes_into(&mut first_bucket_body);
138 first_bucket_body.extend_from_slice(
139 bucket
140 .body
141 .split_to(fragment_size - (HEADER_SIZE * 2))
142 .as_ref(),
143 );
144
145 let mut buckets = Vec::with_capacity(amount_of_fragments);
146 buckets.push(Bucket {
147 header: fragment_head.clone(),
148 body: first_bucket_body.freeze(),
149 });
150
151 for mut bytes in (1..amount_of_fragments).map(|_| {
152 bucket
153 .body
154 .split_to((fragment_size - HEADER_SIZE).min(bucket.body.len()))
155 }) {
156 if bytes.len() + HEADER_SIZE < fragment_size {
158 let mut new_bytes = BytesMut::from(bytes.as_ref());
159 new_bytes.resize(fragment_size - HEADER_SIZE, 0);
161 bytes = new_bytes.freeze();
162 }
163
164 buckets.push(Bucket {
165 header: fragment_head.clone(),
166 body: bytes,
167 });
168 }
169
170 buckets
171 .first_mut()
172 .unwrap()
173 .header
174 .flags
175 .toggle(Flags::START_FRAGMENT);
176 buckets
177 .last_mut()
178 .unwrap()
179 .header
180 .flags
181 .toggle(Flags::END_FRAGMENT);
182
183 Ok(buckets)
184}
185
186pub(crate) fn make_dummy_message<T: LevinCommand>(protocol: &Protocol, size: usize) -> Bucket<T> {
188 let header = BucketHead {
190 signature: protocol.signature,
191 size: usize_to_u64(size),
192 have_to_return_data: false,
193 command: T::from(0),
195 return_code: 0,
196 flags: Flags::DUMMY,
197 protocol_version: protocol.version,
198 };
199
200 let body = Bytes::from(vec![0; size - HEADER_SIZE]);
201
202 Bucket { header, body }
203}