1#![expect(
2 unreachable_code,
3 unused_variables,
4 clippy::unnecessary_wraps,
5 clippy::needless_pass_by_value,
6 reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
7)]
8use std::{
9 collections::{HashMap, HashSet},
10 num::NonZero,
11 sync::Arc,
12};
13
14use rayon::ThreadPool;
15
16use cuprate_database::{
17 ConcreteEnv, DatabaseIter, DatabaseRo, DbResult, Env, EnvInner, RuntimeError,
18};
19use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
20
21use crate::{
22 ops::{get_transaction_verification_data, in_stem_pool},
23 service::{
24 interface::{TxpoolReadRequest, TxpoolReadResponse},
25 types::{ReadResponseResult, TxpoolReadHandle},
26 },
27 tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
28 types::{TransactionBlobHash, TransactionHash},
29 TxEntry,
30};
31
32#[cold]
41#[inline(never)] pub(super) fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> TxpoolReadHandle {
43 init_read_service_with_pool(env, init_thread_pool(threads))
44}
45
46#[cold]
51#[inline(never)] pub(super) fn init_read_service_with_pool(
53 env: Arc<ConcreteEnv>,
54 pool: Arc<ThreadPool>,
55) -> TxpoolReadHandle {
56 DatabaseReadService::new(env, pool, map_request)
57}
58
59fn map_request(
71 env: &ConcreteEnv, request: TxpoolReadRequest, ) -> ReadResponseResult {
74 match request {
75 TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
76 TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
77 TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
78 filter_known_tx_blob_hashes(env, blob_hashes)
79 }
80 TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed),
81 TxpoolReadRequest::Backlog => backlog(env),
82 TxpoolReadRequest::Size {
83 include_sensitive_txs,
84 } => size(env, include_sensitive_txs),
85 TxpoolReadRequest::PoolInfo {
86 include_sensitive_txs,
87 max_tx_count,
88 start_time,
89 } => pool_info(env, include_sensitive_txs, max_tx_count, start_time),
90 TxpoolReadRequest::TxsByHash {
91 tx_hashes,
92 include_sensitive_txs,
93 } => txs_by_hash(env, tx_hashes, include_sensitive_txs),
94 TxpoolReadRequest::KeyImagesSpent {
95 key_images,
96 include_sensitive_txs,
97 } => key_images_spent(env, key_images, include_sensitive_txs),
98 TxpoolReadRequest::KeyImagesSpentVec {
99 key_images,
100 include_sensitive_txs,
101 } => key_images_spent_vec(env, key_images, include_sensitive_txs),
102 TxpoolReadRequest::Pool {
103 include_sensitive_txs,
104 } => pool(env, include_sensitive_txs),
105 TxpoolReadRequest::PoolStats {
106 include_sensitive_txs,
107 } => pool_stats(env, include_sensitive_txs),
108 TxpoolReadRequest::AllHashes {
109 include_sensitive_txs,
110 } => all_hashes(env, include_sensitive_txs),
111 }
112}
113
114#[inline]
133fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
134 let inner_env = env.env_inner();
135 let tx_ro = inner_env.tx_ro()?;
136
137 let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
138 let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
139
140 let tx_blob = tx_blobs_table.get(tx_hash)?.0;
141
142 Ok(TxpoolReadResponse::TxBlob {
143 tx_blob,
144 state_stem: in_stem_pool(tx_hash, &tx_infos_table)?,
145 })
146}
147
148#[inline]
150fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
151 let inner_env = env.env_inner();
152 let tx_ro = inner_env.tx_ro()?;
153
154 let tables = inner_env.open_tables(&tx_ro)?;
155
156 get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
157}
158
159fn filter_known_tx_blob_hashes(
161 env: &ConcreteEnv,
162 mut blob_hashes: HashSet<TransactionBlobHash>,
163) -> ReadResponseResult {
164 let inner_env = env.env_inner();
165 let tx_ro = inner_env.tx_ro()?;
166
167 let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
168 let tx_infos = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
169
170 let mut stem_pool_hashes = Vec::new();
171
172 let mut tx_unknown = |blob_hash| -> DbResult<bool> {
175 match tx_blob_hashes.get(&blob_hash) {
176 Ok(tx_hash) => {
177 if in_stem_pool(&tx_hash, &tx_infos)? {
178 stem_pool_hashes.push(tx_hash);
179 }
180 Ok(false)
181 }
182 Err(RuntimeError::KeyNotFound) => Ok(true),
183 Err(e) => Err(e),
184 }
185 };
186
187 let mut err = None;
188 blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) {
189 Ok(res) => res,
190 Err(e) => {
191 err = Some(e);
192 false
193 }
194 });
195
196 if let Some(e) = err {
197 return Err(e);
198 }
199
200 Ok(TxpoolReadResponse::FilterKnownTxBlobHashes {
201 unknown_blob_hashes: blob_hashes,
202 stem_pool_hashes,
203 })
204}
205
206fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseResult {
208 let inner_env = env.env_inner();
209 let tx_ro = inner_env.tx_ro()?;
210
211 let tables = inner_env.open_tables(&tx_ro)?;
212
213 let mut missing_tx_indexes = Vec::with_capacity(txs.len());
214 let mut txs_verification_data = HashMap::with_capacity(txs.len());
215
216 for (i, tx_hash) in txs.into_iter().enumerate() {
217 match get_transaction_verification_data(&tx_hash, &tables) {
218 Ok(tx) => {
219 txs_verification_data.insert(tx_hash, tx);
220 }
221 Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i),
222 Err(e) => return Err(e),
223 }
224 }
225
226 Ok(TxpoolReadResponse::TxsForBlock {
227 txs: txs_verification_data,
228 missing: missing_tx_indexes,
229 })
230}
231
232#[inline]
234fn backlog(env: &ConcreteEnv) -> ReadResponseResult {
235 let inner_env = env.env_inner();
236 let tx_ro = inner_env.tx_ro()?;
237
238 let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
239
240 let backlog = tx_infos_table
241 .iter()?
242 .map(|info| {
243 let (id, info) = info?;
244
245 Ok(TxEntry {
246 id,
247 weight: info.weight,
248 fee: info.fee,
249 private: info.flags.private(),
250 received_at: info.received_at,
251 })
252 })
253 .collect::<Result<_, RuntimeError>>()?;
254
255 Ok(TxpoolReadResponse::Backlog(backlog))
256}
257
258#[inline]
260fn size(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
261 Ok(TxpoolReadResponse::Size(todo!()))
262}
263
264fn pool_info(
266 env: &ConcreteEnv,
267 include_sensitive_txs: bool,
268 max_tx_count: usize,
269 start_time: Option<NonZero<usize>>,
270) -> ReadResponseResult {
271 Ok(TxpoolReadResponse::PoolInfo(todo!()))
272}
273
274fn txs_by_hash(
276 env: &ConcreteEnv,
277 tx_hashes: Vec<[u8; 32]>,
278 include_sensitive_txs: bool,
279) -> ReadResponseResult {
280 Ok(TxpoolReadResponse::TxsByHash(todo!()))
281}
282
283fn key_images_spent(
285 env: &ConcreteEnv,
286 key_images: HashSet<[u8; 32]>,
287 include_sensitive_txs: bool,
288) -> ReadResponseResult {
289 Ok(TxpoolReadResponse::KeyImagesSpent(todo!()))
290}
291
292fn key_images_spent_vec(
294 env: &ConcreteEnv,
295 key_images: Vec<[u8; 32]>,
296 include_sensitive_txs: bool,
297) -> ReadResponseResult {
298 Ok(TxpoolReadResponse::KeyImagesSpent(todo!()))
299}
300
301fn pool(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
303 Ok(TxpoolReadResponse::Pool {
304 txs: todo!(),
305 spent_key_images: todo!(),
306 })
307}
308
309fn pool_stats(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
311 Ok(TxpoolReadResponse::PoolStats(todo!()))
312}
313
314fn all_hashes(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
316 Ok(TxpoolReadResponse::AllHashes(todo!()))
317}