cuprated/p2p/
request_handler.rs

1use std::{
2    collections::HashSet,
3    future::{ready, Ready},
4    hash::Hash,
5    task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use futures::{
10    future::{BoxFuture, Shared},
11    FutureExt,
12};
13use monero_serai::{block::Block, transaction::Transaction};
14use tokio::sync::{broadcast, oneshot, watch};
15use tokio_stream::wrappers::WatchStream;
16use tower::{Service, ServiceExt};
17
18use cuprate_blockchain::service::BlockchainReadHandle;
19use cuprate_consensus::{
20    transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
21    BlockchainContextService,
22};
23use cuprate_dandelion_tower::TxState;
24use cuprate_fixed_bytes::ByteArrayVec;
25use cuprate_helper::cast::u64_to_usize;
26use cuprate_helper::{
27    asynch::rayon_spawn_async,
28    cast::usize_to_u64,
29    map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
30};
31use cuprate_p2p::constants::{
32    MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN,
33};
34use cuprate_p2p_core::{
35    client::{InternalPeerID, PeerInformation},
36    NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse,
37};
38use cuprate_txpool::service::TxpoolReadHandle;
39use cuprate_types::{
40    blockchain::{BlockchainReadRequest, BlockchainResponse},
41    BlockCompleteEntry, TransactionBlobs, TxsInBlock,
42};
43use cuprate_wire::protocol::{
44    ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
45    GetObjectsResponse, NewFluffyBlock, NewTransactions,
46};
47
48use crate::{
49    blockchain::interface::{self as blockchain_interface, IncomingBlockError},
50    constants::PANIC_CRITICAL_SERVICE_ERROR,
51    p2p::CrossNetworkInternalPeerId,
52    txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs},
53};
54
55/// The P2P protocol request handler [`MakeService`](tower::MakeService).
56#[derive(Clone)]
57pub struct P2pProtocolRequestHandlerMaker {
58    pub blockchain_read_handle: BlockchainReadHandle,
59    pub blockchain_context_service: BlockchainContextService,
60    pub txpool_read_handle: TxpoolReadHandle,
61
62    /// The [`IncomingTxHandler`], wrapped in an [`Option`] as there is a cyclic reference between [`P2pProtocolRequestHandlerMaker`]
63    /// and the [`IncomingTxHandler`].
64    pub incoming_tx_handler: Option<IncomingTxHandler>,
65
66    /// A [`Future`](std::future::Future) that produces the [`IncomingTxHandler`].
67    pub incoming_tx_handler_fut: Shared<oneshot::Receiver<IncomingTxHandler>>,
68}
69
70impl<A: NetZoneAddress> Service<PeerInformation<A>> for P2pProtocolRequestHandlerMaker
71where
72    InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
73{
74    type Response = P2pProtocolRequestHandler<A>;
75    type Error = tower::BoxError;
76    type Future = Ready<Result<Self::Response, Self::Error>>;
77
78    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
79        if self.incoming_tx_handler.is_none() {
80            return self
81                .incoming_tx_handler_fut
82                .poll_unpin(cx)
83                .map(|incoming_tx_handler| {
84                    self.incoming_tx_handler = Some(incoming_tx_handler?);
85                    Ok(())
86                });
87        }
88
89        Poll::Ready(Ok(()))
90    }
91
92    fn call(&mut self, peer_information: PeerInformation<A>) -> Self::Future {
93        let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else {
94            panic!("poll_ready was not called or did not return `Poll::Ready`")
95        };
96
97        // TODO: check sync info?
98
99        let blockchain_read_handle = self.blockchain_read_handle.clone();
100        let txpool_read_handle = self.txpool_read_handle.clone();
101
102        ready(Ok(P2pProtocolRequestHandler {
103            peer_information,
104            blockchain_read_handle,
105            blockchain_context_service: self.blockchain_context_service.clone(),
106            txpool_read_handle,
107            incoming_tx_handler,
108        }))
109    }
110}
111
112/// The P2P protocol request handler.
113#[derive(Clone)]
114pub struct P2pProtocolRequestHandler<N: NetZoneAddress> {
115    peer_information: PeerInformation<N>,
116    blockchain_read_handle: BlockchainReadHandle,
117    blockchain_context_service: BlockchainContextService,
118    txpool_read_handle: TxpoolReadHandle,
119    incoming_tx_handler: IncomingTxHandler,
120}
121
122impl<A: NetZoneAddress> Service<ProtocolRequest> for P2pProtocolRequestHandler<A>
123where
124    InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
125{
126    type Response = ProtocolResponse;
127    type Error = anyhow::Error;
128    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
129
130    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131        Poll::Ready(Ok(()))
132    }
133
134    fn call(&mut self, request: ProtocolRequest) -> Self::Future {
135        match request {
136            ProtocolRequest::GetObjects(r) => {
137                get_objects(r, self.blockchain_read_handle.clone()).boxed()
138            }
139            ProtocolRequest::GetChain(r) => {
140                get_chain(r, self.blockchain_read_handle.clone()).boxed()
141            }
142            ProtocolRequest::FluffyMissingTxs(r) => {
143                fluffy_missing_txs(r, self.blockchain_read_handle.clone()).boxed()
144            }
145            ProtocolRequest::NewBlock(_) => ready(Err(anyhow::anyhow!(
146                "Peer sent a full block when we support fluffy blocks"
147            )))
148            .boxed(),
149            ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block(
150                self.peer_information.clone(),
151                r,
152                self.blockchain_read_handle.clone(),
153                self.txpool_read_handle.clone(),
154            )
155            .boxed(),
156            ProtocolRequest::NewTransactions(r) => new_transactions(
157                self.peer_information.clone(),
158                r,
159                self.blockchain_context_service.clone(),
160                self.incoming_tx_handler.clone(),
161            )
162            .boxed(),
163            ProtocolRequest::GetTxPoolCompliment(_) => ready(Ok(ProtocolResponse::NA)).boxed(), // TODO: should we support this?
164        }
165    }
166}
167
168//---------------------------------------------------------------------------------------------------- Handler functions
169
170/// [`ProtocolRequest::GetObjects`]
171async fn get_objects(
172    request: GetObjectsRequest,
173    mut blockchain_read_handle: BlockchainReadHandle,
174) -> anyhow::Result<ProtocolResponse> {
175    if request.blocks.len() > MAX_BLOCK_BATCH_LEN {
176        anyhow::bail!("Peer requested more blocks than allowed.")
177    }
178
179    let block_hashes: Vec<[u8; 32]> = (&request.blocks).into();
180    // deallocate the backing `Bytes`.
181    drop(request);
182
183    let BlockchainResponse::BlockCompleteEntries {
184        blocks,
185        missing_hashes,
186        blockchain_height,
187    } = blockchain_read_handle
188        .ready()
189        .await?
190        .call(BlockchainReadRequest::BlockCompleteEntries(block_hashes))
191        .await?
192    else {
193        unreachable!();
194    };
195
196    Ok(ProtocolResponse::GetObjects(GetObjectsResponse {
197        blocks,
198        missed_ids: ByteArrayVec::from(missing_hashes),
199        current_blockchain_height: usize_to_u64(blockchain_height),
200    }))
201}
202
203/// [`ProtocolRequest::GetChain`]
204async fn get_chain(
205    request: ChainRequest,
206    mut blockchain_read_handle: BlockchainReadHandle,
207) -> anyhow::Result<ProtocolResponse> {
208    if request.block_ids.len() > MAX_BLOCKS_IDS_IN_CHAIN_ENTRY {
209        anyhow::bail!("Peer sent too many block hashes in chain request.")
210    }
211
212    let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into();
213    let want_pruned_data = request.prune;
214    // deallocate the backing `Bytes`.
215    drop(request);
216
217    let BlockchainResponse::NextChainEntry {
218        start_height,
219        chain_height,
220        block_ids,
221        block_weights,
222        cumulative_difficulty,
223        first_block_blob,
224    } = blockchain_read_handle
225        .ready()
226        .await?
227        .call(BlockchainReadRequest::NextChainEntry(block_hashes, 10_000))
228        .await?
229    else {
230        unreachable!();
231    };
232
233    let Some(start_height) = start_height else {
234        anyhow::bail!("The peers chain has a different genesis block than ours.");
235    };
236
237    let (cumulative_difficulty_low64, cumulative_difficulty_top64) =
238        split_u128_into_low_high_bits(cumulative_difficulty);
239
240    Ok(ProtocolResponse::GetChain(ChainResponse {
241        start_height: usize_to_u64(start_height),
242        total_height: usize_to_u64(chain_height),
243        cumulative_difficulty_low64,
244        cumulative_difficulty_top64,
245        m_block_ids: ByteArrayVec::from(block_ids),
246        first_block: first_block_blob.map_or(Bytes::new(), Bytes::from),
247        // only needed when pruned
248        m_block_weights: if want_pruned_data {
249            block_weights.into_iter().map(usize_to_u64).collect()
250        } else {
251            vec![]
252        },
253    }))
254}
255
256/// [`ProtocolRequest::FluffyMissingTxs`]
257async fn fluffy_missing_txs(
258    mut request: FluffyMissingTransactionsRequest,
259    mut blockchain_read_handle: BlockchainReadHandle,
260) -> anyhow::Result<ProtocolResponse> {
261    let tx_indexes = std::mem::take(&mut request.missing_tx_indices);
262    let block_hash: [u8; 32] = *request.block_hash;
263    let current_blockchain_height = request.current_blockchain_height;
264
265    // deallocate the backing `Bytes`.
266    drop(request);
267
268    let BlockchainResponse::TxsInBlock(res) = blockchain_read_handle
269        .ready()
270        .await?
271        .call(BlockchainReadRequest::TxsInBlock {
272            block_hash,
273            tx_indexes,
274        })
275        .await?
276    else {
277        unreachable!();
278    };
279
280    let Some(TxsInBlock { block, txs }) = res else {
281        anyhow::bail!("The peer requested txs out of range.");
282    };
283
284    Ok(ProtocolResponse::NewFluffyBlock(NewFluffyBlock {
285        b: BlockCompleteEntry {
286            block: Bytes::from(block),
287            txs: TransactionBlobs::Normal(txs.into_iter().map(Bytes::from).collect()),
288            pruned: false,
289            // only needed for pruned blocks.
290            block_weight: 0,
291        },
292        current_blockchain_height,
293    }))
294}
295
296/// [`ProtocolRequest::NewFluffyBlock`]
297async fn new_fluffy_block<A: NetZoneAddress>(
298    peer_information: PeerInformation<A>,
299    request: NewFluffyBlock,
300    mut blockchain_read_handle: BlockchainReadHandle,
301    mut txpool_read_handle: TxpoolReadHandle,
302) -> anyhow::Result<ProtocolResponse> {
303    // TODO: check context service here and ignore the block?
304    let current_blockchain_height = request.current_blockchain_height;
305
306    peer_information
307        .core_sync_data
308        .lock()
309        .unwrap()
310        .current_height = current_blockchain_height;
311
312    let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> {
313        let block = Block::read(&mut request.b.block.as_ref())?;
314
315        let tx_blobs = request
316            .b
317            .txs
318            .take_normal()
319            .ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?;
320
321        let txs = tx_blobs
322            .into_iter()
323            .map(|tx_blob| {
324                if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
325                    anyhow::bail!("Peer sent a transaction over the size limit.");
326                }
327
328                let tx = Transaction::read(&mut tx_blob.as_ref())?;
329
330                Ok((tx.hash(), tx))
331            })
332            .collect::<Result<_, anyhow::Error>>()?;
333
334        // The backing `Bytes` will be deallocated when this closure returns.
335
336        Ok((block, txs))
337    })
338    .await?;
339
340    let res = blockchain_interface::handle_incoming_block(
341        block,
342        txs,
343        &mut blockchain_read_handle,
344        &mut txpool_read_handle,
345    )
346    .await;
347
348    match res {
349        Ok(_) => Ok(ProtocolResponse::NA),
350        Err(IncomingBlockError::UnknownTransactions(block_hash, missing_tx_indices)) => Ok(
351            ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest {
352                block_hash: block_hash.into(),
353                current_blockchain_height,
354                missing_tx_indices: missing_tx_indices.into_iter().map(usize_to_u64).collect(),
355            }),
356        ),
357        Err(IncomingBlockError::Orphan) => {
358            // Block's parent was unknown, could be syncing?
359            Ok(ProtocolResponse::NA)
360        }
361        Err(e) => Err(e.into()),
362    }
363}
364
365/// [`ProtocolRequest::NewTransactions`]
366async fn new_transactions<A>(
367    peer_information: PeerInformation<A>,
368    request: NewTransactions,
369    mut blockchain_context_service: BlockchainContextService,
370    mut incoming_tx_handler: IncomingTxHandler,
371) -> anyhow::Result<ProtocolResponse>
372where
373    A: NetZoneAddress,
374    InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
375{
376    let context = blockchain_context_service.blockchain_context();
377
378    // If we are more than 2 blocks behind the peer then ignore the txs - we are probably still syncing.
379    if usize_to_u64(context.chain_height + 2)
380        < peer_information
381            .core_sync_data
382            .lock()
383            .unwrap()
384            .current_height
385    {
386        return Ok(ProtocolResponse::NA);
387    }
388
389    let state = if request.dandelionpp_fluff {
390        TxState::Fluff
391    } else {
392        TxState::Stem {
393            from: peer_information.id.into(),
394        }
395    };
396
397    // Drop all the data except the stuff we still need.
398    let NewTransactions { txs, .. } = request;
399
400    let res = incoming_tx_handler
401        .ready()
402        .await
403        .expect(PANIC_CRITICAL_SERVICE_ERROR)
404        .call(IncomingTxs { txs, state })
405        .await;
406
407    match res {
408        Ok(()) => Ok(ProtocolResponse::NA),
409        Err(e) => Err(e.into()),
410    }
411}