cuprate_txpool/service/
read.rs1#![expect(
2 unreachable_code,
3 unused_variables,
4 clippy::unnecessary_wraps,
5 reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
6)]
7use std::{
8 collections::{HashMap, HashSet},
9 sync::Arc,
10};
11
12use rayon::ThreadPool;
13
14use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError};
15use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
16
17use crate::{
18 ops::{get_transaction_verification_data, in_stem_pool},
19 service::{
20 interface::{TxpoolReadRequest, TxpoolReadResponse},
21 types::{ReadResponseResult, TxpoolReadHandle},
22 },
23 tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
24 types::{TransactionBlobHash, TransactionHash},
25};
26
27#[cold]
36#[inline(never)] pub(super) fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> TxpoolReadHandle {
38 init_read_service_with_pool(env, init_thread_pool(threads))
39}
40
41#[cold]
46#[inline(never)] pub(super) fn init_read_service_with_pool(
48 env: Arc<ConcreteEnv>,
49 pool: Arc<ThreadPool>,
50) -> TxpoolReadHandle {
51 DatabaseReadService::new(env, pool, map_request)
52}
53
54fn map_request(
66 env: &ConcreteEnv, request: TxpoolReadRequest, ) -> ReadResponseResult {
69 match request {
70 TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
71 TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
72 TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
73 filter_known_tx_blob_hashes(env, blob_hashes)
74 }
75 TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed),
76 TxpoolReadRequest::Backlog => backlog(env),
77 TxpoolReadRequest::Size {
78 include_sensitive_txs,
79 } => size(env, include_sensitive_txs),
80 }
81}
82
83#[inline]
102fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
103 let inner_env = env.env_inner();
104 let tx_ro = inner_env.tx_ro()?;
105
106 let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
107 let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
108
109 let tx_blob = tx_blobs_table.get(tx_hash)?.0;
110
111 Ok(TxpoolReadResponse::TxBlob {
112 tx_blob,
113 state_stem: in_stem_pool(tx_hash, &tx_infos_table)?,
114 })
115}
116
117#[inline]
119fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
120 let inner_env = env.env_inner();
121 let tx_ro = inner_env.tx_ro()?;
122
123 let tables = inner_env.open_tables(&tx_ro)?;
124
125 get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
126}
127
128fn filter_known_tx_blob_hashes(
130 env: &ConcreteEnv,
131 mut blob_hashes: HashSet<TransactionBlobHash>,
132) -> ReadResponseResult {
133 let inner_env = env.env_inner();
134 let tx_ro = inner_env.tx_ro()?;
135
136 let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
137 let tx_infos = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
138
139 let mut stem_pool_hashes = Vec::new();
140
141 let mut tx_unknown = |blob_hash| -> DbResult<bool> {
144 match tx_blob_hashes.get(&blob_hash) {
145 Ok(tx_hash) => {
146 if in_stem_pool(&tx_hash, &tx_infos)? {
147 stem_pool_hashes.push(tx_hash);
148 }
149 Ok(false)
150 }
151 Err(RuntimeError::KeyNotFound) => Ok(true),
152 Err(e) => Err(e),
153 }
154 };
155
156 let mut err = None;
157 blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) {
158 Ok(res) => res,
159 Err(e) => {
160 err = Some(e);
161 false
162 }
163 });
164
165 if let Some(e) = err {
166 return Err(e);
167 }
168
169 Ok(TxpoolReadResponse::FilterKnownTxBlobHashes {
170 unknown_blob_hashes: blob_hashes,
171 stem_pool_hashes,
172 })
173}
174
175fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseResult {
177 let inner_env = env.env_inner();
178 let tx_ro = inner_env.tx_ro()?;
179
180 let tables = inner_env.open_tables(&tx_ro)?;
181
182 let mut missing_tx_indexes = Vec::with_capacity(txs.len());
183 let mut txs_verification_data = HashMap::with_capacity(txs.len());
184
185 for (i, tx_hash) in txs.into_iter().enumerate() {
186 match get_transaction_verification_data(&tx_hash, &tables) {
187 Ok(tx) => {
188 txs_verification_data.insert(tx_hash, tx);
189 }
190 Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i),
191 Err(e) => return Err(e),
192 }
193 }
194
195 Ok(TxpoolReadResponse::TxsForBlock {
196 txs: txs_verification_data,
197 missing: missing_tx_indexes,
198 })
199}
200
201#[inline]
203fn backlog(env: &ConcreteEnv) -> ReadResponseResult {
204 Ok(TxpoolReadResponse::Backlog(todo!()))
205}
206
207#[inline]
209fn size(env: &ConcreteEnv, include_sensitive_txs: bool) -> ReadResponseResult {
210 Ok(TxpoolReadResponse::Size(todo!()))
211}