cuprate_p2p/block_downloader/
download_batch.rs1use std::collections::HashSet;
2
3use monero_serai::{block::Block, transaction::Transaction};
4use rayon::prelude::*;
5use tower::{Service, ServiceExt};
6use tracing::instrument;
7
8use cuprate_fixed_bytes::ByteArrayVec;
9use cuprate_helper::asynch::rayon_spawn_async;
10use cuprate_p2p_core::{
11 handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse, ProtocolRequest,
12 ProtocolResponse,
13};
14use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse};
15
16use crate::{
17 block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse},
18 constants::{MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN},
19 peer_set::ClientDropGuard,
20};
21
22#[instrument(
24 level = "debug",
25 name = "download_batch",
26 skip_all,
27 fields(
28 start_height = expected_start_height,
29 attempt = _attempt
30 )
31)]
32#[expect(clippy::used_underscore_binding)]
33pub async fn download_batch_task<N: NetworkZone>(
34 client: ClientDropGuard<N>,
35 ids: ByteArrayVec<32>,
36 previous_id: [u8; 32],
37 expected_start_height: usize,
38 _attempt: usize,
39) -> BlockDownloadTaskResponse<N> {
40 BlockDownloadTaskResponse {
41 start_height: expected_start_height,
42 result: request_batch_from_peer(client, ids, previous_id, expected_start_height).await,
43 }
44}
45
46async fn request_batch_from_peer<N: NetworkZone>(
51 mut client: ClientDropGuard<N>,
52 ids: ByteArrayVec<32>,
53 previous_id: [u8; 32],
54 expected_start_height: usize,
55) -> Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError> {
56 let request = PeerRequest::Protocol(ProtocolRequest::GetObjects(GetObjectsRequest {
57 blocks: ids.clone(),
58 pruned: false,
59 }));
60
61 let blocks_response = {
63 let PeerResponse::Protocol(ProtocolResponse::GetObjects(blocks_response)) =
64 client.ready().await?.call(request).await?
65 else {
66 panic!("Connection task returned wrong response.");
67 };
68
69 blocks_response
70 };
71
72 if blocks_response.blocks.len() > ids.len() {
74 client.info.handle.ban_peer(MEDIUM_BAN);
75 return Err(BlockDownloadError::PeersResponseWasInvalid);
76 }
77
78 if blocks_response.blocks.len() != ids.len() {
79 return Err(BlockDownloadError::PeerDidNotHaveRequestedData);
80 }
81 let peer_handle = client.info.handle.clone();
82
83 let blocks = rayon_spawn_async(move || {
84 deserialize_batch(
85 blocks_response,
86 expected_start_height,
87 ids,
88 previous_id,
89 peer_handle,
90 )
91 })
92 .await;
93
94 let batch = blocks.inspect_err(|e| {
95 if matches!(e, BlockDownloadError::PeersResponseWasInvalid) {
97 client.info.handle.ban_peer(MEDIUM_BAN);
98 }
99 })?;
100
101 Ok((client, batch))
102}
103
104#[expect(clippy::needless_pass_by_value)]
105fn deserialize_batch(
106 blocks_response: GetObjectsResponse,
107 expected_start_height: usize,
108 requested_ids: ByteArrayVec<32>,
109 previous_id: [u8; 32],
110 peer_handle: ConnectionHandle,
111) -> Result<BlockBatch, BlockDownloadError> {
112 let blocks = blocks_response
113 .blocks
114 .into_par_iter()
115 .enumerate()
116 .map(|(i, block_entry)| {
117 let expected_height = i + expected_start_height;
118
119 let mut size = block_entry.block.len();
120
121 let block = Block::read(&mut block_entry.block.as_ref())
122 .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
123
124 let block_hash = block.hash();
125
126 if requested_ids[i] != block_hash || block.transactions.len() != block_entry.txs.len() {
128 return Err(BlockDownloadError::PeersResponseWasInvalid);
129 }
130
131 if i == 0 && block.header.previous != previous_id {
135 tracing::warn!(
136 "Invalid chain, peer told us a block follows the chain when it doesn't."
137 );
138
139 return Err(BlockDownloadError::ChainInvalid);
142 }
143
144 if block
147 .number()
148 .is_none_or(|height| height != expected_height)
149 {
150 tracing::warn!(
151 "Invalid chain, expected height: {expected_height}, got height: {:?}",
152 block.number()
153 );
154
155 return Err(BlockDownloadError::ChainInvalid);
158 }
159
160 let txs = block_entry
162 .txs
163 .take_normal()
164 .ok_or(BlockDownloadError::PeersResponseWasInvalid)?
165 .into_iter()
166 .map(|tx_blob| {
167 size += tx_blob.len();
168
169 if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
170 return Err(BlockDownloadError::PeersResponseWasInvalid);
171 }
172
173 Transaction::read(&mut tx_blob.as_ref())
174 .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)
175 })
176 .collect::<Result<Vec<_>, _>>()?;
177
178 let mut expected_txs = block.transactions.iter().collect::<HashSet<_>>();
180
181 for tx in &txs {
182 if !expected_txs.remove(&tx.hash()) {
183 return Err(BlockDownloadError::PeersResponseWasInvalid);
184 }
185 }
186
187 if !expected_txs.is_empty() {
188 return Err(BlockDownloadError::PeersResponseWasInvalid);
189 }
190
191 Ok(((block, txs), size))
192 })
193 .collect::<Result<(Vec<_>, Vec<_>), _>>()?;
194
195 Ok(BlockBatch {
196 blocks: blocks.0,
197 size: blocks.1.into_iter().sum(),
198 peer_handle,
199 })
200}