cuprate_txpool/service/
read.rs

1#![expect(
2    unreachable_code,
3    unused_variables,
4    clippy::unnecessary_wraps,
5    clippy::needless_pass_by_value,
6    reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
7)]
8use std::{
9    collections::{HashMap, HashSet},
10    num::NonZero,
11    sync::Arc,
12};
13
14use rayon::ThreadPool;
15
16use cuprate_database::{
17    ConcreteEnv, DatabaseIter, DatabaseRo, DbResult, Env, EnvInner, RuntimeError,
18};
19use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
20
21use crate::{
22    ops::{get_transaction_verification_data, in_stem_pool},
23    service::{
24        interface::{TxpoolReadRequest, TxpoolReadResponse},
25        types::{ReadResponseResult, TxpoolReadHandle},
26    },
27    tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
28    types::{TransactionBlobHash, TransactionHash},
29    TxEntry,
30};
31
32// TODO: update the docs here
33//---------------------------------------------------------------------------------------------------- init_read_service
34/// Initialize the [`TxpoolReadHandle`] thread-pool backed by `rayon`.
35///
36/// This spawns `threads` amount of reader threads
37/// attached to `env` and returns a handle to the pool.
38///
39/// Should be called _once_ per actual database.
40#[cold]
41#[inline(never)] // Only called once.
42pub(super) fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> TxpoolReadHandle {
43    init_read_service_with_pool(env, init_thread_pool(threads))
44}
45
46/// Initialize the [`TxpoolReadHandle`], with a specific rayon thread-pool instead of
47/// creating a new one.
48///
49/// Should be called _once_ per actual database.
50#[cold]
51#[inline(never)] // Only called once.
52pub(super) fn init_read_service_with_pool(
53    env: Arc<ConcreteEnv>,
54    pool: Arc<ThreadPool>,
55) -> TxpoolReadHandle {
56    DatabaseReadService::new(env, pool, map_request)
57}
58
59//---------------------------------------------------------------------------------------------------- Request Mapping
60// This function maps [`Request`]s to function calls
61// executed by the rayon DB reader threadpool.
62
63/// Map [`TxpoolReadRequest`]'s to specific database handler functions.
64///
65/// This is the main entrance into all `Request` handler functions.
66/// The basic structure is:
67/// 1. `Request` is mapped to a handler function
68/// 2. Handler function is called
69/// 3. [`TxpoolReadResponse`] is returned
70fn map_request(
71    env: &ConcreteEnv,          // Access to the database
72    request: TxpoolReadRequest, // The request we must fulfill
73) -> ReadResponseResult {
74    match request {
75        TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
76        TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
77        TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
78            filter_known_tx_blob_hashes(env, blob_hashes)
79        }
80        TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed),
81        TxpoolReadRequest::Backlog => backlog(env),
82        TxpoolReadRequest::Size {
83            include_sensitive_txs,
84        } => size(env, include_sensitive_txs),
85        TxpoolReadRequest::PoolInfo {
86            include_sensitive_txs,
87            max_tx_count,
88            start_time,
89        } => pool_info(env, include_sensitive_txs, max_tx_count, start_time),
90        TxpoolReadRequest::TxsByHash {
91            tx_hashes,
92            include_sensitive_txs,
93        } => txs_by_hash(env, tx_hashes, include_sensitive_txs),
94        TxpoolReadRequest::KeyImagesSpent {
95            key_images,
96            include_sensitive_txs,
97        } => key_images_spent(env, key_images, include_sensitive_txs),
98        TxpoolReadRequest::KeyImagesSpentVec {
99            key_images,
100            include_sensitive_txs,
101        } => key_images_spent_vec(env, key_images, include_sensitive_txs),
102        TxpoolReadRequest::Pool {
103            include_sensitive_txs,
104        } => pool(env, include_sensitive_txs),
105        TxpoolReadRequest::PoolStats {
106            include_sensitive_txs,
107        } => pool_stats(env, include_sensitive_txs),
108        TxpoolReadRequest::AllHashes {
109            include_sensitive_txs,
110        } => all_hashes(env, include_sensitive_txs),
111    }
112}
113
114//---------------------------------------------------------------------------------------------------- Handler functions
115// These are the actual functions that do stuff according to the incoming [`TxpoolReadRequest`].
116//
117// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
118// the enum variant name, e.g: `TxBlob` -> `tx_blob`.
119//
120// Each function will return the [`TxpoolReadResponse`] that we
121// should send back to the caller in [`map_request()`].
122//
123// INVARIANT:
124// These functions are called above in `tower::Service::call()`
125// using a custom threadpool which means any call to `par_*()` functions
126// will be using the custom rayon DB reader thread-pool, not the global one.
127//
128// All functions below assume that this is the case, such that
129// `par_*()` functions will not block the _global_ rayon thread-pool.
130
131/// [`TxpoolReadRequest::TxBlob`].
132#[inline]
133fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
134    let inner_env = env.env_inner();
135    let tx_ro = inner_env.tx_ro()?;
136
137    let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
138    let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
139
140    let tx_blob = tx_blobs_table.get(tx_hash)?.0;
141
142    Ok(TxpoolReadResponse::TxBlob {
143        tx_blob,
144        state_stem: in_stem_pool(tx_hash, &tx_infos_table)?,
145    })
146}
147
148/// [`TxpoolReadRequest::TxVerificationData`].
149#[inline]
150fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
151    let inner_env = env.env_inner();
152    let tx_ro = inner_env.tx_ro()?;
153
154    let tables = inner_env.open_tables(&tx_ro)?;
155
156    get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
157}
158
159/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
160fn filter_known_tx_blob_hashes(
161    env: &ConcreteEnv,
162    mut blob_hashes: HashSet<TransactionBlobHash>,
163) -> ReadResponseResult {
164    let inner_env = env.env_inner();
165    let tx_ro = inner_env.tx_ro()?;
166
167    let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
168    let tx_infos = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
169
170    let mut stem_pool_hashes = Vec::new();
171
172    // A closure that returns `true` if a tx with a certain blob hash is unknown.
173    // This also fills in `stem_tx_hashes`.
174    let mut tx_unknown = |blob_hash| -> DbResult<bool> {
175        match tx_blob_hashes.get(&blob_hash) {
176            Ok(tx_hash) => {
177                if in_stem_pool(&tx_hash, &tx_infos)? {
178                    stem_pool_hashes.push(tx_hash);
179                }
180                Ok(false)
181            }
182            Err(RuntimeError::KeyNotFound) => Ok(true),
183            Err(e) => Err(e),
184        }
185    };
186
187    let mut err = None;
188    blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) {
189        Ok(res) => res,
190        Err(e) => {
191            err = Some(e);
192            false
193        }
194    });
195
196    if let Some(e) = err {
197        return Err(e);
198    }
199
200    Ok(TxpoolReadResponse::FilterKnownTxBlobHashes {
201        unknown_blob_hashes: blob_hashes,
202        stem_pool_hashes,
203    })
204}
205
206/// [`TxpoolReadRequest::TxsForBlock`].
207fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseResult {
208    let inner_env = env.env_inner();
209    let tx_ro = inner_env.tx_ro()?;
210
211    let tables = inner_env.open_tables(&tx_ro)?;
212
213    let mut missing_tx_indexes = Vec::with_capacity(txs.len());
214    let mut txs_verification_data = HashMap::with_capacity(txs.len());
215
216    for (i, tx_hash) in txs.into_iter().enumerate() {
217        match get_transaction_verification_data(&tx_hash, &tables) {
218            Ok(tx) => {
219                txs_verification_data.insert(tx_hash, tx);
220            }
221            Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i),
222            Err(e) => return Err(e),
223        }
224    }
225
226    Ok(TxpoolReadResponse::TxsForBlock {
227        txs: txs_verification_data,
228        missing: missing_tx_indexes,
229    })
230}
231
232/// [`TxpoolReadRequest::Backlog`].
233#[inline]
234fn backlog(env: &ConcreteEnv) -> ReadResponseResult {
235    let inner_env = env.env_inner();
236    let tx_ro = inner_env.tx_ro()?;
237
238    let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
239
240    let backlog = tx_infos_table
241        .iter()?
242        .map(|info| {
243            let (id, info) = info?;
244
245            Ok(TxEntry {
246                id,
247                weight: info.weight,
248                fee: info.fee,
249                private: info.flags.private(),
250                received_at: info.received_at,
251            })
252        })
253        .collect::<Result<_, RuntimeError>>()?;
254
255    Ok(TxpoolReadResponse::Backlog(backlog))
256}
257
258/// [`TxpoolReadRequest::Size`].
259#[inline]
260fn size(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
261    Ok(TxpoolReadResponse::Size(todo!()))
262}
263
264/// [`TxpoolReadRequest::PoolInfo`].
265fn pool_info(
266    env: &ConcreteEnv,
267    include_sensitive_txs: bool,
268    max_tx_count: usize,
269    start_time: Option<NonZero<usize>>,
270) -> ReadResponseResult {
271    Ok(TxpoolReadResponse::PoolInfo(todo!()))
272}
273
274/// [`TxpoolReadRequest::TxsByHash`].
275fn txs_by_hash(
276    env: &ConcreteEnv,
277    tx_hashes: Vec<[u8; 32]>,
278    include_sensitive_txs: bool,
279) -> ReadResponseResult {
280    Ok(TxpoolReadResponse::TxsByHash(todo!()))
281}
282
283/// [`TxpoolReadRequest::KeyImagesSpent`].
284fn key_images_spent(
285    env: &ConcreteEnv,
286    key_images: HashSet<[u8; 32]>,
287    include_sensitive_txs: bool,
288) -> ReadResponseResult {
289    Ok(TxpoolReadResponse::KeyImagesSpent(todo!()))
290}
291
292/// [`TxpoolReadRequest::KeyImagesSpentVec`].
293fn key_images_spent_vec(
294    env: &ConcreteEnv,
295    key_images: Vec<[u8; 32]>,
296    include_sensitive_txs: bool,
297) -> ReadResponseResult {
298    Ok(TxpoolReadResponse::KeyImagesSpent(todo!()))
299}
300
301/// [`TxpoolReadRequest::Pool`].
302fn pool(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
303    Ok(TxpoolReadResponse::Pool {
304        txs: todo!(),
305        spent_key_images: todo!(),
306    })
307}
308
309/// [`TxpoolReadRequest::PoolStats`].
310fn pool_stats(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
311    Ok(TxpoolReadResponse::PoolStats(todo!()))
312}
313
314/// [`TxpoolReadRequest::AllHashes`].
315fn all_hashes(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
316    Ok(TxpoolReadResponse::AllHashes(todo!()))
317}