cuprate_blockchain/service/
read.rs

1//! Database reader thread-pool definitions and logic.
2
3#![expect(
4    unreachable_code,
5    unused_variables,
6    clippy::unnecessary_wraps,
7    clippy::needless_pass_by_value,
8    reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
9)]
10
11//---------------------------------------------------------------------------------------------------- Import
12use std::{
13    cmp::min,
14    collections::{HashMap, HashSet},
15    ops::Range,
16    sync::Arc,
17};
18
19use indexmap::{IndexMap, IndexSet};
20use rayon::{
21    iter::{Either, IntoParallelIterator, ParallelIterator},
22    prelude::*,
23    ThreadPool,
24};
25use thread_local::ThreadLocal;
26
27use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError};
28use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
29use cuprate_helper::map::combine_low_high_bits_to_u128;
30use cuprate_types::{
31    blockchain::{BlockchainReadRequest, BlockchainResponse},
32    output_cache::OutputCache,
33    rpc::OutputHistogramInput,
34    Chain, ChainId, ExtendedBlockHeader, OutputDistributionInput, TxsInBlock,
35};
36
37use crate::{
38    ops::{
39        alt_block::{
40            get_alt_block, get_alt_block_extended_header_from_height, get_alt_block_hash,
41            get_alt_chain_history_ranges,
42        },
43        block::{
44            block_exists, get_block_blob_with_tx_indexes, get_block_complete_entry,
45            get_block_complete_entry_from_height, get_block_extended_header_from_height,
46            get_block_height, get_block_info,
47        },
48        blockchain::{cumulative_generated_coins, find_split_point, top_block_height},
49        key_image::key_image_exists,
50        output::id_to_output_on_chain,
51    },
52    service::{
53        free::{compact_history_genesis_not_included, compact_history_index_to_height_offset},
54        types::{BlockchainReadHandle, ResponseResult},
55    },
56    tables::{
57        AltBlockHeights, BlockHeights, BlockInfos, OpenTables, RctOutputs, Tables, TablesIter,
58        TxIds, TxOutputs,
59    },
60    types::{
61        AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId,
62    },
63};
64
65//---------------------------------------------------------------------------------------------------- init_read_service
66/// Initialize the [`BlockchainReadHandle`] thread-pool backed by [`rayon`].
67///
68/// This spawns `threads` amount of reader threads
69/// attached to `env` and returns a handle to the pool.
70///
71/// Should be called _once_ per actual database. Calling this function more than once will create
72/// multiple unnecessary rayon thread-pools.
73#[cold]
74#[inline(never)] // Only called once.
75pub fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> BlockchainReadHandle {
76    init_read_service_with_pool(env, init_thread_pool(threads))
77}
78
79/// Initialize the blockchain database read service, with a specific rayon thread-pool instead of
80/// creating a new one.
81///
82/// Should be called _once_ per actual database, although nothing bad will happen, cloning the [`BlockchainReadHandle`]
83/// is the correct way to get multiple handles to the database.
84#[cold]
85#[inline(never)] // Only called once.
86pub fn init_read_service_with_pool(
87    env: Arc<ConcreteEnv>,
88    pool: Arc<ThreadPool>,
89) -> BlockchainReadHandle {
90    DatabaseReadService::new(env, pool, map_request)
91}
92
93//---------------------------------------------------------------------------------------------------- Request Mapping
94// This function maps [`Request`]s to function calls
95// executed by the rayon DB reader threadpool.
96
97/// Map [`Request`]'s to specific database handler functions.
98///
99/// This is the main entrance into all `Request` handler functions.
100/// The basic structure is:
101/// 1. `Request` is mapped to a handler function
102/// 2. Handler function is called
103/// 3. [`BlockchainResponse`] is returned
104fn map_request(
105    env: &ConcreteEnv,              // Access to the database
106    request: BlockchainReadRequest, // The request we must fulfill
107) -> ResponseResult {
108    use BlockchainReadRequest as R;
109
110    /* SOMEDAY: pre-request handling, run some code for each request? */
111
112    match request {
113        R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes),
114        R::BlockCompleteEntriesByHeight(heights) => block_complete_entries_by_height(env, heights),
115        R::BlockExtendedHeader(block) => block_extended_header(env, block),
116        R::BlockHash(block, chain) => block_hash(env, block, chain),
117        R::BlockHashInRange(blocks, chain) => block_hash_in_range(env, blocks, chain),
118        R::FindBlock(block_hash) => find_block(env, block_hash),
119        R::FilterUnknownHashes(hashes) => filter_unknown_hashes(env, hashes),
120        R::BlockExtendedHeaderInRange(range, chain) => {
121            block_extended_header_in_range(env, range, chain)
122        }
123        R::ChainHeight => chain_height(env),
124        R::GeneratedCoins(height) => generated_coins(env, height),
125        R::Outputs {
126            outputs: map,
127            get_txid,
128        } => outputs(env, map, get_txid),
129        R::OutputsVec { outputs, get_txid } => outputs_vec(env, outputs, get_txid),
130        R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec),
131        R::KeyImagesSpent(set) => key_images_spent(env, set),
132        R::KeyImagesSpentVec(set) => key_images_spent_vec(env, set),
133        R::CompactChainHistory => compact_chain_history(env),
134        R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount),
135        R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
136        R::TxsInBlock {
137            block_hash,
138            tx_indexes,
139        } => txs_in_block(env, block_hash, tx_indexes),
140        R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id),
141        R::Block { height } => block(env, height),
142        R::BlockByHash(hash) => block_by_hash(env, hash),
143        R::TotalTxCount => total_tx_count(env),
144        R::DatabaseSize => database_size(env),
145        R::OutputHistogram(input) => output_histogram(env, input),
146        R::CoinbaseTxSum { height, count } => coinbase_tx_sum(env, height, count),
147        R::AltChains => alt_chains(env),
148        R::AltChainCount => alt_chain_count(env),
149        R::Transactions { tx_hashes } => transactions(env, tx_hashes),
150        R::TotalRctOutputs => total_rct_outputs(env),
151        R::TxOutputIndexes { tx_hash } => tx_output_indexes(env, &tx_hash),
152        R::OutputDistribution(input) => output_distribution(env, input),
153    }
154
155    /* SOMEDAY: post-request handling, run some code for each request? */
156}
157
158//---------------------------------------------------------------------------------------------------- Thread Local
159/// Q: Why does this exist?
160///
161/// A1: `heed`'s transactions and tables are not `Sync`, so we cannot use
162/// them with rayon, however, we set a feature such that they are `Send`.
163///
164/// A2: When sending to rayon, we want to ensure each read transaction
165/// is only being used by 1 thread only to scale reads
166///
167/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576762346>
168#[inline]
169fn thread_local<T: Send>(env: &impl Env) -> ThreadLocal<T> {
170    ThreadLocal::with_capacity(env.config().reader_threads.get())
171}
172
173/// Take in a `ThreadLocal<impl Tables>` and return an `&impl Tables + Send`.
174///
175/// # Safety
176/// See [`DatabaseRo`] docs.
177///
178/// We are safely using `UnsafeSendable` in `service`'s reader thread-pool
179/// as we are pairing our usage with `ThreadLocal` - only 1 thread
180/// will ever access a transaction at a time. This is an INVARIANT.
181///
182/// A `Mutex` was considered but:
183/// - It is less performant
184/// - It isn't technically needed for safety in our use-case
185/// - It causes `DatabaseIter` function return issues as there is a `MutexGuard` object
186///
187/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1581684698>
188///
189/// # Notes
190/// This is used for other backends as well instead of branching with `cfg_if`.
191/// The other backends (as of current) are `Send + Sync` so this is fine.
192/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1585618374>
193macro_rules! get_tables {
194    ($env_inner:ident, $tx_ro:ident, $tables:ident) => {{
195        $tables.get_or_try(|| {
196            match $env_inner.open_tables($tx_ro) {
197                // SAFETY: see above macro doc comment.
198                Ok(tables) => Ok(unsafe { crate::unsafe_sendable::UnsafeSendable::new(tables) }),
199                Err(e) => Err(e),
200            }
201        })
202    }};
203}
204
205//---------------------------------------------------------------------------------------------------- Handler functions
206// These are the actual functions that do stuff according to the incoming [`Request`].
207//
208// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
209// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
210//
211// Each function will return the [`Response`] that we
212// should send back to the caller in [`map_request()`].
213//
214// INVARIANT:
215// These functions are called above in `tower::Service::call()`
216// using a custom threadpool which means any call to `par_*()` functions
217// will be using the custom rayon DB reader thread-pool, not the global one.
218//
219// All functions below assume that this is the case, such that
220// `par_*()` functions will not block the _global_ rayon thread-pool.
221
222// FIXME: implement multi-transaction read atomicity.
223// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576874589>.
224
225// TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal
226// amount of parallelism.
227
228/// [`BlockchainReadRequest::BlockCompleteEntries`].
229fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec<BlockHash>) -> ResponseResult {
230    // Prepare tx/tables in `ThreadLocal`.
231    let env_inner = env.env_inner();
232    let tx_ro = thread_local(env);
233    let tables = thread_local(env);
234
235    let (missing_hashes, blocks) = block_hashes
236        .into_par_iter()
237        .map(|block_hash| {
238            let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
239            let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
240
241            match get_block_complete_entry(&block_hash, tables) {
242                Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)),
243                res => res.map(Either::Right),
244            }
245        })
246        .collect::<DbResult<_>>()?;
247
248    let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
249    let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
250
251    let blockchain_height = crate::ops::blockchain::chain_height(tables.block_heights())?;
252
253    Ok(BlockchainResponse::BlockCompleteEntries {
254        blocks,
255        missing_hashes,
256        blockchain_height,
257    })
258}
259
260/// [`BlockchainReadRequest::BlockCompleteEntriesByHeight`].
261fn block_complete_entries_by_height(
262    env: &ConcreteEnv,
263    block_heights: Vec<BlockHeight>,
264) -> ResponseResult {
265    // Prepare tx/tables in `ThreadLocal`.
266    let env_inner = env.env_inner();
267    let tx_ro = thread_local(env);
268    let tables = thread_local(env);
269
270    let blocks = block_heights
271        .into_par_iter()
272        .map(|height| {
273            let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
274            let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
275            get_block_complete_entry_from_height(&height, tables)
276        })
277        .collect::<DbResult<_>>()?;
278
279    let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
280    let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
281
282    Ok(BlockchainResponse::BlockCompleteEntriesByHeight(blocks))
283}
284
285/// [`BlockchainReadRequest::BlockExtendedHeader`].
286#[inline]
287fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
288    // Single-threaded, no `ThreadLocal` required.
289    let env_inner = env.env_inner();
290    let tx_ro = env_inner.tx_ro()?;
291    let tables = env_inner.open_tables(&tx_ro)?;
292
293    Ok(BlockchainResponse::BlockExtendedHeader(
294        get_block_extended_header_from_height(&block_height, &tables)?,
295    ))
296}
297
298/// [`BlockchainReadRequest::BlockHash`].
299#[inline]
300fn block_hash(env: &ConcreteEnv, block_height: BlockHeight, chain: Chain) -> ResponseResult {
301    // Single-threaded, no `ThreadLocal` required.
302    let env_inner = env.env_inner();
303    let tx_ro = env_inner.tx_ro()?;
304    let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
305
306    let block_hash = match chain {
307        Chain::Main => get_block_info(&block_height, &table_block_infos)?.block_hash,
308        Chain::Alt(chain) => {
309            get_alt_block_hash(&block_height, chain, &env_inner.open_tables(&tx_ro)?)?
310        }
311    };
312
313    Ok(BlockchainResponse::BlockHash(block_hash))
314}
315
316/// [`BlockchainReadRequest::BlockHashInRange`].
317#[inline]
318fn block_hash_in_range(env: &ConcreteEnv, range: Range<usize>, chain: Chain) -> ResponseResult {
319    // Prepare tx/tables in `ThreadLocal`.
320    let env_inner = env.env_inner();
321    let tx_ro = thread_local(env);
322
323    let block_hash = range
324        .into_par_iter()
325        .map(|block_height| {
326            let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
327
328            let table_block_infos = env_inner.open_db_ro::<BlockInfos>(tx_ro)?;
329
330            let block_hash = match chain {
331                Chain::Main => get_block_info(&block_height, &table_block_infos)?.block_hash,
332                Chain::Alt(chain) => {
333                    get_alt_block_hash(&block_height, chain, &env_inner.open_tables(tx_ro)?)?
334                }
335            };
336
337            Ok(block_hash)
338        })
339        .collect::<Result<_, RuntimeError>>()?;
340
341    Ok(BlockchainResponse::BlockHashInRange(block_hash))
342}
343
344/// [`BlockchainReadRequest::FindBlock`]
345fn find_block(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
346    // Single-threaded, no `ThreadLocal` required.
347    let env_inner = env.env_inner();
348    let tx_ro = env_inner.tx_ro()?;
349
350    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
351
352    // Check the main chain first.
353    match table_block_heights.get(&block_hash) {
354        Ok(height) => return Ok(BlockchainResponse::FindBlock(Some((Chain::Main, height)))),
355        Err(RuntimeError::KeyNotFound) => (),
356        Err(e) => return Err(e),
357    }
358
359    let table_alt_block_heights = env_inner.open_db_ro::<AltBlockHeights>(&tx_ro)?;
360
361    match table_alt_block_heights.get(&block_hash) {
362        Ok(height) => Ok(BlockchainResponse::FindBlock(Some((
363            Chain::Alt(height.chain_id.into()),
364            height.height,
365        )))),
366        Err(RuntimeError::KeyNotFound) => Ok(BlockchainResponse::FindBlock(None)),
367        Err(e) => Err(e),
368    }
369}
370
371/// [`BlockchainReadRequest::FilterUnknownHashes`].
372#[inline]
373fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet<BlockHash>) -> ResponseResult {
374    // Single-threaded, no `ThreadLocal` required.
375    let env_inner = env.env_inner();
376    let tx_ro = env_inner.tx_ro()?;
377
378    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
379
380    let mut err = None;
381
382    hashes.retain(
383        |block_hash| match block_exists(block_hash, &table_block_heights) {
384            Ok(exists) => exists,
385            Err(e) => {
386                err.get_or_insert(e);
387                false
388            }
389        },
390    );
391
392    if let Some(e) = err {
393        Err(e)
394    } else {
395        Ok(BlockchainResponse::FilterUnknownHashes(hashes))
396    }
397}
398
399/// [`BlockchainReadRequest::BlockExtendedHeaderInRange`].
400#[inline]
401fn block_extended_header_in_range(
402    env: &ConcreteEnv,
403    range: Range<BlockHeight>,
404    chain: Chain,
405) -> ResponseResult {
406    // Prepare tx/tables in `ThreadLocal`.
407    let env_inner = env.env_inner();
408    let tx_ro = thread_local(env);
409    let tables = thread_local(env);
410
411    // Collect results using `rayon`.
412    let vec = match chain {
413        Chain::Main => range
414            .into_par_iter()
415            .map(|block_height| {
416                let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
417                let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
418                get_block_extended_header_from_height(&block_height, tables)
419            })
420            .collect::<DbResult<Vec<ExtendedBlockHeader>>>()?,
421        Chain::Alt(chain_id) => {
422            let ranges = {
423                let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
424                let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
425                let alt_chains = tables.alt_chain_infos();
426
427                get_alt_chain_history_ranges(range, chain_id, alt_chains)?
428            };
429
430            ranges
431                .par_iter()
432                .rev()
433                .flat_map(|(chain, range)| {
434                    range.clone().into_par_iter().map(|height| {
435                        let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
436                        let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
437
438                        match *chain {
439                            Chain::Main => get_block_extended_header_from_height(&height, tables),
440                            Chain::Alt(chain_id) => get_alt_block_extended_header_from_height(
441                                &AltBlockHeight {
442                                    chain_id: chain_id.into(),
443                                    height,
444                                },
445                                tables,
446                            ),
447                        }
448                    })
449                })
450                .collect::<DbResult<Vec<_>>>()?
451        }
452    };
453
454    Ok(BlockchainResponse::BlockExtendedHeaderInRange(vec))
455}
456
457/// [`BlockchainReadRequest::ChainHeight`].
458#[inline]
459fn chain_height(env: &ConcreteEnv) -> ResponseResult {
460    // Single-threaded, no `ThreadLocal` required.
461    let env_inner = env.env_inner();
462    let tx_ro = env_inner.tx_ro()?;
463    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
464    let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
465
466    let chain_height = crate::ops::blockchain::chain_height(&table_block_heights)?;
467    let block_hash =
468        get_block_info(&chain_height.saturating_sub(1), &table_block_infos)?.block_hash;
469
470    Ok(BlockchainResponse::ChainHeight(chain_height, block_hash))
471}
472
473/// [`BlockchainReadRequest::GeneratedCoins`].
474#[inline]
475fn generated_coins(env: &ConcreteEnv, height: usize) -> ResponseResult {
476    // Single-threaded, no `ThreadLocal` required.
477    let env_inner = env.env_inner();
478    let tx_ro = env_inner.tx_ro()?;
479    let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
480
481    Ok(BlockchainResponse::GeneratedCoins(
482        cumulative_generated_coins(&height, &table_block_infos)?,
483    ))
484}
485
486/// [`BlockchainReadRequest::Outputs`].
487#[inline]
488fn outputs(
489    env: &ConcreteEnv,
490    outputs: IndexMap<Amount, IndexSet<AmountIndex>>,
491    get_txid: bool,
492) -> ResponseResult {
493    // Prepare tx/tables in `ThreadLocal`.
494    let env_inner = env.env_inner();
495    let tx_ro = thread_local(env);
496    let tables = thread_local(env);
497
498    let amount_of_outs = outputs
499        .par_iter()
500        .map(|(&amount, _)| {
501            let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
502            let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
503
504            if amount == 0 {
505                Ok((amount, tables.rct_outputs().len()?))
506            } else {
507                // v1 transactions.
508                match tables.num_outputs().get(&amount) {
509                    Ok(count) => Ok((amount, count)),
510                    // If we get a request for an `amount` that doesn't exist,
511                    // we return `0` instead of an error.
512                    Err(RuntimeError::KeyNotFound) => Ok((amount, 0)),
513                    Err(e) => Err(e),
514                }
515            }
516        })
517        .collect::<Result<_, _>>()?;
518
519    // The 2nd mapping function.
520    // This is pulled out from the below `map()` for readability.
521    let inner_map = |amount, amount_index| {
522        let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
523        let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
524
525        let id = PreRctOutputId {
526            amount,
527            amount_index,
528        };
529
530        let output_on_chain = match id_to_output_on_chain(&id, get_txid, tables) {
531            Ok(output) => output,
532            Err(RuntimeError::KeyNotFound) => return Ok(Either::Right(amount_index)),
533            Err(e) => return Err(e),
534        };
535
536        Ok(Either::Left((amount_index, output_on_chain)))
537    };
538
539    // Collect results using `rayon`.
540    let (map, wanted_outputs) = outputs
541        .into_par_iter()
542        .map(|(amount, amount_index_set)| {
543            let (left, right) = amount_index_set
544                .into_par_iter()
545                .map(|amount_index| inner_map(amount, amount_index))
546                .collect::<Result<_, _>>()?;
547
548            Ok(((amount, left), (amount, right)))
549        })
550        .collect::<DbResult<(IndexMap<_, IndexMap<_, _>>, IndexMap<_, IndexSet<_>>)>>()?;
551
552    let cache = OutputCache::new(map, amount_of_outs, wanted_outputs);
553
554    Ok(BlockchainResponse::Outputs(cache))
555}
556
557/// [`BlockchainReadRequest::OutputsVec`].
558#[inline]
559fn outputs_vec(
560    env: &ConcreteEnv,
561    outputs: Vec<(Amount, AmountIndex)>,
562    get_txid: bool,
563) -> ResponseResult {
564    Ok(BlockchainResponse::OutputsVec(todo!()))
565}
566
567/// [`BlockchainReadRequest::NumberOutputsWithAmount`].
568#[inline]
569fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec<Amount>) -> ResponseResult {
570    // Prepare tx/tables in `ThreadLocal`.
571    let env_inner = env.env_inner();
572    let tx_ro = thread_local(env);
573    let tables = thread_local(env);
574
575    // Cache the amount of RCT outputs once.
576    #[expect(
577        clippy::cast_possible_truncation,
578        reason = "INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`"
579    )]
580    let num_rct_outputs = {
581        let tx_ro = env_inner.tx_ro()?;
582        let tables = env_inner.open_tables(&tx_ro)?;
583        tables.rct_outputs().len()? as usize
584    };
585
586    // Collect results using `rayon`.
587    let map = amounts
588        .into_par_iter()
589        .map(|amount| {
590            let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
591            let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
592
593            if amount == 0 {
594                // v2 transactions.
595                Ok((amount, num_rct_outputs))
596            } else {
597                // v1 transactions.
598                match tables.num_outputs().get(&amount) {
599                    #[expect(
600                        clippy::cast_possible_truncation,
601                        reason = "INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`"
602                    )]
603                    Ok(count) => Ok((amount, count as usize)),
604                    // If we get a request for an `amount` that doesn't exist,
605                    // we return `0` instead of an error.
606                    Err(RuntimeError::KeyNotFound) => Ok((amount, 0)),
607                    Err(e) => Err(e),
608                }
609            }
610        })
611        .collect::<DbResult<HashMap<Amount, usize>>>()?;
612
613    Ok(BlockchainResponse::NumberOutputsWithAmount(map))
614}
615
616/// [`BlockchainReadRequest::KeyImagesSpent`].
617#[inline]
618fn key_images_spent(env: &ConcreteEnv, key_images: HashSet<KeyImage>) -> ResponseResult {
619    // Prepare tx/tables in `ThreadLocal`.
620    let env_inner = env.env_inner();
621    let tx_ro = thread_local(env);
622    let tables = thread_local(env);
623
624    // Key image check function.
625    let key_image_exists = |key_image| {
626        let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
627        let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
628        key_image_exists(&key_image, tables.key_images())
629    };
630
631    // FIXME:
632    // Create/use `enum cuprate_types::Exist { Does, DoesNot }`
633    // or similar instead of `bool` for clarity.
634    // <https://github.com/Cuprate/cuprate/pull/113#discussion_r1581536526>
635    //
636    // Collect results using `rayon`.
637    match key_images
638        .into_par_iter()
639        .map(key_image_exists)
640        // If the result is either:
641        // `Ok(true)` => a key image was found, return early
642        // `Err` => an error was found, return early
643        //
644        // Else, `Ok(false)` will continue the iterator.
645        .find_any(|result| !matches!(result, Ok(false)))
646    {
647        None | Some(Ok(false)) => Ok(BlockchainResponse::KeyImagesSpent(false)), // Key image was NOT found.
648        Some(Ok(true)) => Ok(BlockchainResponse::KeyImagesSpent(true)), // Key image was found.
649        Some(Err(e)) => Err(e), // A database error occurred.
650    }
651}
652
653/// [`BlockchainReadRequest::KeyImagesSpentVec`].
654fn key_images_spent_vec(env: &ConcreteEnv, key_images: Vec<KeyImage>) -> ResponseResult {
655    // Prepare tx/tables in `ThreadLocal`.
656    let env_inner = env.env_inner();
657    let tx_ro = thread_local(env);
658    let tables = thread_local(env);
659
660    // Key image check function.
661    let key_image_exists = |key_image| {
662        let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
663        let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
664        key_image_exists(&key_image, tables.key_images())
665    };
666
667    // Collect results using `rayon`.
668    Ok(BlockchainResponse::KeyImagesSpentVec(
669        key_images
670            .into_par_iter()
671            .map(key_image_exists)
672            .collect::<DbResult<_>>()?,
673    ))
674}
675
676/// [`BlockchainReadRequest::CompactChainHistory`]
677fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult {
678    let env_inner = env.env_inner();
679    let tx_ro = env_inner.tx_ro()?;
680
681    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
682    let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
683
684    let top_block_height = top_block_height(&table_block_heights)?;
685
686    let top_block_info = get_block_info(&top_block_height, &table_block_infos)?;
687    let cumulative_difficulty = combine_low_high_bits_to_u128(
688        top_block_info.cumulative_difficulty_low,
689        top_block_info.cumulative_difficulty_high,
690    );
691
692    /// The amount of top block IDs in the compact chain.
693    const INITIAL_BLOCKS: usize = 11;
694
695    // rayon is not used here because the amount of block IDs is expected to be small.
696    let mut block_ids = (0..)
697        .map(compact_history_index_to_height_offset::<INITIAL_BLOCKS>)
698        .map_while(|i| top_block_height.checked_sub(i))
699        .map(|height| Ok(get_block_info(&height, &table_block_infos)?.block_hash))
700        .collect::<DbResult<Vec<_>>>()?;
701
702    if compact_history_genesis_not_included::<INITIAL_BLOCKS>(top_block_height) {
703        block_ids.push(get_block_info(&0, &table_block_infos)?.block_hash);
704    }
705
706    Ok(BlockchainResponse::CompactChainHistory {
707        cumulative_difficulty,
708        block_ids,
709    })
710}
711
712/// [`BlockchainReadRequest::NextChainEntry`]
713///
714/// # Invariant
715/// `block_ids` must be sorted in reverse chronological block order, or else
716/// the returned result is unspecified and meaningless, as this function
717/// performs a binary search.
718fn next_chain_entry(
719    env: &ConcreteEnv,
720    block_ids: &[BlockHash],
721    next_entry_size: usize,
722) -> ResponseResult {
723    // Single-threaded, no `ThreadLocal` required.
724    let env_inner = env.env_inner();
725    let tx_ro = env_inner.tx_ro()?;
726
727    let tables = env_inner.open_tables(&tx_ro)?;
728    let table_block_heights = tables.block_heights();
729    let table_alt_block_heights = tables.alt_block_heights();
730    let table_block_infos = tables.block_infos_iter();
731
732    let idx = find_split_point(
733        block_ids,
734        false,
735        false,
736        table_block_heights,
737        table_alt_block_heights,
738    )?;
739
740    // This will happen if we have a different genesis block.
741    if idx == block_ids.len() {
742        return Ok(BlockchainResponse::NextChainEntry {
743            start_height: None,
744            chain_height: 0,
745            block_ids: vec![],
746            block_weights: vec![],
747            cumulative_difficulty: 0,
748            first_block_blob: None,
749        });
750    }
751
752    // The returned chain entry must overlap with one of the blocks  we were told about.
753    let first_known_block_hash = block_ids[idx];
754    let first_known_height = table_block_heights.get(&first_known_block_hash)?;
755
756    let chain_height = crate::ops::blockchain::chain_height(table_block_heights)?;
757    let last_height_in_chain_entry = min(first_known_height + next_entry_size, chain_height);
758
759    let (block_ids, block_weights) = (first_known_height..last_height_in_chain_entry)
760        .map(|height| {
761            let block_info = table_block_infos.get(&height)?;
762
763            Ok((block_info.block_hash, block_info.weight))
764        })
765        .collect::<DbResult<(Vec<_>, Vec<_>)>>()?;
766
767    let top_block_info = table_block_infos.get(&(chain_height - 1))?;
768
769    let first_block_blob = if block_ids.len() >= 2 {
770        Some(get_block_blob_with_tx_indexes(&(first_known_height + 1), &tables)?.0)
771    } else {
772        None
773    };
774
775    Ok(BlockchainResponse::NextChainEntry {
776        start_height: Some(first_known_height),
777        chain_height,
778        block_ids,
779        block_weights,
780        cumulative_difficulty: combine_low_high_bits_to_u128(
781            top_block_info.cumulative_difficulty_low,
782            top_block_info.cumulative_difficulty_high,
783        ),
784        first_block_blob,
785    })
786}
787
788/// [`BlockchainReadRequest::FindFirstUnknown`]
789///
790/// # Invariant
791/// `block_ids` must be sorted in chronological block order, or else
792/// the returned result is unspecified and meaningless, as this function
793/// performs a binary search.
794fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseResult {
795    let env_inner = env.env_inner();
796    let tx_ro = env_inner.tx_ro()?;
797
798    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
799    let table_alt_block_heights = env_inner.open_db_ro::<AltBlockHeights>(&tx_ro)?;
800
801    let idx = find_split_point(
802        block_ids,
803        true,
804        true,
805        &table_block_heights,
806        &table_alt_block_heights,
807    )?;
808
809    Ok(if idx == block_ids.len() {
810        BlockchainResponse::FindFirstUnknown(None)
811    } else if idx == 0 {
812        BlockchainResponse::FindFirstUnknown(Some((0, 0)))
813    } else {
814        let last_known_height = get_block_height(&block_ids[idx - 1], &table_block_heights)?;
815
816        BlockchainResponse::FindFirstUnknown(Some((idx, last_known_height + 1)))
817    })
818}
819
820/// [`BlockchainReadRequest::TxsInBlock`]
821fn txs_in_block(env: &ConcreteEnv, block_hash: [u8; 32], missing_txs: Vec<u64>) -> ResponseResult {
822    // Single-threaded, no `ThreadLocal` required.
823    let env_inner = env.env_inner();
824    let tx_ro = env_inner.tx_ro()?;
825    let tables = env_inner.open_tables(&tx_ro)?;
826
827    let block_height = tables.block_heights().get(&block_hash)?;
828
829    let (block, miner_tx_index, numb_txs) = get_block_blob_with_tx_indexes(&block_height, &tables)?;
830    let first_tx_index = miner_tx_index + 1;
831
832    if numb_txs < missing_txs.len() {
833        return Ok(BlockchainResponse::TxsInBlock(None));
834    }
835
836    let txs = missing_txs
837        .into_iter()
838        .map(|index_offset| Ok(tables.tx_blobs().get(&(first_tx_index + index_offset))?.0))
839        .collect::<DbResult<_>>()?;
840
841    Ok(BlockchainResponse::TxsInBlock(Some(TxsInBlock {
842        block,
843        txs,
844    })))
845}
846
847/// [`BlockchainReadRequest::AltBlocksInChain`]
848fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult {
849    // Prepare tx/tables in `ThreadLocal`.
850    let env_inner = env.env_inner();
851    let tx_ro = thread_local(env);
852    let tables = thread_local(env);
853
854    // Get the history of this alt-chain.
855    let history = {
856        let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
857        let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
858        get_alt_chain_history_ranges(0..usize::MAX, chain_id, tables.alt_chain_infos())?
859    };
860
861    // Get all the blocks until we join the main-chain.
862    let blocks = history
863        .par_iter()
864        .rev()
865        .skip(1)
866        .flat_map(|(chain_id, range)| {
867            let Chain::Alt(chain_id) = chain_id else {
868                panic!("Should not have main chain blocks here we skipped last range");
869            };
870
871            range.clone().into_par_iter().map(|height| {
872                let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
873                let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
874
875                get_alt_block(
876                    &AltBlockHeight {
877                        chain_id: (*chain_id).into(),
878                        height,
879                    },
880                    tables,
881                )
882            })
883        })
884        .collect::<DbResult<_>>()?;
885
886    Ok(BlockchainResponse::AltBlocksInChain(blocks))
887}
888
889/// [`BlockchainReadRequest::Block`]
890fn block(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
891    Ok(BlockchainResponse::Block(todo!()))
892}
893
894/// [`BlockchainReadRequest::BlockByHash`]
895fn block_by_hash(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
896    Ok(BlockchainResponse::Block(todo!()))
897}
898
899/// [`BlockchainReadRequest::TotalTxCount`]
900fn total_tx_count(env: &ConcreteEnv) -> ResponseResult {
901    Ok(BlockchainResponse::TotalTxCount(todo!()))
902}
903
904/// [`BlockchainReadRequest::DatabaseSize`]
905fn database_size(env: &ConcreteEnv) -> ResponseResult {
906    Ok(BlockchainResponse::DatabaseSize {
907        database_size: todo!(),
908        free_space: todo!(),
909    })
910}
911
912/// [`BlockchainReadRequest::OutputHistogram`]
913fn output_histogram(env: &ConcreteEnv, input: OutputHistogramInput) -> ResponseResult {
914    Ok(BlockchainResponse::OutputHistogram(todo!()))
915}
916
917/// [`BlockchainReadRequest::CoinbaseTxSum`]
918fn coinbase_tx_sum(env: &ConcreteEnv, height: usize, count: u64) -> ResponseResult {
919    Ok(BlockchainResponse::CoinbaseTxSum(todo!()))
920}
921
922/// [`BlockchainReadRequest::AltChains`]
923fn alt_chains(env: &ConcreteEnv) -> ResponseResult {
924    Ok(BlockchainResponse::AltChains(todo!()))
925}
926
927/// [`BlockchainReadRequest::AltChainCount`]
928fn alt_chain_count(env: &ConcreteEnv) -> ResponseResult {
929    Ok(BlockchainResponse::AltChainCount(todo!()))
930}
931
932/// [`BlockchainReadRequest::Transactions`]
933fn transactions(env: &ConcreteEnv, tx_hashes: HashSet<[u8; 32]>) -> ResponseResult {
934    Ok(BlockchainResponse::Transactions {
935        txs: todo!(),
936        missed_txs: todo!(),
937    })
938}
939
940/// [`BlockchainReadRequest::TotalRctOutputs`]
941fn total_rct_outputs(env: &ConcreteEnv) -> ResponseResult {
942    // Single-threaded, no `ThreadLocal` required.
943    let env_inner = env.env_inner();
944    let tx_ro = env_inner.tx_ro()?;
945    let len = env_inner.open_db_ro::<RctOutputs>(&tx_ro)?.len()?;
946
947    Ok(BlockchainResponse::TotalRctOutputs(len))
948}
949
950/// [`BlockchainReadRequest::TxOutputIndexes`]
951fn tx_output_indexes(env: &ConcreteEnv, tx_hash: &[u8; 32]) -> ResponseResult {
952    // Single-threaded, no `ThreadLocal` required.
953    let env_inner = env.env_inner();
954    let tx_ro = env_inner.tx_ro()?;
955    let tx_id = env_inner.open_db_ro::<TxIds>(&tx_ro)?.get(tx_hash)?;
956    let o_indexes = env_inner.open_db_ro::<TxOutputs>(&tx_ro)?.get(&tx_id)?;
957
958    Ok(BlockchainResponse::TxOutputIndexes(o_indexes.0))
959}
960
961/// [`BlockchainReadRequest::OutputDistribution`]
962fn output_distribution(env: &ConcreteEnv, input: OutputDistributionInput) -> ResponseResult {
963    Ok(BlockchainResponse::OutputDistribution(todo!()))
964}