cuprate_p2p/block_downloader/
block_queue.rs

1use std::{cmp::Ordering, collections::BinaryHeap};
2
3use cuprate_async_buffer::BufferAppender;
4
5use super::{BlockBatch, BlockDownloadError};
6
7/// A batch of blocks in the ready queue, waiting for previous blocks to come in, so they can
8/// be passed into the buffer.
9///
10/// The [`Eq`] and [`Ord`] impl on this type will only take into account the `start_height`, this
11/// is because the block downloader will only download one chain at once so no 2 batches can have
12/// the same `start_height`.
13///
14/// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`].
15#[derive(Debug, Clone)]
16pub(crate) struct ReadyQueueBatch {
17    /// The start height of the batch.
18    pub start_height: usize,
19    /// The batch of blocks.
20    pub block_batch: BlockBatch,
21}
22
23impl Eq for ReadyQueueBatch {}
24
25impl PartialEq<Self> for ReadyQueueBatch {
26    fn eq(&self, other: &Self) -> bool {
27        self.start_height.eq(&other.start_height)
28    }
29}
30
31impl PartialOrd<Self> for ReadyQueueBatch {
32    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
33        Some(self.cmp(other))
34    }
35}
36
37impl Ord for ReadyQueueBatch {
38    fn cmp(&self, other: &Self) -> Ordering {
39        // reverse the ordering so older blocks (lower height) come first in a [`BinaryHeap`]
40        self.start_height.cmp(&other.start_height).reverse()
41    }
42}
43
44/// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the
45/// oldest batch has been downloaded.
46pub(crate) struct BlockQueue {
47    /// A queue of ready batches.
48    ready_batches: BinaryHeap<ReadyQueueBatch>,
49    /// The size, in bytes, of all the batches in [`Self::ready_batches`].
50    ready_batches_size: usize,
51
52    /// The [`BufferAppender`] that gives blocks to Cuprate.
53    buffer_appender: BufferAppender<BlockBatch>,
54}
55
56impl BlockQueue {
57    /// Creates a new [`BlockQueue`].
58    pub(crate) const fn new(buffer_appender: BufferAppender<BlockBatch>) -> Self {
59        Self {
60            ready_batches: BinaryHeap::new(),
61            ready_batches_size: 0,
62            buffer_appender,
63        }
64    }
65
66    /// Returns the oldest batch that has not been put in the [`async_buffer`] yet.
67    pub(crate) fn oldest_ready_batch(&self) -> Option<usize> {
68        self.ready_batches.peek().map(|batch| batch.start_height)
69    }
70
71    /// Returns the size of all the batches that have not been put into the [`async_buffer`] yet.
72    pub(crate) const fn size(&self) -> usize {
73        self.ready_batches_size
74    }
75
76    /// Adds an incoming batch to the queue and checks if we can push any batches into the [`async_buffer`].
77    ///
78    /// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if
79    /// there are no batches inflight then this should be [`None`].
80    pub(crate) async fn add_incoming_batch(
81        &mut self,
82        new_batch: ReadyQueueBatch,
83        oldest_in_flight_start_height: Option<usize>,
84    ) -> Result<(), BlockDownloadError> {
85        self.ready_batches_size += new_batch.block_batch.size;
86        self.ready_batches.push(new_batch);
87
88        // The height to stop pushing batches into the buffer.
89        let height_to_stop_at = oldest_in_flight_start_height.unwrap_or(usize::MAX);
90
91        while self
92            .ready_batches
93            .peek()
94            .is_some_and(|batch| batch.start_height <= height_to_stop_at)
95        {
96            let batch = self
97                .ready_batches
98                .pop()
99                .expect("We just checked we have a batch in the buffer");
100
101            let batch_size = batch.block_batch.size;
102
103            self.ready_batches_size -= batch_size;
104            self.buffer_appender
105                .send(batch.block_batch, batch_size)
106                .await
107                .map_err(|_| BlockDownloadError::BufferWasClosed)?;
108        }
109
110        Ok(())
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use std::collections::BTreeSet;
117
118    use futures::StreamExt;
119    use proptest::{collection::vec, prelude::*};
120    use tokio_test::block_on;
121
122    use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE;
123    use cuprate_p2p_core::handles::HandleBuilder;
124
125    use super::*;
126
127    prop_compose! {
128        fn ready_batch_strategy()(start_height in 0..MAX_BLOCK_HEIGHT_USIZE) -> ReadyQueueBatch {
129            let (_, peer_handle)  = HandleBuilder::new().build();
130
131            ReadyQueueBatch {
132                start_height,
133                block_batch: BlockBatch {
134                    blocks: vec![],
135                    size: start_height,
136                    peer_handle,
137                },
138            }
139        }
140    }
141
142    proptest! {
143        #[test]
144        #[allow(clippy::mutable_key_type)]
145        fn block_queue_returns_items_in_order(batches in vec(ready_batch_strategy(), 0..10_000)) {
146            block_on(async move {
147                let (buffer_tx, mut buffer_rx) = cuprate_async_buffer::new_buffer(usize::MAX);
148
149                let mut queue = BlockQueue::new(buffer_tx);
150
151                let mut sorted_batches = BTreeSet::from_iter(batches.clone());
152                let mut soreted_batch_2 = sorted_batches.clone();
153
154                for batch in batches {
155                    if sorted_batches.remove(&batch) {
156                        queue.add_incoming_batch(batch, sorted_batches.last().map(|batch| batch.start_height)).await.unwrap();
157                    }
158                }
159
160                assert_eq!(queue.size(), 0);
161                assert!(queue.oldest_ready_batch().is_none());
162                drop(queue);
163
164                while let Some(batch) = buffer_rx.next().await {
165                    let last_batch = soreted_batch_2.pop_last().unwrap();
166
167                    assert_eq!(batch.size, last_batch.block_batch.size);
168                }
169            });
170        }
171    }
172}