cuprate_database_service/
reader_threads.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
//! Reader thread-pool configuration and initiation.
//!
//! This module contains [`ReaderThreads`] which allow specifying the amount of
//! reader threads for the [`rayon::ThreadPool`].
//!
//! It also contains [`init_thread_pool`] which initiates the thread-pool.

//---------------------------------------------------------------------------------------------------- Import
use std::{num::NonZeroUsize, sync::Arc};

use rayon::ThreadPool;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

//---------------------------------------------------------------------------------------------------- init_thread_pool
/// Initialize the reader thread-pool backed by `rayon`.
pub fn init_thread_pool(reader_threads: ReaderThreads) -> Arc<ThreadPool> {
    // How many reader threads to spawn?
    let reader_count = reader_threads.as_threads().get();

    Arc::new(
        rayon::ThreadPoolBuilder::new()
            .num_threads(reader_count)
            .thread_name(|i| format!("{}::DatabaseReader({i})", module_path!()))
            .build()
            .unwrap(),
    )
}

//---------------------------------------------------------------------------------------------------- ReaderThreads
/// Amount of database reader threads to spawn.
///
/// This controls how many reader threads the [`DatabaseReadService`](crate::DatabaseReadService)
/// thread-pool will spawn to receive and send requests/responses.
///
/// # Invariant
/// The main function used to extract an actual
/// usable thread count out of this is [`ReaderThreads::as_threads`].
///
/// This will always return at least 1, up until the amount of threads on the machine.
#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum ReaderThreads {
    #[default]
    /// Spawn 1 reader thread per available thread on the machine.
    ///
    /// For example, a `32-thread` system will spawn
    /// `32` reader threads using this setting.
    OnePerThread,

    /// Only spawn 1 reader thread.
    One,

    /// Spawn a specified amount of reader threads.
    ///
    /// Note that no matter how large this value, it will be
    /// ultimately capped at the amount of system threads.
    ///
    /// # `0`
    /// `ReaderThreads::Number(0)` represents "use maximum value",
    /// as such, it is equal to [`ReaderThreads::OnePerThread`].
    ///
    /// ```rust
    /// # use cuprate_database_service::*;
    /// let reader_threads = ReaderThreads::from(0_usize);
    /// assert!(matches!(reader_threads, ReaderThreads::OnePerThread));
    /// ```
    Number(usize),

    /// Spawn a specified % of reader threads.
    ///
    /// This must be a value in-between `0.0..1.0`
    /// where `1.0` represents [`ReaderThreads::OnePerThread`].
    ///
    /// # Example
    /// For example, using a `16-core, 32-thread` Ryzen 5950x CPU:
    ///
    /// | Input                              | Total thread used |
    /// |------------------------------------|-------------------|
    /// | `ReaderThreads::Percent(0.0)`      | 32 (maximum value)
    /// | `ReaderThreads::Percent(0.5)`      | 16
    /// | `ReaderThreads::Percent(0.75)`     | 24
    /// | `ReaderThreads::Percent(1.0)`      | 32
    /// | `ReaderThreads::Percent(2.0)`      | 32 (saturating)
    /// | `ReaderThreads::Percent(f32::NAN)` | 32 (non-normal default)
    ///
    /// # `0.0`
    /// `ReaderThreads::Percent(0.0)` represents "use maximum value",
    /// as such, it is equal to [`ReaderThreads::OnePerThread`].
    ///
    /// # Not quite `0.0`
    /// If the thread count multiplied by the percentage ends up being
    /// non-zero, but not 1 thread, the minimum value 1 will be returned.
    ///
    /// ```rust
    /// # use cuprate_database_service::ReaderThreads;
    /// assert_eq!(ReaderThreads::Percent(0.000000001).as_threads().get(), 1);
    /// ```
    Percent(f32),
}

impl ReaderThreads {
    /// This converts [`ReaderThreads`] into a safe, usable
    /// number representing how many threads to spawn.
    ///
    /// This function will always return a number in-between `1..=total_thread_count`.
    ///
    /// It uses [`cuprate_helper::thread::threads()`] internally to determine the total thread count.
    ///
    /// # Example
    /// ```rust
    /// use cuprate_database_service::ReaderThreads as R;
    ///
    /// let total_threads: std::num::NonZeroUsize =
    ///     cuprate_helper::thread::threads();
    ///
    /// assert_eq!(R::OnePerThread.as_threads(), total_threads);
    ///
    /// assert_eq!(R::One.as_threads().get(), 1);
    ///
    /// assert_eq!(R::Number(0).as_threads(), total_threads);
    /// assert_eq!(R::Number(1).as_threads().get(), 1);
    /// assert_eq!(R::Number(usize::MAX).as_threads(), total_threads);
    ///
    /// assert_eq!(R::Percent(0.01).as_threads().get(), 1);
    /// assert_eq!(R::Percent(0.0).as_threads(), total_threads);
    /// assert_eq!(R::Percent(1.0).as_threads(), total_threads);
    /// assert_eq!(R::Percent(f32::NAN).as_threads(), total_threads);
    /// assert_eq!(R::Percent(f32::INFINITY).as_threads(), total_threads);
    /// assert_eq!(R::Percent(f32::NEG_INFINITY).as_threads(), total_threads);
    ///
    /// // Percentage only works on more than 1 thread.
    /// if total_threads.get() > 1 {
    ///     assert_eq!(
    ///         R::Percent(0.5).as_threads().get(),
    ///         (total_threads.get() as f32 / 2.0) as usize,
    ///     );
    /// }
    /// ```
    //
    // INVARIANT:
    // LMDB will error if we input zero, so don't allow that.
    // <https://github.com/LMDB/lmdb/blob/b8e54b4c31378932b69f1298972de54a565185b1/libraries/liblmdb/mdb.c#L4687>
    pub fn as_threads(&self) -> NonZeroUsize {
        let total_threads = cuprate_helper::thread::threads();

        match self {
            Self::OnePerThread => total_threads, // use all threads
            Self::One => NonZeroUsize::MIN,      // one
            Self::Number(n) => match NonZeroUsize::new(*n) {
                Some(n) => std::cmp::min(n, total_threads), // saturate at total threads
                None => total_threads,                      // 0 == maximum value
            },

            // We handle the casting loss.
            #[expect(
                clippy::cast_precision_loss,
                clippy::cast_possible_truncation,
                clippy::cast_sign_loss
            )]
            Self::Percent(f) => {
                // If non-normal float, use the default (all threads).
                if !f.is_normal() || !(0.0..=1.0).contains(f) {
                    return total_threads;
                }

                // 0.0 == maximum value.
                if *f == 0.0 {
                    return total_threads;
                }

                // Calculate percentage of total threads.
                let thread_percent = (total_threads.get() as f32) * f;
                match NonZeroUsize::new(thread_percent as usize) {
                    Some(n) => std::cmp::min(n, total_threads), // saturate at total threads.
                    None => {
                        // We checked for `0.0` above, so what this
                        // being 0 means that the percentage was _so_
                        // low it made our thread count something like
                        // 0.99. In this case, just use 1 thread.
                        NonZeroUsize::MIN
                    }
                }
            }
        }
    }
}

impl<T: Into<usize>> From<T> for ReaderThreads {
    /// Create a [`ReaderThreads::Number`].
    ///
    /// If `value` is `0`, this will return [`ReaderThreads::OnePerThread`].
    fn from(value: T) -> Self {
        let u: usize = value.into();
        if u == 0 {
            Self::OnePerThread
        } else {
            Self::Number(u)
        }
    }
}