1use std::{
2 collections::HashSet,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt};
9use monero_serai::transaction::Transaction;
10use tower::{BoxError, Service, ServiceExt};
11
12use cuprate_blockchain::service::BlockchainReadHandle;
13use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions};
14use cuprate_consensus::{
15 transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
16 BlockchainContextService, ExtendedConsensusError,
17};
18use cuprate_dandelion_tower::{
19 pool::{DandelionPoolService, IncomingTxBuilder},
20 State, TxState,
21};
22use cuprate_helper::asynch::rayon_spawn_async;
23use cuprate_p2p::NetworkInterface;
24use cuprate_p2p_core::ClearNet;
25use cuprate_txpool::{
26 service::{
27 interface::{
28 TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
29 },
30 TxpoolReadHandle, TxpoolWriteHandle,
31 },
32 transaction_blob_hash,
33};
34use cuprate_types::TransactionVerificationData;
35
36use crate::{
37 blockchain::ConsensusBlockchainReadHandle,
38 constants::PANIC_CRITICAL_SERVICE_ERROR,
39 p2p::CrossNetworkInternalPeerId,
40 signals::REORG_LOCK,
41 txpool::{
42 dandelion,
43 relay_rules::{check_tx_relay_rules, RelayRuleError},
44 txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
45 },
46};
47
48#[derive(Debug, thiserror::Error)]
50pub enum IncomingTxError {
51 #[error("Error parsing tx: {0}")]
52 Parse(std::io::Error),
53 #[error(transparent)]
54 Consensus(ExtendedConsensusError),
55 #[error("Duplicate tx in message")]
56 DuplicateTransaction,
57 #[error("Relay rule was broken: {0}")]
58 RelayRule(RelayRuleError),
59}
60
61pub struct IncomingTxs {
63 pub txs: Vec<Bytes>,
65 pub state: TxState<CrossNetworkInternalPeerId>,
67 pub drop_relay_rule_errors: bool,
71 pub do_not_relay: bool,
74}
75
76#[derive(Clone)]
78pub struct DandelionTx(pub Bytes);
79
80pub(super) type TxId = [u8; 32];
82
83#[derive(Clone)]
87pub struct IncomingTxHandler {
88 pub(super) txs_being_handled: TxsBeingHandled,
90 pub(super) blockchain_context_cache: BlockchainContextService,
92 pub(super) dandelion_pool_manager:
94 DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
95 pub(super) txpool_write_handle: TxpoolWriteHandle,
97 pub(super) txpool_read_handle: TxpoolReadHandle,
99 pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle,
101}
102
103impl IncomingTxHandler {
104 #[expect(clippy::significant_drop_tightening)]
106 pub fn init(
107 clear_net: NetworkInterface<ClearNet>,
108 txpool_write_handle: TxpoolWriteHandle,
109 txpool_read_handle: TxpoolReadHandle,
110 blockchain_context_cache: BlockchainContextService,
111 blockchain_read_handle: BlockchainReadHandle,
112 ) -> Self {
113 let dandelion_router = dandelion::dandelion_router(clear_net);
114
115 let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
116 dandelion_router,
117 txpool_read_handle.clone(),
118 txpool_write_handle.clone(),
119 );
120
121 Self {
122 txs_being_handled: TxsBeingHandled::new(),
123 blockchain_context_cache,
124 dandelion_pool_manager,
125 txpool_write_handle,
126 txpool_read_handle,
127 blockchain_read_handle: ConsensusBlockchainReadHandle::new(
128 blockchain_read_handle,
129 BoxError::from,
130 ),
131 }
132 }
133}
134
135impl Service<IncomingTxs> for IncomingTxHandler {
136 type Response = ();
137 type Error = IncomingTxError;
138 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
139
140 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
141 Poll::Ready(Ok(()))
142 }
143
144 fn call(&mut self, req: IncomingTxs) -> Self::Future {
145 handle_incoming_txs(
146 req,
147 self.txs_being_handled.clone(),
148 self.blockchain_context_cache.clone(),
149 self.blockchain_read_handle.clone(),
150 self.txpool_write_handle.clone(),
151 self.txpool_read_handle.clone(),
152 self.dandelion_pool_manager.clone(),
153 )
154 .boxed()
155 }
156}
157
158async fn handle_incoming_txs(
160 IncomingTxs {
161 txs,
162 state,
163 drop_relay_rule_errors,
164 do_not_relay,
165 }: IncomingTxs,
166 txs_being_handled: TxsBeingHandled,
167 mut blockchain_context_cache: BlockchainContextService,
168 blockchain_read_handle: ConsensusBlockchainReadHandle,
169 mut txpool_write_handle: TxpoolWriteHandle,
170 mut txpool_read_handle: TxpoolReadHandle,
171 mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
172) -> Result<(), IncomingTxError> {
173 let _reorg_guard = REORG_LOCK.read().await;
174
175 let (txs, stem_pool_txs, txs_being_handled_guard) =
176 prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
177
178 let context = blockchain_context_cache.blockchain_context();
179
180 let txs = start_tx_verification()
181 .append_prepped_txs(txs)
182 .prepare()
183 .map_err(|e| IncomingTxError::Consensus(e.into()))?
184 .full(
185 context.chain_height,
186 context.top_hash,
187 context.current_adjusted_timestamp_for_time_lock(),
188 context.current_hf,
189 blockchain_read_handle,
190 None,
191 )
192 .verify()
193 .await
194 .map_err(IncomingTxError::Consensus)?;
195
196 for tx in txs {
197 if let Err(e) = check_tx_relay_rules(&tx, context) {
200 if drop_relay_rule_errors {
201 tracing::debug!(err = %e, tx = hex::encode(tx.tx_hash), "Tx failed relay check, skipping.");
202 continue;
203 }
204
205 return Err(IncomingTxError::RelayRule(e));
206 }
207
208 if !do_not_relay {
209 handle_valid_tx(
210 tx,
211 state.clone(),
212 &mut txpool_write_handle,
213 &mut dandelion_pool_manager,
214 )
215 .await;
216 }
217 }
218
219 if !do_not_relay {
221 for stem_tx in stem_pool_txs {
222 rerelay_stem_tx(
223 &stem_tx,
224 state.clone(),
225 &mut txpool_read_handle,
226 &mut dandelion_pool_manager,
227 )
228 .await;
229 }
230 }
231
232 Ok(())
233}
234
235async fn prepare_incoming_txs(
244 tx_blobs: Vec<Bytes>,
245 txs_being_handled: TxsBeingHandled,
246 txpool_read_handle: &mut TxpoolReadHandle,
247) -> Result<
248 (
249 Vec<TransactionVerificationData>,
250 Vec<TxId>,
251 TxsBeingHandledLocally,
252 ),
253 IncomingTxError,
254> {
255 let mut tx_blob_hashes = HashSet::new();
256 let mut txs_being_handled_locally = txs_being_handled.local_tracker();
257
258 let txs = tx_blobs
260 .into_iter()
261 .filter_map(|tx_blob| {
262 let tx_blob_hash = transaction_blob_hash(&tx_blob);
263
264 if !tx_blob_hashes.insert(tx_blob_hash) {
266 return Some(Err(IncomingTxError::DuplicateTransaction));
267 }
268
269 if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
271 return None;
272 }
273
274 Some(Ok((tx_blob_hash, tx_blob)))
275 })
276 .collect::<Result<Vec<_>, _>>()?;
277
278 let TxpoolReadResponse::FilterKnownTxBlobHashes {
281 unknown_blob_hashes,
282 stem_pool_hashes,
283 } = txpool_read_handle
284 .ready()
285 .await
286 .expect(PANIC_CRITICAL_SERVICE_ERROR)
287 .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
288 .await
289 .expect(PANIC_CRITICAL_SERVICE_ERROR)
290 else {
291 unreachable!()
292 };
293
294 rayon_spawn_async(move || {
296 let txs = txs
297 .into_iter()
298 .filter_map(|(tx_blob_hash, tx_blob)| {
299 if unknown_blob_hashes.contains(&tx_blob_hash) {
300 Some(tx_blob)
301 } else {
302 None
303 }
304 })
305 .map(|bytes| {
306 let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
307
308 let tx = new_tx_verification_data(tx)
309 .map_err(|e| IncomingTxError::Consensus(e.into()))?;
310
311 Ok(tx)
312 })
313 .collect::<Result<Vec<_>, IncomingTxError>>()?;
314
315 Ok((txs, stem_pool_hashes, txs_being_handled_locally))
316 })
317 .await
318}
319
320async fn handle_valid_tx(
324 tx: TransactionVerificationData,
325 state: TxState<CrossNetworkInternalPeerId>,
326 txpool_write_handle: &mut TxpoolWriteHandle,
327 dandelion_pool_manager: &mut DandelionPoolService<
328 DandelionTx,
329 TxId,
330 CrossNetworkInternalPeerId,
331 >,
332) {
333 let incoming_tx =
334 IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
335
336 let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
337 .ready()
338 .await
339 .expect(PANIC_CRITICAL_SERVICE_ERROR)
340 .call(TxpoolWriteRequest::AddTransaction {
341 tx: Box::new(tx),
342 state_stem: state.is_stem_stage(),
343 })
344 .await
345 .expect("TODO")
346 else {
347 unreachable!()
348 };
349
350 if let Some(tx_hash) = double_spend {
352 return;
353 }
354
355 let incoming_tx = incoming_tx
358 .with_routing_state(state)
359 .with_state_in_db(None)
360 .build()
361 .unwrap();
362
363 dandelion_pool_manager
364 .ready()
365 .await
366 .expect(PANIC_CRITICAL_SERVICE_ERROR)
367 .call(incoming_tx)
368 .await
369 .expect(PANIC_CRITICAL_SERVICE_ERROR);
370}
371
372async fn rerelay_stem_tx(
374 tx_hash: &TxId,
375 state: TxState<CrossNetworkInternalPeerId>,
376 txpool_read_handle: &mut TxpoolReadHandle,
377 dandelion_pool_manager: &mut DandelionPoolService<
378 DandelionTx,
379 TxId,
380 CrossNetworkInternalPeerId,
381 >,
382) {
383 let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
384 .ready()
385 .await
386 .expect(PANIC_CRITICAL_SERVICE_ERROR)
387 .call(TxpoolReadRequest::TxBlob(*tx_hash))
388 .await
389 else {
390 return;
392 };
393
394 let incoming_tx =
395 IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
396
397 let incoming_tx = incoming_tx
398 .with_routing_state(state)
399 .with_state_in_db(Some(State::Stem))
400 .build()
401 .unwrap();
402
403 dandelion_pool_manager
404 .ready()
405 .await
406 .expect(PANIC_CRITICAL_SERVICE_ERROR)
407 .call(incoming_tx)
408 .await
409 .expect(PANIC_CRITICAL_SERVICE_ERROR);
410}