cuprated/txpool/
incoming_tx.rs

1use std::{
2    collections::HashSet,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt};
9use monero_serai::transaction::Transaction;
10use tokio::sync::mpsc;
11use tower::{BoxError, Service, ServiceExt};
12use tracing::instrument;
13
14use cuprate_blockchain::service::BlockchainReadHandle;
15use cuprate_consensus::{
16    transactions::{new_tx_verification_data, start_tx_verification, PrepTransactions},
17    BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
18    ExtendedConsensusError,
19};
20use cuprate_dandelion_tower::{
21    pool::{DandelionPoolService, IncomingTxBuilder},
22    State, TxState,
23};
24use cuprate_helper::asynch::rayon_spawn_async;
25use cuprate_p2p::NetworkInterface;
26use cuprate_p2p_core::{ClearNet, Tor};
27use cuprate_txpool::{
28    service::{
29        interface::{
30            TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
31        },
32        TxpoolReadHandle, TxpoolWriteHandle,
33    },
34    transaction_blob_hash,
35};
36use cuprate_types::TransactionVerificationData;
37
38use crate::{
39    blockchain::ConsensusBlockchainReadHandle,
40    config::TxpoolConfig,
41    constants::PANIC_CRITICAL_SERVICE_ERROR,
42    p2p::CrossNetworkInternalPeerId,
43    signals::REORG_LOCK,
44    txpool::{
45        dandelion::{
46            self, AnonTxService, ConcreteDandelionRouter, DiffuseService, MainDandelionRouter,
47        },
48        manager::{start_txpool_manager, TxpoolManagerHandle},
49        relay_rules::{check_tx_relay_rules, RelayRuleError},
50        txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
51    },
52};
53
54/// An error that can happen handling an incoming tx.
55#[derive(Debug, thiserror::Error)]
56pub enum IncomingTxError {
57    #[error("Error parsing tx: {0}")]
58    Parse(std::io::Error),
59    #[error(transparent)]
60    Consensus(ExtendedConsensusError),
61    #[error("Duplicate tx in message")]
62    DuplicateTransaction,
63    #[error("Relay rule was broken: {0}")]
64    RelayRule(RelayRuleError),
65}
66
67/// Incoming transactions.
68pub struct IncomingTxs {
69    /// The raw bytes of the transactions.
70    pub txs: Vec<Bytes>,
71    /// The routing state of the transactions.
72    pub state: TxState<CrossNetworkInternalPeerId>,
73    /// If [`true`], transactions breaking relay
74    /// rules will be ignored and processing will continue,
75    /// otherwise the service will return an early error.
76    pub drop_relay_rule_errors: bool,
77    /// If [`true`], only checks will be done,
78    /// the transaction will not be relayed.
79    pub do_not_relay: bool,
80}
81
82///  The transaction type used for dandelion++.
83#[derive(Clone)]
84pub struct DandelionTx(pub Bytes);
85
86/// A transaction ID/hash.
87pub(super) type TxId = [u8; 32];
88
89/// The service than handles incoming transaction pool transactions.
90///
91/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
92#[derive(Clone)]
93pub struct IncomingTxHandler {
94    /// A store of txs currently being handled in incoming tx requests.
95    pub(super) txs_being_handled: TxsBeingHandled,
96    /// The blockchain context cache.
97    pub(super) blockchain_context_cache: BlockchainContextService,
98    /// The dandelion txpool manager.
99    pub(super) dandelion_pool_manager:
100        DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
101    pub txpool_manager: TxpoolManagerHandle,
102    /// The txpool read handle.
103    pub(super) txpool_read_handle: TxpoolReadHandle,
104    /// The blockchain read handle.
105    pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle,
106}
107
108impl IncomingTxHandler {
109    /// Initialize the [`IncomingTxHandler`].
110    #[expect(clippy::significant_drop_tightening)]
111    #[instrument(level = "info", skip_all, name = "start_txpool")]
112    pub async fn init(
113        txpool_config: TxpoolConfig,
114        clear_net: NetworkInterface<ClearNet>,
115        tor_net: Option<NetworkInterface<Tor>>,
116        txpool_write_handle: TxpoolWriteHandle,
117        txpool_read_handle: TxpoolReadHandle,
118        blockchain_context_cache: BlockchainContextService,
119        blockchain_read_handle: BlockchainReadHandle,
120    ) -> Self {
121        let diffuse_service = DiffuseService {
122            clear_net_broadcast_service: clear_net.broadcast_svc(),
123        };
124        let clearnet_router = dandelion::dandelion_router(clear_net);
125        let tor_router = tor_net.map(AnonTxService::new);
126
127        let dandelion_router = MainDandelionRouter::new(clearnet_router, tor_router);
128
129        let (promote_tx, promote_rx) = mpsc::unbounded_channel();
130
131        let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
132            dandelion_router,
133            txpool_read_handle.clone(),
134            promote_tx,
135        );
136
137        let txpool_manager = start_txpool_manager(
138            txpool_write_handle,
139            txpool_read_handle.clone(),
140            promote_rx,
141            diffuse_service,
142            dandelion_pool_manager.clone(),
143            txpool_config,
144        )
145        .await;
146
147        Self {
148            txs_being_handled: TxsBeingHandled::new(),
149            blockchain_context_cache,
150            dandelion_pool_manager,
151            txpool_manager,
152            txpool_read_handle,
153            blockchain_read_handle: ConsensusBlockchainReadHandle::new(
154                blockchain_read_handle,
155                BoxError::from,
156            ),
157        }
158    }
159}
160
161impl Service<IncomingTxs> for IncomingTxHandler {
162    type Response = ();
163    type Error = IncomingTxError;
164    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
165
166    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167        Poll::Ready(Ok(()))
168    }
169
170    fn call(&mut self, req: IncomingTxs) -> Self::Future {
171        handle_incoming_txs(
172            req,
173            self.txs_being_handled.clone(),
174            self.blockchain_context_cache.clone(),
175            self.blockchain_read_handle.clone(),
176            self.txpool_read_handle.clone(),
177            self.txpool_manager.clone(),
178            self.dandelion_pool_manager.clone(),
179        )
180        .boxed()
181    }
182}
183
184/// Handles the incoming txs.
185async fn handle_incoming_txs(
186    IncomingTxs {
187        txs,
188        state,
189        drop_relay_rule_errors,
190        do_not_relay,
191    }: IncomingTxs,
192    txs_being_handled: TxsBeingHandled,
193    mut blockchain_context_cache: BlockchainContextService,
194    blockchain_read_handle: ConsensusBlockchainReadHandle,
195    mut txpool_read_handle: TxpoolReadHandle,
196    mut txpool_manager_handle: TxpoolManagerHandle,
197    mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
198) -> Result<(), IncomingTxError> {
199    let _reorg_guard = REORG_LOCK.read().await;
200
201    let (txs, stem_pool_txs, txs_being_handled_guard) =
202        prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
203
204    let context = blockchain_context_cache.blockchain_context();
205
206    let txs = start_tx_verification()
207        .append_prepped_txs(txs)
208        .prepare()
209        .map_err(|e| IncomingTxError::Consensus(e.into()))?
210        .full(
211            context.chain_height,
212            context.top_hash,
213            context.current_adjusted_timestamp_for_time_lock(),
214            context.current_hf,
215            blockchain_read_handle,
216            None,
217        )
218        .verify()
219        .await
220        .map_err(IncomingTxError::Consensus)?;
221
222    for tx in txs {
223        // TODO: this could be a DoS, if someone spams us with txs that violate these rules?
224        // Maybe we should remember these invalid txs for some time to prevent them getting repeatedly sent.
225        if let Err(e) = check_tx_relay_rules(&tx, context) {
226            if drop_relay_rule_errors {
227                tracing::debug!(err = %e, tx = hex::encode(tx.tx_hash), "Tx failed relay check, skipping.");
228                continue;
229            }
230
231            return Err(IncomingTxError::RelayRule(e));
232        }
233
234        tracing::debug!(
235            tx = hex::encode(tx.tx_hash),
236            "passing tx to tx-pool manager"
237        );
238
239        // TODO: take into account `do_not_relay` in the tx-pool manager.
240
241        if txpool_manager_handle
242            .tx_tx
243            .send((tx, state.clone()))
244            .await
245            .is_err()
246        {
247            tracing::warn!("The txpool manager has been stopped, dropping incoming txs");
248            return Ok(());
249        }
250    }
251
252    // Re-relay any txs we got in the block that were already in our stem pool.
253    for stem_tx in stem_pool_txs {
254        rerelay_stem_tx(
255            &stem_tx,
256            state.clone(),
257            &mut txpool_read_handle,
258            &mut dandelion_pool_manager,
259        )
260        .await;
261    }
262
263    Ok(())
264}
265
266/// Prepares the incoming transactions for verification.
267///
268/// This will filter out all transactions already in the pool or txs already being handled in another request.
269///
270/// Returns in order:
271///   - The [`TransactionVerificationData`] for all the txs we did not already have
272///   - The Ids of the transactions in the incoming message that are in our stem-pool
273///   - A [`TxsBeingHandledLocally`] guard that prevents verifying the same tx at the same time across 2 tasks.
274async fn prepare_incoming_txs(
275    tx_blobs: Vec<Bytes>,
276    txs_being_handled: TxsBeingHandled,
277    txpool_read_handle: &mut TxpoolReadHandle,
278) -> Result<
279    (
280        Vec<TransactionVerificationData>,
281        Vec<TxId>,
282        TxsBeingHandledLocally,
283    ),
284    IncomingTxError,
285> {
286    let mut tx_blob_hashes = HashSet::new();
287    let mut txs_being_handled_locally = txs_being_handled.local_tracker();
288
289    // Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch.
290    let txs = tx_blobs
291        .into_iter()
292        .filter_map(|tx_blob| {
293            let tx_blob_hash = transaction_blob_hash(&tx_blob);
294
295            // If a duplicate is in here the incoming tx batch contained the same tx twice.
296            if !tx_blob_hashes.insert(tx_blob_hash) {
297                tracing::debug!("peer sent duplicate tx in batch, ignoring batch.");
298                return Some(Err(IncomingTxError::DuplicateTransaction));
299            }
300
301            // If a duplicate is here it is being handled in another batch.
302            if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
303                return None;
304            }
305
306            Some(Ok((tx_blob_hash, tx_blob)))
307        })
308        .collect::<Result<Vec<_>, _>>()?;
309
310    // Filter the txs already in the txpool out.
311    // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue.
312    let TxpoolReadResponse::FilterKnownTxBlobHashes {
313        unknown_blob_hashes,
314        stem_pool_hashes,
315    } = txpool_read_handle
316        .ready()
317        .await
318        .expect(PANIC_CRITICAL_SERVICE_ERROR)
319        .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
320        .await
321        .expect(PANIC_CRITICAL_SERVICE_ERROR)
322    else {
323        unreachable!()
324    };
325
326    // Now prepare the txs for verification.
327    rayon_spawn_async(move || {
328        let txs = txs
329            .into_iter()
330            .filter_map(|(tx_blob_hash, tx_blob)| {
331                if unknown_blob_hashes.contains(&tx_blob_hash) {
332                    Some(tx_blob)
333                } else {
334                    None
335                }
336            })
337            .map(|bytes| {
338                let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
339
340                let tx = new_tx_verification_data(tx)
341                    .map_err(|e| IncomingTxError::Consensus(e.into()))?;
342
343                Ok(tx)
344            })
345            .collect::<Result<Vec<_>, IncomingTxError>>()?;
346
347        Ok((txs, stem_pool_hashes, txs_being_handled_locally))
348    })
349    .await
350}
351
352/// Re-relay a tx that was already in our stem pool.
353async fn rerelay_stem_tx(
354    tx_hash: &TxId,
355    state: TxState<CrossNetworkInternalPeerId>,
356    txpool_read_handle: &mut TxpoolReadHandle,
357    dandelion_pool_manager: &mut DandelionPoolService<
358        DandelionTx,
359        TxId,
360        CrossNetworkInternalPeerId,
361    >,
362) {
363    let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
364        .ready()
365        .await
366        .expect(PANIC_CRITICAL_SERVICE_ERROR)
367        .call(TxpoolReadRequest::TxBlob(*tx_hash))
368        .await
369    else {
370        // The tx could have been dropped from the pool.
371        return;
372    };
373
374    let incoming_tx =
375        IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
376
377    let incoming_tx = incoming_tx
378        .with_routing_state(state)
379        .with_state_in_db(Some(State::Stem))
380        .build()
381        .unwrap();
382
383    dandelion_pool_manager
384        .ready()
385        .await
386        .expect(PANIC_CRITICAL_SERVICE_ERROR)
387        .call(incoming_tx)
388        .await
389        .expect(PANIC_CRITICAL_SERVICE_ERROR);
390}