cuprated/txpool/
txs_being_handled.rs

1use std::sync::Arc;
2
3use dashmap::DashSet;
4
5/// A set of txs currently being handled, shared between instances of the incoming tx handler.
6#[derive(Clone)]
7pub struct TxsBeingHandled(Arc<DashSet<[u8; 32]>>);
8
9impl TxsBeingHandled {
10    /// Create a new [`TxsBeingHandled`]
11    pub fn new() -> Self {
12        Self(Arc::new(DashSet::new()))
13    }
14
15    /// Create a new [`TxsBeingHandledLocally`] that will keep track of txs being handled in a request.
16    pub fn local_tracker(&self) -> TxsBeingHandledLocally {
17        TxsBeingHandledLocally {
18            txs_being_handled: self.clone(),
19            txs: vec![],
20        }
21    }
22}
23
24/// A tracker of txs being handled in a single request. This will add the txs to the global [`TxsBeingHandled`]
25/// tracker as well.
26///
27/// When this is dropped the txs will be removed from [`TxsBeingHandled`].
28pub struct TxsBeingHandledLocally {
29    txs_being_handled: TxsBeingHandled,
30    txs: Vec<[u8; 32]>,
31}
32
33impl TxsBeingHandledLocally {
34    /// Try add a tx to the map from its [`transaction_blob_hash`](cuprate_txpool::transaction_blob_hash).
35    ///
36    /// Returns `true` if the tx was added and `false` if another task is already handling this tx.
37    pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool {
38        if !self.txs_being_handled.0.insert(tx_blob_hash) {
39            return false;
40        }
41
42        self.txs.push(tx_blob_hash);
43        true
44    }
45}
46
47impl Drop for TxsBeingHandledLocally {
48    fn drop(&mut self) {
49        for hash in &self.txs {
50            self.txs_being_handled.0.remove(hash);
51        }
52    }
53}