1use std::{
2 collections::HashSet,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt};
9use monero_serai::transaction::Transaction;
10use tower::{BoxError, Service, ServiceExt};
11
12use cuprate_blockchain::service::BlockchainReadHandle;
13use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions};
14use cuprate_consensus::{
15 transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
16 BlockchainContextService, ExtendedConsensusError,
17};
18use cuprate_dandelion_tower::{
19 pool::{DandelionPoolService, IncomingTxBuilder},
20 State, TxState,
21};
22use cuprate_helper::asynch::rayon_spawn_async;
23use cuprate_p2p::NetworkInterface;
24use cuprate_p2p_core::ClearNet;
25use cuprate_txpool::{
26 service::{
27 interface::{
28 TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
29 },
30 TxpoolReadHandle, TxpoolWriteHandle,
31 },
32 transaction_blob_hash,
33};
34use cuprate_types::TransactionVerificationData;
35
36use crate::{
37 blockchain::ConsensusBlockchainReadHandle,
38 constants::PANIC_CRITICAL_SERVICE_ERROR,
39 p2p::CrossNetworkInternalPeerId,
40 signals::REORG_LOCK,
41 txpool::{
42 dandelion,
43 relay_rules::check_tx_relay_rules,
44 txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
45 },
46};
47
48#[derive(Debug, thiserror::Error)]
50pub enum IncomingTxError {
51 #[error("Error parsing tx: {0}")]
52 Parse(std::io::Error),
53 #[error(transparent)]
54 Consensus(ExtendedConsensusError),
55 #[error("Duplicate tx in message")]
56 DuplicateTransaction,
57}
58
59pub struct IncomingTxs {
61 pub txs: Vec<Bytes>,
63 pub state: TxState<CrossNetworkInternalPeerId>,
65}
66
67#[derive(Clone)]
69pub struct DandelionTx(pub Bytes);
70
71pub(super) type TxId = [u8; 32];
73
74#[derive(Clone)]
78pub struct IncomingTxHandler {
79 pub(super) txs_being_handled: TxsBeingHandled,
81 pub(super) blockchain_context_cache: BlockchainContextService,
83 pub(super) dandelion_pool_manager:
85 DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
86 pub(super) txpool_write_handle: TxpoolWriteHandle,
88 pub(super) txpool_read_handle: TxpoolReadHandle,
90 pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle,
92}
93
94impl IncomingTxHandler {
95 #[expect(clippy::significant_drop_tightening)]
97 pub fn init(
98 clear_net: NetworkInterface<ClearNet>,
99 txpool_write_handle: TxpoolWriteHandle,
100 txpool_read_handle: TxpoolReadHandle,
101 blockchain_context_cache: BlockchainContextService,
102 blockchain_read_handle: BlockchainReadHandle,
103 ) -> Self {
104 let dandelion_router = dandelion::dandelion_router(clear_net);
105
106 let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
107 dandelion_router,
108 txpool_read_handle.clone(),
109 txpool_write_handle.clone(),
110 );
111
112 Self {
113 txs_being_handled: TxsBeingHandled::new(),
114 blockchain_context_cache,
115 dandelion_pool_manager,
116 txpool_write_handle,
117 txpool_read_handle,
118 blockchain_read_handle: ConsensusBlockchainReadHandle::new(
119 blockchain_read_handle,
120 BoxError::from,
121 ),
122 }
123 }
124}
125
126impl Service<IncomingTxs> for IncomingTxHandler {
127 type Response = ();
128 type Error = IncomingTxError;
129 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
130
131 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
132 Poll::Ready(Ok(()))
133 }
134
135 fn call(&mut self, req: IncomingTxs) -> Self::Future {
136 handle_incoming_txs(
137 req,
138 self.txs_being_handled.clone(),
139 self.blockchain_context_cache.clone(),
140 self.blockchain_read_handle.clone(),
141 self.txpool_write_handle.clone(),
142 self.txpool_read_handle.clone(),
143 self.dandelion_pool_manager.clone(),
144 )
145 .boxed()
146 }
147}
148
149async fn handle_incoming_txs(
151 IncomingTxs { txs, state }: IncomingTxs,
152 txs_being_handled: TxsBeingHandled,
153 mut blockchain_context_cache: BlockchainContextService,
154 blockchain_read_handle: ConsensusBlockchainReadHandle,
155 mut txpool_write_handle: TxpoolWriteHandle,
156 mut txpool_read_handle: TxpoolReadHandle,
157 mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
158) -> Result<(), IncomingTxError> {
159 let _reorg_guard = REORG_LOCK.read().await;
160
161 let (txs, stem_pool_txs, txs_being_handled_guard) =
162 prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
163
164 let context = blockchain_context_cache.blockchain_context();
165
166 let txs = start_tx_verification()
167 .append_prepped_txs(txs)
168 .prepare()
169 .map_err(|e| IncomingTxError::Consensus(e.into()))?
170 .full(
171 context.chain_height,
172 context.top_hash,
173 context.current_adjusted_timestamp_for_time_lock(),
174 context.current_hf,
175 blockchain_read_handle,
176 None,
177 )
178 .verify()
179 .await
180 .map_err(IncomingTxError::Consensus)?;
181
182 for tx in txs {
183 if let Err(e) = check_tx_relay_rules(&tx, context) {
186 tracing::debug!(err = %e, tx = hex::encode(tx.tx_hash), "Tx failed relay check, skipping.");
187
188 continue;
189 }
190
191 handle_valid_tx(
192 tx,
193 state.clone(),
194 &mut txpool_write_handle,
195 &mut dandelion_pool_manager,
196 )
197 .await;
198 }
199
200 for stem_tx in stem_pool_txs {
202 rerelay_stem_tx(
203 &stem_tx,
204 state.clone(),
205 &mut txpool_read_handle,
206 &mut dandelion_pool_manager,
207 )
208 .await;
209 }
210
211 Ok(())
212}
213
214async fn prepare_incoming_txs(
223 tx_blobs: Vec<Bytes>,
224 txs_being_handled: TxsBeingHandled,
225 txpool_read_handle: &mut TxpoolReadHandle,
226) -> Result<
227 (
228 Vec<TransactionVerificationData>,
229 Vec<TxId>,
230 TxsBeingHandledLocally,
231 ),
232 IncomingTxError,
233> {
234 let mut tx_blob_hashes = HashSet::new();
235 let mut txs_being_handled_locally = txs_being_handled.local_tracker();
236
237 let txs = tx_blobs
239 .into_iter()
240 .filter_map(|tx_blob| {
241 let tx_blob_hash = transaction_blob_hash(&tx_blob);
242
243 if !tx_blob_hashes.insert(tx_blob_hash) {
245 return Some(Err(IncomingTxError::DuplicateTransaction));
246 }
247
248 if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
250 return None;
251 }
252
253 Some(Ok((tx_blob_hash, tx_blob)))
254 })
255 .collect::<Result<Vec<_>, _>>()?;
256
257 let TxpoolReadResponse::FilterKnownTxBlobHashes {
260 unknown_blob_hashes,
261 stem_pool_hashes,
262 } = txpool_read_handle
263 .ready()
264 .await
265 .expect(PANIC_CRITICAL_SERVICE_ERROR)
266 .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
267 .await
268 .expect(PANIC_CRITICAL_SERVICE_ERROR)
269 else {
270 unreachable!()
271 };
272
273 rayon_spawn_async(move || {
275 let txs = txs
276 .into_iter()
277 .filter_map(|(tx_blob_hash, tx_blob)| {
278 if unknown_blob_hashes.contains(&tx_blob_hash) {
279 Some(tx_blob)
280 } else {
281 None
282 }
283 })
284 .map(|bytes| {
285 let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
286
287 let tx = new_tx_verification_data(tx)
288 .map_err(|e| IncomingTxError::Consensus(e.into()))?;
289
290 Ok(tx)
291 })
292 .collect::<Result<Vec<_>, IncomingTxError>>()?;
293
294 Ok((txs, stem_pool_hashes, txs_being_handled_locally))
295 })
296 .await
297}
298
299async fn handle_valid_tx(
303 tx: TransactionVerificationData,
304 state: TxState<CrossNetworkInternalPeerId>,
305 txpool_write_handle: &mut TxpoolWriteHandle,
306 dandelion_pool_manager: &mut DandelionPoolService<
307 DandelionTx,
308 TxId,
309 CrossNetworkInternalPeerId,
310 >,
311) {
312 let incoming_tx =
313 IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
314
315 let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
316 .ready()
317 .await
318 .expect(PANIC_CRITICAL_SERVICE_ERROR)
319 .call(TxpoolWriteRequest::AddTransaction {
320 tx: Box::new(tx),
321 state_stem: state.is_stem_stage(),
322 })
323 .await
324 .expect("TODO")
325 else {
326 unreachable!()
327 };
328
329 if let Some(tx_hash) = double_spend {
331 return;
332 }
333
334 let incoming_tx = incoming_tx
337 .with_routing_state(state)
338 .with_state_in_db(None)
339 .build()
340 .unwrap();
341
342 dandelion_pool_manager
343 .ready()
344 .await
345 .expect(PANIC_CRITICAL_SERVICE_ERROR)
346 .call(incoming_tx)
347 .await
348 .expect(PANIC_CRITICAL_SERVICE_ERROR);
349}
350
351async fn rerelay_stem_tx(
353 tx_hash: &TxId,
354 state: TxState<CrossNetworkInternalPeerId>,
355 txpool_read_handle: &mut TxpoolReadHandle,
356 dandelion_pool_manager: &mut DandelionPoolService<
357 DandelionTx,
358 TxId,
359 CrossNetworkInternalPeerId,
360 >,
361) {
362 let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
363 .ready()
364 .await
365 .expect(PANIC_CRITICAL_SERVICE_ERROR)
366 .call(TxpoolReadRequest::TxBlob(*tx_hash))
367 .await
368 else {
369 return;
371 };
372
373 let incoming_tx =
374 IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
375
376 let incoming_tx = incoming_tx
377 .with_routing_state(state)
378 .with_state_in_db(Some(State::Stem))
379 .build()
380 .unwrap();
381
382 dandelion_pool_manager
383 .ready()
384 .await
385 .expect(PANIC_CRITICAL_SERVICE_ERROR)
386 .call(incoming_tx)
387 .await
388 .expect(PANIC_CRITICAL_SERVICE_ERROR);
389}