cuprate_blockchain/service/
read.rs

1//! Database reader thread-pool definitions and logic.
2
3#![expect(
4    unreachable_code,
5    unused_variables,
6    clippy::unnecessary_wraps,
7    clippy::needless_pass_by_value,
8    reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
9)]
10
11//---------------------------------------------------------------------------------------------------- Import
12use std::{
13    cmp::min,
14    collections::{HashMap, HashSet},
15    ops::Range,
16    sync::Arc,
17};
18
19use indexmap::{IndexMap, IndexSet};
20use rayon::{
21    iter::{Either, IntoParallelIterator, ParallelIterator},
22    prelude::*,
23    ThreadPool,
24};
25use thread_local::ThreadLocal;
26
27use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError};
28use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
29use cuprate_helper::map::combine_low_high_bits_to_u128;
30use cuprate_types::{
31    blockchain::{BlockchainReadRequest, BlockchainResponse},
32    output_cache::OutputCache,
33    Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, TxsInBlock,
34};
35
36use crate::{
37    ops::{
38        alt_block::{
39            get_alt_block, get_alt_block_extended_header_from_height, get_alt_block_hash,
40            get_alt_chain_history_ranges,
41        },
42        block::{
43            block_exists, get_block_blob_with_tx_indexes, get_block_complete_entry,
44            get_block_extended_header_from_height, get_block_height, get_block_info,
45        },
46        blockchain::{cumulative_generated_coins, find_split_point, top_block_height},
47        key_image::key_image_exists,
48        output::id_to_output_on_chain,
49    },
50    service::{
51        free::{compact_history_genesis_not_included, compact_history_index_to_height_offset},
52        types::{BlockchainReadHandle, ResponseResult},
53    },
54    tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables, TablesIter},
55    types::{
56        AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId,
57    },
58};
59
60//---------------------------------------------------------------------------------------------------- init_read_service
61/// Initialize the [`BlockchainReadHandle`] thread-pool backed by [`rayon`].
62///
63/// This spawns `threads` amount of reader threads
64/// attached to `env` and returns a handle to the pool.
65///
66/// Should be called _once_ per actual database. Calling this function more than once will create
67/// multiple unnecessary rayon thread-pools.
68#[cold]
69#[inline(never)] // Only called once.
70pub fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> BlockchainReadHandle {
71    init_read_service_with_pool(env, init_thread_pool(threads))
72}
73
74/// Initialize the blockchain database read service, with a specific rayon thread-pool instead of
75/// creating a new one.
76///
77/// Should be called _once_ per actual database, although nothing bad will happen, cloning the [`BlockchainReadHandle`]
78/// is the correct way to get multiple handles to the database.
79#[cold]
80#[inline(never)] // Only called once.
81pub fn init_read_service_with_pool(
82    env: Arc<ConcreteEnv>,
83    pool: Arc<ThreadPool>,
84) -> BlockchainReadHandle {
85    DatabaseReadService::new(env, pool, map_request)
86}
87
88//---------------------------------------------------------------------------------------------------- Request Mapping
89// This function maps [`Request`]s to function calls
90// executed by the rayon DB reader threadpool.
91
92/// Map [`Request`]'s to specific database handler functions.
93///
94/// This is the main entrance into all `Request` handler functions.
95/// The basic structure is:
96/// 1. `Request` is mapped to a handler function
97/// 2. Handler function is called
98/// 3. [`BlockchainResponse`] is returned
99fn map_request(
100    env: &ConcreteEnv,              // Access to the database
101    request: BlockchainReadRequest, // The request we must fulfill
102) -> ResponseResult {
103    use BlockchainReadRequest as R;
104
105    /* SOMEDAY: pre-request handling, run some code for each request? */
106
107    match request {
108        R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes),
109        R::BlockExtendedHeader(block) => block_extended_header(env, block),
110        R::BlockHash(block, chain) => block_hash(env, block, chain),
111        R::BlockHashInRange(blocks, chain) => block_hash_in_range(env, blocks, chain),
112        R::FindBlock(block_hash) => find_block(env, block_hash),
113        R::FilterUnknownHashes(hashes) => filter_unknown_hashes(env, hashes),
114        R::BlockExtendedHeaderInRange(range, chain) => {
115            block_extended_header_in_range(env, range, chain)
116        }
117        R::ChainHeight => chain_height(env),
118        R::GeneratedCoins(height) => generated_coins(env, height),
119        R::Outputs(map) => outputs(env, map),
120        R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec),
121        R::KeyImagesSpent(set) => key_images_spent(env, set),
122        R::CompactChainHistory => compact_chain_history(env),
123        R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount),
124        R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
125        R::TxsInBlock {
126            block_hash,
127            tx_indexes,
128        } => txs_in_block(env, block_hash, tx_indexes),
129        R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id),
130        R::Block { height } => block(env, height),
131        R::BlockByHash(hash) => block_by_hash(env, hash),
132        R::TotalTxCount => total_tx_count(env),
133        R::DatabaseSize => database_size(env),
134        R::OutputHistogram(input) => output_histogram(env, input),
135        R::CoinbaseTxSum { height, count } => coinbase_tx_sum(env, height, count),
136        R::AltChains => alt_chains(env),
137        R::AltChainCount => alt_chain_count(env),
138    }
139
140    /* SOMEDAY: post-request handling, run some code for each request? */
141}
142
143//---------------------------------------------------------------------------------------------------- Thread Local
144/// Q: Why does this exist?
145///
146/// A1: `heed`'s transactions and tables are not `Sync`, so we cannot use
147/// them with rayon, however, we set a feature such that they are `Send`.
148///
149/// A2: When sending to rayon, we want to ensure each read transaction
150/// is only being used by 1 thread only to scale reads
151///
152/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576762346>
153#[inline]
154fn thread_local<T: Send>(env: &impl Env) -> ThreadLocal<T> {
155    ThreadLocal::with_capacity(env.config().reader_threads.get())
156}
157
158/// Take in a `ThreadLocal<impl Tables>` and return an `&impl Tables + Send`.
159///
160/// # Safety
161/// See [`DatabaseRo`] docs.
162///
163/// We are safely using `UnsafeSendable` in `service`'s reader thread-pool
164/// as we are pairing our usage with `ThreadLocal` - only 1 thread
165/// will ever access a transaction at a time. This is an INVARIANT.
166///
167/// A `Mutex` was considered but:
168/// - It is less performant
169/// - It isn't technically needed for safety in our use-case
170/// - It causes `DatabaseIter` function return issues as there is a `MutexGuard` object
171///
172/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1581684698>
173///
174/// # Notes
175/// This is used for other backends as well instead of branching with `cfg_if`.
176/// The other backends (as of current) are `Send + Sync` so this is fine.
177/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1585618374>
178macro_rules! get_tables {
179    ($env_inner:ident, $tx_ro:ident, $tables:ident) => {{
180        $tables.get_or_try(|| {
181            match $env_inner.open_tables($tx_ro) {
182                // SAFETY: see above macro doc comment.
183                Ok(tables) => Ok(unsafe { crate::unsafe_sendable::UnsafeSendable::new(tables) }),
184                Err(e) => Err(e),
185            }
186        })
187    }};
188}
189
190//---------------------------------------------------------------------------------------------------- Handler functions
191// These are the actual functions that do stuff according to the incoming [`Request`].
192//
193// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
194// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
195//
196// Each function will return the [`Response`] that we
197// should send back to the caller in [`map_request()`].
198//
199// INVARIANT:
200// These functions are called above in `tower::Service::call()`
201// using a custom threadpool which means any call to `par_*()` functions
202// will be using the custom rayon DB reader thread-pool, not the global one.
203//
204// All functions below assume that this is the case, such that
205// `par_*()` functions will not block the _global_ rayon thread-pool.
206
207// FIXME: implement multi-transaction read atomicity.
208// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576874589>.
209
210// TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal
211// amount of parallelism.
212
213/// [`BlockchainReadRequest::BlockCompleteEntries`].
214fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec<BlockHash>) -> ResponseResult {
215    // Prepare tx/tables in `ThreadLocal`.
216    let env_inner = env.env_inner();
217    let tx_ro = thread_local(env);
218    let tables = thread_local(env);
219
220    let (missing_hashes, blocks) = block_hashes
221        .into_par_iter()
222        .map(|block_hash| {
223            let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
224            let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
225
226            match get_block_complete_entry(&block_hash, tables) {
227                Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)),
228                res => res.map(Either::Right),
229            }
230        })
231        .collect::<DbResult<_>>()?;
232
233    let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
234    let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
235
236    let blockchain_height = crate::ops::blockchain::chain_height(tables.block_heights())?;
237
238    Ok(BlockchainResponse::BlockCompleteEntries {
239        blocks,
240        missing_hashes,
241        blockchain_height,
242    })
243}
244
245/// [`BlockchainReadRequest::BlockExtendedHeader`].
246#[inline]
247fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
248    // Single-threaded, no `ThreadLocal` required.
249    let env_inner = env.env_inner();
250    let tx_ro = env_inner.tx_ro()?;
251    let tables = env_inner.open_tables(&tx_ro)?;
252
253    Ok(BlockchainResponse::BlockExtendedHeader(
254        get_block_extended_header_from_height(&block_height, &tables)?,
255    ))
256}
257
258/// [`BlockchainReadRequest::BlockHash`].
259#[inline]
260fn block_hash(env: &ConcreteEnv, block_height: BlockHeight, chain: Chain) -> ResponseResult {
261    // Single-threaded, no `ThreadLocal` required.
262    let env_inner = env.env_inner();
263    let tx_ro = env_inner.tx_ro()?;
264    let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
265
266    let block_hash = match chain {
267        Chain::Main => get_block_info(&block_height, &table_block_infos)?.block_hash,
268        Chain::Alt(chain) => {
269            get_alt_block_hash(&block_height, chain, &env_inner.open_tables(&tx_ro)?)?
270        }
271    };
272
273    Ok(BlockchainResponse::BlockHash(block_hash))
274}
275
276/// [`BlockchainReadRequest::BlockHashInRange`].
277#[inline]
278fn block_hash_in_range(env: &ConcreteEnv, range: Range<usize>, chain: Chain) -> ResponseResult {
279    // Prepare tx/tables in `ThreadLocal`.
280    let env_inner = env.env_inner();
281    let tx_ro = thread_local(env);
282
283    let block_hash = range
284        .into_par_iter()
285        .map(|block_height| {
286            let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
287
288            let table_block_infos = env_inner.open_db_ro::<BlockInfos>(tx_ro)?;
289
290            let block_hash = match chain {
291                Chain::Main => get_block_info(&block_height, &table_block_infos)?.block_hash,
292                Chain::Alt(chain) => {
293                    get_alt_block_hash(&block_height, chain, &env_inner.open_tables(tx_ro)?)?
294                }
295            };
296
297            Ok(block_hash)
298        })
299        .collect::<Result<_, RuntimeError>>()?;
300
301    Ok(BlockchainResponse::BlockHashInRange(block_hash))
302}
303
304/// [`BlockchainReadRequest::FindBlock`]
305fn find_block(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
306    // Single-threaded, no `ThreadLocal` required.
307    let env_inner = env.env_inner();
308    let tx_ro = env_inner.tx_ro()?;
309
310    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
311
312    // Check the main chain first.
313    match table_block_heights.get(&block_hash) {
314        Ok(height) => return Ok(BlockchainResponse::FindBlock(Some((Chain::Main, height)))),
315        Err(RuntimeError::KeyNotFound) => (),
316        Err(e) => return Err(e),
317    }
318
319    let table_alt_block_heights = env_inner.open_db_ro::<AltBlockHeights>(&tx_ro)?;
320
321    match table_alt_block_heights.get(&block_hash) {
322        Ok(height) => Ok(BlockchainResponse::FindBlock(Some((
323            Chain::Alt(height.chain_id.into()),
324            height.height,
325        )))),
326        Err(RuntimeError::KeyNotFound) => Ok(BlockchainResponse::FindBlock(None)),
327        Err(e) => Err(e),
328    }
329}
330
331/// [`BlockchainReadRequest::FilterUnknownHashes`].
332#[inline]
333fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet<BlockHash>) -> ResponseResult {
334    // Single-threaded, no `ThreadLocal` required.
335    let env_inner = env.env_inner();
336    let tx_ro = env_inner.tx_ro()?;
337
338    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
339
340    let mut err = None;
341
342    hashes.retain(
343        |block_hash| match block_exists(block_hash, &table_block_heights) {
344            Ok(exists) => exists,
345            Err(e) => {
346                err.get_or_insert(e);
347                false
348            }
349        },
350    );
351
352    if let Some(e) = err {
353        Err(e)
354    } else {
355        Ok(BlockchainResponse::FilterUnknownHashes(hashes))
356    }
357}
358
359/// [`BlockchainReadRequest::BlockExtendedHeaderInRange`].
360#[inline]
361fn block_extended_header_in_range(
362    env: &ConcreteEnv,
363    range: Range<BlockHeight>,
364    chain: Chain,
365) -> ResponseResult {
366    // Prepare tx/tables in `ThreadLocal`.
367    let env_inner = env.env_inner();
368    let tx_ro = thread_local(env);
369    let tables = thread_local(env);
370
371    // Collect results using `rayon`.
372    let vec = match chain {
373        Chain::Main => range
374            .into_par_iter()
375            .map(|block_height| {
376                let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
377                let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
378                get_block_extended_header_from_height(&block_height, tables)
379            })
380            .collect::<DbResult<Vec<ExtendedBlockHeader>>>()?,
381        Chain::Alt(chain_id) => {
382            let ranges = {
383                let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
384                let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
385                let alt_chains = tables.alt_chain_infos();
386
387                get_alt_chain_history_ranges(range, chain_id, alt_chains)?
388            };
389
390            ranges
391                .par_iter()
392                .rev()
393                .flat_map(|(chain, range)| {
394                    range.clone().into_par_iter().map(|height| {
395                        let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
396                        let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
397
398                        match *chain {
399                            Chain::Main => get_block_extended_header_from_height(&height, tables),
400                            Chain::Alt(chain_id) => get_alt_block_extended_header_from_height(
401                                &AltBlockHeight {
402                                    chain_id: chain_id.into(),
403                                    height,
404                                },
405                                tables,
406                            ),
407                        }
408                    })
409                })
410                .collect::<DbResult<Vec<_>>>()?
411        }
412    };
413
414    Ok(BlockchainResponse::BlockExtendedHeaderInRange(vec))
415}
416
417/// [`BlockchainReadRequest::ChainHeight`].
418#[inline]
419fn chain_height(env: &ConcreteEnv) -> ResponseResult {
420    // Single-threaded, no `ThreadLocal` required.
421    let env_inner = env.env_inner();
422    let tx_ro = env_inner.tx_ro()?;
423    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
424    let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
425
426    let chain_height = crate::ops::blockchain::chain_height(&table_block_heights)?;
427    let block_hash =
428        get_block_info(&chain_height.saturating_sub(1), &table_block_infos)?.block_hash;
429
430    Ok(BlockchainResponse::ChainHeight(chain_height, block_hash))
431}
432
433/// [`BlockchainReadRequest::GeneratedCoins`].
434#[inline]
435fn generated_coins(env: &ConcreteEnv, height: usize) -> ResponseResult {
436    // Single-threaded, no `ThreadLocal` required.
437    let env_inner = env.env_inner();
438    let tx_ro = env_inner.tx_ro()?;
439    let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
440
441    Ok(BlockchainResponse::GeneratedCoins(
442        cumulative_generated_coins(&height, &table_block_infos)?,
443    ))
444}
445
446/// [`BlockchainReadRequest::Outputs`].
447#[inline]
448fn outputs(env: &ConcreteEnv, outputs: IndexMap<Amount, IndexSet<AmountIndex>>) -> ResponseResult {
449    // Prepare tx/tables in `ThreadLocal`.
450    let env_inner = env.env_inner();
451    let tx_ro = thread_local(env);
452    let tables = thread_local(env);
453
454    let amount_of_outs = outputs
455        .par_iter()
456        .map(|(&amount, _)| {
457            let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
458            let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
459
460            if amount == 0 {
461                Ok((amount, tables.rct_outputs().len()?))
462            } else {
463                // v1 transactions.
464                match tables.num_outputs().get(&amount) {
465                    Ok(count) => Ok((amount, count)),
466                    // If we get a request for an `amount` that doesn't exist,
467                    // we return `0` instead of an error.
468                    Err(RuntimeError::KeyNotFound) => Ok((amount, 0)),
469                    Err(e) => Err(e),
470                }
471            }
472        })
473        .collect::<Result<_, _>>()?;
474
475    // The 2nd mapping function.
476    // This is pulled out from the below `map()` for readability.
477    let inner_map = |amount, amount_index| {
478        let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
479        let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
480
481        let id = PreRctOutputId {
482            amount,
483            amount_index,
484        };
485
486        let output_on_chain = match id_to_output_on_chain(&id, tables) {
487            Ok(output) => output,
488            Err(RuntimeError::KeyNotFound) => return Ok(Either::Right(amount_index)),
489            Err(e) => return Err(e),
490        };
491
492        Ok(Either::Left((amount_index, output_on_chain)))
493    };
494
495    // Collect results using `rayon`.
496    let (map, wanted_outputs) = outputs
497        .into_par_iter()
498        .map(|(amount, amount_index_set)| {
499            let (left, right) = amount_index_set
500                .into_par_iter()
501                .map(|amount_index| inner_map(amount, amount_index))
502                .collect::<Result<_, _>>()?;
503
504            Ok(((amount, left), (amount, right)))
505        })
506        .collect::<DbResult<(IndexMap<_, IndexMap<_, _>>, IndexMap<_, IndexSet<_>>)>>()?;
507
508    let cache = OutputCache::new(map, amount_of_outs, wanted_outputs);
509
510    Ok(BlockchainResponse::Outputs(cache))
511}
512
513/// [`BlockchainReadRequest::NumberOutputsWithAmount`].
514#[inline]
515fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec<Amount>) -> ResponseResult {
516    // Prepare tx/tables in `ThreadLocal`.
517    let env_inner = env.env_inner();
518    let tx_ro = thread_local(env);
519    let tables = thread_local(env);
520
521    // Cache the amount of RCT outputs once.
522    #[expect(
523        clippy::cast_possible_truncation,
524        reason = "INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`"
525    )]
526    let num_rct_outputs = {
527        let tx_ro = env_inner.tx_ro()?;
528        let tables = env_inner.open_tables(&tx_ro)?;
529        tables.rct_outputs().len()? as usize
530    };
531
532    // Collect results using `rayon`.
533    let map = amounts
534        .into_par_iter()
535        .map(|amount| {
536            let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
537            let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
538
539            if amount == 0 {
540                // v2 transactions.
541                Ok((amount, num_rct_outputs))
542            } else {
543                // v1 transactions.
544                match tables.num_outputs().get(&amount) {
545                    #[expect(
546                        clippy::cast_possible_truncation,
547                        reason = "INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`"
548                    )]
549                    Ok(count) => Ok((amount, count as usize)),
550                    // If we get a request for an `amount` that doesn't exist,
551                    // we return `0` instead of an error.
552                    Err(RuntimeError::KeyNotFound) => Ok((amount, 0)),
553                    Err(e) => Err(e),
554                }
555            }
556        })
557        .collect::<DbResult<HashMap<Amount, usize>>>()?;
558
559    Ok(BlockchainResponse::NumberOutputsWithAmount(map))
560}
561
562/// [`BlockchainReadRequest::KeyImagesSpent`].
563#[inline]
564fn key_images_spent(env: &ConcreteEnv, key_images: HashSet<KeyImage>) -> ResponseResult {
565    // Prepare tx/tables in `ThreadLocal`.
566    let env_inner = env.env_inner();
567    let tx_ro = thread_local(env);
568    let tables = thread_local(env);
569
570    // Key image check function.
571    let key_image_exists = |key_image| {
572        let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
573        let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
574        key_image_exists(&key_image, tables.key_images())
575    };
576
577    // FIXME:
578    // Create/use `enum cuprate_types::Exist { Does, DoesNot }`
579    // or similar instead of `bool` for clarity.
580    // <https://github.com/Cuprate/cuprate/pull/113#discussion_r1581536526>
581    //
582    // Collect results using `rayon`.
583    match key_images
584        .into_par_iter()
585        .map(key_image_exists)
586        // If the result is either:
587        // `Ok(true)` => a key image was found, return early
588        // `Err` => an error was found, return early
589        //
590        // Else, `Ok(false)` will continue the iterator.
591        .find_any(|result| !matches!(result, Ok(false)))
592    {
593        None | Some(Ok(false)) => Ok(BlockchainResponse::KeyImagesSpent(false)), // Key image was NOT found.
594        Some(Ok(true)) => Ok(BlockchainResponse::KeyImagesSpent(true)), // Key image was found.
595        Some(Err(e)) => Err(e), // A database error occurred.
596    }
597}
598
599/// [`BlockchainReadRequest::CompactChainHistory`]
600fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult {
601    let env_inner = env.env_inner();
602    let tx_ro = env_inner.tx_ro()?;
603
604    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
605    let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
606
607    let top_block_height = top_block_height(&table_block_heights)?;
608
609    let top_block_info = get_block_info(&top_block_height, &table_block_infos)?;
610    let cumulative_difficulty = combine_low_high_bits_to_u128(
611        top_block_info.cumulative_difficulty_low,
612        top_block_info.cumulative_difficulty_high,
613    );
614
615    /// The amount of top block IDs in the compact chain.
616    const INITIAL_BLOCKS: usize = 11;
617
618    // rayon is not used here because the amount of block IDs is expected to be small.
619    let mut block_ids = (0..)
620        .map(compact_history_index_to_height_offset::<INITIAL_BLOCKS>)
621        .map_while(|i| top_block_height.checked_sub(i))
622        .map(|height| Ok(get_block_info(&height, &table_block_infos)?.block_hash))
623        .collect::<DbResult<Vec<_>>>()?;
624
625    if compact_history_genesis_not_included::<INITIAL_BLOCKS>(top_block_height) {
626        block_ids.push(get_block_info(&0, &table_block_infos)?.block_hash);
627    }
628
629    Ok(BlockchainResponse::CompactChainHistory {
630        cumulative_difficulty,
631        block_ids,
632    })
633}
634
635/// [`BlockchainReadRequest::NextChainEntry`]
636///
637/// # Invariant
638/// `block_ids` must be sorted in reverse chronological block order, or else
639/// the returned result is unspecified and meaningless, as this function
640/// performs a binary search.
641fn next_chain_entry(
642    env: &ConcreteEnv,
643    block_ids: &[BlockHash],
644    next_entry_size: usize,
645) -> ResponseResult {
646    // Single-threaded, no `ThreadLocal` required.
647    let env_inner = env.env_inner();
648    let tx_ro = env_inner.tx_ro()?;
649
650    let tables = env_inner.open_tables(&tx_ro)?;
651    let table_block_heights = tables.block_heights();
652    let table_alt_block_heights = tables.alt_block_heights();
653    let table_block_infos = tables.block_infos_iter();
654
655    let idx = find_split_point(
656        block_ids,
657        false,
658        false,
659        table_block_heights,
660        table_alt_block_heights,
661    )?;
662
663    // This will happen if we have a different genesis block.
664    if idx == block_ids.len() {
665        return Ok(BlockchainResponse::NextChainEntry {
666            start_height: None,
667            chain_height: 0,
668            block_ids: vec![],
669            block_weights: vec![],
670            cumulative_difficulty: 0,
671            first_block_blob: None,
672        });
673    }
674
675    // The returned chain entry must overlap with one of the blocks  we were told about.
676    let first_known_block_hash = block_ids[idx];
677    let first_known_height = table_block_heights.get(&first_known_block_hash)?;
678
679    let chain_height = crate::ops::blockchain::chain_height(table_block_heights)?;
680    let last_height_in_chain_entry = min(first_known_height + next_entry_size, chain_height);
681
682    let (block_ids, block_weights) = (first_known_height..last_height_in_chain_entry)
683        .map(|height| {
684            let block_info = table_block_infos.get(&height)?;
685
686            Ok((block_info.block_hash, block_info.weight))
687        })
688        .collect::<DbResult<(Vec<_>, Vec<_>)>>()?;
689
690    let top_block_info = table_block_infos.get(&(chain_height - 1))?;
691
692    let first_block_blob = if block_ids.len() >= 2 {
693        Some(get_block_blob_with_tx_indexes(&(first_known_height + 1), &tables)?.0)
694    } else {
695        None
696    };
697
698    Ok(BlockchainResponse::NextChainEntry {
699        start_height: Some(first_known_height),
700        chain_height,
701        block_ids,
702        block_weights,
703        cumulative_difficulty: combine_low_high_bits_to_u128(
704            top_block_info.cumulative_difficulty_low,
705            top_block_info.cumulative_difficulty_high,
706        ),
707        first_block_blob,
708    })
709}
710
711/// [`BlockchainReadRequest::FindFirstUnknown`]
712///
713/// # Invariant
714/// `block_ids` must be sorted in chronological block order, or else
715/// the returned result is unspecified and meaningless, as this function
716/// performs a binary search.
717fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseResult {
718    let env_inner = env.env_inner();
719    let tx_ro = env_inner.tx_ro()?;
720
721    let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
722    let table_alt_block_heights = env_inner.open_db_ro::<AltBlockHeights>(&tx_ro)?;
723
724    let idx = find_split_point(
725        block_ids,
726        true,
727        true,
728        &table_block_heights,
729        &table_alt_block_heights,
730    )?;
731
732    Ok(if idx == block_ids.len() {
733        BlockchainResponse::FindFirstUnknown(None)
734    } else if idx == 0 {
735        BlockchainResponse::FindFirstUnknown(Some((0, 0)))
736    } else {
737        let last_known_height = get_block_height(&block_ids[idx - 1], &table_block_heights)?;
738
739        BlockchainResponse::FindFirstUnknown(Some((idx, last_known_height + 1)))
740    })
741}
742
743/// [`BlockchainReadRequest::TxsInBlock`]
744fn txs_in_block(env: &ConcreteEnv, block_hash: [u8; 32], missing_txs: Vec<u64>) -> ResponseResult {
745    // Single-threaded, no `ThreadLocal` required.
746    let env_inner = env.env_inner();
747    let tx_ro = env_inner.tx_ro()?;
748    let tables = env_inner.open_tables(&tx_ro)?;
749
750    let block_height = tables.block_heights().get(&block_hash)?;
751
752    let (block, miner_tx_index, numb_txs) = get_block_blob_with_tx_indexes(&block_height, &tables)?;
753    let first_tx_index = miner_tx_index + 1;
754
755    if numb_txs < missing_txs.len() {
756        return Ok(BlockchainResponse::TxsInBlock(None));
757    }
758
759    let txs = missing_txs
760        .into_iter()
761        .map(|index_offset| Ok(tables.tx_blobs().get(&(first_tx_index + index_offset))?.0))
762        .collect::<DbResult<_>>()?;
763
764    Ok(BlockchainResponse::TxsInBlock(Some(TxsInBlock {
765        block,
766        txs,
767    })))
768}
769
770/// [`BlockchainReadRequest::AltBlocksInChain`]
771fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult {
772    // Prepare tx/tables in `ThreadLocal`.
773    let env_inner = env.env_inner();
774    let tx_ro = thread_local(env);
775    let tables = thread_local(env);
776
777    // Get the history of this alt-chain.
778    let history = {
779        let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
780        let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
781        get_alt_chain_history_ranges(0..usize::MAX, chain_id, tables.alt_chain_infos())?
782    };
783
784    // Get all the blocks until we join the main-chain.
785    let blocks = history
786        .par_iter()
787        .rev()
788        .skip(1)
789        .flat_map(|(chain_id, range)| {
790            let Chain::Alt(chain_id) = chain_id else {
791                panic!("Should not have main chain blocks here we skipped last range");
792            };
793
794            range.clone().into_par_iter().map(|height| {
795                let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
796                let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
797
798                get_alt_block(
799                    &AltBlockHeight {
800                        chain_id: (*chain_id).into(),
801                        height,
802                    },
803                    tables,
804                )
805            })
806        })
807        .collect::<DbResult<_>>()?;
808
809    Ok(BlockchainResponse::AltBlocksInChain(blocks))
810}
811
812/// [`BlockchainReadRequest::Block`]
813fn block(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
814    Ok(BlockchainResponse::Block(todo!()))
815}
816
817/// [`BlockchainReadRequest::BlockByHash`]
818fn block_by_hash(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
819    Ok(BlockchainResponse::Block(todo!()))
820}
821
822/// [`BlockchainReadRequest::TotalTxCount`]
823fn total_tx_count(env: &ConcreteEnv) -> ResponseResult {
824    Ok(BlockchainResponse::TotalTxCount(todo!()))
825}
826
827/// [`BlockchainReadRequest::DatabaseSize`]
828fn database_size(env: &ConcreteEnv) -> ResponseResult {
829    Ok(BlockchainResponse::DatabaseSize {
830        database_size: todo!(),
831        free_space: todo!(),
832    })
833}
834
835/// [`BlockchainReadRequest::OutputHistogram`]
836fn output_histogram(env: &ConcreteEnv, input: OutputHistogramInput) -> ResponseResult {
837    Ok(BlockchainResponse::OutputHistogram(todo!()))
838}
839
840/// [`BlockchainReadRequest::CoinbaseTxSum`]
841fn coinbase_tx_sum(env: &ConcreteEnv, height: usize, count: u64) -> ResponseResult {
842    Ok(BlockchainResponse::CoinbaseTxSum(todo!()))
843}
844
845/// [`BlockchainReadRequest::AltChains`]
846fn alt_chains(env: &ConcreteEnv) -> ResponseResult {
847    Ok(BlockchainResponse::AltChains(todo!()))
848}
849
850/// [`BlockchainReadRequest::AltChainCount`]
851fn alt_chain_count(env: &ConcreteEnv) -> ResponseResult {
852    Ok(BlockchainResponse::AltChainCount(todo!()))
853}