1use std::{
9 cmp::{max, min, Reverse},
10 collections::{BTreeMap, BinaryHeap, VecDeque},
11 time::Duration,
12};
13
14use futures::TryFutureExt;
15use monero_serai::{block::Block, transaction::Transaction};
16use tokio::{
17 task::JoinSet,
18 time::{interval, timeout, MissedTickBehavior},
19};
20use tower::{util::BoxCloneService, Service, ServiceExt};
21use tracing::{instrument, Instrument, Span};
22
23use cuprate_async_buffer::{BufferAppender, BufferStream};
24use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE;
25use cuprate_p2p_core::{handles::ConnectionHandle, NetworkZone};
26use cuprate_pruning::PruningSeed;
27
28use crate::{
29 constants::{
30 BLOCK_DOWNLOADER_REQUEST_TIMEOUT, EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED, LONG_BAN,
31 MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES, MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE,
32 },
33 peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse},
34};
35
36mod block_queue;
37mod chain_tracker;
38mod download_batch;
39mod request_chain;
40#[cfg(test)]
41mod tests;
42
43use block_queue::{BlockQueue, ReadyQueueBatch};
44pub use chain_tracker::ChainEntry;
45use chain_tracker::{BlocksToRetrieve, ChainTracker};
46use download_batch::download_batch_task;
47use request_chain::{initial_chain_search, request_chain_entry_from_peer};
48
49#[derive(Debug, Clone)]
51pub struct BlockBatch {
52 pub blocks: Vec<(Block, Vec<Transaction>)>,
54 pub size: usize,
56 pub peer_handle: ConnectionHandle,
58}
59
60#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Ord, Eq)]
62pub struct BlockDownloaderConfig {
63 pub buffer_bytes: usize,
66 pub in_progress_queue_bytes: usize,
68 pub check_client_pool_interval: Duration,
70 pub target_batch_bytes: usize,
72 pub initial_batch_len: usize,
74}
75
76#[derive(Debug, thiserror::Error)]
78pub(crate) enum BlockDownloadError {
79 #[error("A request to a peer timed out.")]
80 TimedOut,
81 #[error("The block buffer was closed.")]
82 BufferWasClosed,
83 #[error("The peers we requested data from did not have all the data.")]
84 PeerDidNotHaveRequestedData,
85 #[error("The peers response to a request was invalid.")]
86 PeersResponseWasInvalid,
87 #[error("The chain we are following is invalid.")]
88 ChainInvalid,
89 #[error("Failed to find a more advanced chain to follow")]
90 FailedToFindAChainToFollow,
91 #[error("The peer did not send any overlapping blocks, unknown start height.")]
92 PeerSentNoOverlappingBlocks,
93 #[error("Service error: {0}")]
94 ServiceError(#[from] tower::BoxError),
95}
96
97pub enum ChainSvcRequest<N: NetworkZone> {
99 CompactHistory,
101 FindFirstUnknown(Vec<[u8; 32]>),
103 CumulativeDifficulty,
105
106 ValidateEntries(VecDeque<ChainEntry<N>>, usize),
107}
108
109pub enum ChainSvcResponse<N: NetworkZone> {
111 CompactHistory {
113 block_ids: Vec<[u8; 32]>,
117 cumulative_difficulty: u128,
119 },
120 FindFirstUnknown(Option<(usize, usize)>),
124 CumulativeDifficulty(u128),
128
129 ValidateEntries {
130 valid: VecDeque<ChainEntry<N>>,
131 unknown: VecDeque<ChainEntry<N>>,
132 },
133}
134
135#[instrument(level = "error", skip_all, name = "block_downloader")]
144pub fn download_blocks<N: NetworkZone, C>(
145 peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
146 our_chain_svc: C,
147 config: BlockDownloaderConfig,
148) -> BufferStream<BlockBatch>
149where
150 C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
151 + Send
152 + 'static,
153 C::Future: Send + 'static,
154{
155 let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_bytes);
156
157 let block_downloader = BlockDownloader::new(peer_set, our_chain_svc, buffer_appender, config);
158
159 tokio::spawn(
160 block_downloader
161 .run()
162 .inspect_err(|e| tracing::debug!("Error downloading blocks: {e}"))
163 .instrument(Span::current()),
164 );
165
166 buffer_stream
167}
168
169struct BlockDownloader<N: NetworkZone, C> {
195 peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
197
198 our_chain_svc: C,
200
201 most_recent_batch_sizes: BinaryHeap<Reverse<(usize, BatchSizeInformation)>>,
202
203 amount_of_empty_chain_entries: usize,
207
208 block_download_tasks: JoinSet<BlockDownloadTaskResponse<N>>,
210 #[expect(clippy::type_complexity)]
214 chain_entry_task: JoinSet<Result<(ClientDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
215
216 inflight_requests: BTreeMap<usize, BlocksToRetrieve<N>>,
220
221 failed_batches: BinaryHeap<Reverse<usize>>,
225
226 block_queue: BlockQueue,
227
228 config: BlockDownloaderConfig,
230}
231
232impl<N: NetworkZone, C> BlockDownloader<N, C>
233where
234 C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
235 + Send
236 + 'static,
237 C::Future: Send + 'static,
238{
239 fn new(
241 peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
242 our_chain_svc: C,
243 buffer_appender: BufferAppender<BlockBatch>,
244 config: BlockDownloaderConfig,
245 ) -> Self {
246 Self {
247 peer_set,
248 our_chain_svc,
249 most_recent_batch_sizes: BinaryHeap::new(),
250 amount_of_empty_chain_entries: 0,
251 block_download_tasks: JoinSet::new(),
252 chain_entry_task: JoinSet::new(),
253 inflight_requests: BTreeMap::new(),
254 block_queue: BlockQueue::new(buffer_appender),
255 failed_batches: BinaryHeap::new(),
256 config,
257 }
258 }
259
260 fn check_pending_peers(
262 &mut self,
263 chain_tracker: &mut ChainTracker<N>,
264 pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
265 ) {
266 tracing::debug!("Checking if we can give any work to pending peers.");
267
268 for (_, peers) in pending_peers.iter_mut() {
269 while let Some(peer) = peers.pop() {
270 if peer.info.handle.is_closed() {
271 continue;
273 }
274
275 let client = self.try_handle_free_client(chain_tracker, peer);
276 if let Some(peer) = client {
277 peers.push(peer);
280 break;
281 }
282 }
283 }
284 }
285
286 fn amount_of_blocks_to_request(&self) -> usize {
287 let biggest_batch = self
288 .most_recent_batch_sizes
289 .iter()
290 .max_by(|batch_1, batch_2| {
291 let av1 = batch_1.0 .1.byte_size / batch_1.0 .1.len;
292 let av2 = batch_2.0 .1.byte_size / batch_2.0 .1.len;
293
294 av1.cmp(&av2)
295 });
296
297 let Some(biggest_batch) = biggest_batch else {
298 return self.config.initial_batch_len;
299 };
300
301 calculate_next_block_batch_size(
302 biggest_batch.0 .1.byte_size,
303 biggest_batch.0 .1.len,
304 self.config.target_batch_bytes,
305 )
306 }
307
308 fn request_inflight_batch_again(
315 &mut self,
316 client: ClientDropGuard<N>,
317 ) -> Option<ClientDropGuard<N>> {
318 tracing::debug!(
319 "Requesting an inflight batch, current ready queue size: {}",
320 self.block_queue.size()
321 );
322
323 assert!(
324 !self.inflight_requests.is_empty(),
325 "We need requests inflight to be able to send the request again",
326 );
327
328 let oldest_ready_batch = self.block_queue.oldest_ready_batch().unwrap();
329
330 for (_, in_flight_batch) in self.inflight_requests.range_mut(0..oldest_ready_batch) {
331 if in_flight_batch.requests_sent >= 2 {
332 continue;
333 }
334
335 if !client_has_block_in_range(
336 &client.info.pruning_seed,
337 in_flight_batch.start_height,
338 in_flight_batch.ids.len(),
339 ) {
340 return Some(client);
341 }
342
343 self.block_download_tasks.spawn(download_batch_task(
344 client,
345 in_flight_batch.ids.clone(),
346 in_flight_batch.prev_id,
347 in_flight_batch.start_height,
348 in_flight_batch.requests_sent,
349 ));
350
351 return None;
352 }
353
354 tracing::debug!("Could not find an inflight request applicable for this peer.");
355
356 Some(client)
357 }
358
359 fn request_block_batch(
366 &mut self,
367 chain_tracker: &mut ChainTracker<N>,
368 client: ClientDropGuard<N>,
369 ) -> Option<ClientDropGuard<N>> {
370 tracing::trace!("Using peer to request a batch of blocks.");
371 while let Some(failed_request) = self.failed_batches.peek() {
373 let Some(request) = self.inflight_requests.get_mut(&failed_request.0) else {
376 self.failed_batches.pop();
378 continue;
379 };
380 if client_has_block_in_range(
382 &client.info.pruning_seed,
383 request.start_height,
384 request.ids.len(),
385 ) {
386 tracing::debug!("Using peer to request a failed batch");
387 request.requests_sent += 1;
390
391 self.block_download_tasks.spawn(download_batch_task(
392 client,
393 request.ids.clone(),
394 request.prev_id,
395 request.start_height,
396 request.requests_sent,
397 ));
398
399 self.failed_batches.pop();
401
402 return None;
403 }
404 break;
406 }
407
408 if self.block_queue.size() >= self.config.in_progress_queue_bytes {
410 return self.request_inflight_batch_again(client);
411 }
412
413 let Some(mut block_entry_to_get) = chain_tracker.blocks_to_get(
416 &client.info.pruning_seed,
417 self.amount_of_blocks_to_request(),
418 ) else {
419 return Some(client);
420 };
421
422 tracing::debug!("Requesting a new batch of blocks");
423
424 block_entry_to_get.requests_sent = 1;
425 self.inflight_requests
426 .insert(block_entry_to_get.start_height, block_entry_to_get.clone());
427
428 self.block_download_tasks.spawn(download_batch_task(
429 client,
430 block_entry_to_get.ids.clone(),
431 block_entry_to_get.prev_id,
432 block_entry_to_get.start_height,
433 block_entry_to_get.requests_sent,
434 ));
435
436 None
437 }
438
439 fn try_handle_free_client(
447 &mut self,
448 chain_tracker: &mut ChainTracker<N>,
449 client: ClientDropGuard<N>,
450 ) -> Option<ClientDropGuard<N>> {
451 if self.chain_entry_task.len() < 2
453 && self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED
455 && chain_tracker.block_requests_queued(self.amount_of_blocks_to_request()) < 500
458 && chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
460 {
461 tracing::debug!("Requesting next chain entry");
462
463 let history = chain_tracker.get_simple_history();
464
465 self.chain_entry_task.spawn(
466 async move {
467 timeout(
468 BLOCK_DOWNLOADER_REQUEST_TIMEOUT,
469 request_chain_entry_from_peer(client, history),
470 )
471 .await
472 .map_err(|_| BlockDownloadError::TimedOut)?
473 }
474 .instrument(tracing::debug_span!(
475 "request_chain_entry",
476 current_height = chain_tracker.top_height()
477 )),
478 );
479
480 return None;
481 }
482
483 self.request_block_batch(chain_tracker, client)
485 }
486
487 async fn check_for_free_clients(
489 &mut self,
490 chain_tracker: &mut ChainTracker<N>,
491 pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
492 ) -> Result<(), BlockDownloadError> {
493 tracing::debug!("Checking for free peers");
494
495 let ChainSvcResponse::CumulativeDifficulty(current_cumulative_difficulty) = self
497 .our_chain_svc
498 .ready()
499 .await?
500 .call(ChainSvcRequest::CumulativeDifficulty)
501 .await?
502 else {
503 panic!("Chain service returned wrong response.");
504 };
505
506 let PeerSetResponse::PeersWithMorePoW(clients) = self
507 .peer_set
508 .ready()
509 .await?
510 .call(PeerSetRequest::PeersWithMorePoW(
511 current_cumulative_difficulty,
512 ))
513 .await?
514 else {
515 unreachable!();
516 };
517
518 for client in clients {
519 pending_peers
520 .entry(client.info.pruning_seed)
521 .or_default()
522 .push(client);
523 }
524
525 self.check_pending_peers(chain_tracker, pending_peers);
526
527 Ok(())
528 }
529
530 async fn handle_download_batch_res(
532 &mut self,
533 start_height: usize,
534 res: Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError>,
535 chain_tracker: &mut ChainTracker<N>,
536 pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
537 ) -> Result<(), BlockDownloadError> {
538 tracing::debug!("Handling block download response");
539
540 match res {
541 Err(e) => {
542 if matches!(e, BlockDownloadError::ChainInvalid) {
543 self.inflight_requests.get(&start_height).inspect(|entry| {
546 tracing::warn!(
547 "Received an invalid chain from peer: {}, exiting block downloader (it should be restarted).",
548 entry.peer_who_told_us
549 );
550 entry.peer_who_told_us_handle.ban_peer(LONG_BAN);
551 });
552
553 return Err(e);
554 }
555
556 if let Some(batch) = self.inflight_requests.get_mut(&start_height) {
558 tracing::debug!("Error downloading batch: {e}");
559
560 batch.failures += 1;
561 if batch.failures > MAX_DOWNLOAD_FAILURES {
562 tracing::debug!(
563 "Too many errors downloading blocks, stopping the block downloader."
564 );
565 return Err(BlockDownloadError::TimedOut);
566 }
567
568 self.failed_batches.push(Reverse(start_height));
569 }
570
571 Ok(())
572 }
573 Ok((client, block_batch)) => {
574 if self.inflight_requests.remove(&start_height).is_none() {
576 tracing::debug!("Already retrieved batch");
577 pending_peers
579 .entry(client.info.pruning_seed)
580 .or_default()
581 .push(client);
582
583 self.check_pending_peers(chain_tracker, pending_peers);
584
585 return Ok(());
586 };
587
588 if start_height
591 > self
592 .most_recent_batch_sizes
593 .peek()
594 .map(|Reverse((height, _))| *height)
595 .unwrap_or_default()
596 {
597 self.most_recent_batch_sizes.push(Reverse((
598 start_height,
599 BatchSizeInformation {
600 len: block_batch.blocks.len(),
601 byte_size: block_batch.size,
602 },
603 )));
604
605 if self.most_recent_batch_sizes.len() > MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE
606 {
607 self.most_recent_batch_sizes.pop();
608 }
609 }
610
611 self.block_queue
612 .add_incoming_batch(
613 ReadyQueueBatch {
614 start_height,
615 block_batch,
616 },
617 self.inflight_requests.first_key_value().map(|(k, _)| *k),
618 )
619 .await?;
620
621 pending_peers
622 .entry(client.info.pruning_seed)
623 .or_default()
624 .push(client);
625
626 self.check_pending_peers(chain_tracker, pending_peers);
627
628 Ok(())
629 }
630 }
631 }
632
633 async fn run(mut self) -> Result<(), BlockDownloadError> {
635 let mut chain_tracker =
636 initial_chain_search(&mut self.peer_set, &mut self.our_chain_svc).await?;
637
638 let mut pending_peers = BTreeMap::new();
639
640 tracing::info!("Attempting to download blocks from peers, this may take a while.");
641
642 let mut check_client_pool_interval = interval(self.config.check_client_pool_interval);
643 check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
644
645 self.check_for_free_clients(&mut chain_tracker, &mut pending_peers)
646 .await?;
647
648 loop {
649 tokio::select! {
650 _ = check_client_pool_interval.tick() => {
651 tracing::debug!("Checking client pool for free peers, timer fired.");
652 self.check_for_free_clients(&mut chain_tracker, &mut pending_peers).await?;
653
654 if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
656 tracing::debug!("Failed to find any more chain entries, probably fround the top");
657 return Ok(());
658 }
659 }
660 Some(res) = self.block_download_tasks.join_next() => {
661 let BlockDownloadTaskResponse {
662 start_height,
663 result
664 } = res.expect("Download batch future panicked");
665
666 self.handle_download_batch_res(start_height, result, &mut chain_tracker, &mut pending_peers).await?;
667
668 if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
670 tracing::debug!("Failed to find any more chain entries, probably fround the top");
671 return Ok(());
672 }
673 }
674 Some(Ok(res)) = self.chain_entry_task.join_next() => {
675 match res {
676 Ok((client, entry)) => {
677 match chain_tracker.add_entry(entry, &mut self.our_chain_svc).await {
678 Ok(()) => {
679 tracing::debug!("Successfully added chain entry to chain tracker.");
680 self.amount_of_empty_chain_entries = 0;
681 }
682 Err(e) => {
683 tracing::debug!("Failed to add incoming chain entry to chain tracker: {e:?}");
684 self.amount_of_empty_chain_entries += 1;
685 }
686 }
687
688 pending_peers
689 .entry(client.info.pruning_seed)
690 .or_default()
691 .push(client);
692
693 self.check_pending_peers(&mut chain_tracker, &mut pending_peers);
694 }
695 Err(_) => self.amount_of_empty_chain_entries += 1
696 }
697 }
698 }
699 }
700 }
701}
702
703struct BlockDownloadTaskResponse<N: NetworkZone> {
705 start_height: usize,
707 result: Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError>,
709}
710
711const fn client_has_block_in_range(
713 pruning_seed: &PruningSeed,
714 start_height: usize,
715 length: usize,
716) -> bool {
717 pruning_seed.has_full_block(start_height, MAX_BLOCK_HEIGHT_USIZE)
718 && pruning_seed.has_full_block(start_height + length, MAX_BLOCK_HEIGHT_USIZE)
719}
720
721#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
722struct BatchSizeInformation {
723 len: usize,
724 byte_size: usize,
725}
726
727fn calculate_next_block_batch_size(
734 previous_batch_size: usize,
735 previous_batch_len: usize,
736 target_batch_size: usize,
737) -> usize {
738 let adjusted_average_block_size = max((previous_batch_size * 2) / previous_batch_len, 1);
741
742 let next_batch_len = max(target_batch_size / adjusted_average_block_size, 1);
744
745 let next_batch_len = min(next_batch_len, (previous_batch_len * 3).div_ceil(2));
748
749 min(next_batch_len, MAX_BLOCK_BATCH_LEN)
751}