cuprate_fast_sync/
fast_sync.rs1use std::{
2 cmp::min,
3 collections::{HashMap, VecDeque},
4 sync::OnceLock,
5};
6
7use blake3::Hasher;
8use monero_serai::{
9 block::Block,
10 transaction::{Input, Transaction},
11};
12use tower::{Service, ServiceExt};
13
14use cuprate_blockchain::service::BlockchainReadHandle;
15use cuprate_consensus::transactions::new_tx_verification_data;
16use cuprate_consensus_context::BlockchainContext;
17use cuprate_p2p::block_downloader::ChainEntry;
18use cuprate_p2p_core::NetworkZone;
19use cuprate_types::{
20 blockchain::{BlockchainReadRequest, BlockchainResponse},
21 Chain, VerifiedBlockInformation, VerifiedTransactionInformation,
22};
23
24static FAST_SYNC_HASHES: OnceLock<&[[u8; 32]]> = OnceLock::new();
26
27pub const FAST_SYNC_BATCH_LEN: usize = 512;
29
30pub fn fast_sync_stop_height() -> usize {
36 FAST_SYNC_HASHES.get().unwrap().len() * FAST_SYNC_BATCH_LEN
37}
38
39pub fn set_fast_sync_hashes(hashes: &'static [[u8; 32]]) {
45 FAST_SYNC_HASHES.set(hashes).unwrap();
46}
47
48pub async fn validate_entries<N: NetworkZone>(
65 mut entries: VecDeque<ChainEntry<N>>,
66 start_height: usize,
67 blockchain_read_handle: &mut BlockchainReadHandle,
68) -> Result<(VecDeque<ChainEntry<N>>, VecDeque<ChainEntry<N>>), tower::BoxError> {
69 if start_height >= fast_sync_stop_height() {
71 return Ok((entries, VecDeque::new()));
72 }
73
74 let hashes_start_height = (start_height / FAST_SYNC_BATCH_LEN) * FAST_SYNC_BATCH_LEN;
91 let amount_of_hashes = entries.iter().map(|e| e.ids.len()).sum::<usize>();
92 let last_height = amount_of_hashes + start_height;
93
94 let hashes_stop_height = min(
95 (last_height / FAST_SYNC_BATCH_LEN) * FAST_SYNC_BATCH_LEN,
96 fast_sync_stop_height(),
97 );
98
99 let mut hashes_stop_diff_last_height = last_height - hashes_stop_height;
100
101 let BlockchainResponse::BlockHashInRange(starting_hashes) = blockchain_read_handle
103 .ready()
104 .await?
105 .call(BlockchainReadRequest::BlockHashInRange(
106 hashes_start_height..start_height,
107 Chain::Main,
108 ))
109 .await?
110 else {
111 unreachable!()
112 };
113
114 if amount_of_hashes + starting_hashes.len() < FAST_SYNC_BATCH_LEN {
116 return Ok((VecDeque::new(), entries));
117 }
118
119 let mut unknown = VecDeque::new();
120
121 while !entries.is_empty() && hashes_stop_diff_last_height != 0 {
124 let back = entries.back_mut().unwrap();
125
126 if back.ids.len() >= hashes_stop_diff_last_height {
127 unknown.push_front(ChainEntry {
129 ids: back
130 .ids
131 .drain((back.ids.len() - hashes_stop_diff_last_height)..)
132 .collect(),
133 peer: back.peer,
134 handle: back.handle.clone(),
135 });
136
137 break;
138 }
139
140 let back = entries.pop_back().unwrap();
142 hashes_stop_diff_last_height -= back.ids.len();
143 unknown.push_front(back);
144 }
145
146 let mut hasher = Hasher::default();
148 let mut last_i = 1;
149 for (i, hash) in starting_hashes
150 .iter()
151 .chain(entries.iter().flat_map(|e| e.ids.iter()))
152 .enumerate()
153 {
154 hasher.update(hash);
155
156 if (i + 1) % FAST_SYNC_BATCH_LEN == 0 {
157 let got_hash = hasher.finalize();
158
159 if got_hash
160 != FAST_SYNC_HASHES.get().unwrap()
161 [get_hash_index_for_height(hashes_start_height + i)]
162 {
163 return Err("Hashes do not match".into());
164 }
165 hasher.reset();
166 }
167
168 last_i = i + 1;
169 }
170 assert_eq!(last_i % FAST_SYNC_BATCH_LEN, 0);
172
173 Ok((entries, unknown))
174}
175
176const fn get_hash_index_for_height(height: usize) -> usize {
178 height / FAST_SYNC_BATCH_LEN
179}
180
181pub fn block_to_verified_block_information(
187 block: Block,
188 txs: Vec<Transaction>,
189 blockchin_ctx: &BlockchainContext,
190) -> VerifiedBlockInformation {
191 let block_hash = block.hash();
192
193 let block_blob = block.serialize();
194
195 let Some(Input::Gen(height)) = block.miner_transaction.prefix().inputs.first() else {
196 panic!("fast sync block invalid");
197 };
198
199 assert_eq!(
200 *height, blockchin_ctx.chain_height,
201 "fast sync block invalid"
202 );
203
204 let mut txs = txs
205 .into_iter()
206 .map(|tx| {
207 let data = new_tx_verification_data(tx).expect("fast sync block invalid");
208
209 (data.tx_hash, data)
210 })
211 .collect::<HashMap<_, _>>();
212
213 let mut verified_txs = Vec::with_capacity(txs.len());
214 for tx in &block.transactions {
215 let data = txs.remove(tx).expect("fast sync block invalid");
216
217 verified_txs.push(VerifiedTransactionInformation {
218 tx_blob: data.tx_blob,
219 tx_weight: data.tx_weight,
220 fee: data.fee,
221 tx_hash: data.tx_hash,
222 tx: data.tx,
223 });
224 }
225
226 let total_fees = verified_txs.iter().map(|tx| tx.fee).sum::<u64>();
227 let total_outputs = block
228 .miner_transaction
229 .prefix()
230 .outputs
231 .iter()
232 .map(|output| output.amount.unwrap_or(0))
233 .sum::<u64>();
234
235 let generated_coins = total_outputs - total_fees;
236
237 let weight = block.miner_transaction.weight()
238 + verified_txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
239
240 VerifiedBlockInformation {
241 block_blob,
242 txs: verified_txs,
243 block_hash,
244 pow_hash: [u8::MAX; 32],
245 height: *height,
246 generated_coins,
247 weight,
248 long_term_weight: blockchin_ctx.next_block_long_term_weight(weight),
249 cumulative_difficulty: blockchin_ctx.cumulative_difficulty + blockchin_ctx.next_difficulty,
250 block,
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use std::{collections::VecDeque, slice, sync::LazyLock};
257
258 use proptest::proptest;
259
260 use cuprate_p2p::block_downloader::ChainEntry;
261 use cuprate_p2p_core::{client::InternalPeerID, handles::HandleBuilder, ClearNet};
262
263 use crate::{
264 fast_sync_stop_height, set_fast_sync_hashes, validate_entries, FAST_SYNC_BATCH_LEN,
265 };
266
267 static HASHES: LazyLock<&[[u8; 32]]> = LazyLock::new(|| {
268 let hashes = (0..FAST_SYNC_BATCH_LEN * 2000)
269 .map(|i| {
270 let mut ret = [0; 32];
271 ret[..8].copy_from_slice(&i.to_le_bytes());
272 ret
273 })
274 .collect::<Vec<_>>();
275
276 let hashes = hashes.leak();
277
278 let fast_sync_hashes = hashes
279 .chunks(FAST_SYNC_BATCH_LEN)
280 .map(|chunk| {
281 let len = chunk.len() * 32;
282 let bytes = chunk.as_ptr().cast::<u8>();
283
284 unsafe { blake3::hash(slice::from_raw_parts(bytes, len)).into() }
288 })
289 .collect::<Vec<_>>();
290
291 set_fast_sync_hashes(fast_sync_hashes.leak());
292
293 hashes
294 });
295
296 proptest! {
297 #[test]
298 fn valid_entry(len in 0_usize..1_500_000) {
299 let mut ids = HASHES.to_vec();
300 ids.resize(len, [0_u8; 32]);
301
302 let handle = HandleBuilder::new().build();
303
304 let entry = ChainEntry {
305 ids,
306 peer: InternalPeerID::Unknown(1),
307 handle: handle.1
308 };
309
310 let data_dir = tempfile::tempdir().unwrap();
311
312 tokio_test::block_on(async move {
313 let blockchain_config = cuprate_blockchain::config::ConfigBuilder::new()
314 .data_directory(data_dir.path().to_path_buf())
315 .build();
316
317 let (mut blockchain_read_handle, _, _) =
318 cuprate_blockchain::service::init(blockchain_config).unwrap();
319
320
321 let ret = validate_entries::<ClearNet>(VecDeque::from([entry]), 0, &mut blockchain_read_handle).await.unwrap();
322
323 let len_left = ret.0.iter().map(|e| e.ids.len()).sum::<usize>();
324 let len_right = ret.1.iter().map(|e| e.ids.len()).sum::<usize>();
325
326 assert_eq!(len_left + len_right, len);
327 assert!(len_left <= fast_sync_stop_height());
328 assert!(len_right < FAST_SYNC_BATCH_LEN || len > fast_sync_stop_height());
329 });
330 }
331
332 #[test]
333 fn single_hash_entries(len in 0_usize..1_500_000) {
334 let handle = HandleBuilder::new().build();
335 let entries = (0..len).map(|i| {
336 ChainEntry {
337 ids: vec![HASHES.get(i).copied().unwrap_or_default()],
338 peer: InternalPeerID::Unknown(1),
339 handle: handle.1.clone()
340 }
341 }).collect();
342
343 let data_dir = tempfile::tempdir().unwrap();
344
345 tokio_test::block_on(async move {
346 let blockchain_config = cuprate_blockchain::config::ConfigBuilder::new()
347 .data_directory(data_dir.path().to_path_buf())
348 .build();
349
350 let (mut blockchain_read_handle, _, _) =
351 cuprate_blockchain::service::init(blockchain_config).unwrap();
352
353
354 let ret = validate_entries::<ClearNet>(entries, 0, &mut blockchain_read_handle).await.unwrap();
355
356 let len_left = ret.0.iter().map(|e| e.ids.len()).sum::<usize>();
357 let len_right = ret.1.iter().map(|e| e.ids.len()).sum::<usize>();
358
359 assert_eq!(len_left + len_right, len);
360 assert!(len_left <= fast_sync_stop_height());
361 assert!(len_right < FAST_SYNC_BATCH_LEN || len > fast_sync_stop_height());
362 });
363 }
364
365 #[test]
366 fn not_enough_hashes(len in 0_usize..FAST_SYNC_BATCH_LEN) {
367 let hashes_start_height = FAST_SYNC_BATCH_LEN * 1234;
368
369 let handle = HandleBuilder::new().build();
370 let entry = ChainEntry {
371 ids: HASHES[hashes_start_height..(hashes_start_height + len)].to_vec(),
372 peer: InternalPeerID::Unknown(1),
373 handle: handle.1
374 };
375
376 let data_dir = tempfile::tempdir().unwrap();
377
378 tokio_test::block_on(async move {
379 let blockchain_config = cuprate_blockchain::config::ConfigBuilder::new()
380 .data_directory(data_dir.path().to_path_buf())
381 .build();
382
383 let (mut blockchain_read_handle, _, _) =
384 cuprate_blockchain::service::init(blockchain_config).unwrap();
385
386
387 let ret = validate_entries::<ClearNet>(VecDeque::from([entry]), 0, &mut blockchain_read_handle).await.unwrap();
388
389 let len_left = ret.0.iter().map(|e| e.ids.len()).sum::<usize>();
390 let len_right = ret.1.iter().map(|e| e.ids.len()).sum::<usize>();
391
392 assert_eq!(len_right, len);
393 assert_eq!(len_left, 0);
394 });
395 }
396 }
397}