cuprate_txpool/service/
read.rs

1#![expect(
2    unreachable_code,
3    unused_variables,
4    clippy::unnecessary_wraps,
5    reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
6)]
7use std::{
8    collections::{HashMap, HashSet},
9    sync::Arc,
10};
11
12use rayon::ThreadPool;
13
14use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError};
15use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
16
17use crate::{
18    ops::{get_transaction_verification_data, in_stem_pool},
19    service::{
20        interface::{TxpoolReadRequest, TxpoolReadResponse},
21        types::{ReadResponseResult, TxpoolReadHandle},
22    },
23    tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
24    types::{TransactionBlobHash, TransactionHash},
25};
26
27// TODO: update the docs here
28//---------------------------------------------------------------------------------------------------- init_read_service
29/// Initialize the [`TxpoolReadHandle`] thread-pool backed by `rayon`.
30///
31/// This spawns `threads` amount of reader threads
32/// attached to `env` and returns a handle to the pool.
33///
34/// Should be called _once_ per actual database.
35#[cold]
36#[inline(never)] // Only called once.
37pub(super) fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> TxpoolReadHandle {
38    init_read_service_with_pool(env, init_thread_pool(threads))
39}
40
41/// Initialize the [`TxpoolReadHandle`], with a specific rayon thread-pool instead of
42/// creating a new one.
43///
44/// Should be called _once_ per actual database.
45#[cold]
46#[inline(never)] // Only called once.
47pub(super) fn init_read_service_with_pool(
48    env: Arc<ConcreteEnv>,
49    pool: Arc<ThreadPool>,
50) -> TxpoolReadHandle {
51    DatabaseReadService::new(env, pool, map_request)
52}
53
54//---------------------------------------------------------------------------------------------------- Request Mapping
55// This function maps [`Request`]s to function calls
56// executed by the rayon DB reader threadpool.
57
58/// Map [`TxpoolReadRequest`]'s to specific database handler functions.
59///
60/// This is the main entrance into all `Request` handler functions.
61/// The basic structure is:
62/// 1. `Request` is mapped to a handler function
63/// 2. Handler function is called
64/// 3. [`TxpoolReadResponse`] is returned
65fn map_request(
66    env: &ConcreteEnv,          // Access to the database
67    request: TxpoolReadRequest, // The request we must fulfill
68) -> ReadResponseResult {
69    match request {
70        TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
71        TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
72        TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
73            filter_known_tx_blob_hashes(env, blob_hashes)
74        }
75        TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed),
76        TxpoolReadRequest::Backlog => backlog(env),
77        TxpoolReadRequest::Size {
78            include_sensitive_txs,
79        } => size(env, include_sensitive_txs),
80    }
81}
82
83//---------------------------------------------------------------------------------------------------- Handler functions
84// These are the actual functions that do stuff according to the incoming [`TxpoolReadRequest`].
85//
86// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
87// the enum variant name, e.g: `TxBlob` -> `tx_blob`.
88//
89// Each function will return the [`TxpoolReadResponse`] that we
90// should send back to the caller in [`map_request()`].
91//
92// INVARIANT:
93// These functions are called above in `tower::Service::call()`
94// using a custom threadpool which means any call to `par_*()` functions
95// will be using the custom rayon DB reader thread-pool, not the global one.
96//
97// All functions below assume that this is the case, such that
98// `par_*()` functions will not block the _global_ rayon thread-pool.
99
100/// [`TxpoolReadRequest::TxBlob`].
101#[inline]
102fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
103    let inner_env = env.env_inner();
104    let tx_ro = inner_env.tx_ro()?;
105
106    let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
107    let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
108
109    let tx_blob = tx_blobs_table.get(tx_hash)?.0;
110
111    Ok(TxpoolReadResponse::TxBlob {
112        tx_blob,
113        state_stem: in_stem_pool(tx_hash, &tx_infos_table)?,
114    })
115}
116
117/// [`TxpoolReadRequest::TxVerificationData`].
118#[inline]
119fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
120    let inner_env = env.env_inner();
121    let tx_ro = inner_env.tx_ro()?;
122
123    let tables = inner_env.open_tables(&tx_ro)?;
124
125    get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
126}
127
128/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
129fn filter_known_tx_blob_hashes(
130    env: &ConcreteEnv,
131    mut blob_hashes: HashSet<TransactionBlobHash>,
132) -> ReadResponseResult {
133    let inner_env = env.env_inner();
134    let tx_ro = inner_env.tx_ro()?;
135
136    let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
137    let tx_infos = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
138
139    let mut stem_pool_hashes = Vec::new();
140
141    // A closure that returns `true` if a tx with a certain blob hash is unknown.
142    // This also fills in `stem_tx_hashes`.
143    let mut tx_unknown = |blob_hash| -> DbResult<bool> {
144        match tx_blob_hashes.get(&blob_hash) {
145            Ok(tx_hash) => {
146                if in_stem_pool(&tx_hash, &tx_infos)? {
147                    stem_pool_hashes.push(tx_hash);
148                }
149                Ok(false)
150            }
151            Err(RuntimeError::KeyNotFound) => Ok(true),
152            Err(e) => Err(e),
153        }
154    };
155
156    let mut err = None;
157    blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) {
158        Ok(res) => res,
159        Err(e) => {
160            err = Some(e);
161            false
162        }
163    });
164
165    if let Some(e) = err {
166        return Err(e);
167    }
168
169    Ok(TxpoolReadResponse::FilterKnownTxBlobHashes {
170        unknown_blob_hashes: blob_hashes,
171        stem_pool_hashes,
172    })
173}
174
175/// [`TxpoolReadRequest::TxsForBlock`].
176fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseResult {
177    let inner_env = env.env_inner();
178    let tx_ro = inner_env.tx_ro()?;
179
180    let tables = inner_env.open_tables(&tx_ro)?;
181
182    let mut missing_tx_indexes = Vec::with_capacity(txs.len());
183    let mut txs_verification_data = HashMap::with_capacity(txs.len());
184
185    for (i, tx_hash) in txs.into_iter().enumerate() {
186        match get_transaction_verification_data(&tx_hash, &tables) {
187            Ok(tx) => {
188                txs_verification_data.insert(tx_hash, tx);
189            }
190            Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i),
191            Err(e) => return Err(e),
192        }
193    }
194
195    Ok(TxpoolReadResponse::TxsForBlock {
196        txs: txs_verification_data,
197        missing: missing_tx_indexes,
198    })
199}
200
201/// [`TxpoolReadRequest::Backlog`].
202#[inline]
203fn backlog(env: &ConcreteEnv) -> ReadResponseResult {
204    Ok(TxpoolReadResponse::Backlog(todo!()))
205}
206
207/// [`TxpoolReadRequest::Size`].
208#[inline]
209fn size(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
210    Ok(TxpoolReadResponse::Size(todo!()))
211}