cuprate_database_service/
reader_threads.rs

1//! Reader thread-pool configuration and initiation.
2//!
3//! This module contains [`ReaderThreads`] which allow specifying the amount of
4//! reader threads for the [`rayon::ThreadPool`].
5//!
6//! It also contains [`init_thread_pool`] which initiates the thread-pool.
7
8//---------------------------------------------------------------------------------------------------- Import
9use std::{num::NonZeroUsize, sync::Arc};
10
11use rayon::ThreadPool;
12#[cfg(feature = "serde")]
13use serde::{Deserialize, Serialize};
14
15//---------------------------------------------------------------------------------------------------- init_thread_pool
16/// Initialize the reader thread-pool backed by `rayon`.
17pub fn init_thread_pool(reader_threads: ReaderThreads) -> Arc<ThreadPool> {
18    // How many reader threads to spawn?
19    let reader_count = reader_threads.as_threads().get();
20
21    Arc::new(
22        rayon::ThreadPoolBuilder::new()
23            .num_threads(reader_count)
24            .thread_name(|i| format!("{}::DatabaseReader({i})", module_path!()))
25            .build()
26            .unwrap(),
27    )
28}
29
30//---------------------------------------------------------------------------------------------------- ReaderThreads
31/// Amount of database reader threads to spawn.
32///
33/// This controls how many reader threads the [`DatabaseReadService`](crate::DatabaseReadService)
34/// thread-pool will spawn to receive and send requests/responses.
35///
36/// # Invariant
37/// The main function used to extract an actual
38/// usable thread count out of this is [`ReaderThreads::as_threads`].
39///
40/// This will always return at least 1, up until the amount of threads on the machine.
41#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd)]
42#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
43pub enum ReaderThreads {
44    #[default]
45    /// Spawn 1 reader thread per available thread on the machine.
46    ///
47    /// For example, a `32-thread` system will spawn
48    /// `32` reader threads using this setting.
49    OnePerThread,
50
51    /// Only spawn 1 reader thread.
52    One,
53
54    /// Spawn a specified amount of reader threads.
55    ///
56    /// Note that no matter how large this value, it will be
57    /// ultimately capped at the amount of system threads.
58    ///
59    /// # `0`
60    /// `ReaderThreads::Number(0)` represents "use maximum value",
61    /// as such, it is equal to [`ReaderThreads::OnePerThread`].
62    ///
63    /// ```rust
64    /// # use cuprate_database_service::*;
65    /// let reader_threads = ReaderThreads::from(0_usize);
66    /// assert!(matches!(reader_threads, ReaderThreads::OnePerThread));
67    /// ```
68    Number(usize),
69
70    /// Spawn a specified % of reader threads.
71    ///
72    /// This must be a value in-between `0.0..1.0`
73    /// where `1.0` represents [`ReaderThreads::OnePerThread`].
74    ///
75    /// # Example
76    /// For example, using a `16-core, 32-thread` Ryzen 5950x CPU:
77    ///
78    /// | Input                              | Total thread used |
79    /// |------------------------------------|-------------------|
80    /// | `ReaderThreads::Percent(0.0)`      | 32 (maximum value)
81    /// | `ReaderThreads::Percent(0.5)`      | 16
82    /// | `ReaderThreads::Percent(0.75)`     | 24
83    /// | `ReaderThreads::Percent(1.0)`      | 32
84    /// | `ReaderThreads::Percent(2.0)`      | 32 (saturating)
85    /// | `ReaderThreads::Percent(f32::NAN)` | 32 (non-normal default)
86    ///
87    /// # `0.0`
88    /// `ReaderThreads::Percent(0.0)` represents "use maximum value",
89    /// as such, it is equal to [`ReaderThreads::OnePerThread`].
90    ///
91    /// # Not quite `0.0`
92    /// If the thread count multiplied by the percentage ends up being
93    /// non-zero, but not 1 thread, the minimum value 1 will be returned.
94    ///
95    /// ```rust
96    /// # use cuprate_database_service::ReaderThreads;
97    /// assert_eq!(ReaderThreads::Percent(0.000000001).as_threads().get(), 1);
98    /// ```
99    Percent(f32),
100}
101
102impl ReaderThreads {
103    /// This converts [`ReaderThreads`] into a safe, usable
104    /// number representing how many threads to spawn.
105    ///
106    /// This function will always return a number in-between `1..=total_thread_count`.
107    ///
108    /// It uses [`cuprate_helper::thread::threads()`] internally to determine the total thread count.
109    ///
110    /// # Example
111    /// ```rust
112    /// use cuprate_database_service::ReaderThreads as R;
113    ///
114    /// let total_threads: std::num::NonZeroUsize =
115    ///     cuprate_helper::thread::threads();
116    ///
117    /// assert_eq!(R::OnePerThread.as_threads(), total_threads);
118    ///
119    /// assert_eq!(R::One.as_threads().get(), 1);
120    ///
121    /// assert_eq!(R::Number(0).as_threads(), total_threads);
122    /// assert_eq!(R::Number(1).as_threads().get(), 1);
123    /// assert_eq!(R::Number(usize::MAX).as_threads(), total_threads);
124    ///
125    /// assert_eq!(R::Percent(0.01).as_threads().get(), 1);
126    /// assert_eq!(R::Percent(0.0).as_threads(), total_threads);
127    /// assert_eq!(R::Percent(1.0).as_threads(), total_threads);
128    /// assert_eq!(R::Percent(f32::NAN).as_threads(), total_threads);
129    /// assert_eq!(R::Percent(f32::INFINITY).as_threads(), total_threads);
130    /// assert_eq!(R::Percent(f32::NEG_INFINITY).as_threads(), total_threads);
131    ///
132    /// // Percentage only works on more than 1 thread.
133    /// if total_threads.get() > 1 {
134    ///     assert_eq!(
135    ///         R::Percent(0.5).as_threads().get(),
136    ///         (total_threads.get() as f32 / 2.0) as usize,
137    ///     );
138    /// }
139    /// ```
140    //
141    // INVARIANT:
142    // LMDB will error if we input zero, so don't allow that.
143    // <https://github.com/LMDB/lmdb/blob/b8e54b4c31378932b69f1298972de54a565185b1/libraries/liblmdb/mdb.c#L4687>
144    pub fn as_threads(&self) -> NonZeroUsize {
145        let total_threads = cuprate_helper::thread::threads();
146
147        match self {
148            Self::OnePerThread => total_threads, // use all threads
149            Self::One => NonZeroUsize::MIN,      // one
150            Self::Number(n) => match NonZeroUsize::new(*n) {
151                Some(n) => std::cmp::min(n, total_threads), // saturate at total threads
152                None => total_threads,                      // 0 == maximum value
153            },
154
155            // We handle the casting loss.
156            #[expect(
157                clippy::cast_precision_loss,
158                clippy::cast_possible_truncation,
159                clippy::cast_sign_loss
160            )]
161            Self::Percent(f) => {
162                // If non-normal float, use the default (all threads).
163                if !f.is_normal() || !(0.0..=1.0).contains(f) {
164                    return total_threads;
165                }
166
167                // 0.0 == maximum value.
168                if *f == 0.0 {
169                    return total_threads;
170                }
171
172                // Calculate percentage of total threads.
173                let thread_percent = (total_threads.get() as f32) * f;
174                match NonZeroUsize::new(thread_percent as usize) {
175                    Some(n) => std::cmp::min(n, total_threads), // saturate at total threads.
176                    None => {
177                        // We checked for `0.0` above, so what this
178                        // being 0 means that the percentage was _so_
179                        // low it made our thread count something like
180                        // 0.99. In this case, just use 1 thread.
181                        NonZeroUsize::MIN
182                    }
183                }
184            }
185        }
186    }
187}
188
189impl<T: Into<usize>> From<T> for ReaderThreads {
190    /// Create a [`ReaderThreads::Number`].
191    ///
192    /// If `value` is `0`, this will return [`ReaderThreads::OnePerThread`].
193    fn from(value: T) -> Self {
194        let u: usize = value.into();
195        if u == 0 {
196            Self::OnePerThread
197        } else {
198            Self::Number(u)
199        }
200    }
201}