cuprated/txpool/
incoming_tx.rs

1use std::{
2    collections::HashSet,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{future::BoxFuture, FutureExt};
9use monero_serai::transaction::Transaction;
10use tower::{BoxError, Service, ServiceExt};
11
12use cuprate_blockchain::service::BlockchainReadHandle;
13use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions};
14use cuprate_consensus::{
15    transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
16    BlockchainContextService, ExtendedConsensusError,
17};
18use cuprate_dandelion_tower::{
19    pool::{DandelionPoolService, IncomingTxBuilder},
20    State, TxState,
21};
22use cuprate_helper::asynch::rayon_spawn_async;
23use cuprate_p2p::NetworkInterface;
24use cuprate_p2p_core::ClearNet;
25use cuprate_txpool::{
26    service::{
27        interface::{
28            TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
29        },
30        TxpoolReadHandle, TxpoolWriteHandle,
31    },
32    transaction_blob_hash,
33};
34use cuprate_types::TransactionVerificationData;
35
36use crate::{
37    blockchain::ConsensusBlockchainReadHandle,
38    constants::PANIC_CRITICAL_SERVICE_ERROR,
39    p2p::CrossNetworkInternalPeerId,
40    signals::REORG_LOCK,
41    txpool::{
42        dandelion,
43        txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
44    },
45};
46
47/// An error that can happen handling an incoming tx.
48#[derive(Debug, thiserror::Error)]
49pub enum IncomingTxError {
50    #[error("Error parsing tx: {0}")]
51    Parse(std::io::Error),
52    #[error(transparent)]
53    Consensus(ExtendedConsensusError),
54    #[error("Duplicate tx in message")]
55    DuplicateTransaction,
56}
57
58/// Incoming transactions.
59pub struct IncomingTxs {
60    /// The raw bytes of the transactions.
61    pub txs: Vec<Bytes>,
62    /// The routing state of the transactions.
63    pub state: TxState<CrossNetworkInternalPeerId>,
64}
65
66///  The transaction type used for dandelion++.
67#[derive(Clone)]
68pub struct DandelionTx(pub Bytes);
69
70/// A transaction ID/hash.
71pub(super) type TxId = [u8; 32];
72
73/// The service than handles incoming transaction pool transactions.
74///
75/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
76#[derive(Clone)]
77pub struct IncomingTxHandler {
78    /// A store of txs currently being handled in incoming tx requests.
79    pub(super) txs_being_handled: TxsBeingHandled,
80    /// The blockchain context cache.
81    pub(super) blockchain_context_cache: BlockchainContextService,
82    /// The dandelion txpool manager.
83    pub(super) dandelion_pool_manager:
84        DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
85    /// The txpool write handle.
86    pub(super) txpool_write_handle: TxpoolWriteHandle,
87    /// The txpool read handle.
88    pub(super) txpool_read_handle: TxpoolReadHandle,
89    /// The blockchain read handle.
90    pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle,
91}
92
93impl IncomingTxHandler {
94    /// Initialize the [`IncomingTxHandler`].
95    #[expect(clippy::significant_drop_tightening)]
96    pub fn init(
97        clear_net: NetworkInterface<ClearNet>,
98        txpool_write_handle: TxpoolWriteHandle,
99        txpool_read_handle: TxpoolReadHandle,
100        blockchain_context_cache: BlockchainContextService,
101        blockchain_read_handle: BlockchainReadHandle,
102    ) -> Self {
103        let dandelion_router = dandelion::dandelion_router(clear_net);
104
105        let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
106            dandelion_router,
107            txpool_read_handle.clone(),
108            txpool_write_handle.clone(),
109        );
110
111        Self {
112            txs_being_handled: TxsBeingHandled::new(),
113            blockchain_context_cache,
114            dandelion_pool_manager,
115            txpool_write_handle,
116            txpool_read_handle,
117            blockchain_read_handle: ConsensusBlockchainReadHandle::new(
118                blockchain_read_handle,
119                BoxError::from,
120            ),
121        }
122    }
123}
124
125impl Service<IncomingTxs> for IncomingTxHandler {
126    type Response = ();
127    type Error = IncomingTxError;
128    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
129
130    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
131        Poll::Ready(Ok(()))
132    }
133
134    fn call(&mut self, req: IncomingTxs) -> Self::Future {
135        handle_incoming_txs(
136            req,
137            self.txs_being_handled.clone(),
138            self.blockchain_context_cache.clone(),
139            self.blockchain_read_handle.clone(),
140            self.txpool_write_handle.clone(),
141            self.txpool_read_handle.clone(),
142            self.dandelion_pool_manager.clone(),
143        )
144        .boxed()
145    }
146}
147
148/// Handles the incoming txs.
149async fn handle_incoming_txs(
150    IncomingTxs { txs, state }: IncomingTxs,
151    txs_being_handled: TxsBeingHandled,
152    mut blockchain_context_cache: BlockchainContextService,
153    blockchain_read_handle: ConsensusBlockchainReadHandle,
154    mut txpool_write_handle: TxpoolWriteHandle,
155    mut txpool_read_handle: TxpoolReadHandle,
156    mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
157) -> Result<(), IncomingTxError> {
158    let _reorg_guard = REORG_LOCK.read().await;
159
160    let (txs, stem_pool_txs, txs_being_handled_guard) =
161        prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
162
163    let context = blockchain_context_cache.blockchain_context();
164
165    let txs = start_tx_verification()
166        .append_prepped_txs(txs)
167        .prepare()
168        .map_err(|e| IncomingTxError::Consensus(e.into()))?
169        .full(
170            context.chain_height,
171            context.top_hash,
172            context.current_adjusted_timestamp_for_time_lock(),
173            context.current_hf,
174            blockchain_read_handle,
175            None,
176        )
177        .verify()
178        .await
179        .map_err(IncomingTxError::Consensus)?;
180
181    for tx in txs {
182        handle_valid_tx(
183            tx,
184            state.clone(),
185            &mut txpool_write_handle,
186            &mut dandelion_pool_manager,
187        )
188        .await;
189    }
190
191    // Re-relay any txs we got in the block that were already in our stem pool.
192    for stem_tx in stem_pool_txs {
193        rerelay_stem_tx(
194            &stem_tx,
195            state.clone(),
196            &mut txpool_read_handle,
197            &mut dandelion_pool_manager,
198        )
199        .await;
200    }
201
202    Ok(())
203}
204
205/// Prepares the incoming transactions for verification.
206///
207/// This will filter out all transactions already in the pool or txs already being handled in another request.
208///
209/// Returns in order:
210///   - The [`TransactionVerificationData`] for all the txs we did not already have
211///   - The Ids of the transactions in the incoming message that are in our stem-pool
212///   - A [`TxsBeingHandledLocally`] guard that prevents verifying the same tx at the same time across 2 tasks.
213async fn prepare_incoming_txs(
214    tx_blobs: Vec<Bytes>,
215    txs_being_handled: TxsBeingHandled,
216    txpool_read_handle: &mut TxpoolReadHandle,
217) -> Result<
218    (
219        Vec<TransactionVerificationData>,
220        Vec<TxId>,
221        TxsBeingHandledLocally,
222    ),
223    IncomingTxError,
224> {
225    let mut tx_blob_hashes = HashSet::new();
226    let mut txs_being_handled_locally = txs_being_handled.local_tracker();
227
228    // Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch.
229    let txs = tx_blobs
230        .into_iter()
231        .filter_map(|tx_blob| {
232            let tx_blob_hash = transaction_blob_hash(&tx_blob);
233
234            // If a duplicate is in here the incoming tx batch contained the same tx twice.
235            if !tx_blob_hashes.insert(tx_blob_hash) {
236                return Some(Err(IncomingTxError::DuplicateTransaction));
237            }
238
239            // If a duplicate is here it is being handled in another batch.
240            if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
241                return None;
242            }
243
244            Some(Ok((tx_blob_hash, tx_blob)))
245        })
246        .collect::<Result<Vec<_>, _>>()?;
247
248    // Filter the txs already in the txpool out.
249    // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue.
250    let TxpoolReadResponse::FilterKnownTxBlobHashes {
251        unknown_blob_hashes,
252        stem_pool_hashes,
253    } = txpool_read_handle
254        .ready()
255        .await
256        .expect(PANIC_CRITICAL_SERVICE_ERROR)
257        .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
258        .await
259        .expect(PANIC_CRITICAL_SERVICE_ERROR)
260    else {
261        unreachable!()
262    };
263
264    // Now prepare the txs for verification.
265    rayon_spawn_async(move || {
266        let txs = txs
267            .into_iter()
268            .filter_map(|(tx_blob_hash, tx_blob)| {
269                if unknown_blob_hashes.contains(&tx_blob_hash) {
270                    Some(tx_blob)
271                } else {
272                    None
273                }
274            })
275            .map(|bytes| {
276                let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
277
278                let tx = new_tx_verification_data(tx)
279                    .map_err(|e| IncomingTxError::Consensus(e.into()))?;
280
281                Ok(tx)
282            })
283            .collect::<Result<Vec<_>, IncomingTxError>>()?;
284
285        Ok((txs, stem_pool_hashes, txs_being_handled_locally))
286    })
287    .await
288}
289
290/// Handle a verified tx.
291///
292/// This will add the tx to the txpool and route it to the network.
293async fn handle_valid_tx(
294    tx: TransactionVerificationData,
295    state: TxState<CrossNetworkInternalPeerId>,
296    txpool_write_handle: &mut TxpoolWriteHandle,
297    dandelion_pool_manager: &mut DandelionPoolService<
298        DandelionTx,
299        TxId,
300        CrossNetworkInternalPeerId,
301    >,
302) {
303    let incoming_tx =
304        IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
305
306    let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
307        .ready()
308        .await
309        .expect(PANIC_CRITICAL_SERVICE_ERROR)
310        .call(TxpoolWriteRequest::AddTransaction {
311            tx: Box::new(tx),
312            state_stem: state.is_stem_stage(),
313        })
314        .await
315        .expect("TODO")
316    else {
317        unreachable!()
318    };
319
320    // TODO: track double spends to quickly ignore them from their blob hash.
321    if let Some(tx_hash) = double_spend {
322        return;
323    };
324
325    // TODO: There is a race condition possible if a tx and block come in at the same time: <https://github.com/Cuprate/cuprate/issues/314>.
326
327    let incoming_tx = incoming_tx
328        .with_routing_state(state)
329        .with_state_in_db(None)
330        .build()
331        .unwrap();
332
333    dandelion_pool_manager
334        .ready()
335        .await
336        .expect(PANIC_CRITICAL_SERVICE_ERROR)
337        .call(incoming_tx)
338        .await
339        .expect(PANIC_CRITICAL_SERVICE_ERROR);
340}
341
342/// Re-relay a tx that was already in our stem pool.
343async fn rerelay_stem_tx(
344    tx_hash: &TxId,
345    state: TxState<CrossNetworkInternalPeerId>,
346    txpool_read_handle: &mut TxpoolReadHandle,
347    dandelion_pool_manager: &mut DandelionPoolService<
348        DandelionTx,
349        TxId,
350        CrossNetworkInternalPeerId,
351    >,
352) {
353    let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
354        .ready()
355        .await
356        .expect(PANIC_CRITICAL_SERVICE_ERROR)
357        .call(TxpoolReadRequest::TxBlob(*tx_hash))
358        .await
359    else {
360        // The tx could have been dropped from the pool.
361        return;
362    };
363
364    let incoming_tx =
365        IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
366
367    let incoming_tx = incoming_tx
368        .with_routing_state(state)
369        .with_state_in_db(Some(State::Stem))
370        .build()
371        .unwrap();
372
373    dandelion_pool_manager
374        .ready()
375        .await
376        .expect(PANIC_CRITICAL_SERVICE_ERROR)
377        .call(incoming_tx)
378        .await
379        .expect(PANIC_CRITICAL_SERVICE_ERROR);
380}