cuprate_txpool/service/
write.rs1use std::sync::Arc;
2
3use cuprate_database::{
4 ConcreteEnv, DatabaseRo, DatabaseRw, DbResult, Env, EnvInner, RuntimeError, TxRw,
5};
6use cuprate_database_service::DatabaseWriteHandle;
7use cuprate_helper::time::current_unix_timestamp;
8use cuprate_types::TransactionVerificationData;
9
10use crate::{
11 ops::{self, TxPoolWriteError},
12 service::{
13 interface::{TxpoolWriteRequest, TxpoolWriteResponse},
14 types::TxpoolWriteHandle,
15 },
16 tables::{OpenTables, Tables, TransactionInfos},
17 types::{KeyImage, TransactionHash, TxStateFlags},
18};
19
20pub(super) fn init_write_service(env: Arc<ConcreteEnv>) -> TxpoolWriteHandle {
23 DatabaseWriteHandle::init(env, handle_txpool_request)
24}
25
26fn handle_txpool_request(
29 env: &ConcreteEnv,
30 req: &TxpoolWriteRequest,
31) -> DbResult<TxpoolWriteResponse> {
32 match req {
33 TxpoolWriteRequest::AddTransaction { tx, state_stem } => {
34 add_transaction(env, tx, *state_stem)
35 }
36 TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash),
37 TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash),
38 TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images),
39 }
40}
41
42fn add_transaction(
53 env: &ConcreteEnv,
54 tx: &TransactionVerificationData,
55 state_stem: bool,
56) -> DbResult<TxpoolWriteResponse> {
57 let env_inner = env.env_inner();
58 let tx_rw = env_inner.tx_rw()?;
59
60 let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
61
62 if let Err(e) = ops::add_transaction(tx, state_stem, &mut tables_mut) {
63 drop(tables_mut);
64 TxRw::abort(tx_rw)
66 .expect("could not maintain database atomicity by aborting write transaction");
67
68 return match e {
69 TxPoolWriteError::DoubleSpend(tx_hash) => {
70 Ok(TxpoolWriteResponse::AddTransaction(Some(tx_hash)))
74 }
75 TxPoolWriteError::Database(e) => Err(e),
76 };
77 }
78
79 drop(tables_mut);
80 TxRw::commit(tx_rw)?;
82 Ok(TxpoolWriteResponse::AddTransaction(None))
83}
84
85fn remove_transaction(
87 env: &ConcreteEnv,
88 tx_hash: &TransactionHash,
89) -> DbResult<TxpoolWriteResponse> {
90 let env_inner = env.env_inner();
91 let tx_rw = env_inner.tx_rw()?;
92
93 let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
94
95 if let Err(e) = ops::remove_transaction(tx_hash, &mut tables_mut) {
96 drop(tables_mut);
97 TxRw::abort(tx_rw)
99 .expect("could not maintain database atomicity by aborting write transaction");
100
101 return Err(e);
102 }
103
104 drop(tables_mut);
105
106 TxRw::commit(tx_rw)?;
107 Ok(TxpoolWriteResponse::Ok)
108}
109
110fn promote(env: &ConcreteEnv, tx_hash: &TransactionHash) -> DbResult<TxpoolWriteResponse> {
112 let env_inner = env.env_inner();
113 let tx_rw = env_inner.tx_rw()?;
114
115 let res = || {
116 let mut tx_infos = env_inner.open_db_rw::<TransactionInfos>(&tx_rw)?;
117
118 tx_infos.update(tx_hash, |mut info| {
119 info.received_at = current_unix_timestamp();
120 info.flags.remove(TxStateFlags::STATE_STEM);
121 Some(info)
122 })
123 };
124
125 if let Err(e) = res() {
126 TxRw::abort(tx_rw)
128 .expect("could not maintain database atomicity by aborting write transaction");
129
130 return Err(e);
131 }
132
133 TxRw::commit(tx_rw)?;
134 Ok(TxpoolWriteResponse::Ok)
135}
136
137fn new_block(env: &ConcreteEnv, spent_key_images: &[KeyImage]) -> DbResult<TxpoolWriteResponse> {
139 let env_inner = env.env_inner();
140 let tx_rw = env_inner.tx_rw()?;
141
142 let mut txs_removed = Vec::new();
143
144 let mut result = || {
146 let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
147
148 for key_image in spent_key_images {
150 match tables_mut
151 .spent_key_images()
152 .get(key_image)
153 .and_then(|tx_hash| {
154 txs_removed.push(tx_hash);
155 ops::remove_transaction(&tx_hash, &mut tables_mut)
156 }) {
157 Ok(()) | Err(RuntimeError::KeyNotFound) => (),
158 Err(e) => return Err(e),
159 }
160 }
161
162 Ok(())
163 };
164
165 if let Err(e) = result() {
166 TxRw::abort(tx_rw)?;
167 return Err(e);
168 }
169
170 TxRw::commit(tx_rw)?;
171 Ok(TxpoolWriteResponse::NewBlock(txs_removed))
172}