cuprate_p2p/block_downloader/
chain_tracker.rs

1use std::{cmp::min, collections::VecDeque, mem};
2
3use cuprate_fixed_bytes::ByteArrayVec;
4use tower::{Service, ServiceExt};
5
6use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE;
7use cuprate_p2p_core::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
8use cuprate_pruning::PruningSeed;
9
10use crate::{
11    block_downloader::{ChainSvcRequest, ChainSvcResponse},
12    constants::MEDIUM_BAN,
13};
14
15/// A new chain entry to add to our chain tracker.
16#[derive(Debug)]
17pub struct ChainEntry<N: NetworkZone> {
18    /// A list of block IDs.
19    pub ids: Vec<[u8; 32]>,
20    /// The peer who told us about this chain entry.
21    pub peer: InternalPeerID<N::Addr>,
22    /// The peer who told us about this chain entry's handle
23    pub handle: ConnectionHandle,
24}
25
26/// A batch of blocks to retrieve.
27#[derive(Clone)]
28pub(crate) struct BlocksToRetrieve<N: NetworkZone> {
29    /// The block IDs to get.
30    pub ids: ByteArrayVec<32>,
31    /// The hash of the last block before this batch.
32    pub prev_id: [u8; 32],
33    /// The expected height of the first block in [`BlocksToRetrieve::ids`].
34    pub start_height: usize,
35    /// The peer who told us about this batch.
36    pub peer_who_told_us: InternalPeerID<N::Addr>,
37    /// The peer who told us about this batch's handle.
38    pub peer_who_told_us_handle: ConnectionHandle,
39    /// The number of requests sent for this batch.
40    pub requests_sent: usize,
41    /// The number of times this batch has been requested from a peer and failed.
42    pub failures: usize,
43}
44
45/// An error returned from the [`ChainTracker`].
46#[derive(Debug)]
47pub(crate) enum ChainTrackerError {
48    /// The new chain entry is invalid.
49    NewEntryIsInvalid,
50    NewEntryIsEmpty,
51    /// The new chain entry does not follow from the top of our chain tracker.
52    NewEntryDoesNotFollowChain,
53    #[expect(dead_code)] // This is used for logging
54    ChainSvcError(tower::BoxError),
55}
56
57/// # Chain Tracker
58///
59/// This struct allows following a single chain. It takes in [`ChainEntry`]s and
60/// allows getting [`BlocksToRetrieve`].
61pub(crate) struct ChainTracker<N: NetworkZone> {
62    /// A list of [`ChainEntry`]s, in order, that we should request.
63    valid_entries: VecDeque<ChainEntry<N>>,
64    /// A list of [`ChainEntry`]s that are pending more [`ChainEntry`]s to check validity.
65    unknown_entries: VecDeque<ChainEntry<N>>,
66    /// The height of the first block, in the first entry in [`Self::entries`].
67    first_height: usize,
68    /// The hash of the last block in the last entry.
69    top_seen_hash: [u8; 32],
70    /// The hash of the block one below [`Self::first_height`].
71    previous_hash: [u8; 32],
72    /// The hash of the genesis block.
73    our_genesis: [u8; 32],
74}
75
76impl<N: NetworkZone> ChainTracker<N> {
77    /// Creates a new chain tracker.
78    pub(crate) async fn new<C>(
79        new_entry: ChainEntry<N>,
80        first_height: usize,
81        our_genesis: [u8; 32],
82        previous_hash: [u8; 32],
83        our_chain_svc: &mut C,
84    ) -> Result<Self, ChainTrackerError>
85    where
86        C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
87    {
88        let top_seen_hash = *new_entry.ids.last().unwrap();
89        let mut entries = VecDeque::with_capacity(1);
90        entries.push_back(new_entry);
91
92        let ChainSvcResponse::ValidateEntries { valid, unknown } = our_chain_svc
93            .ready()
94            .await
95            .map_err(ChainTrackerError::ChainSvcError)?
96            .call(ChainSvcRequest::ValidateEntries(entries, first_height))
97            .await
98            .map_err(ChainTrackerError::ChainSvcError)?
99        else {
100            unreachable!()
101        };
102
103        Ok(Self {
104            valid_entries: valid,
105            unknown_entries: unknown,
106            first_height,
107            top_seen_hash,
108            previous_hash,
109            our_genesis,
110        })
111    }
112
113    /// Returns `true` if the peer is expected to have the next block after our highest seen block
114    /// according to their pruning seed.
115    pub(crate) fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool {
116        seed.has_full_block(self.top_height(), MAX_BLOCK_HEIGHT_USIZE)
117    }
118
119    /// Returns the simple history, the highest seen block and the genesis block.
120    pub(crate) const fn get_simple_history(&self) -> [[u8; 32]; 2] {
121        [self.top_seen_hash, self.our_genesis]
122    }
123
124    /// Returns the height of the highest block we are tracking.
125    pub(crate) fn top_height(&self) -> usize {
126        let top_block_idx = self
127            .valid_entries
128            .iter()
129            .chain(self.unknown_entries.iter())
130            .map(|entry| entry.ids.len())
131            .sum::<usize>();
132
133        self.first_height + top_block_idx
134    }
135
136    /// Returns the total number of queued batches for a certain `batch_size`.
137    ///
138    /// # Panics
139    /// This function panics if `batch_size` is `0`.
140    pub(crate) fn block_requests_queued(&self, batch_size: usize) -> usize {
141        self.valid_entries
142            .iter()
143            .map(|entry| entry.ids.len().div_ceil(batch_size))
144            .sum()
145    }
146
147    /// Attempts to add an incoming [`ChainEntry`] to the chain tracker.
148    pub(crate) async fn add_entry<C>(
149        &mut self,
150        mut chain_entry: ChainEntry<N>,
151        our_chain_svc: &mut C,
152    ) -> Result<(), ChainTrackerError>
153    where
154        C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
155    {
156        if chain_entry.ids.len() == 1 {
157            return Err(ChainTrackerError::NewEntryIsEmpty);
158        }
159
160        let Some(first) = chain_entry.ids.first() else {
161            // The peer must send at lest one overlapping block.
162            chain_entry.handle.ban_peer(MEDIUM_BAN);
163            return Err(ChainTrackerError::NewEntryIsInvalid);
164        };
165
166        if *first != self.top_seen_hash {
167            return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
168        }
169
170        let new_entry = ChainEntry {
171            // ignore the first block - we already know it.
172            ids: chain_entry.ids.split_off(1),
173            peer: chain_entry.peer,
174            handle: chain_entry.handle,
175        };
176
177        self.top_seen_hash = *new_entry.ids.last().unwrap();
178
179        self.unknown_entries.push_back(new_entry);
180
181        let ChainSvcResponse::ValidateEntries { mut valid, unknown } = our_chain_svc
182            .ready()
183            .await
184            .map_err(ChainTrackerError::ChainSvcError)?
185            .call(ChainSvcRequest::ValidateEntries(
186                mem::take(&mut self.unknown_entries),
187                self.first_height
188                    + self
189                        .valid_entries
190                        .iter()
191                        .map(|e| e.ids.len())
192                        .sum::<usize>(),
193            ))
194            .await
195            .map_err(ChainTrackerError::ChainSvcError)?
196        else {
197            unreachable!()
198        };
199
200        self.valid_entries.append(&mut valid);
201        self.unknown_entries = unknown;
202
203        Ok(())
204    }
205
206    /// Returns a batch of blocks to request.
207    ///
208    /// The returned batches length will be less than or equal to `max_blocks`
209    pub(crate) fn blocks_to_get(
210        &mut self,
211        pruning_seed: &PruningSeed,
212        max_blocks: usize,
213    ) -> Option<BlocksToRetrieve<N>> {
214        if !pruning_seed.has_full_block(self.first_height, MAX_BLOCK_HEIGHT_USIZE) {
215            return None;
216        }
217
218        let entry = self.valid_entries.front_mut()?;
219
220        // Calculate the ending index for us to get in this batch, it will be one of these:
221        // - smallest out of `max_blocks`
222        // - length of the batch
223        // - index of the next pruned block for this seed
224        let end_idx = min(
225            min(entry.ids.len(), max_blocks),
226                pruning_seed
227                    .get_next_pruned_block(self.first_height, MAX_BLOCK_HEIGHT_USIZE)
228                    .expect("We use local values to calculate height which should be below the sanity limit")
229                    // Use a big value as a fallback if the seed does no pruning.
230                    .unwrap_or(MAX_BLOCK_HEIGHT_USIZE)
231                    - self.first_height,
232        );
233
234        if end_idx == 0 {
235            return None;
236        }
237
238        let ids_to_get = entry.ids.drain(0..end_idx).collect::<Vec<_>>();
239
240        let blocks = BlocksToRetrieve {
241            ids: ids_to_get.into(),
242            prev_id: self.previous_hash,
243            start_height: self.first_height,
244            peer_who_told_us: entry.peer,
245            peer_who_told_us_handle: entry.handle.clone(),
246            requests_sent: 0,
247            failures: 0,
248        };
249
250        self.first_height += end_idx;
251        // TODO: improve ByteArrayVec API.
252        self.previous_hash = blocks.ids[blocks.ids.len() - 1];
253
254        if entry.ids.is_empty() {
255            self.valid_entries.pop_front();
256        }
257
258        Some(blocks)
259    }
260}