cuprated/txpool/
incoming_tx.rs

1use std::{
2    collections::HashSet,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt};
9use monero_serai::transaction::Transaction;
10use tower::{BoxError, Service, ServiceExt};
11
12use cuprate_blockchain::service::BlockchainReadHandle;
13use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions};
14use cuprate_consensus::{
15    transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
16    BlockchainContextService, ExtendedConsensusError,
17};
18use cuprate_dandelion_tower::{
19    pool::{DandelionPoolService, IncomingTxBuilder},
20    State, TxState,
21};
22use cuprate_helper::asynch::rayon_spawn_async;
23use cuprate_p2p::NetworkInterface;
24use cuprate_p2p_core::ClearNet;
25use cuprate_txpool::{
26    service::{
27        interface::{
28            TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
29        },
30        TxpoolReadHandle, TxpoolWriteHandle,
31    },
32    transaction_blob_hash,
33};
34use cuprate_types::TransactionVerificationData;
35
36use crate::{
37    blockchain::ConsensusBlockchainReadHandle,
38    constants::PANIC_CRITICAL_SERVICE_ERROR,
39    p2p::CrossNetworkInternalPeerId,
40    signals::REORG_LOCK,
41    txpool::{
42        dandelion,
43        relay_rules::check_tx_relay_rules,
44        txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
45    },
46};
47
48/// An error that can happen handling an incoming tx.
49#[derive(Debug, thiserror::Error)]
50pub enum IncomingTxError {
51    #[error("Error parsing tx: {0}")]
52    Parse(std::io::Error),
53    #[error(transparent)]
54    Consensus(ExtendedConsensusError),
55    #[error("Duplicate tx in message")]
56    DuplicateTransaction,
57}
58
59/// Incoming transactions.
60pub struct IncomingTxs {
61    /// The raw bytes of the transactions.
62    pub txs: Vec<Bytes>,
63    /// The routing state of the transactions.
64    pub state: TxState<CrossNetworkInternalPeerId>,
65}
66
67///  The transaction type used for dandelion++.
68#[derive(Clone)]
69pub struct DandelionTx(pub Bytes);
70
71/// A transaction ID/hash.
72pub(super) type TxId = [u8; 32];
73
74/// The service than handles incoming transaction pool transactions.
75///
76/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
77#[derive(Clone)]
78pub struct IncomingTxHandler {
79    /// A store of txs currently being handled in incoming tx requests.
80    pub(super) txs_being_handled: TxsBeingHandled,
81    /// The blockchain context cache.
82    pub(super) blockchain_context_cache: BlockchainContextService,
83    /// The dandelion txpool manager.
84    pub(super) dandelion_pool_manager:
85        DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
86    /// The txpool write handle.
87    pub(super) txpool_write_handle: TxpoolWriteHandle,
88    /// The txpool read handle.
89    pub(super) txpool_read_handle: TxpoolReadHandle,
90    /// The blockchain read handle.
91    pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle,
92}
93
94impl IncomingTxHandler {
95    /// Initialize the [`IncomingTxHandler`].
96    #[expect(clippy::significant_drop_tightening)]
97    pub fn init(
98        clear_net: NetworkInterface<ClearNet>,
99        txpool_write_handle: TxpoolWriteHandle,
100        txpool_read_handle: TxpoolReadHandle,
101        blockchain_context_cache: BlockchainContextService,
102        blockchain_read_handle: BlockchainReadHandle,
103    ) -> Self {
104        let dandelion_router = dandelion::dandelion_router(clear_net);
105
106        let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
107            dandelion_router,
108            txpool_read_handle.clone(),
109            txpool_write_handle.clone(),
110        );
111
112        Self {
113            txs_being_handled: TxsBeingHandled::new(),
114            blockchain_context_cache,
115            dandelion_pool_manager,
116            txpool_write_handle,
117            txpool_read_handle,
118            blockchain_read_handle: ConsensusBlockchainReadHandle::new(
119                blockchain_read_handle,
120                BoxError::from,
121            ),
122        }
123    }
124}
125
126impl Service<IncomingTxs> for IncomingTxHandler {
127    type Response = ();
128    type Error = IncomingTxError;
129    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
130
131    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
132        Poll::Ready(Ok(()))
133    }
134
135    fn call(&mut self, req: IncomingTxs) -> Self::Future {
136        handle_incoming_txs(
137            req,
138            self.txs_being_handled.clone(),
139            self.blockchain_context_cache.clone(),
140            self.blockchain_read_handle.clone(),
141            self.txpool_write_handle.clone(),
142            self.txpool_read_handle.clone(),
143            self.dandelion_pool_manager.clone(),
144        )
145        .boxed()
146    }
147}
148
149/// Handles the incoming txs.
150async fn handle_incoming_txs(
151    IncomingTxs { txs, state }: IncomingTxs,
152    txs_being_handled: TxsBeingHandled,
153    mut blockchain_context_cache: BlockchainContextService,
154    blockchain_read_handle: ConsensusBlockchainReadHandle,
155    mut txpool_write_handle: TxpoolWriteHandle,
156    mut txpool_read_handle: TxpoolReadHandle,
157    mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
158) -> Result<(), IncomingTxError> {
159    let _reorg_guard = REORG_LOCK.read().await;
160
161    let (txs, stem_pool_txs, txs_being_handled_guard) =
162        prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
163
164    let context = blockchain_context_cache.blockchain_context();
165
166    let txs = start_tx_verification()
167        .append_prepped_txs(txs)
168        .prepare()
169        .map_err(|e| IncomingTxError::Consensus(e.into()))?
170        .full(
171            context.chain_height,
172            context.top_hash,
173            context.current_adjusted_timestamp_for_time_lock(),
174            context.current_hf,
175            blockchain_read_handle,
176            None,
177        )
178        .verify()
179        .await
180        .map_err(IncomingTxError::Consensus)?;
181
182    for tx in txs {
183        // TODO: this could be a DoS, if someone spams us with txs that violate these rules?
184        // Maybe we should remember these invalid txs for some time to prevent them getting repeatedly sent.
185        if let Err(e) = check_tx_relay_rules(&tx, context) {
186            tracing::debug!(err = %e, tx = hex::encode(tx.tx_hash), "Tx failed relay check, skipping.");
187
188            continue;
189        }
190
191        handle_valid_tx(
192            tx,
193            state.clone(),
194            &mut txpool_write_handle,
195            &mut dandelion_pool_manager,
196        )
197        .await;
198    }
199
200    // Re-relay any txs we got in the block that were already in our stem pool.
201    for stem_tx in stem_pool_txs {
202        rerelay_stem_tx(
203            &stem_tx,
204            state.clone(),
205            &mut txpool_read_handle,
206            &mut dandelion_pool_manager,
207        )
208        .await;
209    }
210
211    Ok(())
212}
213
214/// Prepares the incoming transactions for verification.
215///
216/// This will filter out all transactions already in the pool or txs already being handled in another request.
217///
218/// Returns in order:
219///   - The [`TransactionVerificationData`] for all the txs we did not already have
220///   - The Ids of the transactions in the incoming message that are in our stem-pool
221///   - A [`TxsBeingHandledLocally`] guard that prevents verifying the same tx at the same time across 2 tasks.
222async fn prepare_incoming_txs(
223    tx_blobs: Vec<Bytes>,
224    txs_being_handled: TxsBeingHandled,
225    txpool_read_handle: &mut TxpoolReadHandle,
226) -> Result<
227    (
228        Vec<TransactionVerificationData>,
229        Vec<TxId>,
230        TxsBeingHandledLocally,
231    ),
232    IncomingTxError,
233> {
234    let mut tx_blob_hashes = HashSet::new();
235    let mut txs_being_handled_locally = txs_being_handled.local_tracker();
236
237    // Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch.
238    let txs = tx_blobs
239        .into_iter()
240        .filter_map(|tx_blob| {
241            let tx_blob_hash = transaction_blob_hash(&tx_blob);
242
243            // If a duplicate is in here the incoming tx batch contained the same tx twice.
244            if !tx_blob_hashes.insert(tx_blob_hash) {
245                return Some(Err(IncomingTxError::DuplicateTransaction));
246            }
247
248            // If a duplicate is here it is being handled in another batch.
249            if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
250                return None;
251            }
252
253            Some(Ok((tx_blob_hash, tx_blob)))
254        })
255        .collect::<Result<Vec<_>, _>>()?;
256
257    // Filter the txs already in the txpool out.
258    // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue.
259    let TxpoolReadResponse::FilterKnownTxBlobHashes {
260        unknown_blob_hashes,
261        stem_pool_hashes,
262    } = txpool_read_handle
263        .ready()
264        .await
265        .expect(PANIC_CRITICAL_SERVICE_ERROR)
266        .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
267        .await
268        .expect(PANIC_CRITICAL_SERVICE_ERROR)
269    else {
270        unreachable!()
271    };
272
273    // Now prepare the txs for verification.
274    rayon_spawn_async(move || {
275        let txs = txs
276            .into_iter()
277            .filter_map(|(tx_blob_hash, tx_blob)| {
278                if unknown_blob_hashes.contains(&tx_blob_hash) {
279                    Some(tx_blob)
280                } else {
281                    None
282                }
283            })
284            .map(|bytes| {
285                let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
286
287                let tx = new_tx_verification_data(tx)
288                    .map_err(|e| IncomingTxError::Consensus(e.into()))?;
289
290                Ok(tx)
291            })
292            .collect::<Result<Vec<_>, IncomingTxError>>()?;
293
294        Ok((txs, stem_pool_hashes, txs_being_handled_locally))
295    })
296    .await
297}
298
299/// Handle a verified tx.
300///
301/// This will add the tx to the txpool and route it to the network.
302async fn handle_valid_tx(
303    tx: TransactionVerificationData,
304    state: TxState<CrossNetworkInternalPeerId>,
305    txpool_write_handle: &mut TxpoolWriteHandle,
306    dandelion_pool_manager: &mut DandelionPoolService<
307        DandelionTx,
308        TxId,
309        CrossNetworkInternalPeerId,
310    >,
311) {
312    let incoming_tx =
313        IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
314
315    let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
316        .ready()
317        .await
318        .expect(PANIC_CRITICAL_SERVICE_ERROR)
319        .call(TxpoolWriteRequest::AddTransaction {
320            tx: Box::new(tx),
321            state_stem: state.is_stem_stage(),
322        })
323        .await
324        .expect("TODO")
325    else {
326        unreachable!()
327    };
328
329    // TODO: track double spends to quickly ignore them from their blob hash.
330    if let Some(tx_hash) = double_spend {
331        return;
332    }
333
334    // TODO: There is a race condition possible if a tx and block come in at the same time: <https://github.com/Cuprate/cuprate/issues/314>.
335
336    let incoming_tx = incoming_tx
337        .with_routing_state(state)
338        .with_state_in_db(None)
339        .build()
340        .unwrap();
341
342    dandelion_pool_manager
343        .ready()
344        .await
345        .expect(PANIC_CRITICAL_SERVICE_ERROR)
346        .call(incoming_tx)
347        .await
348        .expect(PANIC_CRITICAL_SERVICE_ERROR);
349}
350
351/// Re-relay a tx that was already in our stem pool.
352async fn rerelay_stem_tx(
353    tx_hash: &TxId,
354    state: TxState<CrossNetworkInternalPeerId>,
355    txpool_read_handle: &mut TxpoolReadHandle,
356    dandelion_pool_manager: &mut DandelionPoolService<
357        DandelionTx,
358        TxId,
359        CrossNetworkInternalPeerId,
360    >,
361) {
362    let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
363        .ready()
364        .await
365        .expect(PANIC_CRITICAL_SERVICE_ERROR)
366        .call(TxpoolReadRequest::TxBlob(*tx_hash))
367        .await
368    else {
369        // The tx could have been dropped from the pool.
370        return;
371    };
372
373    let incoming_tx =
374        IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
375
376    let incoming_tx = incoming_tx
377        .with_routing_state(state)
378        .with_state_in_db(Some(State::Stem))
379        .build()
380        .unwrap();
381
382    dandelion_pool_manager
383        .ready()
384        .await
385        .expect(PANIC_CRITICAL_SERVICE_ERROR)
386        .call(incoming_tx)
387        .await
388        .expect(PANIC_CRITICAL_SERVICE_ERROR);
389}