cuprate_txpool/service/
write.rs

1use std::sync::Arc;
2
3use cuprate_database::{
4    ConcreteEnv, DatabaseRo, DatabaseRw, DbResult, Env, EnvInner, RuntimeError, TxRw,
5};
6use cuprate_database_service::DatabaseWriteHandle;
7use cuprate_types::TransactionVerificationData;
8
9use crate::{
10    ops::{self, TxPoolWriteError},
11    service::{
12        interface::{TxpoolWriteRequest, TxpoolWriteResponse},
13        types::TxpoolWriteHandle,
14    },
15    tables::{OpenTables, Tables, TransactionInfos},
16    types::{KeyImage, TransactionHash, TxStateFlags},
17};
18
19//---------------------------------------------------------------------------------------------------- init_write_service
20/// Initialize the txpool write service from a [`ConcreteEnv`].
21pub(super) fn init_write_service(env: Arc<ConcreteEnv>) -> TxpoolWriteHandle {
22    DatabaseWriteHandle::init(env, handle_txpool_request)
23}
24
25//---------------------------------------------------------------------------------------------------- handle_txpool_request
26/// Handle an incoming [`TxpoolWriteRequest`], returning a [`TxpoolWriteResponse`].
27fn handle_txpool_request(
28    env: &ConcreteEnv,
29    req: &TxpoolWriteRequest,
30) -> DbResult<TxpoolWriteResponse> {
31    match req {
32        TxpoolWriteRequest::AddTransaction { tx, state_stem } => {
33            add_transaction(env, tx, *state_stem)
34        }
35        TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash),
36        TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash),
37        TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images),
38    }
39}
40
41//---------------------------------------------------------------------------------------------------- Handler functions
42// These are the actual functions that do stuff according to the incoming [`TxpoolWriteRequest`].
43//
44// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
45// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
46//
47// Each function will return the [`Response`] that we
48// should send back to the caller in [`map_request()`].
49
50/// [`TxpoolWriteRequest::AddTransaction`]
51fn add_transaction(
52    env: &ConcreteEnv,
53    tx: &TransactionVerificationData,
54    state_stem: bool,
55) -> DbResult<TxpoolWriteResponse> {
56    let env_inner = env.env_inner();
57    let tx_rw = env_inner.tx_rw()?;
58
59    let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
60
61    if let Err(e) = ops::add_transaction(tx, state_stem, &mut tables_mut) {
62        drop(tables_mut);
63        // error adding the tx, abort the DB transaction.
64        TxRw::abort(tx_rw)
65            .expect("could not maintain database atomicity by aborting write transaction");
66
67        return match e {
68            TxPoolWriteError::DoubleSpend(tx_hash) => {
69                // If we couldn't add the tx due to a double spend still return ok, but include the tx
70                // this double spent.
71                // TODO: mark the double spent tx?
72                Ok(TxpoolWriteResponse::AddTransaction(Some(tx_hash)))
73            }
74            TxPoolWriteError::Database(e) => Err(e),
75        };
76    };
77
78    drop(tables_mut);
79    // The tx was added to the pool successfully.
80    TxRw::commit(tx_rw)?;
81    Ok(TxpoolWriteResponse::AddTransaction(None))
82}
83
84/// [`TxpoolWriteRequest::RemoveTransaction`]
85fn remove_transaction(
86    env: &ConcreteEnv,
87    tx_hash: &TransactionHash,
88) -> DbResult<TxpoolWriteResponse> {
89    let env_inner = env.env_inner();
90    let tx_rw = env_inner.tx_rw()?;
91
92    let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
93
94    if let Err(e) = ops::remove_transaction(tx_hash, &mut tables_mut) {
95        drop(tables_mut);
96        // error removing the tx, abort the DB transaction.
97        TxRw::abort(tx_rw)
98            .expect("could not maintain database atomicity by aborting write transaction");
99
100        return Err(e);
101    }
102
103    drop(tables_mut);
104
105    TxRw::commit(tx_rw)?;
106    Ok(TxpoolWriteResponse::Ok)
107}
108
109/// [`TxpoolWriteRequest::Promote`]
110fn promote(env: &ConcreteEnv, tx_hash: &TransactionHash) -> DbResult<TxpoolWriteResponse> {
111    let env_inner = env.env_inner();
112    let tx_rw = env_inner.tx_rw()?;
113
114    let res = || {
115        let mut tx_infos = env_inner.open_db_rw::<TransactionInfos>(&tx_rw)?;
116
117        tx_infos.update(tx_hash, |mut info| {
118            info.flags.remove(TxStateFlags::STATE_STEM);
119            Some(info)
120        })
121    };
122
123    if let Err(e) = res() {
124        // error promoting the tx, abort the DB transaction.
125        TxRw::abort(tx_rw)
126            .expect("could not maintain database atomicity by aborting write transaction");
127
128        return Err(e);
129    }
130
131    TxRw::commit(tx_rw)?;
132    Ok(TxpoolWriteResponse::Ok)
133}
134
135/// [`TxpoolWriteRequest::NewBlock`]
136fn new_block(env: &ConcreteEnv, spent_key_images: &[KeyImage]) -> DbResult<TxpoolWriteResponse> {
137    let env_inner = env.env_inner();
138    let tx_rw = env_inner.tx_rw()?;
139
140    // FIXME: use try blocks once stable.
141    let result = || {
142        let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
143
144        // Remove all txs which spend key images that were spent in the new block.
145        for key_image in spent_key_images {
146            match tables_mut
147                .spent_key_images()
148                .get(key_image)
149                .and_then(|tx_hash| ops::remove_transaction(&tx_hash, &mut tables_mut))
150            {
151                Ok(()) | Err(RuntimeError::KeyNotFound) => (),
152                Err(e) => return Err(e),
153            }
154        }
155
156        Ok(())
157    };
158
159    if let Err(e) = result() {
160        TxRw::abort(tx_rw)?;
161        return Err(e);
162    }
163
164    TxRw::commit(tx_rw)?;
165    Ok(TxpoolWriteResponse::Ok)
166}