cuprate_txpool/service/
write.rs

1use std::sync::Arc;
2
3use cuprate_database::{
4    ConcreteEnv, DatabaseRo, DatabaseRw, DbResult, Env, EnvInner, RuntimeError, TxRw,
5};
6use cuprate_database_service::DatabaseWriteHandle;
7use cuprate_helper::time::current_unix_timestamp;
8use cuprate_types::TransactionVerificationData;
9
10use crate::{
11    ops::{self, TxPoolWriteError},
12    service::{
13        interface::{TxpoolWriteRequest, TxpoolWriteResponse},
14        types::TxpoolWriteHandle,
15    },
16    tables::{OpenTables, Tables, TransactionInfos},
17    types::{KeyImage, TransactionHash, TxStateFlags},
18};
19
20//---------------------------------------------------------------------------------------------------- init_write_service
21/// Initialize the txpool write service from a [`ConcreteEnv`].
22pub(super) fn init_write_service(env: Arc<ConcreteEnv>) -> TxpoolWriteHandle {
23    DatabaseWriteHandle::init(env, handle_txpool_request)
24}
25
26//---------------------------------------------------------------------------------------------------- handle_txpool_request
27/// Handle an incoming [`TxpoolWriteRequest`], returning a [`TxpoolWriteResponse`].
28fn handle_txpool_request(
29    env: &ConcreteEnv,
30    req: &TxpoolWriteRequest,
31) -> DbResult<TxpoolWriteResponse> {
32    match req {
33        TxpoolWriteRequest::AddTransaction { tx, state_stem } => {
34            add_transaction(env, tx, *state_stem)
35        }
36        TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash),
37        TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash),
38        TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images),
39    }
40}
41
42//---------------------------------------------------------------------------------------------------- Handler functions
43// These are the actual functions that do stuff according to the incoming [`TxpoolWriteRequest`].
44//
45// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
46// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
47//
48// Each function will return the [`Response`] that we
49// should send back to the caller in [`map_request()`].
50
51/// [`TxpoolWriteRequest::AddTransaction`]
52fn add_transaction(
53    env: &ConcreteEnv,
54    tx: &TransactionVerificationData,
55    state_stem: bool,
56) -> DbResult<TxpoolWriteResponse> {
57    let env_inner = env.env_inner();
58    let tx_rw = env_inner.tx_rw()?;
59
60    let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
61
62    if let Err(e) = ops::add_transaction(tx, state_stem, &mut tables_mut) {
63        drop(tables_mut);
64        // error adding the tx, abort the DB transaction.
65        TxRw::abort(tx_rw)
66            .expect("could not maintain database atomicity by aborting write transaction");
67
68        return match e {
69            TxPoolWriteError::DoubleSpend(tx_hash) => {
70                // If we couldn't add the tx due to a double spend still return ok, but include the tx
71                // this double spent.
72                // TODO: mark the double spent tx?
73                Ok(TxpoolWriteResponse::AddTransaction(Some(tx_hash)))
74            }
75            TxPoolWriteError::Database(e) => Err(e),
76        };
77    }
78
79    drop(tables_mut);
80    // The tx was added to the pool successfully.
81    TxRw::commit(tx_rw)?;
82    Ok(TxpoolWriteResponse::AddTransaction(None))
83}
84
85/// [`TxpoolWriteRequest::RemoveTransaction`]
86fn remove_transaction(
87    env: &ConcreteEnv,
88    tx_hash: &TransactionHash,
89) -> DbResult<TxpoolWriteResponse> {
90    let env_inner = env.env_inner();
91    let tx_rw = env_inner.tx_rw()?;
92
93    let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
94
95    if let Err(e) = ops::remove_transaction(tx_hash, &mut tables_mut) {
96        drop(tables_mut);
97        // error removing the tx, abort the DB transaction.
98        TxRw::abort(tx_rw)
99            .expect("could not maintain database atomicity by aborting write transaction");
100
101        return Err(e);
102    }
103
104    drop(tables_mut);
105
106    TxRw::commit(tx_rw)?;
107    Ok(TxpoolWriteResponse::Ok)
108}
109
110/// [`TxpoolWriteRequest::Promote`]
111fn promote(env: &ConcreteEnv, tx_hash: &TransactionHash) -> DbResult<TxpoolWriteResponse> {
112    let env_inner = env.env_inner();
113    let tx_rw = env_inner.tx_rw()?;
114
115    let res = || {
116        let mut tx_infos = env_inner.open_db_rw::<TransactionInfos>(&tx_rw)?;
117
118        tx_infos.update(tx_hash, |mut info| {
119            info.received_at = current_unix_timestamp();
120            info.flags.remove(TxStateFlags::STATE_STEM);
121            Some(info)
122        })
123    };
124
125    if let Err(e) = res() {
126        // error promoting the tx, abort the DB transaction.
127        TxRw::abort(tx_rw)
128            .expect("could not maintain database atomicity by aborting write transaction");
129
130        return Err(e);
131    }
132
133    TxRw::commit(tx_rw)?;
134    Ok(TxpoolWriteResponse::Ok)
135}
136
137/// [`TxpoolWriteRequest::NewBlock`]
138fn new_block(env: &ConcreteEnv, spent_key_images: &[KeyImage]) -> DbResult<TxpoolWriteResponse> {
139    let env_inner = env.env_inner();
140    let tx_rw = env_inner.tx_rw()?;
141
142    let mut txs_removed = Vec::new();
143
144    // FIXME: use try blocks once stable.
145    let mut result = || {
146        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
147
148        // Remove all txs which spend key images that were spent in the new block.
149        for key_image in spent_key_images {
150            match tables_mut
151                .spent_key_images()
152                .get(key_image)
153                .and_then(|tx_hash| {
154                    txs_removed.push(tx_hash);
155                    ops::remove_transaction(&tx_hash, &mut tables_mut)
156                }) {
157                Ok(()) | Err(RuntimeError::KeyNotFound) => (),
158                Err(e) => return Err(e),
159            }
160        }
161
162        Ok(())
163    };
164
165    if let Err(e) = result() {
166        TxRw::abort(tx_rw)?;
167        return Err(e);
168    }
169
170    TxRw::commit(tx_rw)?;
171    Ok(TxpoolWriteResponse::NewBlock(txs_removed))
172}