1use std::{
2 collections::HashSet,
3 future::{ready, Ready},
4 hash::Hash,
5 task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use futures::{
10 future::{BoxFuture, Shared},
11 FutureExt,
12};
13use monero_serai::{block::Block, transaction::Transaction};
14use tokio::sync::{broadcast, oneshot, watch};
15use tokio_stream::wrappers::WatchStream;
16use tower::{Service, ServiceExt};
17
18use cuprate_blockchain::service::BlockchainReadHandle;
19use cuprate_consensus::{
20 transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
21 BlockchainContextService,
22};
23use cuprate_dandelion_tower::TxState;
24use cuprate_fixed_bytes::ByteArrayVec;
25use cuprate_helper::cast::u64_to_usize;
26use cuprate_helper::{
27 asynch::rayon_spawn_async,
28 cast::usize_to_u64,
29 map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
30};
31use cuprate_p2p::constants::{
32 MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN,
33};
34use cuprate_p2p_core::{
35 client::{InternalPeerID, PeerInformation},
36 NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse,
37};
38use cuprate_txpool::service::TxpoolReadHandle;
39use cuprate_types::{
40 blockchain::{BlockchainReadRequest, BlockchainResponse},
41 BlockCompleteEntry, TransactionBlobs, TxsInBlock,
42};
43use cuprate_wire::protocol::{
44 ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
45 GetObjectsResponse, NewFluffyBlock, NewTransactions,
46};
47
48use crate::{
49 blockchain::interface::{self as blockchain_interface, IncomingBlockError},
50 constants::PANIC_CRITICAL_SERVICE_ERROR,
51 p2p::CrossNetworkInternalPeerId,
52 txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs},
53};
54
55#[derive(Clone)]
57pub struct P2pProtocolRequestHandlerMaker {
58 pub blockchain_read_handle: BlockchainReadHandle,
59 pub blockchain_context_service: BlockchainContextService,
60 pub txpool_read_handle: TxpoolReadHandle,
61
62 pub incoming_tx_handler: Option<IncomingTxHandler>,
65
66 pub incoming_tx_handler_fut: Shared<oneshot::Receiver<IncomingTxHandler>>,
68}
69
70impl<A: NetZoneAddress> Service<PeerInformation<A>> for P2pProtocolRequestHandlerMaker
71where
72 InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
73{
74 type Response = P2pProtocolRequestHandler<A>;
75 type Error = tower::BoxError;
76 type Future = Ready<Result<Self::Response, Self::Error>>;
77
78 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
79 if self.incoming_tx_handler.is_none() {
80 return self
81 .incoming_tx_handler_fut
82 .poll_unpin(cx)
83 .map(|incoming_tx_handler| {
84 self.incoming_tx_handler = Some(incoming_tx_handler?);
85 Ok(())
86 });
87 }
88
89 Poll::Ready(Ok(()))
90 }
91
92 fn call(&mut self, peer_information: PeerInformation<A>) -> Self::Future {
93 let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else {
94 panic!("poll_ready was not called or did not return `Poll::Ready`")
95 };
96
97 let blockchain_read_handle = self.blockchain_read_handle.clone();
100 let txpool_read_handle = self.txpool_read_handle.clone();
101
102 ready(Ok(P2pProtocolRequestHandler {
103 peer_information,
104 blockchain_read_handle,
105 blockchain_context_service: self.blockchain_context_service.clone(),
106 txpool_read_handle,
107 incoming_tx_handler,
108 }))
109 }
110}
111
112#[derive(Clone)]
114pub struct P2pProtocolRequestHandler<N: NetZoneAddress> {
115 peer_information: PeerInformation<N>,
116 blockchain_read_handle: BlockchainReadHandle,
117 blockchain_context_service: BlockchainContextService,
118 txpool_read_handle: TxpoolReadHandle,
119 incoming_tx_handler: IncomingTxHandler,
120}
121
122impl<A: NetZoneAddress> Service<ProtocolRequest> for P2pProtocolRequestHandler<A>
123where
124 InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
125{
126 type Response = ProtocolResponse;
127 type Error = anyhow::Error;
128 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
129
130 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131 Poll::Ready(Ok(()))
132 }
133
134 fn call(&mut self, request: ProtocolRequest) -> Self::Future {
135 match request {
136 ProtocolRequest::GetObjects(r) => {
137 get_objects(r, self.blockchain_read_handle.clone()).boxed()
138 }
139 ProtocolRequest::GetChain(r) => {
140 get_chain(r, self.blockchain_read_handle.clone()).boxed()
141 }
142 ProtocolRequest::FluffyMissingTxs(r) => {
143 fluffy_missing_txs(r, self.blockchain_read_handle.clone()).boxed()
144 }
145 ProtocolRequest::NewBlock(_) => ready(Err(anyhow::anyhow!(
146 "Peer sent a full block when we support fluffy blocks"
147 )))
148 .boxed(),
149 ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block(
150 self.peer_information.clone(),
151 r,
152 self.blockchain_read_handle.clone(),
153 self.txpool_read_handle.clone(),
154 )
155 .boxed(),
156 ProtocolRequest::NewTransactions(r) => new_transactions(
157 self.peer_information.clone(),
158 r,
159 self.blockchain_context_service.clone(),
160 self.incoming_tx_handler.clone(),
161 )
162 .boxed(),
163 ProtocolRequest::GetTxPoolCompliment(_) => ready(Ok(ProtocolResponse::NA)).boxed(), }
165 }
166}
167
168async fn get_objects(
172 request: GetObjectsRequest,
173 mut blockchain_read_handle: BlockchainReadHandle,
174) -> anyhow::Result<ProtocolResponse> {
175 if request.blocks.len() > MAX_BLOCK_BATCH_LEN {
176 anyhow::bail!("Peer requested more blocks than allowed.")
177 }
178
179 let block_hashes: Vec<[u8; 32]> = (&request.blocks).into();
180 drop(request);
182
183 let BlockchainResponse::BlockCompleteEntries {
184 blocks,
185 missing_hashes,
186 blockchain_height,
187 } = blockchain_read_handle
188 .ready()
189 .await?
190 .call(BlockchainReadRequest::BlockCompleteEntries(block_hashes))
191 .await?
192 else {
193 unreachable!();
194 };
195
196 Ok(ProtocolResponse::GetObjects(GetObjectsResponse {
197 blocks,
198 missed_ids: ByteArrayVec::from(missing_hashes),
199 current_blockchain_height: usize_to_u64(blockchain_height),
200 }))
201}
202
203async fn get_chain(
205 request: ChainRequest,
206 mut blockchain_read_handle: BlockchainReadHandle,
207) -> anyhow::Result<ProtocolResponse> {
208 if request.block_ids.len() > MAX_BLOCKS_IDS_IN_CHAIN_ENTRY {
209 anyhow::bail!("Peer sent too many block hashes in chain request.")
210 }
211
212 let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into();
213 let want_pruned_data = request.prune;
214 drop(request);
216
217 let BlockchainResponse::NextChainEntry {
218 start_height,
219 chain_height,
220 block_ids,
221 block_weights,
222 cumulative_difficulty,
223 first_block_blob,
224 } = blockchain_read_handle
225 .ready()
226 .await?
227 .call(BlockchainReadRequest::NextChainEntry(block_hashes, 10_000))
228 .await?
229 else {
230 unreachable!();
231 };
232
233 let Some(start_height) = start_height else {
234 anyhow::bail!("The peers chain has a different genesis block than ours.");
235 };
236
237 let (cumulative_difficulty_low64, cumulative_difficulty_top64) =
238 split_u128_into_low_high_bits(cumulative_difficulty);
239
240 Ok(ProtocolResponse::GetChain(ChainResponse {
241 start_height: usize_to_u64(start_height),
242 total_height: usize_to_u64(chain_height),
243 cumulative_difficulty_low64,
244 cumulative_difficulty_top64,
245 m_block_ids: ByteArrayVec::from(block_ids),
246 first_block: first_block_blob.map_or(Bytes::new(), Bytes::from),
247 m_block_weights: if want_pruned_data {
249 block_weights.into_iter().map(usize_to_u64).collect()
250 } else {
251 vec![]
252 },
253 }))
254}
255
256async fn fluffy_missing_txs(
258 mut request: FluffyMissingTransactionsRequest,
259 mut blockchain_read_handle: BlockchainReadHandle,
260) -> anyhow::Result<ProtocolResponse> {
261 let tx_indexes = std::mem::take(&mut request.missing_tx_indices);
262 let block_hash: [u8; 32] = *request.block_hash;
263 let current_blockchain_height = request.current_blockchain_height;
264
265 drop(request);
267
268 let BlockchainResponse::TxsInBlock(res) = blockchain_read_handle
269 .ready()
270 .await?
271 .call(BlockchainReadRequest::TxsInBlock {
272 block_hash,
273 tx_indexes,
274 })
275 .await?
276 else {
277 unreachable!();
278 };
279
280 let Some(TxsInBlock { block, txs }) = res else {
281 anyhow::bail!("The peer requested txs out of range.");
282 };
283
284 Ok(ProtocolResponse::NewFluffyBlock(NewFluffyBlock {
285 b: BlockCompleteEntry {
286 block: Bytes::from(block),
287 txs: TransactionBlobs::Normal(txs.into_iter().map(Bytes::from).collect()),
288 pruned: false,
289 block_weight: 0,
291 },
292 current_blockchain_height,
293 }))
294}
295
296async fn new_fluffy_block<A: NetZoneAddress>(
298 peer_information: PeerInformation<A>,
299 request: NewFluffyBlock,
300 mut blockchain_read_handle: BlockchainReadHandle,
301 mut txpool_read_handle: TxpoolReadHandle,
302) -> anyhow::Result<ProtocolResponse> {
303 let current_blockchain_height = request.current_blockchain_height;
305
306 peer_information
307 .core_sync_data
308 .lock()
309 .unwrap()
310 .current_height = current_blockchain_height;
311
312 let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> {
313 let block = Block::read(&mut request.b.block.as_ref())?;
314
315 let tx_blobs = request
316 .b
317 .txs
318 .take_normal()
319 .ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?;
320
321 let txs = tx_blobs
322 .into_iter()
323 .map(|tx_blob| {
324 if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
325 anyhow::bail!("Peer sent a transaction over the size limit.");
326 }
327
328 let tx = Transaction::read(&mut tx_blob.as_ref())?;
329
330 Ok((tx.hash(), tx))
331 })
332 .collect::<Result<_, anyhow::Error>>()?;
333
334 Ok((block, txs))
337 })
338 .await?;
339
340 let res = blockchain_interface::handle_incoming_block(
341 block,
342 txs,
343 &mut blockchain_read_handle,
344 &mut txpool_read_handle,
345 )
346 .await;
347
348 match res {
349 Ok(_) => Ok(ProtocolResponse::NA),
350 Err(IncomingBlockError::UnknownTransactions(block_hash, missing_tx_indices)) => Ok(
351 ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest {
352 block_hash: block_hash.into(),
353 current_blockchain_height,
354 missing_tx_indices: missing_tx_indices.into_iter().map(usize_to_u64).collect(),
355 }),
356 ),
357 Err(IncomingBlockError::Orphan) => {
358 Ok(ProtocolResponse::NA)
360 }
361 Err(e) => Err(e.into()),
362 }
363}
364
365async fn new_transactions<A>(
367 peer_information: PeerInformation<A>,
368 request: NewTransactions,
369 mut blockchain_context_service: BlockchainContextService,
370 mut incoming_tx_handler: IncomingTxHandler,
371) -> anyhow::Result<ProtocolResponse>
372where
373 A: NetZoneAddress,
374 InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
375{
376 let context = blockchain_context_service.blockchain_context();
377
378 if usize_to_u64(context.chain_height + 2)
380 < peer_information
381 .core_sync_data
382 .lock()
383 .unwrap()
384 .current_height
385 {
386 return Ok(ProtocolResponse::NA);
387 }
388
389 let state = if request.dandelionpp_fluff {
390 TxState::Fluff
391 } else {
392 TxState::Stem {
393 from: peer_information.id.into(),
394 }
395 };
396
397 let NewTransactions { txs, .. } = request;
399
400 let res = incoming_tx_handler
401 .ready()
402 .await
403 .expect(PANIC_CRITICAL_SERVICE_ERROR)
404 .call(IncomingTxs { txs, state })
405 .await;
406
407 match res {
408 Ok(()) => Ok(ProtocolResponse::NA),
409 Err(e) => Err(e.into()),
410 }
411}