cuprate_p2p/
block_downloader.rs

1//! # Block Downloader
2//!
3//! This module contains the block downloader, which finds a chain to
4//! download from our connected peers and downloads it. See the actual
5//! `struct` documentation for implementation details.
6//!
7//! The block downloader is started by [`download_blocks`].
8use std::{
9    cmp::{max, min, Reverse},
10    collections::{BTreeMap, BinaryHeap, VecDeque},
11    time::Duration,
12};
13
14use futures::TryFutureExt;
15use monero_serai::{block::Block, transaction::Transaction};
16use tokio::{
17    task::JoinSet,
18    time::{interval, timeout, MissedTickBehavior},
19};
20use tower::{util::BoxCloneService, Service, ServiceExt};
21use tracing::{instrument, Instrument, Span};
22
23use cuprate_async_buffer::{BufferAppender, BufferStream};
24use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE;
25use cuprate_p2p_core::{handles::ConnectionHandle, NetworkZone};
26use cuprate_pruning::PruningSeed;
27
28use crate::{
29    constants::{
30        BLOCK_DOWNLOADER_REQUEST_TIMEOUT, EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED, LONG_BAN,
31        MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES, MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE,
32    },
33    peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse},
34};
35
36mod block_queue;
37mod chain_tracker;
38mod download_batch;
39mod request_chain;
40#[cfg(test)]
41mod tests;
42
43use block_queue::{BlockQueue, ReadyQueueBatch};
44pub use chain_tracker::ChainEntry;
45use chain_tracker::{BlocksToRetrieve, ChainTracker};
46use download_batch::download_batch_task;
47use request_chain::{initial_chain_search, request_chain_entry_from_peer};
48
49/// A downloaded batch of blocks.
50#[derive(Debug, Clone)]
51pub struct BlockBatch {
52    /// The blocks.
53    pub blocks: Vec<(Block, Vec<Transaction>)>,
54    /// The size in bytes of this batch.
55    pub size: usize,
56    /// The peer that gave us this batch.
57    pub peer_handle: ConnectionHandle,
58}
59
60/// The block downloader config.
61#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Ord, Eq)]
62pub struct BlockDownloaderConfig {
63    /// The size in bytes of the buffer between the block downloader and the place which
64    /// is consuming the downloaded blocks.
65    pub buffer_bytes: usize,
66    /// The size of the in progress queue (in bytes) at which we stop requesting more blocks.
67    pub in_progress_queue_bytes: usize,
68    /// The [`Duration`] between checking the client pool for free peers.
69    pub check_client_pool_interval: Duration,
70    /// The target size of a single batch of blocks (in bytes).
71    pub target_batch_bytes: usize,
72    /// The initial amount of blocks to request (in number of blocks)
73    pub initial_batch_len: usize,
74}
75
76/// An error that occurred in the [`BlockDownloader`].
77#[derive(Debug, thiserror::Error)]
78pub(crate) enum BlockDownloadError {
79    #[error("A request to a peer timed out.")]
80    TimedOut,
81    #[error("The block buffer was closed.")]
82    BufferWasClosed,
83    #[error("The peers we requested data from did not have all the data.")]
84    PeerDidNotHaveRequestedData,
85    #[error("The peers response to a request was invalid.")]
86    PeersResponseWasInvalid,
87    #[error("The chain we are following is invalid.")]
88    ChainInvalid,
89    #[error("Failed to find a more advanced chain to follow")]
90    FailedToFindAChainToFollow,
91    #[error("The peer did not send any overlapping blocks, unknown start height.")]
92    PeerSentNoOverlappingBlocks,
93    #[error("Service error: {0}")]
94    ServiceError(#[from] tower::BoxError),
95}
96
97/// The request type for the chain service.
98pub enum ChainSvcRequest<N: NetworkZone> {
99    /// A request for the current chain history.
100    CompactHistory,
101    /// A request to find the first unknown block ID in a list of block IDs.
102    FindFirstUnknown(Vec<[u8; 32]>),
103    /// A request for our current cumulative difficulty.
104    CumulativeDifficulty,
105
106    ValidateEntries(VecDeque<ChainEntry<N>>, usize),
107}
108
109/// The response type for the chain service.
110pub enum ChainSvcResponse<N: NetworkZone> {
111    /// The response for [`ChainSvcRequest::CompactHistory`].
112    CompactHistory {
113        /// A list of blocks IDs in our chain, starting with the most recent block, all the way to the genesis block.
114        ///
115        /// These blocks should be in reverse chronological order, not every block is needed.
116        block_ids: Vec<[u8; 32]>,
117        /// The current cumulative difficulty of the chain.
118        cumulative_difficulty: u128,
119    },
120    /// The response for [`ChainSvcRequest::FindFirstUnknown`].
121    ///
122    /// Contains the index of the first unknown block and its expected height.
123    FindFirstUnknown(Option<(usize, usize)>),
124    /// The response for [`ChainSvcRequest::CumulativeDifficulty`].
125    ///
126    /// The current cumulative difficulty of our chain.
127    CumulativeDifficulty(u128),
128
129    ValidateEntries {
130        valid: VecDeque<ChainEntry<N>>,
131        unknown: VecDeque<ChainEntry<N>>,
132    },
133}
134
135/// This function starts the block downloader and returns a [`BufferStream`] that will produce
136/// a sequential stream of blocks.
137///
138/// The block downloader will pick the longest chain and will follow it for as long as possible,
139/// the blocks given from the [`BufferStream`] will be in order.
140///
141/// The block downloader may fail before the whole chain is downloaded. If this is the case you can
142/// call this function again, so it can start the search again.
143#[instrument(level = "error", skip_all, name = "block_downloader")]
144pub fn download_blocks<N: NetworkZone, C>(
145    peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
146    our_chain_svc: C,
147    config: BlockDownloaderConfig,
148) -> BufferStream<BlockBatch>
149where
150    C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
151        + Send
152        + 'static,
153    C::Future: Send + 'static,
154{
155    let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_bytes);
156
157    let block_downloader = BlockDownloader::new(peer_set, our_chain_svc, buffer_appender, config);
158
159    tokio::spawn(
160        block_downloader
161            .run()
162            .inspect_err(|e| tracing::debug!("Error downloading blocks: {e}"))
163            .instrument(Span::current()),
164    );
165
166    buffer_stream
167}
168
169/// # Block Downloader
170///
171/// This is the block downloader, which finds a chain to follow and attempts to follow it, adding the
172/// downloaded blocks to an [`async_buffer`].
173///
174/// ## Implementation Details
175///
176/// The first step to downloading blocks is to find a chain to follow, this is done by [`initial_chain_search`],
177/// docs can be found on that function for details on how this is done.
178///
179/// With an initial list of block IDs to follow the block downloader will then look for available peers
180/// to download blocks from.
181///
182/// For each peer we will then allocate a batch of blocks for them to retrieve, as these blocks come in
183/// we add them to the [`BlockQueue`] for pushing into the [`async_buffer`], once we have the oldest block downloaded
184/// we send it into the buffer, repeating this until the oldest current block is still being downloaded.
185///
186/// When a peer has finished downloading blocks we add it to our list of ready peers, so it can be used to
187/// request more data from.
188///
189/// Ready peers will either:
190/// - download the next batch of blocks
191/// - request the next chain entry
192/// - download an already requested batch of blocks (this might happen due to an error in the previous request
193///   or because the queue of ready blocks is too large, so we need the oldest block to clear it).
194struct BlockDownloader<N: NetworkZone, C> {
195    /// The peer set.
196    peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
197
198    /// The service that holds our current chain state.
199    our_chain_svc: C,
200
201    most_recent_batch_sizes: BinaryHeap<Reverse<(usize, BatchSizeInformation)>>,
202
203    /// The amount of consecutive empty chain entries we received.
204    ///
205    /// An empty chain entry means we reached the peer's chain tip.
206    amount_of_empty_chain_entries: usize,
207
208    /// The running block download tasks.
209    block_download_tasks: JoinSet<BlockDownloadTaskResponse<N>>,
210    /// The running chain entry tasks.
211    ///
212    /// Returns a result of the chain entry or an error.
213    #[expect(clippy::type_complexity)]
214    chain_entry_task: JoinSet<Result<(ClientDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
215
216    /// The current inflight requests.
217    ///
218    /// This is a map of batch start heights to block IDs and related information of the batch.
219    inflight_requests: BTreeMap<usize, BlocksToRetrieve<N>>,
220
221    /// A queue of start heights from failed batches that should be retried.
222    ///
223    /// Wrapped in [`Reverse`] so we prioritize early batches.
224    failed_batches: BinaryHeap<Reverse<usize>>,
225
226    block_queue: BlockQueue,
227
228    /// The [`BlockDownloaderConfig`].
229    config: BlockDownloaderConfig,
230}
231
232impl<N: NetworkZone, C> BlockDownloader<N, C>
233where
234    C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>
235        + Send
236        + 'static,
237    C::Future: Send + 'static,
238{
239    /// Creates a new [`BlockDownloader`]
240    fn new(
241        peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
242        our_chain_svc: C,
243        buffer_appender: BufferAppender<BlockBatch>,
244        config: BlockDownloaderConfig,
245    ) -> Self {
246        Self {
247            peer_set,
248            our_chain_svc,
249            most_recent_batch_sizes: BinaryHeap::new(),
250            amount_of_empty_chain_entries: 0,
251            block_download_tasks: JoinSet::new(),
252            chain_entry_task: JoinSet::new(),
253            inflight_requests: BTreeMap::new(),
254            block_queue: BlockQueue::new(buffer_appender),
255            failed_batches: BinaryHeap::new(),
256            config,
257        }
258    }
259
260    /// Checks if we can make use of any peers that are currently pending requests.
261    fn check_pending_peers(
262        &mut self,
263        chain_tracker: &mut ChainTracker<N>,
264        pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
265    ) {
266        tracing::debug!("Checking if we can give any work to pending peers.");
267
268        for (_, peers) in pending_peers.iter_mut() {
269            while let Some(peer) = peers.pop() {
270                if peer.info.handle.is_closed() {
271                    // Peer has disconnected, drop it.
272                    continue;
273                }
274
275                let client = self.try_handle_free_client(chain_tracker, peer);
276                if let Some(peer) = client {
277                    // This peer is ok however it does not have the data we currently need, this will only happen
278                    // because of its pruning seed so just skip over all peers with this pruning seed.
279                    peers.push(peer);
280                    break;
281                }
282            }
283        }
284    }
285
286    fn amount_of_blocks_to_request(&self) -> usize {
287        let biggest_batch = self
288            .most_recent_batch_sizes
289            .iter()
290            .max_by(|batch_1, batch_2| {
291                let av1 = batch_1.0 .1.byte_size / batch_1.0 .1.len;
292                let av2 = batch_2.0 .1.byte_size / batch_2.0 .1.len;
293
294                av1.cmp(&av2)
295            });
296
297        let Some(biggest_batch) = biggest_batch else {
298            return self.config.initial_batch_len;
299        };
300
301        calculate_next_block_batch_size(
302            biggest_batch.0 .1.byte_size,
303            biggest_batch.0 .1.len,
304            self.config.target_batch_bytes,
305        )
306    }
307
308    /// Attempts to send another request for an inflight batch
309    ///
310    /// This function will find the batch(es) that we are waiting on to clear our ready queue and sends another request
311    /// for them.
312    ///
313    /// Returns the [`ClientDropGuard`] back if it doesn't have the batch according to its pruning seed.
314    fn request_inflight_batch_again(
315        &mut self,
316        client: ClientDropGuard<N>,
317    ) -> Option<ClientDropGuard<N>> {
318        tracing::debug!(
319            "Requesting an inflight batch, current ready queue size: {}",
320            self.block_queue.size()
321        );
322
323        assert!(
324            !self.inflight_requests.is_empty(),
325            "We need requests inflight to be able to send the request again",
326        );
327
328        let oldest_ready_batch = self.block_queue.oldest_ready_batch().unwrap();
329
330        for (_, in_flight_batch) in self.inflight_requests.range_mut(0..oldest_ready_batch) {
331            if in_flight_batch.requests_sent >= 2 {
332                continue;
333            }
334
335            if !client_has_block_in_range(
336                &client.info.pruning_seed,
337                in_flight_batch.start_height,
338                in_flight_batch.ids.len(),
339            ) {
340                return Some(client);
341            }
342
343            self.block_download_tasks.spawn(download_batch_task(
344                client,
345                in_flight_batch.ids.clone(),
346                in_flight_batch.prev_id,
347                in_flight_batch.start_height,
348                in_flight_batch.requests_sent,
349            ));
350
351            return None;
352        }
353
354        tracing::debug!("Could not find an inflight request applicable for this peer.");
355
356        Some(client)
357    }
358
359    /// Spawns a task to request blocks from the given peer.
360    ///
361    /// The batch requested will depend on our current state, failed batches will be prioritised.
362    ///
363    /// Returns the [`ClientDropGuard`] back if it doesn't have the data we currently need according
364    /// to its pruning seed.
365    fn request_block_batch(
366        &mut self,
367        chain_tracker: &mut ChainTracker<N>,
368        client: ClientDropGuard<N>,
369    ) -> Option<ClientDropGuard<N>> {
370        tracing::trace!("Using peer to request a batch of blocks.");
371        // First look to see if we have any failed requests.
372        while let Some(failed_request) = self.failed_batches.peek() {
373            // Check if we still have the request that failed - another peer could have completed it after
374            // failure.
375            let Some(request) = self.inflight_requests.get_mut(&failed_request.0) else {
376                // We don't have the request in flight so remove the failure.
377                self.failed_batches.pop();
378                continue;
379            };
380            // Check if this peer has the blocks according to their pruning seed.
381            if client_has_block_in_range(
382                &client.info.pruning_seed,
383                request.start_height,
384                request.ids.len(),
385            ) {
386                tracing::debug!("Using peer to request a failed batch");
387                // They should have the blocks so send the re-request to this peer.
388
389                request.requests_sent += 1;
390
391                self.block_download_tasks.spawn(download_batch_task(
392                    client,
393                    request.ids.clone(),
394                    request.prev_id,
395                    request.start_height,
396                    request.requests_sent,
397                ));
398
399                // Remove the failure, we have just handled it.
400                self.failed_batches.pop();
401
402                return None;
403            }
404            // The peer doesn't have the batch according to its pruning seed.
405            break;
406        }
407
408        // If our ready queue is too large send duplicate requests for the blocks we are waiting on.
409        if self.block_queue.size() >= self.config.in_progress_queue_bytes {
410            return self.request_inflight_batch_again(client);
411        }
412
413        // No failed requests that we can handle, request some new blocks.
414
415        let Some(mut block_entry_to_get) = chain_tracker.blocks_to_get(
416            &client.info.pruning_seed,
417            self.amount_of_blocks_to_request(),
418        ) else {
419            return Some(client);
420        };
421
422        tracing::debug!("Requesting a new batch of blocks");
423
424        block_entry_to_get.requests_sent = 1;
425        self.inflight_requests
426            .insert(block_entry_to_get.start_height, block_entry_to_get.clone());
427
428        self.block_download_tasks.spawn(download_batch_task(
429            client,
430            block_entry_to_get.ids.clone(),
431            block_entry_to_get.prev_id,
432            block_entry_to_get.start_height,
433            block_entry_to_get.requests_sent,
434        ));
435
436        None
437    }
438
439    /// Attempts to give work to a free client.
440    ///
441    /// This function will use our current state to decide if we should send a request for a chain entry
442    /// or if we should request a batch of blocks.
443    ///
444    /// Returns the [`ClientDropGuard`] back if it doesn't have the data we currently need according
445    /// to its pruning seed.
446    fn try_handle_free_client(
447        &mut self,
448        chain_tracker: &mut ChainTracker<N>,
449        client: ClientDropGuard<N>,
450    ) -> Option<ClientDropGuard<N>> {
451        // We send 2 requests, so if one of them is slow or doesn't have the next chain, we still have a backup.
452        if self.chain_entry_task.len() < 2
453            // If we have had too many failures then assume the tip has been found so no more chain entries.
454            && self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED
455            // Check we have a big buffer of pending block IDs to retrieve, we don't want to be waiting around
456            // for a chain entry.
457            && chain_tracker.block_requests_queued(self.amount_of_blocks_to_request()) < 500
458            // Make sure this peer actually has the chain.
459            && chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
460        {
461            tracing::debug!("Requesting next chain entry");
462
463            let history = chain_tracker.get_simple_history();
464
465            self.chain_entry_task.spawn(
466                async move {
467                    timeout(
468                        BLOCK_DOWNLOADER_REQUEST_TIMEOUT,
469                        request_chain_entry_from_peer(client, history),
470                    )
471                    .await
472                    .map_err(|_| BlockDownloadError::TimedOut)?
473                }
474                .instrument(tracing::debug_span!(
475                    "request_chain_entry",
476                    current_height = chain_tracker.top_height()
477                )),
478            );
479
480            return None;
481        }
482
483        // Request a batch of blocks instead.
484        self.request_block_batch(chain_tracker, client)
485    }
486
487    /// Checks the [`ClientPool`] for free peers.
488    async fn check_for_free_clients(
489        &mut self,
490        chain_tracker: &mut ChainTracker<N>,
491        pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
492    ) -> Result<(), BlockDownloadError> {
493        tracing::debug!("Checking for free peers");
494
495        // This value might be slightly behind but that's ok.
496        let ChainSvcResponse::CumulativeDifficulty(current_cumulative_difficulty) = self
497            .our_chain_svc
498            .ready()
499            .await?
500            .call(ChainSvcRequest::CumulativeDifficulty)
501            .await?
502        else {
503            panic!("Chain service returned wrong response.");
504        };
505
506        let PeerSetResponse::PeersWithMorePoW(clients) = self
507            .peer_set
508            .ready()
509            .await?
510            .call(PeerSetRequest::PeersWithMorePoW(
511                current_cumulative_difficulty,
512            ))
513            .await?
514        else {
515            unreachable!();
516        };
517
518        for client in clients {
519            pending_peers
520                .entry(client.info.pruning_seed)
521                .or_default()
522                .push(client);
523        }
524
525        self.check_pending_peers(chain_tracker, pending_peers);
526
527        Ok(())
528    }
529
530    /// Handles a response to a request to get blocks from a peer.
531    async fn handle_download_batch_res(
532        &mut self,
533        start_height: usize,
534        res: Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError>,
535        chain_tracker: &mut ChainTracker<N>,
536        pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
537    ) -> Result<(), BlockDownloadError> {
538        tracing::debug!("Handling block download response");
539
540        match res {
541            Err(e) => {
542                if matches!(e, BlockDownloadError::ChainInvalid) {
543                    // If the chain was invalid ban the peer who told us about it and error here to stop the
544                    // block downloader.
545                    self.inflight_requests.get(&start_height).inspect(|entry| {
546                        tracing::warn!(
547                            "Received an invalid chain from peer: {}, exiting block downloader (it should be restarted).",
548                            entry.peer_who_told_us
549                        );
550                        entry.peer_who_told_us_handle.ban_peer(LONG_BAN);
551                    });
552
553                    return Err(e);
554                }
555
556                // Add the request to the failed list.
557                if let Some(batch) = self.inflight_requests.get_mut(&start_height) {
558                    tracing::debug!("Error downloading batch: {e}");
559
560                    batch.failures += 1;
561                    if batch.failures > MAX_DOWNLOAD_FAILURES {
562                        tracing::debug!(
563                            "Too many errors downloading blocks, stopping the block downloader."
564                        );
565                        return Err(BlockDownloadError::TimedOut);
566                    }
567
568                    self.failed_batches.push(Reverse(start_height));
569                }
570
571                Ok(())
572            }
573            Ok((client, block_batch)) => {
574                // Remove the batch from the inflight batches.
575                if self.inflight_requests.remove(&start_height).is_none() {
576                    tracing::debug!("Already retrieved batch");
577                    // If it was already retrieved then there is nothing else to do.
578                    pending_peers
579                        .entry(client.info.pruning_seed)
580                        .or_default()
581                        .push(client);
582
583                    self.check_pending_peers(chain_tracker, pending_peers);
584
585                    return Ok(());
586                };
587
588                // If the batch is higher than the last time we updated `amount_of_blocks_to_request`, update it
589                // again.
590                if start_height
591                    > self
592                        .most_recent_batch_sizes
593                        .peek()
594                        .map(|Reverse((height, _))| *height)
595                        .unwrap_or_default()
596                {
597                    self.most_recent_batch_sizes.push(Reverse((
598                        start_height,
599                        BatchSizeInformation {
600                            len: block_batch.blocks.len(),
601                            byte_size: block_batch.size,
602                        },
603                    )));
604
605                    if self.most_recent_batch_sizes.len() > MOST_RECENT_BATCH_WEIGHTS_FOR_BATCH_SIZE
606                    {
607                        self.most_recent_batch_sizes.pop();
608                    }
609                }
610
611                self.block_queue
612                    .add_incoming_batch(
613                        ReadyQueueBatch {
614                            start_height,
615                            block_batch,
616                        },
617                        self.inflight_requests.first_key_value().map(|(k, _)| *k),
618                    )
619                    .await?;
620
621                pending_peers
622                    .entry(client.info.pruning_seed)
623                    .or_default()
624                    .push(client);
625
626                self.check_pending_peers(chain_tracker, pending_peers);
627
628                Ok(())
629            }
630        }
631    }
632
633    /// Starts the main loop of the block downloader.
634    async fn run(mut self) -> Result<(), BlockDownloadError> {
635        let mut chain_tracker =
636            initial_chain_search(&mut self.peer_set, &mut self.our_chain_svc).await?;
637
638        let mut pending_peers = BTreeMap::new();
639
640        tracing::info!("Attempting to download blocks from peers, this may take a while.");
641
642        let mut check_client_pool_interval = interval(self.config.check_client_pool_interval);
643        check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
644
645        self.check_for_free_clients(&mut chain_tracker, &mut pending_peers)
646            .await?;
647
648        loop {
649            tokio::select! {
650                _ = check_client_pool_interval.tick() => {
651                    tracing::debug!("Checking client pool for free peers, timer fired.");
652                    self.check_for_free_clients(&mut chain_tracker, &mut pending_peers).await?;
653
654                     // If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found.
655                    if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
656                        tracing::debug!("Failed to find any more chain entries, probably fround the top");
657                        return Ok(());
658                    }
659                }
660                Some(res) = self.block_download_tasks.join_next() => {
661                    let BlockDownloadTaskResponse {
662                        start_height,
663                        result
664                    } = res.expect("Download batch future panicked");
665
666                    self.handle_download_batch_res(start_height, result, &mut chain_tracker, &mut pending_peers).await?;
667
668                    // If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found.
669                    if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
670                        tracing::debug!("Failed to find any more chain entries, probably fround the top");
671                        return Ok(());
672                    }
673                }
674                Some(Ok(res)) = self.chain_entry_task.join_next() => {
675                    match res {
676                        Ok((client, entry)) => {
677                            match chain_tracker.add_entry(entry, &mut self.our_chain_svc).await {
678                                Ok(()) => {
679                                    tracing::debug!("Successfully added chain entry to chain tracker.");
680                                    self.amount_of_empty_chain_entries = 0;
681                                }
682                                Err(e) => {
683                                    tracing::debug!("Failed to add incoming chain entry to chain tracker: {e:?}");
684                                    self.amount_of_empty_chain_entries += 1;
685                                }
686                            }
687
688                            pending_peers
689                                .entry(client.info.pruning_seed)
690                                .or_default()
691                                .push(client);
692
693                            self.check_pending_peers(&mut chain_tracker, &mut pending_peers);
694                        }
695                        Err(_) => self.amount_of_empty_chain_entries += 1
696                    }
697                }
698            }
699        }
700    }
701}
702
703/// The return value from the block download tasks.
704struct BlockDownloadTaskResponse<N: NetworkZone> {
705    /// The start height of the batch.
706    start_height: usize,
707    /// A result containing the batch or an error.
708    result: Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError>,
709}
710
711/// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`].
712const fn client_has_block_in_range(
713    pruning_seed: &PruningSeed,
714    start_height: usize,
715    length: usize,
716) -> bool {
717    pruning_seed.has_full_block(start_height, MAX_BLOCK_HEIGHT_USIZE)
718        && pruning_seed.has_full_block(start_height + length, MAX_BLOCK_HEIGHT_USIZE)
719}
720
721#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
722struct BatchSizeInformation {
723    len: usize,
724    byte_size: usize,
725}
726
727/// Calculates the next amount of blocks to request in a batch.
728///
729/// Parameters:
730/// - `previous_batch_size` is the size, in bytes, of the last batch
731/// - `previous_batch_len` is the amount of blocks in the last batch
732/// - `target_batch_size` is the target size, in bytes, of a batch
733fn calculate_next_block_batch_size(
734    previous_batch_size: usize,
735    previous_batch_len: usize,
736    target_batch_size: usize,
737) -> usize {
738    // The average block size of the last batch of blocks, multiplied by 2 as a safety margin for
739    // future blocks.
740    let adjusted_average_block_size = max((previous_batch_size * 2) / previous_batch_len, 1);
741
742    // Set the amount of blocks to request equal to our target batch size divided by the adjusted_average_block_size.
743    let next_batch_len = max(target_batch_size / adjusted_average_block_size, 1);
744
745    // Cap the amount of growth to 1.5x the previous batch len, to prevent a small block causing us to request
746    // a huge amount of blocks.
747    let next_batch_len = min(next_batch_len, (previous_batch_len * 3).div_ceil(2));
748
749    // Cap the length to the maximum allowed.
750    min(next_batch_len, MAX_BLOCK_BATCH_LEN)
751}