cuprate_p2p/block_downloader/
block_queue.rs1use std::{cmp::Ordering, collections::BinaryHeap};
2
3use cuprate_async_buffer::BufferAppender;
4
5use super::{BlockBatch, BlockDownloadError};
6
7#[derive(Debug, Clone)]
16pub(crate) struct ReadyQueueBatch {
17 pub start_height: usize,
19 pub block_batch: BlockBatch,
21}
22
23impl Eq for ReadyQueueBatch {}
24
25impl PartialEq<Self> for ReadyQueueBatch {
26 fn eq(&self, other: &Self) -> bool {
27 self.start_height.eq(&other.start_height)
28 }
29}
30
31impl PartialOrd<Self> for ReadyQueueBatch {
32 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
33 Some(self.cmp(other))
34 }
35}
36
37impl Ord for ReadyQueueBatch {
38 fn cmp(&self, other: &Self) -> Ordering {
39 self.start_height.cmp(&other.start_height).reverse()
41 }
42}
43
44pub(crate) struct BlockQueue {
47 ready_batches: BinaryHeap<ReadyQueueBatch>,
49 ready_batches_size: usize,
51
52 buffer_appender: BufferAppender<BlockBatch>,
54}
55
56impl BlockQueue {
57 pub(crate) const fn new(buffer_appender: BufferAppender<BlockBatch>) -> Self {
59 Self {
60 ready_batches: BinaryHeap::new(),
61 ready_batches_size: 0,
62 buffer_appender,
63 }
64 }
65
66 pub(crate) fn oldest_ready_batch(&self) -> Option<usize> {
68 self.ready_batches.peek().map(|batch| batch.start_height)
69 }
70
71 pub(crate) const fn size(&self) -> usize {
73 self.ready_batches_size
74 }
75
76 pub(crate) async fn add_incoming_batch(
81 &mut self,
82 new_batch: ReadyQueueBatch,
83 oldest_in_flight_start_height: Option<usize>,
84 ) -> Result<(), BlockDownloadError> {
85 self.ready_batches_size += new_batch.block_batch.size;
86 self.ready_batches.push(new_batch);
87
88 let height_to_stop_at = oldest_in_flight_start_height.unwrap_or(usize::MAX);
90
91 while self
92 .ready_batches
93 .peek()
94 .is_some_and(|batch| batch.start_height <= height_to_stop_at)
95 {
96 let batch = self
97 .ready_batches
98 .pop()
99 .expect("We just checked we have a batch in the buffer");
100
101 let batch_size = batch.block_batch.size;
102
103 self.ready_batches_size -= batch_size;
104 self.buffer_appender
105 .send(batch.block_batch, batch_size)
106 .await
107 .map_err(|_| BlockDownloadError::BufferWasClosed)?;
108 }
109
110 Ok(())
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use std::collections::BTreeSet;
117
118 use futures::StreamExt;
119 use proptest::{collection::vec, prelude::*};
120 use tokio_test::block_on;
121
122 use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE;
123 use cuprate_p2p_core::handles::HandleBuilder;
124
125 use super::*;
126
127 prop_compose! {
128 fn ready_batch_strategy()(start_height in 0..MAX_BLOCK_HEIGHT_USIZE) -> ReadyQueueBatch {
129 let (_, peer_handle) = HandleBuilder::new().build();
130
131 ReadyQueueBatch {
132 start_height,
133 block_batch: BlockBatch {
134 blocks: vec![],
135 size: start_height,
136 peer_handle,
137 },
138 }
139 }
140 }
141
142 proptest! {
143 #[test]
144 #[allow(clippy::mutable_key_type)]
145 fn block_queue_returns_items_in_order(batches in vec(ready_batch_strategy(), 0..10_000)) {
146 block_on(async move {
147 let (buffer_tx, mut buffer_rx) = cuprate_async_buffer::new_buffer(usize::MAX);
148
149 let mut queue = BlockQueue::new(buffer_tx);
150
151 let mut sorted_batches = BTreeSet::from_iter(batches.clone());
152 let mut soreted_batch_2 = sorted_batches.clone();
153
154 for batch in batches {
155 if sorted_batches.remove(&batch) {
156 queue.add_incoming_batch(batch, sorted_batches.last().map(|batch| batch.start_height)).await.unwrap();
157 }
158 }
159
160 assert_eq!(queue.size(), 0);
161 assert!(queue.oldest_ready_batch().is_none());
162 drop(queue);
163
164 while let Some(batch) = buffer_rx.next().await {
165 let last_batch = soreted_batch_2.pop_last().unwrap();
166
167 assert_eq!(batch.size, last_batch.block_batch.size);
168 }
169 });
170 }
171 }
172}