cuprate_database_service/reader_threads.rs
1//! Reader thread-pool configuration and initiation.
2//!
3//! This module contains [`ReaderThreads`] which allow specifying the amount of
4//! reader threads for the [`rayon::ThreadPool`].
5//!
6//! It also contains [`init_thread_pool`] which initiates the thread-pool.
7
8//---------------------------------------------------------------------------------------------------- Import
9use std::{num::NonZeroUsize, sync::Arc};
10
11use rayon::ThreadPool;
12#[cfg(feature = "serde")]
13use serde::{Deserialize, Serialize};
14
15//---------------------------------------------------------------------------------------------------- init_thread_pool
16/// Initialize the reader thread-pool backed by `rayon`.
17pub fn init_thread_pool(reader_threads: ReaderThreads) -> Arc<ThreadPool> {
18 // How many reader threads to spawn?
19 let reader_count = reader_threads.as_threads().get();
20
21 Arc::new(
22 rayon::ThreadPoolBuilder::new()
23 .num_threads(reader_count)
24 .thread_name(|i| format!("{}::DatabaseReader({i})", module_path!()))
25 .build()
26 .unwrap(),
27 )
28}
29
30//---------------------------------------------------------------------------------------------------- ReaderThreads
31/// Amount of database reader threads to spawn.
32///
33/// This controls how many reader threads the [`DatabaseReadService`](crate::DatabaseReadService)
34/// thread-pool will spawn to receive and send requests/responses.
35///
36/// # Invariant
37/// The main function used to extract an actual
38/// usable thread count out of this is [`ReaderThreads::as_threads`].
39///
40/// This will always return at least 1, up until the amount of threads on the machine.
41#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd)]
42#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
43pub enum ReaderThreads {
44 #[default]
45 /// Spawn 1 reader thread per available thread on the machine.
46 ///
47 /// For example, a `32-thread` system will spawn
48 /// `32` reader threads using this setting.
49 OnePerThread,
50
51 /// Only spawn 1 reader thread.
52 One,
53
54 /// Spawn a specified amount of reader threads.
55 ///
56 /// Note that no matter how large this value, it will be
57 /// ultimately capped at the amount of system threads.
58 ///
59 /// # `0`
60 /// `ReaderThreads::Number(0)` represents "use maximum value",
61 /// as such, it is equal to [`ReaderThreads::OnePerThread`].
62 ///
63 /// ```rust
64 /// # use cuprate_database_service::*;
65 /// let reader_threads = ReaderThreads::from(0_usize);
66 /// assert!(matches!(reader_threads, ReaderThreads::OnePerThread));
67 /// ```
68 Number(usize),
69
70 /// Spawn a specified % of reader threads.
71 ///
72 /// This must be a value in-between `0.0..1.0`
73 /// where `1.0` represents [`ReaderThreads::OnePerThread`].
74 ///
75 /// # Example
76 /// For example, using a `16-core, 32-thread` Ryzen 5950x CPU:
77 ///
78 /// | Input | Total thread used |
79 /// |------------------------------------|-------------------|
80 /// | `ReaderThreads::Percent(0.0)` | 32 (maximum value)
81 /// | `ReaderThreads::Percent(0.5)` | 16
82 /// | `ReaderThreads::Percent(0.75)` | 24
83 /// | `ReaderThreads::Percent(1.0)` | 32
84 /// | `ReaderThreads::Percent(2.0)` | 32 (saturating)
85 /// | `ReaderThreads::Percent(f32::NAN)` | 32 (non-normal default)
86 ///
87 /// # `0.0`
88 /// `ReaderThreads::Percent(0.0)` represents "use maximum value",
89 /// as such, it is equal to [`ReaderThreads::OnePerThread`].
90 ///
91 /// # Not quite `0.0`
92 /// If the thread count multiplied by the percentage ends up being
93 /// non-zero, but not 1 thread, the minimum value 1 will be returned.
94 ///
95 /// ```rust
96 /// # use cuprate_database_service::ReaderThreads;
97 /// assert_eq!(ReaderThreads::Percent(0.000000001).as_threads().get(), 1);
98 /// ```
99 Percent(f32),
100}
101
102impl ReaderThreads {
103 /// This converts [`ReaderThreads`] into a safe, usable
104 /// number representing how many threads to spawn.
105 ///
106 /// This function will always return a number in-between `1..=total_thread_count`.
107 ///
108 /// It uses [`cuprate_helper::thread::threads()`] internally to determine the total thread count.
109 ///
110 /// # Example
111 /// ```rust
112 /// use cuprate_database_service::ReaderThreads as R;
113 ///
114 /// let total_threads: std::num::NonZeroUsize =
115 /// cuprate_helper::thread::threads();
116 ///
117 /// assert_eq!(R::OnePerThread.as_threads(), total_threads);
118 ///
119 /// assert_eq!(R::One.as_threads().get(), 1);
120 ///
121 /// assert_eq!(R::Number(0).as_threads(), total_threads);
122 /// assert_eq!(R::Number(1).as_threads().get(), 1);
123 /// assert_eq!(R::Number(usize::MAX).as_threads(), total_threads);
124 ///
125 /// assert_eq!(R::Percent(0.01).as_threads().get(), 1);
126 /// assert_eq!(R::Percent(0.0).as_threads(), total_threads);
127 /// assert_eq!(R::Percent(1.0).as_threads(), total_threads);
128 /// assert_eq!(R::Percent(f32::NAN).as_threads(), total_threads);
129 /// assert_eq!(R::Percent(f32::INFINITY).as_threads(), total_threads);
130 /// assert_eq!(R::Percent(f32::NEG_INFINITY).as_threads(), total_threads);
131 ///
132 /// // Percentage only works on more than 1 thread.
133 /// if total_threads.get() > 1 {
134 /// assert_eq!(
135 /// R::Percent(0.5).as_threads().get(),
136 /// (total_threads.get() as f32 / 2.0) as usize,
137 /// );
138 /// }
139 /// ```
140 //
141 // INVARIANT:
142 // LMDB will error if we input zero, so don't allow that.
143 // <https://github.com/LMDB/lmdb/blob/b8e54b4c31378932b69f1298972de54a565185b1/libraries/liblmdb/mdb.c#L4687>
144 pub fn as_threads(&self) -> NonZeroUsize {
145 let total_threads = cuprate_helper::thread::threads();
146
147 match self {
148 Self::OnePerThread => total_threads, // use all threads
149 Self::One => NonZeroUsize::MIN, // one
150 Self::Number(n) => match NonZeroUsize::new(*n) {
151 Some(n) => std::cmp::min(n, total_threads), // saturate at total threads
152 None => total_threads, // 0 == maximum value
153 },
154
155 // We handle the casting loss.
156 #[expect(
157 clippy::cast_precision_loss,
158 clippy::cast_possible_truncation,
159 clippy::cast_sign_loss
160 )]
161 Self::Percent(f) => {
162 // If non-normal float, use the default (all threads).
163 if !f.is_normal() || !(0.0..=1.0).contains(f) {
164 return total_threads;
165 }
166
167 // 0.0 == maximum value.
168 if *f == 0.0 {
169 return total_threads;
170 }
171
172 // Calculate percentage of total threads.
173 let thread_percent = (total_threads.get() as f32) * f;
174 match NonZeroUsize::new(thread_percent as usize) {
175 Some(n) => std::cmp::min(n, total_threads), // saturate at total threads.
176 None => {
177 // We checked for `0.0` above, so what this
178 // being 0 means that the percentage was _so_
179 // low it made our thread count something like
180 // 0.99. In this case, just use 1 thread.
181 NonZeroUsize::MIN
182 }
183 }
184 }
185 }
186 }
187}
188
189impl<T: Into<usize>> From<T> for ReaderThreads {
190 /// Create a [`ReaderThreads::Number`].
191 ///
192 /// If `value` is `0`, this will return [`ReaderThreads::OnePerThread`].
193 fn from(value: T) -> Self {
194 let u: usize = value.into();
195 if u == 0 {
196 Self::OnePerThread
197 } else {
198 Self::Number(u)
199 }
200 }
201}