cuprate_levin/
message.rs

1//! Levin Messages
2//!
3//! This module contains the [`LevinMessage`], which allows sending bucket body's, full buckets or dummy messages.
4//! The codec will not return [`LevinMessage`] instead it will only return bucket body's. [`LevinMessage`] allows
5//! for more control over what is actually sent over the wire at certain times.
6use bytes::{Bytes, BytesMut};
7
8use cuprate_helper::cast::usize_to_u64;
9
10use crate::{
11    header::{Flags, HEADER_SIZE},
12    Bucket, BucketBuilder, BucketError, BucketHead, LevinBody, LevinCommand, Protocol,
13};
14
15/// A levin message that can be sent to a peer.
16pub enum LevinMessage<T: LevinBody> {
17    /// A message body.
18    ///
19    /// A levin header will be added to this message before it is sent to the peer.
20    Body(T),
21    /// A full levin bucket.
22    ///
23    /// This bucket will be sent to the peer directly with no extra information.
24    ///
25    /// This should only be used to send fragmented messages: [`make_fragmented_messages`]
26    Bucket(Bucket<T::Command>),
27    /// A dummy message.
28    ///
29    /// A dummy message which the peer will ignore. The dummy message will be the exact size
30    /// (in bytes) of the given `usize` on the wire.
31    Dummy(usize),
32}
33
34impl<T: LevinBody> From<T> for LevinMessage<T> {
35    fn from(value: T) -> Self {
36        Self::Body(value)
37    }
38}
39
40impl<T: LevinBody> From<Bucket<T::Command>> for LevinMessage<T> {
41    fn from(value: Bucket<T::Command>) -> Self {
42        Self::Bucket(value)
43    }
44}
45
46/// This represents a dummy message to send to a peer.
47///
48/// The message, including the header, will be the exact size of the given `usize`.
49/// This exists because it seems weird to do this:
50/// ```rust,ignore
51/// peer.send(1_000);
52/// ```
53/// This is a lot clearer:
54/// ```rust,ignore
55/// peer.send(Dummy(1_000));
56/// ```
57pub struct Dummy(pub usize);
58
59impl<T: LevinBody> From<Dummy> for LevinMessage<T> {
60    fn from(value: Dummy) -> Self {
61        Self::Dummy(value.0)
62    }
63}
64
65/// Fragments the provided message into buckets which, when serialised, will all be the size of `fragment_size`.
66///
67/// This function will produce many buckets that have to be sent in order. When the peer receives these buckets
68/// they will combine them to produce the original message.
69///
70/// The last bucket may be padded with zeros to make it the correct size, the format used to encode the body must
71/// allow for extra data at the end of the message this to work.
72///
73/// `fragment_size` must be more than 2 * [`HEADER_SIZE`] otherwise this will panic.
74pub fn make_fragmented_messages<T: LevinBody>(
75    protocol: &Protocol,
76    fragment_size: usize,
77    message: T,
78) -> Result<Vec<Bucket<T::Command>>, BucketError> {
79    assert!(
80        fragment_size >= 2 * HEADER_SIZE,
81        "Fragment size: {fragment_size}, is too small, must be at least {}",
82        2 * HEADER_SIZE
83    );
84
85    let mut builder = BucketBuilder::new(protocol);
86    message.encode(&mut builder)?;
87    let mut bucket = builder.finish();
88
89    // Make sure we are not trying to fragment a fragment.
90    if !bucket
91        .header
92        .flags
93        .intersects(Flags::REQUEST | Flags::RESPONSE)
94    {
95        // If a bucket does not have the request or response bits set it is a fragment.
96        return Err(BucketError::InvalidFragmentedMessage(
97            "Can't make a fragmented message out of a message which is already fragmented",
98        ));
99    }
100
101    // Check if the bucket can fit in one fragment.
102    if bucket.body.len() + HEADER_SIZE <= fragment_size {
103        // If it can pad the bucket upto the fragment size and just return this bucket.
104        if bucket.body.len() + HEADER_SIZE < fragment_size {
105            let mut new_body = BytesMut::from(bucket.body.as_ref());
106            // Epee's binary format will ignore extra data at the end so just pad with 0.
107            new_body.resize(fragment_size - HEADER_SIZE, 0);
108
109            bucket.body = new_body.freeze();
110            bucket.header.size = usize_to_u64(fragment_size - HEADER_SIZE);
111        }
112
113        return Ok(vec![bucket]);
114    }
115
116    // A header put on all fragments.
117    // The first fragment will set the START flag, the last will set the END flag.
118    let fragment_head = BucketHead {
119        signature: protocol.signature,
120        size: usize_to_u64(fragment_size - HEADER_SIZE),
121        have_to_return_data: false,
122        // Just use a default command.
123        command: T::Command::from(0),
124        return_code: 0,
125        flags: Flags::empty(),
126        protocol_version: protocol.version,
127    };
128
129    // data_space - the amount of actual data we can fit in each fragment.
130    let data_space = fragment_size - HEADER_SIZE;
131
132    let amount_of_fragments = (bucket.body.len() + HEADER_SIZE).div_ceil(data_space);
133
134    let mut first_bucket_body = BytesMut::with_capacity(fragment_size);
135    // Fragmented messages store the whole fragmented bucket in the combined payloads not just the body
136    // so the first bucket contains 2 headers, a fragment header and the actual bucket header we are sending.
137    bucket.header.write_bytes_into(&mut first_bucket_body);
138    first_bucket_body.extend_from_slice(
139        bucket
140            .body
141            .split_to(fragment_size - (HEADER_SIZE * 2))
142            .as_ref(),
143    );
144
145    let mut buckets = Vec::with_capacity(amount_of_fragments);
146    buckets.push(Bucket {
147        header: fragment_head.clone(),
148        body: first_bucket_body.freeze(),
149    });
150
151    for mut bytes in (1..amount_of_fragments).map(|_| {
152        bucket
153            .body
154            .split_to((fragment_size - HEADER_SIZE).min(bucket.body.len()))
155    }) {
156        // make sure this fragment has the correct size - the last one might not, so pad it.
157        if bytes.len() + HEADER_SIZE < fragment_size {
158            let mut new_bytes = BytesMut::from(bytes.as_ref());
159            // Epee's binary format will ignore extra data at the end so just pad with 0.
160            new_bytes.resize(fragment_size - HEADER_SIZE, 0);
161            bytes = new_bytes.freeze();
162        }
163
164        buckets.push(Bucket {
165            header: fragment_head.clone(),
166            body: bytes,
167        });
168    }
169
170    buckets
171        .first_mut()
172        .unwrap()
173        .header
174        .flags
175        .toggle(Flags::START_FRAGMENT);
176    buckets
177        .last_mut()
178        .unwrap()
179        .header
180        .flags
181        .toggle(Flags::END_FRAGMENT);
182
183    Ok(buckets)
184}
185
186/// Makes a dummy message, which will be the size of `size` when sent over the wire.
187pub(crate) fn make_dummy_message<T: LevinCommand>(protocol: &Protocol, size: usize) -> Bucket<T> {
188    // A header to put on the dummy message.
189    let header = BucketHead {
190        signature: protocol.signature,
191        size: usize_to_u64(size),
192        have_to_return_data: false,
193        // Just use a default command.
194        command: T::from(0),
195        return_code: 0,
196        flags: Flags::DUMMY,
197        protocol_version: protocol.version,
198    };
199
200    let body = Bytes::from(vec![0; size - HEADER_SIZE]);
201
202    Bucket { header, body }
203}