cuprated/txpool/
incoming_tx.rs

1use std::{
2    collections::HashSet,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt};
9use monero_serai::transaction::Transaction;
10use tower::{BoxError, Service, ServiceExt};
11
12use cuprate_blockchain::service::BlockchainReadHandle;
13use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions};
14use cuprate_consensus::{
15    transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
16    BlockchainContextService, ExtendedConsensusError,
17};
18use cuprate_dandelion_tower::{
19    pool::{DandelionPoolService, IncomingTxBuilder},
20    State, TxState,
21};
22use cuprate_helper::asynch::rayon_spawn_async;
23use cuprate_p2p::NetworkInterface;
24use cuprate_p2p_core::ClearNet;
25use cuprate_txpool::{
26    service::{
27        interface::{
28            TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
29        },
30        TxpoolReadHandle, TxpoolWriteHandle,
31    },
32    transaction_blob_hash,
33};
34use cuprate_types::TransactionVerificationData;
35
36use crate::{
37    blockchain::ConsensusBlockchainReadHandle,
38    constants::PANIC_CRITICAL_SERVICE_ERROR,
39    p2p::CrossNetworkInternalPeerId,
40    signals::REORG_LOCK,
41    txpool::{
42        dandelion,
43        relay_rules::{check_tx_relay_rules, RelayRuleError},
44        txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
45    },
46};
47
48/// An error that can happen handling an incoming tx.
49#[derive(Debug, thiserror::Error)]
50pub enum IncomingTxError {
51    #[error("Error parsing tx: {0}")]
52    Parse(std::io::Error),
53    #[error(transparent)]
54    Consensus(ExtendedConsensusError),
55    #[error("Duplicate tx in message")]
56    DuplicateTransaction,
57    #[error("Relay rule was broken: {0}")]
58    RelayRule(RelayRuleError),
59}
60
61/// Incoming transactions.
62pub struct IncomingTxs {
63    /// The raw bytes of the transactions.
64    pub txs: Vec<Bytes>,
65    /// The routing state of the transactions.
66    pub state: TxState<CrossNetworkInternalPeerId>,
67    /// If [`true`], transactions breaking relay
68    /// rules will be ignored and processing will continue,
69    /// otherwise the service will return an early error.
70    pub drop_relay_rule_errors: bool,
71    /// If [`true`], only checks will be done,
72    /// the transaction will not be relayed.
73    pub do_not_relay: bool,
74}
75
76///  The transaction type used for dandelion++.
77#[derive(Clone)]
78pub struct DandelionTx(pub Bytes);
79
80/// A transaction ID/hash.
81pub(super) type TxId = [u8; 32];
82
83/// The service than handles incoming transaction pool transactions.
84///
85/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
86#[derive(Clone)]
87pub struct IncomingTxHandler {
88    /// A store of txs currently being handled in incoming tx requests.
89    pub(super) txs_being_handled: TxsBeingHandled,
90    /// The blockchain context cache.
91    pub(super) blockchain_context_cache: BlockchainContextService,
92    /// The dandelion txpool manager.
93    pub(super) dandelion_pool_manager:
94        DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
95    /// The txpool write handle.
96    pub(super) txpool_write_handle: TxpoolWriteHandle,
97    /// The txpool read handle.
98    pub(super) txpool_read_handle: TxpoolReadHandle,
99    /// The blockchain read handle.
100    pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle,
101}
102
103impl IncomingTxHandler {
104    /// Initialize the [`IncomingTxHandler`].
105    #[expect(clippy::significant_drop_tightening)]
106    pub fn init(
107        clear_net: NetworkInterface<ClearNet>,
108        txpool_write_handle: TxpoolWriteHandle,
109        txpool_read_handle: TxpoolReadHandle,
110        blockchain_context_cache: BlockchainContextService,
111        blockchain_read_handle: BlockchainReadHandle,
112    ) -> Self {
113        let dandelion_router = dandelion::dandelion_router(clear_net);
114
115        let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
116            dandelion_router,
117            txpool_read_handle.clone(),
118            txpool_write_handle.clone(),
119        );
120
121        Self {
122            txs_being_handled: TxsBeingHandled::new(),
123            blockchain_context_cache,
124            dandelion_pool_manager,
125            txpool_write_handle,
126            txpool_read_handle,
127            blockchain_read_handle: ConsensusBlockchainReadHandle::new(
128                blockchain_read_handle,
129                BoxError::from,
130            ),
131        }
132    }
133}
134
135impl Service<IncomingTxs> for IncomingTxHandler {
136    type Response = ();
137    type Error = IncomingTxError;
138    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
139
140    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
141        Poll::Ready(Ok(()))
142    }
143
144    fn call(&mut self, req: IncomingTxs) -> Self::Future {
145        handle_incoming_txs(
146            req,
147            self.txs_being_handled.clone(),
148            self.blockchain_context_cache.clone(),
149            self.blockchain_read_handle.clone(),
150            self.txpool_write_handle.clone(),
151            self.txpool_read_handle.clone(),
152            self.dandelion_pool_manager.clone(),
153        )
154        .boxed()
155    }
156}
157
158/// Handles the incoming txs.
159async fn handle_incoming_txs(
160    IncomingTxs {
161        txs,
162        state,
163        drop_relay_rule_errors,
164        do_not_relay,
165    }: IncomingTxs,
166    txs_being_handled: TxsBeingHandled,
167    mut blockchain_context_cache: BlockchainContextService,
168    blockchain_read_handle: ConsensusBlockchainReadHandle,
169    mut txpool_write_handle: TxpoolWriteHandle,
170    mut txpool_read_handle: TxpoolReadHandle,
171    mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
172) -> Result<(), IncomingTxError> {
173    let _reorg_guard = REORG_LOCK.read().await;
174
175    let (txs, stem_pool_txs, txs_being_handled_guard) =
176        prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
177
178    let context = blockchain_context_cache.blockchain_context();
179
180    let txs = start_tx_verification()
181        .append_prepped_txs(txs)
182        .prepare()
183        .map_err(|e| IncomingTxError::Consensus(e.into()))?
184        .full(
185            context.chain_height,
186            context.top_hash,
187            context.current_adjusted_timestamp_for_time_lock(),
188            context.current_hf,
189            blockchain_read_handle,
190            None,
191        )
192        .verify()
193        .await
194        .map_err(IncomingTxError::Consensus)?;
195
196    for tx in txs {
197        // TODO: this could be a DoS, if someone spams us with txs that violate these rules?
198        // Maybe we should remember these invalid txs for some time to prevent them getting repeatedly sent.
199        if let Err(e) = check_tx_relay_rules(&tx, context) {
200            if drop_relay_rule_errors {
201                tracing::debug!(err = %e, tx = hex::encode(tx.tx_hash), "Tx failed relay check, skipping.");
202                continue;
203            }
204
205            return Err(IncomingTxError::RelayRule(e));
206        }
207
208        if !do_not_relay {
209            handle_valid_tx(
210                tx,
211                state.clone(),
212                &mut txpool_write_handle,
213                &mut dandelion_pool_manager,
214            )
215            .await;
216        }
217    }
218
219    // Re-relay any txs we got in the block that were already in our stem pool.
220    if !do_not_relay {
221        for stem_tx in stem_pool_txs {
222            rerelay_stem_tx(
223                &stem_tx,
224                state.clone(),
225                &mut txpool_read_handle,
226                &mut dandelion_pool_manager,
227            )
228            .await;
229        }
230    }
231
232    Ok(())
233}
234
235/// Prepares the incoming transactions for verification.
236///
237/// This will filter out all transactions already in the pool or txs already being handled in another request.
238///
239/// Returns in order:
240///   - The [`TransactionVerificationData`] for all the txs we did not already have
241///   - The Ids of the transactions in the incoming message that are in our stem-pool
242///   - A [`TxsBeingHandledLocally`] guard that prevents verifying the same tx at the same time across 2 tasks.
243async fn prepare_incoming_txs(
244    tx_blobs: Vec<Bytes>,
245    txs_being_handled: TxsBeingHandled,
246    txpool_read_handle: &mut TxpoolReadHandle,
247) -> Result<
248    (
249        Vec<TransactionVerificationData>,
250        Vec<TxId>,
251        TxsBeingHandledLocally,
252    ),
253    IncomingTxError,
254> {
255    let mut tx_blob_hashes = HashSet::new();
256    let mut txs_being_handled_locally = txs_being_handled.local_tracker();
257
258    // Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch.
259    let txs = tx_blobs
260        .into_iter()
261        .filter_map(|tx_blob| {
262            let tx_blob_hash = transaction_blob_hash(&tx_blob);
263
264            // If a duplicate is in here the incoming tx batch contained the same tx twice.
265            if !tx_blob_hashes.insert(tx_blob_hash) {
266                return Some(Err(IncomingTxError::DuplicateTransaction));
267            }
268
269            // If a duplicate is here it is being handled in another batch.
270            if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
271                return None;
272            }
273
274            Some(Ok((tx_blob_hash, tx_blob)))
275        })
276        .collect::<Result<Vec<_>, _>>()?;
277
278    // Filter the txs already in the txpool out.
279    // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue.
280    let TxpoolReadResponse::FilterKnownTxBlobHashes {
281        unknown_blob_hashes,
282        stem_pool_hashes,
283    } = txpool_read_handle
284        .ready()
285        .await
286        .expect(PANIC_CRITICAL_SERVICE_ERROR)
287        .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
288        .await
289        .expect(PANIC_CRITICAL_SERVICE_ERROR)
290    else {
291        unreachable!()
292    };
293
294    // Now prepare the txs for verification.
295    rayon_spawn_async(move || {
296        let txs = txs
297            .into_iter()
298            .filter_map(|(tx_blob_hash, tx_blob)| {
299                if unknown_blob_hashes.contains(&tx_blob_hash) {
300                    Some(tx_blob)
301                } else {
302                    None
303                }
304            })
305            .map(|bytes| {
306                let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
307
308                let tx = new_tx_verification_data(tx)
309                    .map_err(|e| IncomingTxError::Consensus(e.into()))?;
310
311                Ok(tx)
312            })
313            .collect::<Result<Vec<_>, IncomingTxError>>()?;
314
315        Ok((txs, stem_pool_hashes, txs_being_handled_locally))
316    })
317    .await
318}
319
320/// Handle a verified tx.
321///
322/// This will add the tx to the txpool and route it to the network.
323async fn handle_valid_tx(
324    tx: TransactionVerificationData,
325    state: TxState<CrossNetworkInternalPeerId>,
326    txpool_write_handle: &mut TxpoolWriteHandle,
327    dandelion_pool_manager: &mut DandelionPoolService<
328        DandelionTx,
329        TxId,
330        CrossNetworkInternalPeerId,
331    >,
332) {
333    let incoming_tx =
334        IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
335
336    let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
337        .ready()
338        .await
339        .expect(PANIC_CRITICAL_SERVICE_ERROR)
340        .call(TxpoolWriteRequest::AddTransaction {
341            tx: Box::new(tx),
342            state_stem: state.is_stem_stage(),
343        })
344        .await
345        .expect("TODO")
346    else {
347        unreachable!()
348    };
349
350    // TODO: track double spends to quickly ignore them from their blob hash.
351    if let Some(tx_hash) = double_spend {
352        return;
353    }
354
355    // TODO: There is a race condition possible if a tx and block come in at the same time: <https://github.com/Cuprate/cuprate/issues/314>.
356
357    let incoming_tx = incoming_tx
358        .with_routing_state(state)
359        .with_state_in_db(None)
360        .build()
361        .unwrap();
362
363    dandelion_pool_manager
364        .ready()
365        .await
366        .expect(PANIC_CRITICAL_SERVICE_ERROR)
367        .call(incoming_tx)
368        .await
369        .expect(PANIC_CRITICAL_SERVICE_ERROR);
370}
371
372/// Re-relay a tx that was already in our stem pool.
373async fn rerelay_stem_tx(
374    tx_hash: &TxId,
375    state: TxState<CrossNetworkInternalPeerId>,
376    txpool_read_handle: &mut TxpoolReadHandle,
377    dandelion_pool_manager: &mut DandelionPoolService<
378        DandelionTx,
379        TxId,
380        CrossNetworkInternalPeerId,
381    >,
382) {
383    let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
384        .ready()
385        .await
386        .expect(PANIC_CRITICAL_SERVICE_ERROR)
387        .call(TxpoolReadRequest::TxBlob(*tx_hash))
388        .await
389    else {
390        // The tx could have been dropped from the pool.
391        return;
392    };
393
394    let incoming_tx =
395        IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
396
397    let incoming_tx = incoming_tx
398        .with_routing_state(state)
399        .with_state_in_db(Some(State::Stem))
400        .build()
401        .unwrap();
402
403    dandelion_pool_manager
404        .ready()
405        .await
406        .expect(PANIC_CRITICAL_SERVICE_ERROR)
407        .call(incoming_tx)
408        .await
409        .expect(PANIC_CRITICAL_SERVICE_ERROR);
410}