1//! Levin Messages
2//!
3//! This module contains the [`LevinMessage`], which allows sending bucket body's, full buckets or dummy messages.
4//! The codec will not return [`LevinMessage`] instead it will only return bucket body's. [`LevinMessage`] allows
5//! for more control over what is actually sent over the wire at certain times.
6use bytes::{Bytes, BytesMut};
78use cuprate_helper::cast::usize_to_u64;
910use crate::{
11 header::{Flags, HEADER_SIZE},
12 Bucket, BucketBuilder, BucketError, BucketHead, LevinBody, LevinCommand, Protocol,
13};
1415/// A levin message that can be sent to a peer.
16pub enum LevinMessage<T: LevinBody> {
17/// A message body.
18 ///
19 /// A levin header will be added to this message before it is sent to the peer.
20Body(T),
21/// A full levin bucket.
22 ///
23 /// This bucket will be sent to the peer directly with no extra information.
24 ///
25 /// This should only be used to send fragmented messages: [`make_fragmented_messages`]
26Bucket(Bucket<T::Command>),
27/// A dummy message.
28 ///
29 /// A dummy message which the peer will ignore. The dummy message will be the exact size
30 /// (in bytes) of the given `usize` on the wire.
31Dummy(usize),
32}
3334impl<T: LevinBody> From<T> for LevinMessage<T> {
35fn from(value: T) -> Self {
36Self::Body(value)
37 }
38}
3940impl<T: LevinBody> From<Bucket<T::Command>> for LevinMessage<T> {
41fn from(value: Bucket<T::Command>) -> Self {
42Self::Bucket(value)
43 }
44}
4546/// This represents a dummy message to send to a peer.
47///
48/// The message, including the header, will be the exact size of the given `usize`.
49/// This exists because it seems weird to do this:
50/// ```rust,ignore
51/// peer.send(1_000);
52/// ```
53/// This is a lot clearer:
54/// ```rust,ignore
55/// peer.send(Dummy(1_000));
56/// ```
57pub struct Dummy(pub usize);
5859impl<T: LevinBody> From<Dummy> for LevinMessage<T> {
60fn from(value: Dummy) -> Self {
61Self::Dummy(value.0)
62 }
63}
6465/// Fragments the provided message into buckets which, when serialised, will all be the size of `fragment_size`.
66///
67/// This function will produce many buckets that have to be sent in order. When the peer receives these buckets
68/// they will combine them to produce the original message.
69///
70/// The last bucket may be padded with zeros to make it the correct size, the format used to encode the body must
71/// allow for extra data at the end of the message this to work.
72///
73/// `fragment_size` must be more than 2 * [`HEADER_SIZE`] otherwise this will panic.
74pub fn make_fragmented_messages<T: LevinBody>(
75 protocol: &Protocol,
76 fragment_size: usize,
77 message: T,
78) -> Result<Vec<Bucket<T::Command>>, BucketError> {
79assert!(
80 fragment_size >= 2 * HEADER_SIZE,
81"Fragment size: {fragment_size}, is too small, must be at least {}",
822 * HEADER_SIZE
83 );
8485let mut builder = BucketBuilder::new(protocol);
86 message.encode(&mut builder)?;
87let mut bucket = builder.finish();
8889// Make sure we are not trying to fragment a fragment.
90if !bucket
91 .header
92 .flags
93 .intersects(Flags::REQUEST | Flags::RESPONSE)
94 {
95// If a bucket does not have the request or response bits set it is a fragment.
96return Err(BucketError::InvalidFragmentedMessage(
97"Can't make a fragmented message out of a message which is already fragmented",
98 ));
99 }
100101// Check if the bucket can fit in one fragment.
102if bucket.body.len() + HEADER_SIZE <= fragment_size {
103// If it can pad the bucket upto the fragment size and just return this bucket.
104if bucket.body.len() + HEADER_SIZE < fragment_size {
105let mut new_body = BytesMut::from(bucket.body.as_ref());
106// Epee's binary format will ignore extra data at the end so just pad with 0.
107new_body.resize(fragment_size - HEADER_SIZE, 0);
108109 bucket.body = new_body.freeze();
110 bucket.header.size = usize_to_u64(fragment_size - HEADER_SIZE);
111 }
112113return Ok(vec![bucket]);
114 }
115116// A header put on all fragments.
117 // The first fragment will set the START flag, the last will set the END flag.
118let fragment_head = BucketHead {
119 signature: protocol.signature,
120 size: usize_to_u64(fragment_size - HEADER_SIZE),
121 have_to_return_data: false,
122// Just use a default command.
123command: T::Command::from(0),
124 return_code: 0,
125 flags: Flags::empty(),
126 protocol_version: protocol.version,
127 };
128129// data_space - the amount of actual data we can fit in each fragment.
130let data_space = fragment_size - HEADER_SIZE;
131132let amount_of_fragments = (bucket.body.len() + HEADER_SIZE).div_ceil(data_space);
133134let mut first_bucket_body = BytesMut::with_capacity(fragment_size);
135// Fragmented messages store the whole fragmented bucket in the combined payloads not just the body
136 // so the first bucket contains 2 headers, a fragment header and the actual bucket header we are sending.
137bucket.header.write_bytes_into(&mut first_bucket_body);
138 first_bucket_body.extend_from_slice(
139 bucket
140 .body
141 .split_to(fragment_size - (HEADER_SIZE * 2))
142 .as_ref(),
143 );
144145let mut buckets = Vec::with_capacity(amount_of_fragments);
146 buckets.push(Bucket {
147 header: fragment_head.clone(),
148 body: first_bucket_body.freeze(),
149 });
150151for mut bytes in (1..amount_of_fragments).map(|_| {
152 bucket
153 .body
154 .split_to((fragment_size - HEADER_SIZE).min(bucket.body.len()))
155 }) {
156// make sure this fragment has the correct size - the last one might not, so pad it.
157if bytes.len() + HEADER_SIZE < fragment_size {
158let mut new_bytes = BytesMut::from(bytes.as_ref());
159// Epee's binary format will ignore extra data at the end so just pad with 0.
160new_bytes.resize(fragment_size - HEADER_SIZE, 0);
161 bytes = new_bytes.freeze();
162 }
163164 buckets.push(Bucket {
165 header: fragment_head.clone(),
166 body: bytes,
167 });
168 }
169170 buckets
171 .first_mut()
172 .unwrap()
173 .header
174 .flags
175 .toggle(Flags::START_FRAGMENT);
176 buckets
177 .last_mut()
178 .unwrap()
179 .header
180 .flags
181 .toggle(Flags::END_FRAGMENT);
182183Ok(buckets)
184}
185186/// Makes a dummy message, which will be the size of `size` when sent over the wire.
187pub(crate) fn make_dummy_message<T: LevinCommand>(protocol: &Protocol, size: usize) -> Bucket<T> {
188// A header to put on the dummy message.
189let header = BucketHead {
190 signature: protocol.signature,
191 size: usize_to_u64(size),
192 have_to_return_data: false,
193// Just use a default command.
194command: T::from(0),
195 return_code: 0,
196 flags: Flags::DUMMY,
197 protocol_version: protocol.version,
198 };
199200let body = Bytes::from(vec![0; size - HEADER_SIZE]);
201202 Bucket { header, body }
203}