cuprate_p2p/block_downloader/
download_batch.rs

1use std::collections::HashSet;
2
3use monero_serai::{block::Block, transaction::Transaction};
4use rayon::prelude::*;
5use tower::{Service, ServiceExt};
6use tracing::instrument;
7
8use cuprate_fixed_bytes::ByteArrayVec;
9use cuprate_helper::asynch::rayon_spawn_async;
10use cuprate_p2p_core::{
11    handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse, ProtocolRequest,
12    ProtocolResponse,
13};
14use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse};
15
16use crate::{
17    block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse},
18    constants::{MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN},
19    peer_set::ClientDropGuard,
20};
21
22/// Attempts to request a batch of blocks from a peer, returning [`BlockDownloadTaskResponse`].
23#[instrument(
24    level = "debug",
25    name = "download_batch",
26    skip_all,
27    fields(
28        start_height = expected_start_height,
29        attempt = _attempt
30    )
31)]
32#[expect(clippy::used_underscore_binding)]
33pub async fn download_batch_task<N: NetworkZone>(
34    client: ClientDropGuard<N>,
35    ids: ByteArrayVec<32>,
36    previous_id: [u8; 32],
37    expected_start_height: usize,
38    _attempt: usize,
39) -> BlockDownloadTaskResponse<N> {
40    BlockDownloadTaskResponse {
41        start_height: expected_start_height,
42        result: request_batch_from_peer(client, ids, previous_id, expected_start_height).await,
43    }
44}
45
46/// Requests a sequential batch of blocks from a peer.
47///
48/// This function will validate the blocks that were downloaded were the ones asked for and that they match
49/// the expected height.
50async fn request_batch_from_peer<N: NetworkZone>(
51    mut client: ClientDropGuard<N>,
52    ids: ByteArrayVec<32>,
53    previous_id: [u8; 32],
54    expected_start_height: usize,
55) -> Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError> {
56    let request = PeerRequest::Protocol(ProtocolRequest::GetObjects(GetObjectsRequest {
57        blocks: ids.clone(),
58        pruned: false,
59    }));
60
61    // Request the blocks and add a timeout to the request
62    let blocks_response = {
63        let PeerResponse::Protocol(ProtocolResponse::GetObjects(blocks_response)) =
64            client.ready().await?.call(request).await?
65        else {
66            panic!("Connection task returned wrong response.");
67        };
68
69        blocks_response
70    };
71
72    // Initial sanity checks
73    if blocks_response.blocks.len() > ids.len() {
74        client.info.handle.ban_peer(MEDIUM_BAN);
75        return Err(BlockDownloadError::PeersResponseWasInvalid);
76    }
77
78    if blocks_response.blocks.len() != ids.len() {
79        return Err(BlockDownloadError::PeerDidNotHaveRequestedData);
80    }
81    let peer_handle = client.info.handle.clone();
82
83    let blocks = rayon_spawn_async(move || {
84        deserialize_batch(
85            blocks_response,
86            expected_start_height,
87            ids,
88            previous_id,
89            peer_handle,
90        )
91    })
92    .await;
93
94    let batch = blocks.inspect_err(|e| {
95        // If the peers response was invalid, ban it.
96        if matches!(e, BlockDownloadError::PeersResponseWasInvalid) {
97            client.info.handle.ban_peer(MEDIUM_BAN);
98        }
99    })?;
100
101    Ok((client, batch))
102}
103
104#[expect(clippy::needless_pass_by_value)]
105fn deserialize_batch(
106    blocks_response: GetObjectsResponse,
107    expected_start_height: usize,
108    requested_ids: ByteArrayVec<32>,
109    previous_id: [u8; 32],
110    peer_handle: ConnectionHandle,
111) -> Result<BlockBatch, BlockDownloadError> {
112    let blocks = blocks_response
113        .blocks
114        .into_par_iter()
115        .enumerate()
116        .map(|(i, block_entry)| {
117            let expected_height = i + expected_start_height;
118
119            let mut size = block_entry.block.len();
120
121            let block = Block::read(&mut block_entry.block.as_ref())
122                .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
123
124            let block_hash = block.hash();
125
126            // Check the block matches the one requested and the peer sent enough transactions.
127            if requested_ids[i] != block_hash || block.transactions.len() != block_entry.txs.len() {
128                return Err(BlockDownloadError::PeersResponseWasInvalid);
129            }
130
131            // Check that the previous ID is correct for the first block.
132            // This is to protect use against banning the wrong peer.
133            // This must happen after the hash check.
134            if i == 0 && block.header.previous != previous_id {
135                tracing::warn!(
136                    "Invalid chain, peer told us a block follows the chain when it doesn't."
137                );
138
139                // This peer probably did nothing wrong, it was the peer who told us this blockID which
140                // is misbehaving.
141                return Err(BlockDownloadError::ChainInvalid);
142            }
143
144            // Check the height lines up as expected.
145            // This must happen after the hash check.
146            if block
147                .number()
148                .is_none_or(|height| height != expected_height)
149            {
150                tracing::warn!(
151                    "Invalid chain, expected height: {expected_height}, got height: {:?}",
152                    block.number()
153                );
154
155                // This peer probably did nothing wrong, it was the peer who told us this blockID which
156                // is misbehaving.
157                return Err(BlockDownloadError::ChainInvalid);
158            }
159
160            // Deserialize the transactions.
161            let txs = block_entry
162                .txs
163                .take_normal()
164                .ok_or(BlockDownloadError::PeersResponseWasInvalid)?
165                .into_iter()
166                .map(|tx_blob| {
167                    size += tx_blob.len();
168
169                    if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
170                        return Err(BlockDownloadError::PeersResponseWasInvalid);
171                    }
172
173                    Transaction::read(&mut tx_blob.as_ref())
174                        .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)
175                })
176                .collect::<Result<Vec<_>, _>>()?;
177
178            // Make sure the transactions in the block were the ones the peer sent.
179            let mut expected_txs = block.transactions.iter().collect::<HashSet<_>>();
180
181            for tx in &txs {
182                if !expected_txs.remove(&tx.hash()) {
183                    return Err(BlockDownloadError::PeersResponseWasInvalid);
184                }
185            }
186
187            if !expected_txs.is_empty() {
188                return Err(BlockDownloadError::PeersResponseWasInvalid);
189            }
190
191            Ok(((block, txs), size))
192        })
193        .collect::<Result<(Vec<_>, Vec<_>), _>>()?;
194
195    Ok(BlockBatch {
196        blocks: blocks.0,
197        size: blocks.1.into_iter().sum(),
198        peer_handle,
199    })
200}