cuprated/txpool/
manager.rs

1use std::{
2    cmp::min,
3    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
4};
5
6use bytes::Bytes;
7use futures::StreamExt;
8use indexmap::IndexMap;
9use rand::Rng;
10use tokio::sync::{mpsc, oneshot};
11use tokio_util::{time::delay_queue, time::DelayQueue};
12use tower::{Service, ServiceExt};
13use tracing::{instrument, Instrument, Span};
14
15use cuprate_dandelion_tower::{
16    pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder},
17    traits::DiffuseRequest,
18    TxState,
19};
20use cuprate_helper::time::current_unix_timestamp;
21use cuprate_p2p_core::ClearNet;
22use cuprate_txpool::service::{
23    interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
24    TxpoolReadHandle, TxpoolWriteHandle,
25};
26use cuprate_types::TransactionVerificationData;
27
28use crate::{
29    config::TxpoolConfig,
30    constants::PANIC_CRITICAL_SERVICE_ERROR,
31    p2p::{CrossNetworkInternalPeerId, NetworkInterfaces},
32    txpool::{
33        dandelion::DiffuseService,
34        incoming_tx::{DandelionTx, TxId},
35    },
36};
37
38const INCOMING_TX_QUEUE_SIZE: usize = 100;
39
40/// Starts the transaction pool manager service.
41///
42/// # Panics
43///
44/// This function may panic if any inner service has an unrecoverable error.
45pub async fn start_txpool_manager(
46    mut txpool_write_handle: TxpoolWriteHandle,
47    mut txpool_read_handle: TxpoolReadHandle,
48    promote_tx_channel: mpsc::UnboundedReceiver<[u8; 32]>,
49    diffuse_service: DiffuseService<ClearNet>,
50    dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
51    config: TxpoolConfig,
52) -> TxpoolManagerHandle {
53    let TxpoolReadResponse::Backlog(backlog) = txpool_read_handle
54        .ready()
55        .await
56        .expect(PANIC_CRITICAL_SERVICE_ERROR)
57        .call(TxpoolReadRequest::Backlog)
58        .await
59        .expect(PANIC_CRITICAL_SERVICE_ERROR)
60    else {
61        unreachable!()
62    };
63
64    tracing::info!(txs_in_pool = backlog.len(), "starting txpool manager");
65
66    let mut stem_txs = Vec::new();
67
68    let mut tx_timeouts = DelayQueue::with_capacity(backlog.len());
69    let current_txs = backlog
70        .into_iter()
71        .map(|tx| {
72            let timeout_key = if tx.private {
73                stem_txs.push(tx.id);
74                None
75            } else {
76                let next_timeout = calculate_next_timeout(tx.received_at, config.maximum_age_secs);
77                Some(tx_timeouts.insert(tx.id, Duration::from_secs(next_timeout)))
78            };
79
80            (
81                tx.id,
82                TxInfo {
83                    weight: tx.weight,
84                    fee: tx.fee,
85                    received_at: tx.received_at,
86                    private: tx.private,
87                    timeout_key,
88                },
89            )
90        })
91        .collect();
92
93    let mut manager = TxpoolManager {
94        current_txs,
95        tx_timeouts,
96        txpool_write_handle,
97        txpool_read_handle,
98        dandelion_pool_manager,
99        promote_tx_channel,
100        diffuse_service,
101        config,
102    };
103
104    tracing::info!(stem_txs = stem_txs.len(), "promoting stem txs");
105
106    for tx in stem_txs {
107        manager.promote_tx(tx).await;
108    }
109
110    let (tx_tx, tx_rx) = mpsc::channel(INCOMING_TX_QUEUE_SIZE);
111    let (spent_kis_tx, spent_kis_rx) = mpsc::channel(1);
112
113    tokio::spawn(manager.run(tx_rx, spent_kis_rx));
114
115    TxpoolManagerHandle {
116        tx_tx,
117        spent_kis_tx,
118    }
119}
120
121/// A handle to the tx-pool manager.
122#[derive(Clone)]
123pub struct TxpoolManagerHandle {
124    /// The incoming tx channel.
125    pub tx_tx: mpsc::Sender<(
126        TransactionVerificationData,
127        TxState<CrossNetworkInternalPeerId>,
128    )>,
129
130    /// The spent key images in a new block tx.
131    spent_kis_tx: mpsc::Sender<(Vec<[u8; 32]>, oneshot::Sender<()>)>,
132}
133
134impl TxpoolManagerHandle {
135    /// Create a mock [`TxpoolManagerHandle`] that does nothing.
136    ///
137    /// Useful for testing.
138    #[expect(clippy::let_underscore_must_use)]
139    pub fn mock() -> Self {
140        let (spent_kis_tx, mut spent_kis_rx) = mpsc::channel(1);
141        let (tx_tx, mut tx_rx) = mpsc::channel(100);
142
143        tokio::spawn(async move {
144            loop {
145                let Some(rec): Option<(_, oneshot::Sender<()>)> = spent_kis_rx.recv().await else {
146                    return;
147                };
148
149                let _ = rec.1.send(());
150            }
151        });
152
153        tokio::spawn(async move {
154            loop {
155                if tx_rx.recv().await.is_none() {
156                    return;
157                }
158            }
159        });
160
161        Self {
162            tx_tx,
163            spent_kis_tx,
164        }
165    }
166
167    /// Tell the tx-pool about spent key images in an incoming block.
168    pub async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) -> anyhow::Result<()> {
169        let (tx, rx) = oneshot::channel();
170
171        drop(self.spent_kis_tx.send((spent_key_images, tx)).await);
172
173        rx.await
174            .map_err(|_| anyhow::anyhow!("txpool manager stopped"))
175    }
176}
177
178/// Information on a transaction in the tx-pool.
179struct TxInfo {
180    /// The weight of the transaction.
181    weight: usize,
182    /// The fee the transaction paid.
183    fee: u64,
184    /// The UNIX timestamp when the tx was received.
185    received_at: u64,
186    /// Whether the tx is in the private pool.
187    private: bool,
188
189    /// The [`delay_queue::Key`] for the timeout queue in the manager.
190    ///
191    /// This will be [`None`] if the tx is private as timeouts for them are handled in the dandelion pool.
192    timeout_key: Option<delay_queue::Key>,
193}
194
195struct TxpoolManager {
196    current_txs: IndexMap<[u8; 32], TxInfo>,
197
198    /// A [`DelayQueue`] for waiting on tx timeouts.
199    ///
200    /// Timeouts can be for re-relaying or removal from the pool.
201    tx_timeouts: DelayQueue<[u8; 32]>,
202
203    txpool_write_handle: TxpoolWriteHandle,
204    txpool_read_handle: TxpoolReadHandle,
205
206    dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
207    /// The channel the dandelion manager will use to communicate that a tx should be promoted to the
208    /// public pool.
209    promote_tx_channel: mpsc::UnboundedReceiver<[u8; 32]>,
210    /// The [`DiffuseService`] to diffuse txs to the p2p network.
211    ///
212    /// Used for re-relays.
213    diffuse_service: DiffuseService<ClearNet>,
214
215    config: TxpoolConfig,
216}
217
218impl TxpoolManager {
219    /// Removes a transaction from the tx-pool manager, and optionally the database too.
220    ///
221    /// # Panics
222    ///
223    /// This function will panic if the tx is not in the tx-pool manager.
224    #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
225    async fn remove_tx_from_pool(&mut self, tx: [u8; 32], remove_from_db: bool) {
226        tracing::debug!("removing tx from pool");
227
228        let tx_info = self.current_txs.swap_remove(&tx).unwrap();
229
230        tx_info
231            .timeout_key
232            .and_then(|key| self.tx_timeouts.try_remove(&key));
233
234        if remove_from_db {
235            self.txpool_write_handle
236                .ready()
237                .await
238                .expect(PANIC_CRITICAL_SERVICE_ERROR)
239                .call(TxpoolWriteRequest::RemoveTransaction(tx))
240                .await
241                .expect(PANIC_CRITICAL_SERVICE_ERROR);
242        }
243    }
244
245    /// Re-relay a tx to the network.
246    ///
247    /// # Panics
248    ///
249    /// This function will panic if the tx is not in the tx-pool.
250    #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
251    async fn rerelay_tx(&mut self, tx: [u8; 32]) {
252        tracing::debug!("re-relaying tx to network");
253
254        let TxpoolReadResponse::TxBlob {
255            tx_blob,
256            state_stem: _,
257        } = self
258            .txpool_read_handle
259            .ready()
260            .await
261            .expect(PANIC_CRITICAL_SERVICE_ERROR)
262            .call(TxpoolReadRequest::TxBlob(tx))
263            .await
264            .expect(PANIC_CRITICAL_SERVICE_ERROR)
265        else {
266            unreachable!()
267        };
268
269        self.diffuse_service
270            .call(DiffuseRequest(DandelionTx(Bytes::from(tx_blob))))
271            .await
272            .expect(PANIC_CRITICAL_SERVICE_ERROR);
273    }
274
275    /// Handles a transaction timeout, be either rebroadcasting or dropping the tx from the pool.
276    /// If a rebroadcast happens, this function will handle adding another timeout to the queue.
277    #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
278    async fn handle_tx_timeout(&mut self, tx: [u8; 32]) {
279        let Some(tx_info) = self.current_txs.get(&tx) else {
280            tracing::warn!("tx timed out, but tx not in pool");
281            return;
282        };
283
284        let time_in_pool = current_unix_timestamp() - tx_info.received_at;
285
286        // Check if the tx has timed out, with a small buffer to prevent rebroadcasting if the time is
287        // slightly off.
288        if time_in_pool + 10 > self.config.maximum_age_secs {
289            tracing::warn!("tx has been in pool too long, removing from pool");
290            self.remove_tx_from_pool(tx, true).await;
291            return;
292        }
293
294        let received_at = tx_info.received_at;
295
296        tracing::debug!(time_in_pool, "tx timed out, resending to network");
297
298        self.rerelay_tx(tx).await;
299
300        let tx_info = self.current_txs.get_mut(&tx).unwrap();
301
302        let next_timeout = calculate_next_timeout(received_at, self.config.maximum_age_secs);
303        tracing::trace!(in_secs = next_timeout, "setting next tx timeout");
304
305        tx_info.timeout_key = Some(
306            self.tx_timeouts
307                .insert(tx, Duration::from_secs(next_timeout)),
308        );
309    }
310
311    /// Adds a tx to the tx-pool manager.
312    #[instrument(level = "trace", skip_all, fields(tx_id = hex::encode(tx)))]
313    fn track_tx(&mut self, tx: [u8; 32], weight: usize, fee: u64, private: bool) {
314        let now = current_unix_timestamp();
315
316        let timeout_key = if private {
317            // The dandelion pool handles stem tx embargo.
318            None
319        } else {
320            let timeout = calculate_next_timeout(now, self.config.maximum_age_secs);
321
322            tracing::trace!(in_secs = timeout, "setting next tx timeout");
323
324            Some(self.tx_timeouts.insert(tx, Duration::from_secs(timeout)))
325        };
326
327        self.current_txs.insert(
328            tx,
329            TxInfo {
330                weight,
331                fee,
332                received_at: now,
333                private,
334                timeout_key,
335            },
336        );
337    }
338
339    /// Handles an incoming tx, adding it to the pool and routing it.
340    #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx.tx_hash), state))]
341    async fn handle_incoming_tx(
342        &mut self,
343        tx: TransactionVerificationData,
344        state: TxState<CrossNetworkInternalPeerId>,
345    ) {
346        tracing::debug!("handling new tx");
347
348        let incoming_tx =
349            IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
350
351        let (tx_hash, tx_weight, tx_fee) = (tx.tx_hash, tx.tx_weight, tx.fee);
352
353        let TxpoolWriteResponse::AddTransaction(double_spend) = self
354            .txpool_write_handle
355            .ready()
356            .await
357            .expect(PANIC_CRITICAL_SERVICE_ERROR)
358            .call(TxpoolWriteRequest::AddTransaction {
359                tx: Box::new(tx),
360                state_stem: state.is_stem_stage(),
361            })
362            .await
363            .expect(PANIC_CRITICAL_SERVICE_ERROR)
364        else {
365            unreachable!()
366        };
367
368        if let Some(tx_hash) = double_spend {
369            tracing::debug!(
370                double_spent = hex::encode(tx_hash),
371                "transaction is a double spend, ignoring"
372            );
373            return;
374        }
375
376        self.track_tx(tx_hash, tx_weight, tx_fee, state.is_stem_stage());
377
378        let incoming_tx = incoming_tx
379            .with_routing_state(state)
380            .with_state_in_db(None)
381            .build()
382            .unwrap();
383
384        self.dandelion_pool_manager
385            .ready()
386            .await
387            .expect(PANIC_CRITICAL_SERVICE_ERROR)
388            .call(incoming_tx)
389            .await
390            .expect(PANIC_CRITICAL_SERVICE_ERROR);
391    }
392
393    /// Promote a tx to the public pool.
394    #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
395    async fn promote_tx(&mut self, tx: [u8; 32]) {
396        let Some(tx_info) = self.current_txs.get_mut(&tx) else {
397            tracing::debug!("not promoting tx, tx not in pool");
398            return;
399        };
400
401        if !tx_info.private {
402            tracing::trace!("not promoting tx, tx is already public");
403            return;
404        }
405
406        tracing::debug!("promoting tx");
407
408        // It's now in the public pool, pretend we just saw it.
409        tx_info.received_at = current_unix_timestamp();
410
411        let next_timeout =
412            calculate_next_timeout(tx_info.received_at, self.config.maximum_age_secs);
413        tracing::trace!(in_secs = next_timeout, "setting next tx timeout");
414        tx_info.timeout_key = Some(
415            self.tx_timeouts
416                .insert(tx, Duration::from_secs(next_timeout)),
417        );
418
419        self.txpool_write_handle
420            .ready()
421            .await
422            .expect(PANIC_CRITICAL_SERVICE_ERROR)
423            .call(TxpoolWriteRequest::Promote(tx))
424            .await
425            .expect(PANIC_CRITICAL_SERVICE_ERROR);
426    }
427
428    /// Handles removing all transactions that have been included/double spent in an incoming block.
429    #[instrument(level = "debug", skip_all)]
430    async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) {
431        tracing::debug!("handling new block");
432
433        let TxpoolWriteResponse::NewBlock(removed_txs) = self
434            .txpool_write_handle
435            .ready()
436            .await
437            .expect(PANIC_CRITICAL_SERVICE_ERROR)
438            .call(TxpoolWriteRequest::NewBlock { spent_key_images })
439            .await
440            .expect(PANIC_CRITICAL_SERVICE_ERROR)
441        else {
442            unreachable!()
443        };
444
445        for tx in removed_txs {
446            self.remove_tx_from_pool(tx, false).await;
447        }
448    }
449
450    #[expect(clippy::let_underscore_must_use)]
451    async fn run(
452        mut self,
453        mut tx_rx: mpsc::Receiver<(
454            TransactionVerificationData,
455            TxState<CrossNetworkInternalPeerId>,
456        )>,
457        mut block_rx: mpsc::Receiver<(Vec<[u8; 32]>, oneshot::Sender<()>)>,
458    ) {
459        loop {
460            tokio::select! {
461                Some(tx) = self.tx_timeouts.next() => {
462                    self.handle_tx_timeout(tx.into_inner()).await;
463                }
464                Some((tx, state)) = tx_rx.recv() => {
465                    self.handle_incoming_tx(tx, state).await;
466                }
467                Some(tx) = self.promote_tx_channel.recv() => {
468                    self.promote_tx(tx).await;
469                }
470                Some((spent_kis, tx)) = block_rx.recv() => {
471                    self.new_block(spent_kis).await;
472                    let _ = tx.send(());
473                }
474            }
475        }
476    }
477}
478
479/// Calculates the amount of time to wait before resending a tx to the network.
480fn calculate_next_timeout(received_at: u64, max_time_in_pool: u64) -> u64 {
481    /// The base time between re-relays to the p2p network.
482    const TX_RERELAY_TIME: u64 = 300;
483
484    /*
485    This is a simple exponential backoff.
486    The first timeout is TX_RERELAY_TIME seconds, the second is 2 * TX_RERELAY_TIME seconds, then 4, 8, 16, etc.
487     */
488    let now = current_unix_timestamp();
489
490    let time_in_pool = now - received_at;
491
492    let time_till_max_timeout = max_time_in_pool.saturating_sub(time_in_pool);
493
494    let timeouts = time_in_pool / TX_RERELAY_TIME;
495
496    min((timeouts + 1) * TX_RERELAY_TIME, time_till_max_timeout)
497}