cuprate_p2p/block_downloader/
chain_tracker.rs1use std::{cmp::min, collections::VecDeque, mem};
2
3use cuprate_fixed_bytes::ByteArrayVec;
4use tower::{Service, ServiceExt};
5
6use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE;
7use cuprate_p2p_core::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
8use cuprate_pruning::PruningSeed;
9
10use crate::{
11 block_downloader::{ChainSvcRequest, ChainSvcResponse},
12 constants::MEDIUM_BAN,
13};
14
15#[derive(Debug)]
17pub struct ChainEntry<N: NetworkZone> {
18 pub ids: Vec<[u8; 32]>,
20 pub peer: InternalPeerID<N::Addr>,
22 pub handle: ConnectionHandle,
24}
25
26#[derive(Clone)]
28pub(crate) struct BlocksToRetrieve<N: NetworkZone> {
29 pub ids: ByteArrayVec<32>,
31 pub prev_id: [u8; 32],
33 pub start_height: usize,
35 pub peer_who_told_us: InternalPeerID<N::Addr>,
37 pub peer_who_told_us_handle: ConnectionHandle,
39 pub requests_sent: usize,
41 pub failures: usize,
43}
44
45#[derive(Debug)]
47pub(crate) enum ChainTrackerError {
48 NewEntryIsInvalid,
50 NewEntryIsEmpty,
51 NewEntryDoesNotFollowChain,
53 #[expect(dead_code)] ChainSvcError(tower::BoxError),
55}
56
57pub(crate) struct ChainTracker<N: NetworkZone> {
62 valid_entries: VecDeque<ChainEntry<N>>,
64 unknown_entries: VecDeque<ChainEntry<N>>,
66 first_height: usize,
68 top_seen_hash: [u8; 32],
70 previous_hash: [u8; 32],
72 our_genesis: [u8; 32],
74}
75
76impl<N: NetworkZone> ChainTracker<N> {
77 pub(crate) async fn new<C>(
79 new_entry: ChainEntry<N>,
80 first_height: usize,
81 our_genesis: [u8; 32],
82 previous_hash: [u8; 32],
83 our_chain_svc: &mut C,
84 ) -> Result<Self, ChainTrackerError>
85 where
86 C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
87 {
88 let top_seen_hash = *new_entry.ids.last().unwrap();
89 let mut entries = VecDeque::with_capacity(1);
90 entries.push_back(new_entry);
91
92 let ChainSvcResponse::ValidateEntries { valid, unknown } = our_chain_svc
93 .ready()
94 .await
95 .map_err(ChainTrackerError::ChainSvcError)?
96 .call(ChainSvcRequest::ValidateEntries(entries, first_height))
97 .await
98 .map_err(ChainTrackerError::ChainSvcError)?
99 else {
100 unreachable!()
101 };
102
103 Ok(Self {
104 valid_entries: valid,
105 unknown_entries: unknown,
106 first_height,
107 top_seen_hash,
108 previous_hash,
109 our_genesis,
110 })
111 }
112
113 pub(crate) fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool {
116 seed.has_full_block(self.top_height(), MAX_BLOCK_HEIGHT_USIZE)
117 }
118
119 pub(crate) const fn get_simple_history(&self) -> [[u8; 32]; 2] {
121 [self.top_seen_hash, self.our_genesis]
122 }
123
124 pub(crate) fn top_height(&self) -> usize {
126 let top_block_idx = self
127 .valid_entries
128 .iter()
129 .chain(self.unknown_entries.iter())
130 .map(|entry| entry.ids.len())
131 .sum::<usize>();
132
133 self.first_height + top_block_idx
134 }
135
136 pub(crate) fn block_requests_queued(&self, batch_size: usize) -> usize {
141 self.valid_entries
142 .iter()
143 .map(|entry| entry.ids.len().div_ceil(batch_size))
144 .sum()
145 }
146
147 pub(crate) async fn add_entry<C>(
149 &mut self,
150 mut chain_entry: ChainEntry<N>,
151 our_chain_svc: &mut C,
152 ) -> Result<(), ChainTrackerError>
153 where
154 C: Service<ChainSvcRequest<N>, Response = ChainSvcResponse<N>, Error = tower::BoxError>,
155 {
156 if chain_entry.ids.len() == 1 {
157 return Err(ChainTrackerError::NewEntryIsEmpty);
158 }
159
160 let Some(first) = chain_entry.ids.first() else {
161 chain_entry.handle.ban_peer(MEDIUM_BAN);
163 return Err(ChainTrackerError::NewEntryIsInvalid);
164 };
165
166 if *first != self.top_seen_hash {
167 return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
168 }
169
170 let new_entry = ChainEntry {
171 ids: chain_entry.ids.split_off(1),
173 peer: chain_entry.peer,
174 handle: chain_entry.handle,
175 };
176
177 self.top_seen_hash = *new_entry.ids.last().unwrap();
178
179 self.unknown_entries.push_back(new_entry);
180
181 let ChainSvcResponse::ValidateEntries { mut valid, unknown } = our_chain_svc
182 .ready()
183 .await
184 .map_err(ChainTrackerError::ChainSvcError)?
185 .call(ChainSvcRequest::ValidateEntries(
186 mem::take(&mut self.unknown_entries),
187 self.first_height
188 + self
189 .valid_entries
190 .iter()
191 .map(|e| e.ids.len())
192 .sum::<usize>(),
193 ))
194 .await
195 .map_err(ChainTrackerError::ChainSvcError)?
196 else {
197 unreachable!()
198 };
199
200 self.valid_entries.append(&mut valid);
201 self.unknown_entries = unknown;
202
203 Ok(())
204 }
205
206 pub(crate) fn blocks_to_get(
210 &mut self,
211 pruning_seed: &PruningSeed,
212 max_blocks: usize,
213 ) -> Option<BlocksToRetrieve<N>> {
214 if !pruning_seed.has_full_block(self.first_height, MAX_BLOCK_HEIGHT_USIZE) {
215 return None;
216 }
217
218 let entry = self.valid_entries.front_mut()?;
219
220 let end_idx = min(
225 min(entry.ids.len(), max_blocks),
226 pruning_seed
227 .get_next_pruned_block(self.first_height, MAX_BLOCK_HEIGHT_USIZE)
228 .expect("We use local values to calculate height which should be below the sanity limit")
229 .unwrap_or(MAX_BLOCK_HEIGHT_USIZE)
231 - self.first_height,
232 );
233
234 if end_idx == 0 {
235 return None;
236 }
237
238 let ids_to_get = entry.ids.drain(0..end_idx).collect::<Vec<_>>();
239
240 let blocks = BlocksToRetrieve {
241 ids: ids_to_get.into(),
242 prev_id: self.previous_hash,
243 start_height: self.first_height,
244 peer_who_told_us: entry.peer,
245 peer_who_told_us_handle: entry.handle.clone(),
246 requests_sent: 0,
247 failures: 0,
248 };
249
250 self.first_height += end_idx;
251 self.previous_hash = blocks.ids[blocks.ids.len() - 1];
253
254 if entry.ids.is_empty() {
255 self.valid_entries.pop_front();
256 }
257
258 Some(blocks)
259 }
260}