1use std::{
2 collections::HashSet,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt};
9use monero_serai::transaction::Transaction;
10use tokio::sync::mpsc;
11use tower::{BoxError, Service, ServiceExt};
12use tracing::instrument;
13
14use cuprate_blockchain::service::BlockchainReadHandle;
15use cuprate_consensus::{
16 transactions::{new_tx_verification_data, start_tx_verification, PrepTransactions},
17 BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
18 ExtendedConsensusError,
19};
20use cuprate_dandelion_tower::{
21 pool::{DandelionPoolService, IncomingTxBuilder},
22 State, TxState,
23};
24use cuprate_helper::asynch::rayon_spawn_async;
25use cuprate_p2p::NetworkInterface;
26use cuprate_p2p_core::{ClearNet, Tor};
27use cuprate_txpool::{
28 service::{
29 interface::{
30 TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
31 },
32 TxpoolReadHandle, TxpoolWriteHandle,
33 },
34 transaction_blob_hash,
35};
36use cuprate_types::TransactionVerificationData;
37
38use crate::{
39 blockchain::ConsensusBlockchainReadHandle,
40 config::TxpoolConfig,
41 constants::PANIC_CRITICAL_SERVICE_ERROR,
42 p2p::CrossNetworkInternalPeerId,
43 signals::REORG_LOCK,
44 txpool::{
45 dandelion::{
46 self, AnonTxService, ConcreteDandelionRouter, DiffuseService, MainDandelionRouter,
47 },
48 manager::{start_txpool_manager, TxpoolManagerHandle},
49 relay_rules::{check_tx_relay_rules, RelayRuleError},
50 txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
51 },
52};
53
54#[derive(Debug, thiserror::Error)]
56pub enum IncomingTxError {
57 #[error("Error parsing tx: {0}")]
58 Parse(std::io::Error),
59 #[error(transparent)]
60 Consensus(ExtendedConsensusError),
61 #[error("Duplicate tx in message")]
62 DuplicateTransaction,
63 #[error("Relay rule was broken: {0}")]
64 RelayRule(RelayRuleError),
65}
66
67pub struct IncomingTxs {
69 pub txs: Vec<Bytes>,
71 pub state: TxState<CrossNetworkInternalPeerId>,
73 pub drop_relay_rule_errors: bool,
77 pub do_not_relay: bool,
80}
81
82#[derive(Clone)]
84pub struct DandelionTx(pub Bytes);
85
86pub(super) type TxId = [u8; 32];
88
89#[derive(Clone)]
93pub struct IncomingTxHandler {
94 pub(super) txs_being_handled: TxsBeingHandled,
96 pub(super) blockchain_context_cache: BlockchainContextService,
98 pub(super) dandelion_pool_manager:
100 DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
101 pub txpool_manager: TxpoolManagerHandle,
102 pub(super) txpool_read_handle: TxpoolReadHandle,
104 pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle,
106}
107
108impl IncomingTxHandler {
109 #[expect(clippy::significant_drop_tightening)]
111 #[instrument(level = "info", skip_all, name = "start_txpool")]
112 pub async fn init(
113 txpool_config: TxpoolConfig,
114 clear_net: NetworkInterface<ClearNet>,
115 tor_net: Option<NetworkInterface<Tor>>,
116 txpool_write_handle: TxpoolWriteHandle,
117 txpool_read_handle: TxpoolReadHandle,
118 blockchain_context_cache: BlockchainContextService,
119 blockchain_read_handle: BlockchainReadHandle,
120 ) -> Self {
121 let diffuse_service = DiffuseService {
122 clear_net_broadcast_service: clear_net.broadcast_svc(),
123 };
124 let clearnet_router = dandelion::dandelion_router(clear_net);
125 let tor_router = tor_net.map(AnonTxService::new);
126
127 let dandelion_router = MainDandelionRouter::new(clearnet_router, tor_router);
128
129 let (promote_tx, promote_rx) = mpsc::unbounded_channel();
130
131 let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
132 dandelion_router,
133 txpool_read_handle.clone(),
134 promote_tx,
135 );
136
137 let txpool_manager = start_txpool_manager(
138 txpool_write_handle,
139 txpool_read_handle.clone(),
140 promote_rx,
141 diffuse_service,
142 dandelion_pool_manager.clone(),
143 txpool_config,
144 )
145 .await;
146
147 Self {
148 txs_being_handled: TxsBeingHandled::new(),
149 blockchain_context_cache,
150 dandelion_pool_manager,
151 txpool_manager,
152 txpool_read_handle,
153 blockchain_read_handle: ConsensusBlockchainReadHandle::new(
154 blockchain_read_handle,
155 BoxError::from,
156 ),
157 }
158 }
159}
160
161impl Service<IncomingTxs> for IncomingTxHandler {
162 type Response = ();
163 type Error = IncomingTxError;
164 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
165
166 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167 Poll::Ready(Ok(()))
168 }
169
170 fn call(&mut self, req: IncomingTxs) -> Self::Future {
171 handle_incoming_txs(
172 req,
173 self.txs_being_handled.clone(),
174 self.blockchain_context_cache.clone(),
175 self.blockchain_read_handle.clone(),
176 self.txpool_read_handle.clone(),
177 self.txpool_manager.clone(),
178 self.dandelion_pool_manager.clone(),
179 )
180 .boxed()
181 }
182}
183
184async fn handle_incoming_txs(
186 IncomingTxs {
187 txs,
188 state,
189 drop_relay_rule_errors,
190 do_not_relay,
191 }: IncomingTxs,
192 txs_being_handled: TxsBeingHandled,
193 mut blockchain_context_cache: BlockchainContextService,
194 blockchain_read_handle: ConsensusBlockchainReadHandle,
195 mut txpool_read_handle: TxpoolReadHandle,
196 mut txpool_manager_handle: TxpoolManagerHandle,
197 mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
198) -> Result<(), IncomingTxError> {
199 let _reorg_guard = REORG_LOCK.read().await;
200
201 let (txs, stem_pool_txs, txs_being_handled_guard) =
202 prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
203
204 let context = blockchain_context_cache.blockchain_context();
205
206 let txs = start_tx_verification()
207 .append_prepped_txs(txs)
208 .prepare()
209 .map_err(|e| IncomingTxError::Consensus(e.into()))?
210 .full(
211 context.chain_height,
212 context.top_hash,
213 context.current_adjusted_timestamp_for_time_lock(),
214 context.current_hf,
215 blockchain_read_handle,
216 None,
217 )
218 .verify()
219 .await
220 .map_err(IncomingTxError::Consensus)?;
221
222 for tx in txs {
223 if let Err(e) = check_tx_relay_rules(&tx, context) {
226 if drop_relay_rule_errors {
227 tracing::debug!(err = %e, tx = hex::encode(tx.tx_hash), "Tx failed relay check, skipping.");
228 continue;
229 }
230
231 return Err(IncomingTxError::RelayRule(e));
232 }
233
234 tracing::debug!(
235 tx = hex::encode(tx.tx_hash),
236 "passing tx to tx-pool manager"
237 );
238
239 if txpool_manager_handle
242 .tx_tx
243 .send((tx, state.clone()))
244 .await
245 .is_err()
246 {
247 tracing::warn!("The txpool manager has been stopped, dropping incoming txs");
248 return Ok(());
249 }
250 }
251
252 for stem_tx in stem_pool_txs {
254 rerelay_stem_tx(
255 &stem_tx,
256 state.clone(),
257 &mut txpool_read_handle,
258 &mut dandelion_pool_manager,
259 )
260 .await;
261 }
262
263 Ok(())
264}
265
266async fn prepare_incoming_txs(
275 tx_blobs: Vec<Bytes>,
276 txs_being_handled: TxsBeingHandled,
277 txpool_read_handle: &mut TxpoolReadHandle,
278) -> Result<
279 (
280 Vec<TransactionVerificationData>,
281 Vec<TxId>,
282 TxsBeingHandledLocally,
283 ),
284 IncomingTxError,
285> {
286 let mut tx_blob_hashes = HashSet::new();
287 let mut txs_being_handled_locally = txs_being_handled.local_tracker();
288
289 let txs = tx_blobs
291 .into_iter()
292 .filter_map(|tx_blob| {
293 let tx_blob_hash = transaction_blob_hash(&tx_blob);
294
295 if !tx_blob_hashes.insert(tx_blob_hash) {
297 tracing::debug!("peer sent duplicate tx in batch, ignoring batch.");
298 return Some(Err(IncomingTxError::DuplicateTransaction));
299 }
300
301 if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
303 return None;
304 }
305
306 Some(Ok((tx_blob_hash, tx_blob)))
307 })
308 .collect::<Result<Vec<_>, _>>()?;
309
310 let TxpoolReadResponse::FilterKnownTxBlobHashes {
313 unknown_blob_hashes,
314 stem_pool_hashes,
315 } = txpool_read_handle
316 .ready()
317 .await
318 .expect(PANIC_CRITICAL_SERVICE_ERROR)
319 .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
320 .await
321 .expect(PANIC_CRITICAL_SERVICE_ERROR)
322 else {
323 unreachable!()
324 };
325
326 rayon_spawn_async(move || {
328 let txs = txs
329 .into_iter()
330 .filter_map(|(tx_blob_hash, tx_blob)| {
331 if unknown_blob_hashes.contains(&tx_blob_hash) {
332 Some(tx_blob)
333 } else {
334 None
335 }
336 })
337 .map(|bytes| {
338 let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
339
340 let tx = new_tx_verification_data(tx)
341 .map_err(|e| IncomingTxError::Consensus(e.into()))?;
342
343 Ok(tx)
344 })
345 .collect::<Result<Vec<_>, IncomingTxError>>()?;
346
347 Ok((txs, stem_pool_hashes, txs_being_handled_locally))
348 })
349 .await
350}
351
352async fn rerelay_stem_tx(
354 tx_hash: &TxId,
355 state: TxState<CrossNetworkInternalPeerId>,
356 txpool_read_handle: &mut TxpoolReadHandle,
357 dandelion_pool_manager: &mut DandelionPoolService<
358 DandelionTx,
359 TxId,
360 CrossNetworkInternalPeerId,
361 >,
362) {
363 let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
364 .ready()
365 .await
366 .expect(PANIC_CRITICAL_SERVICE_ERROR)
367 .call(TxpoolReadRequest::TxBlob(*tx_hash))
368 .await
369 else {
370 return;
372 };
373
374 let incoming_tx =
375 IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
376
377 let incoming_tx = incoming_tx
378 .with_routing_state(state)
379 .with_state_in_db(Some(State::Stem))
380 .build()
381 .unwrap();
382
383 dandelion_pool_manager
384 .ready()
385 .await
386 .expect(PANIC_CRITICAL_SERVICE_ERROR)
387 .call(incoming_tx)
388 .await
389 .expect(PANIC_CRITICAL_SERVICE_ERROR);
390}