1use std::{
2 collections::HashSet,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt};
9use monero_serai::transaction::Transaction;
10use tower::{BoxError, Service, ServiceExt};
11
12use cuprate_blockchain::service::BlockchainReadHandle;
13use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions};
14use cuprate_consensus::{
15 transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
16 BlockchainContextService, ExtendedConsensusError,
17};
18use cuprate_dandelion_tower::{
19 pool::{DandelionPoolService, IncomingTxBuilder},
20 State, TxState,
21};
22use cuprate_helper::asynch::rayon_spawn_async;
23use cuprate_p2p::NetworkInterface;
24use cuprate_p2p_core::ClearNet;
25use cuprate_txpool::{
26 service::{
27 interface::{
28 TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
29 },
30 TxpoolReadHandle, TxpoolWriteHandle,
31 },
32 transaction_blob_hash,
33};
34use cuprate_types::TransactionVerificationData;
35
36use crate::{
37 blockchain::ConsensusBlockchainReadHandle,
38 constants::PANIC_CRITICAL_SERVICE_ERROR,
39 p2p::CrossNetworkInternalPeerId,
40 signals::REORG_LOCK,
41 txpool::{
42 dandelion,
43 txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
44 },
45};
46
47#[derive(Debug, thiserror::Error)]
49pub enum IncomingTxError {
50 #[error("Error parsing tx: {0}")]
51 Parse(std::io::Error),
52 #[error(transparent)]
53 Consensus(ExtendedConsensusError),
54 #[error("Duplicate tx in message")]
55 DuplicateTransaction,
56}
57
58pub struct IncomingTxs {
60 pub txs: Vec<Bytes>,
62 pub state: TxState<CrossNetworkInternalPeerId>,
64}
65
66#[derive(Clone)]
68pub struct DandelionTx(pub Bytes);
69
70pub(super) type TxId = [u8; 32];
72
73#[derive(Clone)]
77pub struct IncomingTxHandler {
78 pub(super) txs_being_handled: TxsBeingHandled,
80 pub(super) blockchain_context_cache: BlockchainContextService,
82 pub(super) dandelion_pool_manager:
84 DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
85 pub(super) txpool_write_handle: TxpoolWriteHandle,
87 pub(super) txpool_read_handle: TxpoolReadHandle,
89 pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle,
91}
92
93impl IncomingTxHandler {
94 #[expect(clippy::significant_drop_tightening)]
96 pub fn init(
97 clear_net: NetworkInterface<ClearNet>,
98 txpool_write_handle: TxpoolWriteHandle,
99 txpool_read_handle: TxpoolReadHandle,
100 blockchain_context_cache: BlockchainContextService,
101 blockchain_read_handle: BlockchainReadHandle,
102 ) -> Self {
103 let dandelion_router = dandelion::dandelion_router(clear_net);
104
105 let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
106 dandelion_router,
107 txpool_read_handle.clone(),
108 txpool_write_handle.clone(),
109 );
110
111 Self {
112 txs_being_handled: TxsBeingHandled::new(),
113 blockchain_context_cache,
114 dandelion_pool_manager,
115 txpool_write_handle,
116 txpool_read_handle,
117 blockchain_read_handle: ConsensusBlockchainReadHandle::new(
118 blockchain_read_handle,
119 BoxError::from,
120 ),
121 }
122 }
123}
124
125impl Service<IncomingTxs> for IncomingTxHandler {
126 type Response = ();
127 type Error = IncomingTxError;
128 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
129
130 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131 Poll::Ready(Ok(()))
132 }
133
134 fn call(&mut self, req: IncomingTxs) -> Self::Future {
135 handle_incoming_txs(
136 req,
137 self.txs_being_handled.clone(),
138 self.blockchain_context_cache.clone(),
139 self.blockchain_read_handle.clone(),
140 self.txpool_write_handle.clone(),
141 self.txpool_read_handle.clone(),
142 self.dandelion_pool_manager.clone(),
143 )
144 .boxed()
145 }
146}
147
148async fn handle_incoming_txs(
150 IncomingTxs { txs, state }: IncomingTxs,
151 txs_being_handled: TxsBeingHandled,
152 mut blockchain_context_cache: BlockchainContextService,
153 blockchain_read_handle: ConsensusBlockchainReadHandle,
154 mut txpool_write_handle: TxpoolWriteHandle,
155 mut txpool_read_handle: TxpoolReadHandle,
156 mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
157) -> Result<(), IncomingTxError> {
158 let _reorg_guard = REORG_LOCK.read().await;
159
160 let (txs, stem_pool_txs, txs_being_handled_guard) =
161 prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
162
163 let context = blockchain_context_cache.blockchain_context();
164
165 let txs = start_tx_verification()
166 .append_prepped_txs(txs)
167 .prepare()
168 .map_err(|e| IncomingTxError::Consensus(e.into()))?
169 .full(
170 context.chain_height,
171 context.top_hash,
172 context.current_adjusted_timestamp_for_time_lock(),
173 context.current_hf,
174 blockchain_read_handle,
175 None,
176 )
177 .verify()
178 .await
179 .map_err(IncomingTxError::Consensus)?;
180
181 for tx in txs {
182 handle_valid_tx(
183 tx,
184 state.clone(),
185 &mut txpool_write_handle,
186 &mut dandelion_pool_manager,
187 )
188 .await;
189 }
190
191 for stem_tx in stem_pool_txs {
193 rerelay_stem_tx(
194 &stem_tx,
195 state.clone(),
196 &mut txpool_read_handle,
197 &mut dandelion_pool_manager,
198 )
199 .await;
200 }
201
202 Ok(())
203}
204
205async fn prepare_incoming_txs(
214 tx_blobs: Vec<Bytes>,
215 txs_being_handled: TxsBeingHandled,
216 txpool_read_handle: &mut TxpoolReadHandle,
217) -> Result<
218 (
219 Vec<TransactionVerificationData>,
220 Vec<TxId>,
221 TxsBeingHandledLocally,
222 ),
223 IncomingTxError,
224> {
225 let mut tx_blob_hashes = HashSet::new();
226 let mut txs_being_handled_locally = txs_being_handled.local_tracker();
227
228 let txs = tx_blobs
230 .into_iter()
231 .filter_map(|tx_blob| {
232 let tx_blob_hash = transaction_blob_hash(&tx_blob);
233
234 if !tx_blob_hashes.insert(tx_blob_hash) {
236 return Some(Err(IncomingTxError::DuplicateTransaction));
237 }
238
239 if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
241 return None;
242 }
243
244 Some(Ok((tx_blob_hash, tx_blob)))
245 })
246 .collect::<Result<Vec<_>, _>>()?;
247
248 let TxpoolReadResponse::FilterKnownTxBlobHashes {
251 unknown_blob_hashes,
252 stem_pool_hashes,
253 } = txpool_read_handle
254 .ready()
255 .await
256 .expect(PANIC_CRITICAL_SERVICE_ERROR)
257 .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
258 .await
259 .expect(PANIC_CRITICAL_SERVICE_ERROR)
260 else {
261 unreachable!()
262 };
263
264 rayon_spawn_async(move || {
266 let txs = txs
267 .into_iter()
268 .filter_map(|(tx_blob_hash, tx_blob)| {
269 if unknown_blob_hashes.contains(&tx_blob_hash) {
270 Some(tx_blob)
271 } else {
272 None
273 }
274 })
275 .map(|bytes| {
276 let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
277
278 let tx = new_tx_verification_data(tx)
279 .map_err(|e| IncomingTxError::Consensus(e.into()))?;
280
281 Ok(tx)
282 })
283 .collect::<Result<Vec<_>, IncomingTxError>>()?;
284
285 Ok((txs, stem_pool_hashes, txs_being_handled_locally))
286 })
287 .await
288}
289
290async fn handle_valid_tx(
294 tx: TransactionVerificationData,
295 state: TxState<CrossNetworkInternalPeerId>,
296 txpool_write_handle: &mut TxpoolWriteHandle,
297 dandelion_pool_manager: &mut DandelionPoolService<
298 DandelionTx,
299 TxId,
300 CrossNetworkInternalPeerId,
301 >,
302) {
303 let incoming_tx =
304 IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
305
306 let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
307 .ready()
308 .await
309 .expect(PANIC_CRITICAL_SERVICE_ERROR)
310 .call(TxpoolWriteRequest::AddTransaction {
311 tx: Box::new(tx),
312 state_stem: state.is_stem_stage(),
313 })
314 .await
315 .expect("TODO")
316 else {
317 unreachable!()
318 };
319
320 if let Some(tx_hash) = double_spend {
322 return;
323 };
324
325 let incoming_tx = incoming_tx
328 .with_routing_state(state)
329 .with_state_in_db(None)
330 .build()
331 .unwrap();
332
333 dandelion_pool_manager
334 .ready()
335 .await
336 .expect(PANIC_CRITICAL_SERVICE_ERROR)
337 .call(incoming_tx)
338 .await
339 .expect(PANIC_CRITICAL_SERVICE_ERROR);
340}
341
342async fn rerelay_stem_tx(
344 tx_hash: &TxId,
345 state: TxState<CrossNetworkInternalPeerId>,
346 txpool_read_handle: &mut TxpoolReadHandle,
347 dandelion_pool_manager: &mut DandelionPoolService<
348 DandelionTx,
349 TxId,
350 CrossNetworkInternalPeerId,
351 >,
352) {
353 let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
354 .ready()
355 .await
356 .expect(PANIC_CRITICAL_SERVICE_ERROR)
357 .call(TxpoolReadRequest::TxBlob(*tx_hash))
358 .await
359 else {
360 return;
362 };
363
364 let incoming_tx =
365 IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
366
367 let incoming_tx = incoming_tx
368 .with_routing_state(state)
369 .with_state_in_db(Some(State::Stem))
370 .build()
371 .unwrap();
372
373 dandelion_pool_manager
374 .ready()
375 .await
376 .expect(PANIC_CRITICAL_SERVICE_ERROR)
377 .call(incoming_tx)
378 .await
379 .expect(PANIC_CRITICAL_SERVICE_ERROR);
380}