cuprate_txpool/service/
write.rs1use std::sync::Arc;
2
3use cuprate_database::{
4 ConcreteEnv, DatabaseRo, DatabaseRw, DbResult, Env, EnvInner, RuntimeError, TxRw,
5};
6use cuprate_database_service::DatabaseWriteHandle;
7use cuprate_types::TransactionVerificationData;
8
9use crate::{
10 ops::{self, TxPoolWriteError},
11 service::{
12 interface::{TxpoolWriteRequest, TxpoolWriteResponse},
13 types::TxpoolWriteHandle,
14 },
15 tables::{OpenTables, Tables, TransactionInfos},
16 types::{KeyImage, TransactionHash, TxStateFlags},
17};
18
19pub(super) fn init_write_service(env: Arc<ConcreteEnv>) -> TxpoolWriteHandle {
22 DatabaseWriteHandle::init(env, handle_txpool_request)
23}
24
25fn handle_txpool_request(
28 env: &ConcreteEnv,
29 req: &TxpoolWriteRequest,
30) -> DbResult<TxpoolWriteResponse> {
31 match req {
32 TxpoolWriteRequest::AddTransaction { tx, state_stem } => {
33 add_transaction(env, tx, *state_stem)
34 }
35 TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash),
36 TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash),
37 TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images),
38 }
39}
40
41fn add_transaction(
52 env: &ConcreteEnv,
53 tx: &TransactionVerificationData,
54 state_stem: bool,
55) -> DbResult<TxpoolWriteResponse> {
56 let env_inner = env.env_inner();
57 let tx_rw = env_inner.tx_rw()?;
58
59 let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
60
61 if let Err(e) = ops::add_transaction(tx, state_stem, &mut tables_mut) {
62 drop(tables_mut);
63 TxRw::abort(tx_rw)
65 .expect("could not maintain database atomicity by aborting write transaction");
66
67 return match e {
68 TxPoolWriteError::DoubleSpend(tx_hash) => {
69 Ok(TxpoolWriteResponse::AddTransaction(Some(tx_hash)))
73 }
74 TxPoolWriteError::Database(e) => Err(e),
75 };
76 };
77
78 drop(tables_mut);
79 TxRw::commit(tx_rw)?;
81 Ok(TxpoolWriteResponse::AddTransaction(None))
82}
83
84fn remove_transaction(
86 env: &ConcreteEnv,
87 tx_hash: &TransactionHash,
88) -> DbResult<TxpoolWriteResponse> {
89 let env_inner = env.env_inner();
90 let tx_rw = env_inner.tx_rw()?;
91
92 let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
93
94 if let Err(e) = ops::remove_transaction(tx_hash, &mut tables_mut) {
95 drop(tables_mut);
96 TxRw::abort(tx_rw)
98 .expect("could not maintain database atomicity by aborting write transaction");
99
100 return Err(e);
101 }
102
103 drop(tables_mut);
104
105 TxRw::commit(tx_rw)?;
106 Ok(TxpoolWriteResponse::Ok)
107}
108
109fn promote(env: &ConcreteEnv, tx_hash: &TransactionHash) -> DbResult<TxpoolWriteResponse> {
111 let env_inner = env.env_inner();
112 let tx_rw = env_inner.tx_rw()?;
113
114 let res = || {
115 let mut tx_infos = env_inner.open_db_rw::<TransactionInfos>(&tx_rw)?;
116
117 tx_infos.update(tx_hash, |mut info| {
118 info.flags.remove(TxStateFlags::STATE_STEM);
119 Some(info)
120 })
121 };
122
123 if let Err(e) = res() {
124 TxRw::abort(tx_rw)
126 .expect("could not maintain database atomicity by aborting write transaction");
127
128 return Err(e);
129 }
130
131 TxRw::commit(tx_rw)?;
132 Ok(TxpoolWriteResponse::Ok)
133}
134
135fn new_block(env: &ConcreteEnv, spent_key_images: &[KeyImage]) -> DbResult<TxpoolWriteResponse> {
137 let env_inner = env.env_inner();
138 let tx_rw = env_inner.tx_rw()?;
139
140 let result = || {
142 let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
143
144 for key_image in spent_key_images {
146 match tables_mut
147 .spent_key_images()
148 .get(key_image)
149 .and_then(|tx_hash| ops::remove_transaction(&tx_hash, &mut tables_mut))
150 {
151 Ok(()) | Err(RuntimeError::KeyNotFound) => (),
152 Err(e) => return Err(e),
153 }
154 }
155
156 Ok(())
157 };
158
159 if let Err(e) = result() {
160 TxRw::abort(tx_rw)?;
161 return Err(e);
162 }
163
164 TxRw::commit(tx_rw)?;
165 Ok(TxpoolWriteResponse::Ok)
166}