cuprate_levin/
message.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
//! Levin Messages
//!
//! This module contains the [`LevinMessage`], which allows sending bucket body's, full buckets or dummy messages.
//! The codec will not return [`LevinMessage`] instead it will only return bucket body's. [`LevinMessage`] allows
//! for more control over what is actually sent over the wire at certain times.
use bytes::{Bytes, BytesMut};

use cuprate_helper::cast::usize_to_u64;

use crate::{
    header::{Flags, HEADER_SIZE},
    Bucket, BucketBuilder, BucketError, BucketHead, LevinBody, LevinCommand, Protocol,
};

/// A levin message that can be sent to a peer.
pub enum LevinMessage<T: LevinBody> {
    /// A message body.
    ///
    /// A levin header will be added to this message before it is sent to the peer.
    Body(T),
    /// A full levin bucket.
    ///
    /// This bucket will be sent to the peer directly with no extra information.
    ///
    /// This should only be used to send fragmented messages: [`make_fragmented_messages`]
    Bucket(Bucket<T::Command>),
    /// A dummy message.
    ///
    /// A dummy message which the peer will ignore. The dummy message will be the exact size
    /// (in bytes) of the given `usize` on the wire.
    Dummy(usize),
}

impl<T: LevinBody> From<T> for LevinMessage<T> {
    fn from(value: T) -> Self {
        Self::Body(value)
    }
}

impl<T: LevinBody> From<Bucket<T::Command>> for LevinMessage<T> {
    fn from(value: Bucket<T::Command>) -> Self {
        Self::Bucket(value)
    }
}

/// This represents a dummy message to send to a peer.
///
/// The message, including the header, will be the exact size of the given `usize`.
/// This exists because it seems weird to do this:
/// ```rust,ignore
/// peer.send(1_000);
/// ```
/// This is a lot clearer:
/// ```rust,ignore
/// peer.send(Dummy(1_000));
/// ```
pub struct Dummy(pub usize);

impl<T: LevinBody> From<Dummy> for LevinMessage<T> {
    fn from(value: Dummy) -> Self {
        Self::Dummy(value.0)
    }
}

/// Fragments the provided message into buckets which, when serialised, will all be the size of `fragment_size`.
///
/// This function will produce many buckets that have to be sent in order. When the peer receives these buckets
/// they will combine them to produce the original message.
///
/// The last bucket may be padded with zeros to make it the correct size, the format used to encode the body must
/// allow for extra data at the end of the message this to work.
///
/// `fragment_size` must be more than 2 * [`HEADER_SIZE`] otherwise this will panic.
pub fn make_fragmented_messages<T: LevinBody>(
    protocol: &Protocol,
    fragment_size: usize,
    message: T,
) -> Result<Vec<Bucket<T::Command>>, BucketError> {
    assert!(
        fragment_size * 2 >= HEADER_SIZE,
        "Fragment size: {fragment_size}, is too small, must be at least {}",
        2 * HEADER_SIZE
    );

    let mut builder = BucketBuilder::new(protocol);
    message.encode(&mut builder)?;
    let mut bucket = builder.finish();

    // Make sure we are not trying to fragment a fragment.
    if !bucket
        .header
        .flags
        .intersects(Flags::REQUEST | Flags::RESPONSE)
    {
        // If a bucket does not have the request or response bits set it is a fragment.
        return Err(BucketError::InvalidFragmentedMessage(
            "Can't make a fragmented message out of a message which is already fragmented",
        ));
    }

    // Check if the bucket can fit in one fragment.
    if bucket.body.len() + HEADER_SIZE <= fragment_size {
        // If it can pad the bucket upto the fragment size and just return this bucket.
        if bucket.body.len() + HEADER_SIZE < fragment_size {
            let mut new_body = BytesMut::from(bucket.body.as_ref());
            // Epee's binary format will ignore extra data at the end so just pad with 0.
            new_body.resize(fragment_size - HEADER_SIZE, 0);

            bucket.body = new_body.freeze();
            bucket.header.size = usize_to_u64(fragment_size - HEADER_SIZE);
        }

        return Ok(vec![bucket]);
    }

    // A header put on all fragments.
    // The first fragment will set the START flag, the last will set the END flag.
    let fragment_head = BucketHead {
        signature: protocol.signature,
        size: usize_to_u64(fragment_size - HEADER_SIZE),
        have_to_return_data: false,
        // Just use a default command.
        command: T::Command::from(0),
        return_code: 0,
        flags: Flags::empty(),
        protocol_version: protocol.version,
    };

    // data_space - the amount of actual data we can fit in each fragment.
    let data_space = fragment_size - HEADER_SIZE;

    let amount_of_fragments = (bucket.body.len() + HEADER_SIZE).div_ceil(data_space);

    let mut first_bucket_body = BytesMut::with_capacity(fragment_size);
    // Fragmented messages store the whole fragmented bucket in the combined payloads not just the body
    // so the first bucket contains 2 headers, a fragment header and the actual bucket header we are sending.
    bucket.header.write_bytes_into(&mut first_bucket_body);
    first_bucket_body.extend_from_slice(
        bucket
            .body
            .split_to(fragment_size - (HEADER_SIZE * 2))
            .as_ref(),
    );

    let mut buckets = Vec::with_capacity(amount_of_fragments);
    buckets.push(Bucket {
        header: fragment_head.clone(),
        body: first_bucket_body.freeze(),
    });

    for mut bytes in (1..amount_of_fragments).map(|_| {
        bucket
            .body
            .split_to((fragment_size - HEADER_SIZE).min(bucket.body.len()))
    }) {
        // make sure this fragment has the correct size - the last one might not, so pad it.
        if bytes.len() + HEADER_SIZE < fragment_size {
            let mut new_bytes = BytesMut::from(bytes.as_ref());
            // Epee's binary format will ignore extra data at the end so just pad with 0.
            new_bytes.resize(fragment_size - HEADER_SIZE, 0);
            bytes = new_bytes.freeze();
        }

        buckets.push(Bucket {
            header: fragment_head.clone(),
            body: bytes,
        });
    }

    buckets
        .first_mut()
        .unwrap()
        .header
        .flags
        .toggle(Flags::START_FRAGMENT);
    buckets
        .last_mut()
        .unwrap()
        .header
        .flags
        .toggle(Flags::END_FRAGMENT);

    Ok(buckets)
}

/// Makes a dummy message, which will be the size of `size` when sent over the wire.
pub(crate) fn make_dummy_message<T: LevinCommand>(protocol: &Protocol, size: usize) -> Bucket<T> {
    // A header to put on the dummy message.
    let header = BucketHead {
        signature: protocol.signature,
        size: usize_to_u64(size),
        have_to_return_data: false,
        // Just use a default command.
        command: T::from(0),
        return_code: 0,
        flags: Flags::DUMMY,
        protocol_version: protocol.version,
    };

    let body = Bytes::from(vec![0; size - HEADER_SIZE]);

    Bucket { header, body }
}