cuprate_txpool/service/
read.rs

1#![expect(
2    unreachable_code,
3    unused_variables,
4    clippy::unnecessary_wraps,
5    clippy::needless_pass_by_value,
6    reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
7)]
8use std::{
9    collections::{HashMap, HashSet},
10    num::NonZero,
11    sync::Arc,
12};
13
14use rayon::ThreadPool;
15
16use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError};
17use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
18
19use crate::{
20    ops::{get_transaction_verification_data, in_stem_pool},
21    service::{
22        interface::{TxpoolReadRequest, TxpoolReadResponse},
23        types::{ReadResponseResult, TxpoolReadHandle},
24    },
25    tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
26    types::{TransactionBlobHash, TransactionHash},
27};
28
29// TODO: update the docs here
30//---------------------------------------------------------------------------------------------------- init_read_service
31/// Initialize the [`TxpoolReadHandle`] thread-pool backed by `rayon`.
32///
33/// This spawns `threads` amount of reader threads
34/// attached to `env` and returns a handle to the pool.
35///
36/// Should be called _once_ per actual database.
37#[cold]
38#[inline(never)] // Only called once.
39pub(super) fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> TxpoolReadHandle {
40    init_read_service_with_pool(env, init_thread_pool(threads))
41}
42
43/// Initialize the [`TxpoolReadHandle`], with a specific rayon thread-pool instead of
44/// creating a new one.
45///
46/// Should be called _once_ per actual database.
47#[cold]
48#[inline(never)] // Only called once.
49pub(super) fn init_read_service_with_pool(
50    env: Arc<ConcreteEnv>,
51    pool: Arc<ThreadPool>,
52) -> TxpoolReadHandle {
53    DatabaseReadService::new(env, pool, map_request)
54}
55
56//---------------------------------------------------------------------------------------------------- Request Mapping
57// This function maps [`Request`]s to function calls
58// executed by the rayon DB reader threadpool.
59
60/// Map [`TxpoolReadRequest`]'s to specific database handler functions.
61///
62/// This is the main entrance into all `Request` handler functions.
63/// The basic structure is:
64/// 1. `Request` is mapped to a handler function
65/// 2. Handler function is called
66/// 3. [`TxpoolReadResponse`] is returned
67fn map_request(
68    env: &ConcreteEnv,          // Access to the database
69    request: TxpoolReadRequest, // The request we must fulfill
70) -> ReadResponseResult {
71    match request {
72        TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
73        TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
74        TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
75            filter_known_tx_blob_hashes(env, blob_hashes)
76        }
77        TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed),
78        TxpoolReadRequest::Backlog => backlog(env),
79        TxpoolReadRequest::Size {
80            include_sensitive_txs,
81        } => size(env, include_sensitive_txs),
82        TxpoolReadRequest::PoolInfo {
83            include_sensitive_txs,
84            max_tx_count,
85            start_time,
86        } => pool_info(env, include_sensitive_txs, max_tx_count, start_time),
87        TxpoolReadRequest::TxsByHash {
88            tx_hashes,
89            include_sensitive_txs,
90        } => txs_by_hash(env, tx_hashes, include_sensitive_txs),
91        TxpoolReadRequest::KeyImagesSpent {
92            key_images,
93            include_sensitive_txs,
94        } => key_images_spent(env, key_images, include_sensitive_txs),
95        TxpoolReadRequest::KeyImagesSpentVec {
96            key_images,
97            include_sensitive_txs,
98        } => key_images_spent_vec(env, key_images, include_sensitive_txs),
99        TxpoolReadRequest::Pool {
100            include_sensitive_txs,
101        } => pool(env, include_sensitive_txs),
102        TxpoolReadRequest::PoolStats {
103            include_sensitive_txs,
104        } => pool_stats(env, include_sensitive_txs),
105        TxpoolReadRequest::AllHashes {
106            include_sensitive_txs,
107        } => all_hashes(env, include_sensitive_txs),
108    }
109}
110
111//---------------------------------------------------------------------------------------------------- Handler functions
112// These are the actual functions that do stuff according to the incoming [`TxpoolReadRequest`].
113//
114// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
115// the enum variant name, e.g: `TxBlob` -> `tx_blob`.
116//
117// Each function will return the [`TxpoolReadResponse`] that we
118// should send back to the caller in [`map_request()`].
119//
120// INVARIANT:
121// These functions are called above in `tower::Service::call()`
122// using a custom threadpool which means any call to `par_*()` functions
123// will be using the custom rayon DB reader thread-pool, not the global one.
124//
125// All functions below assume that this is the case, such that
126// `par_*()` functions will not block the _global_ rayon thread-pool.
127
128/// [`TxpoolReadRequest::TxBlob`].
129#[inline]
130fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
131    let inner_env = env.env_inner();
132    let tx_ro = inner_env.tx_ro()?;
133
134    let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
135    let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
136
137    let tx_blob = tx_blobs_table.get(tx_hash)?.0;
138
139    Ok(TxpoolReadResponse::TxBlob {
140        tx_blob,
141        state_stem: in_stem_pool(tx_hash, &tx_infos_table)?,
142    })
143}
144
145/// [`TxpoolReadRequest::TxVerificationData`].
146#[inline]
147fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
148    let inner_env = env.env_inner();
149    let tx_ro = inner_env.tx_ro()?;
150
151    let tables = inner_env.open_tables(&tx_ro)?;
152
153    get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
154}
155
156/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
157fn filter_known_tx_blob_hashes(
158    env: &ConcreteEnv,
159    mut blob_hashes: HashSet<TransactionBlobHash>,
160) -> ReadResponseResult {
161    let inner_env = env.env_inner();
162    let tx_ro = inner_env.tx_ro()?;
163
164    let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
165    let tx_infos = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
166
167    let mut stem_pool_hashes = Vec::new();
168
169    // A closure that returns `true` if a tx with a certain blob hash is unknown.
170    // This also fills in `stem_tx_hashes`.
171    let mut tx_unknown = |blob_hash| -> DbResult<bool> {
172        match tx_blob_hashes.get(&blob_hash) {
173            Ok(tx_hash) => {
174                if in_stem_pool(&tx_hash, &tx_infos)? {
175                    stem_pool_hashes.push(tx_hash);
176                }
177                Ok(false)
178            }
179            Err(RuntimeError::KeyNotFound) => Ok(true),
180            Err(e) => Err(e),
181        }
182    };
183
184    let mut err = None;
185    blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) {
186        Ok(res) => res,
187        Err(e) => {
188            err = Some(e);
189            false
190        }
191    });
192
193    if let Some(e) = err {
194        return Err(e);
195    }
196
197    Ok(TxpoolReadResponse::FilterKnownTxBlobHashes {
198        unknown_blob_hashes: blob_hashes,
199        stem_pool_hashes,
200    })
201}
202
203/// [`TxpoolReadRequest::TxsForBlock`].
204fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseResult {
205    let inner_env = env.env_inner();
206    let tx_ro = inner_env.tx_ro()?;
207
208    let tables = inner_env.open_tables(&tx_ro)?;
209
210    let mut missing_tx_indexes = Vec::with_capacity(txs.len());
211    let mut txs_verification_data = HashMap::with_capacity(txs.len());
212
213    for (i, tx_hash) in txs.into_iter().enumerate() {
214        match get_transaction_verification_data(&tx_hash, &tables) {
215            Ok(tx) => {
216                txs_verification_data.insert(tx_hash, tx);
217            }
218            Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i),
219            Err(e) => return Err(e),
220        }
221    }
222
223    Ok(TxpoolReadResponse::TxsForBlock {
224        txs: txs_verification_data,
225        missing: missing_tx_indexes,
226    })
227}
228
229/// [`TxpoolReadRequest::Backlog`].
230#[inline]
231fn backlog(env: &ConcreteEnv) -> ReadResponseResult {
232    Ok(TxpoolReadResponse::Backlog(todo!()))
233}
234
235/// [`TxpoolReadRequest::Size`].
236#[inline]
237fn size(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
238    Ok(TxpoolReadResponse::Size(todo!()))
239}
240
241/// [`TxpoolReadRequest::PoolInfo`].
242fn pool_info(
243    env: &ConcreteEnv,
244    include_sensitive_txs: bool,
245    max_tx_count: usize,
246    start_time: Option<NonZero<usize>>,
247) -> ReadResponseResult {
248    Ok(TxpoolReadResponse::PoolInfo(todo!()))
249}
250
251/// [`TxpoolReadRequest::TxsByHash`].
252fn txs_by_hash(
253    env: &ConcreteEnv,
254    tx_hashes: Vec<[u8; 32]>,
255    include_sensitive_txs: bool,
256) -> ReadResponseResult {
257    Ok(TxpoolReadResponse::TxsByHash(todo!()))
258}
259
260/// [`TxpoolReadRequest::KeyImagesSpent`].
261fn key_images_spent(
262    env: &ConcreteEnv,
263    key_images: HashSet<[u8; 32]>,
264    include_sensitive_txs: bool,
265) -> ReadResponseResult {
266    Ok(TxpoolReadResponse::KeyImagesSpent(todo!()))
267}
268
269/// [`TxpoolReadRequest::KeyImagesSpentVec`].
270fn key_images_spent_vec(
271    env: &ConcreteEnv,
272    key_images: Vec<[u8; 32]>,
273    include_sensitive_txs: bool,
274) -> ReadResponseResult {
275    Ok(TxpoolReadResponse::KeyImagesSpent(todo!()))
276}
277
278/// [`TxpoolReadRequest::Pool`].
279fn pool(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
280    Ok(TxpoolReadResponse::Pool {
281        txs: todo!(),
282        spent_key_images: todo!(),
283    })
284}
285
286/// [`TxpoolReadRequest::PoolStats`].
287fn pool_stats(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
288    Ok(TxpoolReadResponse::PoolStats(todo!()))
289}
290
291/// [`TxpoolReadRequest::AllHashes`].
292fn all_hashes(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
293    Ok(TxpoolReadResponse::AllHashes(todo!()))
294}