1use std::{cmp::Ordering, collections::BinaryHeap};
23use cuprate_async_buffer::BufferAppender;
45use super::{BlockBatch, BlockDownloadError};
67/// A batch of blocks in the ready queue, waiting for previous blocks to come in, so they can
8/// be passed into the buffer.
9///
10/// The [`Eq`] and [`Ord`] impl on this type will only take into account the `start_height`, this
11/// is because the block downloader will only download one chain at once so no 2 batches can have
12/// the same `start_height`.
13///
14/// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`].
15#[derive(Debug, Clone)]
16pub(crate) struct ReadyQueueBatch {
17/// The start height of the batch.
18pub start_height: usize,
19/// The batch of blocks.
20pub block_batch: BlockBatch,
21}
2223impl Eq for ReadyQueueBatch {}
2425impl PartialEq<Self> for ReadyQueueBatch {
26fn eq(&self, other: &Self) -> bool {
27self.start_height.eq(&other.start_height)
28 }
29}
3031impl PartialOrd<Self> for ReadyQueueBatch {
32fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
33Some(self.cmp(other))
34 }
35}
3637impl Ord for ReadyQueueBatch {
38fn cmp(&self, other: &Self) -> Ordering {
39// reverse the ordering so older blocks (lower height) come first in a [`BinaryHeap`]
40self.start_height.cmp(&other.start_height).reverse()
41 }
42}
4344/// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the
45/// oldest batch has been downloaded.
46pub(crate) struct BlockQueue {
47/// A queue of ready batches.
48ready_batches: BinaryHeap<ReadyQueueBatch>,
49/// The size, in bytes, of all the batches in [`Self::ready_batches`].
50ready_batches_size: usize,
5152/// The [`BufferAppender`] that gives blocks to Cuprate.
53buffer_appender: BufferAppender<BlockBatch>,
54}
5556impl BlockQueue {
57/// Creates a new [`BlockQueue`].
58pub(crate) const fn new(buffer_appender: BufferAppender<BlockBatch>) -> Self {
59Self {
60 ready_batches: BinaryHeap::new(),
61 ready_batches_size: 0,
62 buffer_appender,
63 }
64 }
6566/// Returns the oldest batch that has not been put in the [`async_buffer`] yet.
67pub(crate) fn oldest_ready_batch(&self) -> Option<usize> {
68self.ready_batches.peek().map(|batch| batch.start_height)
69 }
7071/// Returns the size of all the batches that have not been put into the [`async_buffer`] yet.
72pub(crate) const fn size(&self) -> usize {
73self.ready_batches_size
74 }
7576/// Adds an incoming batch to the queue and checks if we can push any batches into the [`async_buffer`].
77 ///
78 /// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if
79 /// there are no batches inflight then this should be [`None`].
80pub(crate) async fn add_incoming_batch(
81&mut self,
82 new_batch: ReadyQueueBatch,
83 oldest_in_flight_start_height: Option<usize>,
84 ) -> Result<(), BlockDownloadError> {
85self.ready_batches_size += new_batch.block_batch.size;
86self.ready_batches.push(new_batch);
8788// The height to stop pushing batches into the buffer.
89let height_to_stop_at = oldest_in_flight_start_height.unwrap_or(usize::MAX);
9091while self
92.ready_batches
93 .peek()
94 .is_some_and(|batch| batch.start_height <= height_to_stop_at)
95 {
96let batch = self
97.ready_batches
98 .pop()
99 .expect("We just checked we have a batch in the buffer");
100101let batch_size = batch.block_batch.size;
102103self.ready_batches_size -= batch_size;
104self.buffer_appender
105 .send(batch.block_batch, batch_size)
106 .await
107.map_err(|_| BlockDownloadError::BufferWasClosed)?;
108 }
109110Ok(())
111 }
112}
113114#[cfg(test)]
115mod tests {
116use std::collections::BTreeSet;
117118use futures::StreamExt;
119use proptest::{collection::vec, prelude::*};
120use tokio_test::block_on;
121122use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE;
123use cuprate_p2p_core::handles::HandleBuilder;
124125use super::*;
126127prop_compose! {
128fn ready_batch_strategy()(start_height in 0..MAX_BLOCK_HEIGHT_USIZE) -> ReadyQueueBatch {
129let (_, peer_handle) = HandleBuilder::new().build();
130131 ReadyQueueBatch {
132 start_height,
133 block_batch: BlockBatch {
134 blocks: vec![],
135 size: start_height,
136 peer_handle,
137 },
138 }
139 }
140 }
141142proptest! {
143#[test]
144 #[allow(clippy::mutable_key_type)]
145fn block_queue_returns_items_in_order(batches in vec(ready_batch_strategy(), 0..10_000)) {
146 block_on(async move {
147let (buffer_tx, mut buffer_rx) = cuprate_async_buffer::new_buffer(usize::MAX);
148149let mut queue = BlockQueue::new(buffer_tx);
150151let mut sorted_batches = BTreeSet::from_iter(batches.clone());
152let mut soreted_batch_2 = sorted_batches.clone();
153154for batch in batches {
155if sorted_batches.remove(&batch) {
156 queue.add_incoming_batch(batch, sorted_batches.last().map(|batch| batch.start_height)).await.unwrap();
157 }
158 }
159160assert_eq!(queue.size(), 0);
161assert!(queue.oldest_ready_batch().is_none());
162 drop(queue);
163164while let Some(batch) = buffer_rx.next().await {
165let last_batch = soreted_batch_2.pop_last().unwrap();
166167assert_eq!(batch.size, last_batch.block_batch.size);
168 }
169 });
170 }
171 }
172}