redb/tree_store/page_store/
base.rs

1use crate::tree_store::page_store::cached_file::WritablePage;
2use crate::tree_store::page_store::page_manager::MAX_MAX_PAGE_ORDER;
3use std::cmp::Ordering;
4#[cfg(debug_assertions)]
5use std::collections::HashMap;
6use std::collections::HashSet;
7use std::fmt::{Debug, Formatter};
8use std::mem;
9use std::ops::Range;
10use std::sync::Arc;
11#[cfg(debug_assertions)]
12use std::sync::Mutex;
13
14pub(crate) const MAX_VALUE_LENGTH: usize = 3 * 1024 * 1024 * 1024;
15pub(crate) const MAX_PAIR_LENGTH: usize = 3 * 1024 * 1024 * 1024 + 768 * 1024 * 1024;
16pub(crate) const MAX_PAGE_INDEX: u32 = 0x000F_FFFF;
17
18// On-disk format is:
19// TODO: consider implementing an optimization in which we store the number of order-0 pages that
20// are actually used, in these reserved bits, so that the reads to the PagedCachedFile layer can avoid
21// reading all the zeros at the end of the page.
22// lowest 20bits: page index within the region. Only the lowest `20 - order_exponent` bits may be read.
23// The remaining bits are reserved for future use and must be ignored
24// second 20bits: region number
25// 19bits: reserved
26// highest 5bits: page order exponent
27//
28// Assuming a reasonable page size, like 4kiB, this allows for 4kiB * 2^20 * 2^20 = 4PiB of usable space
29#[derive(Copy, Clone, Eq, PartialEq, Hash)]
30pub(crate) struct PageNumber {
31    pub(crate) region: u32,
32    pub(crate) page_index: u32,
33    pub(crate) page_order: u8,
34}
35
36// PageNumbers are ordered as determined by their starting address in the database file
37impl Ord for PageNumber {
38    fn cmp(&self, other: &Self) -> Ordering {
39        match self.region.cmp(&other.region) {
40            Ordering::Less => Ordering::Less,
41            Ordering::Equal => {
42                let self_order0 = self.page_index * 2u32.pow(self.page_order.into());
43                let other_order0 = other.page_index * 2u32.pow(other.page_order.into());
44                assert!(
45                    self_order0 != other_order0 || self.page_order == other.page_order,
46                    "{self:?} overlaps {other:?}, but is not equal"
47                );
48                self_order0.cmp(&other_order0)
49            }
50            Ordering::Greater => Ordering::Greater,
51        }
52    }
53}
54
55impl PartialOrd for PageNumber {
56    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
57        Some(self.cmp(other))
58    }
59}
60
61impl PageNumber {
62    #[inline(always)]
63    pub(crate) const fn serialized_size() -> usize {
64        8
65    }
66
67    pub(crate) fn new(region: u32, page_index: u32, page_order: u8) -> Self {
68        debug_assert!(region <= 0x000F_FFFF);
69        debug_assert!(page_index <= MAX_PAGE_INDEX);
70        debug_assert!(page_order <= MAX_MAX_PAGE_ORDER);
71        Self {
72            region,
73            page_index,
74            page_order,
75        }
76    }
77
78    pub(crate) fn to_le_bytes(self) -> [u8; 8] {
79        let mut temp = 0x000F_FFFF & u64::from(self.page_index);
80        temp |= (0x000F_FFFF & u64::from(self.region)) << 20;
81        temp |= (0b0001_1111 & u64::from(self.page_order)) << 59;
82        temp.to_le_bytes()
83    }
84
85    pub(crate) fn from_le_bytes(bytes: [u8; 8]) -> Self {
86        let temp = u64::from_le_bytes(bytes);
87        let order = (temp >> 59) as u8;
88        let index = u32::try_from(temp & (0x000F_FFFF >> order)).unwrap();
89        let region = ((temp >> 20) & 0x000F_FFFF) as u32;
90
91        Self {
92            region,
93            page_index: index,
94            page_order: order,
95        }
96    }
97
98    // Returns true if this PageNumber is before the other PageNumber in the file layout
99    pub(crate) fn is_before(&self, other: PageNumber) -> bool {
100        if self.region < other.region {
101            return true;
102        }
103        if self.region > other.region {
104            return false;
105        }
106        let self_order0 = self.page_index * 2u32.pow(self.page_order.into());
107        let other_order0 = other.page_index * 2u32.pow(other.page_order.into());
108        assert_ne!(self_order0, other_order0, "{self:?} overlaps {other:?}");
109        self_order0 < other_order0
110    }
111
112    #[cfg(test)]
113    pub(crate) fn to_order0(self) -> Vec<PageNumber> {
114        let mut pages = vec![self];
115        loop {
116            let mut progress = false;
117            let mut new_pages = vec![];
118            for page in pages {
119                if page.page_order == 0 {
120                    new_pages.push(page);
121                } else {
122                    progress = true;
123                    new_pages.push(PageNumber::new(
124                        page.region,
125                        page.page_index * 2,
126                        page.page_order - 1,
127                    ));
128                    new_pages.push(PageNumber::new(
129                        page.region,
130                        page.page_index * 2 + 1,
131                        page.page_order - 1,
132                    ));
133                }
134            }
135            pages = new_pages;
136            if !progress {
137                break;
138            }
139        }
140
141        pages
142    }
143
144    pub(crate) fn address_range(
145        &self,
146        data_section_offset: u64,
147        region_size: u64,
148        region_pages_start: u64,
149        page_size: u32,
150    ) -> Range<u64> {
151        let regional_start =
152            region_pages_start + u64::from(self.page_index) * self.page_size_bytes(page_size);
153        debug_assert!(regional_start < region_size);
154        let region_base = u64::from(self.region) * region_size;
155        let start = data_section_offset + region_base + regional_start;
156        let end = start + self.page_size_bytes(page_size);
157        start..end
158    }
159
160    pub(crate) fn page_size_bytes(&self, page_size: u32) -> u64 {
161        let pages = 1u64 << self.page_order;
162        pages * u64::from(page_size)
163    }
164}
165
166impl Debug for PageNumber {
167    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168        write!(
169            f,
170            "r{}.{}/{}",
171            self.region, self.page_index, self.page_order
172        )
173    }
174}
175
176pub(crate) trait Page {
177    fn memory(&self) -> &[u8];
178
179    fn get_page_number(&self) -> PageNumber;
180}
181
182pub struct PageImpl {
183    pub(super) mem: Arc<[u8]>,
184    pub(super) page_number: PageNumber,
185    #[cfg(debug_assertions)]
186    pub(super) open_pages: Arc<Mutex<HashMap<PageNumber, u64>>>,
187}
188
189impl PageImpl {
190    pub(crate) fn to_arc(&self) -> Arc<[u8]> {
191        self.mem.clone()
192    }
193}
194
195impl Debug for PageImpl {
196    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
197        f.write_fmt(format_args!("PageImpl: page_number={:?}", self.page_number))
198    }
199}
200
201#[cfg(debug_assertions)]
202impl Drop for PageImpl {
203    fn drop(&mut self) {
204        let mut open_pages = self.open_pages.lock().unwrap();
205        let value = open_pages.get_mut(&self.page_number).unwrap();
206        assert!(*value > 0);
207        *value -= 1;
208        if *value == 0 {
209            open_pages.remove(&self.page_number);
210        }
211    }
212}
213
214impl Page for PageImpl {
215    fn memory(&self) -> &[u8] {
216        self.mem.as_ref()
217    }
218
219    fn get_page_number(&self) -> PageNumber {
220        self.page_number
221    }
222}
223
224impl Clone for PageImpl {
225    fn clone(&self) -> Self {
226        #[cfg(debug_assertions)]
227        {
228            *self
229                .open_pages
230                .lock()
231                .unwrap()
232                .get_mut(&self.page_number)
233                .unwrap() += 1;
234        }
235        Self {
236            mem: self.mem.clone(),
237            page_number: self.page_number,
238            #[cfg(debug_assertions)]
239            open_pages: self.open_pages.clone(),
240        }
241    }
242}
243
244pub(crate) struct PageMut {
245    pub(super) mem: WritablePage,
246    pub(super) page_number: PageNumber,
247    #[cfg(debug_assertions)]
248    pub(super) open_pages: Arc<Mutex<HashSet<PageNumber>>>,
249}
250
251impl PageMut {
252    pub(crate) fn memory_mut(&mut self) -> &mut [u8] {
253        self.mem.mem_mut()
254    }
255}
256
257impl Page for PageMut {
258    fn memory(&self) -> &[u8] {
259        self.mem.mem()
260    }
261
262    fn get_page_number(&self) -> PageNumber {
263        self.page_number
264    }
265}
266
267#[cfg(debug_assertions)]
268impl Drop for PageMut {
269    fn drop(&mut self) {
270        assert!(self.open_pages.lock().unwrap().remove(&self.page_number));
271    }
272}
273
274#[derive(Copy, Clone)]
275pub(crate) enum PageHint {
276    None,
277    Clean,
278}
279
280pub(crate) enum PageTrackerPolicy {
281    Ignore,
282    Track(HashSet<PageNumber>),
283    Closed,
284}
285
286impl PageTrackerPolicy {
287    pub(crate) fn new_tracking() -> Self {
288        PageTrackerPolicy::Track(HashSet::new())
289    }
290
291    pub(crate) fn is_empty(&self) -> bool {
292        match self {
293            PageTrackerPolicy::Ignore | PageTrackerPolicy::Closed => true,
294            PageTrackerPolicy::Track(x) => x.is_empty(),
295        }
296    }
297
298    pub(super) fn remove(&mut self, page: PageNumber) {
299        match self {
300            PageTrackerPolicy::Ignore => {}
301            PageTrackerPolicy::Track(x) => {
302                assert!(x.remove(&page));
303            }
304            PageTrackerPolicy::Closed => {
305                panic!("Page tracker is closed");
306            }
307        }
308    }
309
310    pub(super) fn insert(&mut self, page: PageNumber) {
311        match self {
312            PageTrackerPolicy::Ignore => {}
313            PageTrackerPolicy::Track(x) => {
314                assert!(x.insert(page));
315            }
316            PageTrackerPolicy::Closed => {
317                panic!("Page tracker is closed");
318            }
319        }
320    }
321
322    pub(crate) fn close(&mut self) -> HashSet<PageNumber> {
323        let old = mem::replace(self, PageTrackerPolicy::Closed);
324        match old {
325            PageTrackerPolicy::Ignore => HashSet::new(),
326            PageTrackerPolicy::Track(x) => x,
327            PageTrackerPolicy::Closed => {
328                panic!("Page tracker is closed");
329            }
330        }
331    }
332}
333
334#[cfg(test)]
335mod test {
336    use crate::tree_store::PageNumber;
337
338    #[test]
339    fn last_page() {
340        let region_data_size = 2u64.pow(32);
341        let page_size = 4096;
342        let pages_per_region = region_data_size / page_size;
343        let region_header_size = 2u64.pow(16);
344        let last_page_index = pages_per_region - 1;
345        let page_number = PageNumber::new(1, last_page_index.try_into().unwrap(), 0);
346        page_number.address_range(
347            4096,
348            region_data_size + region_header_size,
349            region_header_size,
350            page_size.try_into().unwrap(),
351        );
352    }
353
354    #[test]
355    fn reserved_bits() {
356        let page_number = PageNumber::new(0, 0, 12);
357        let mut bytes = page_number.to_le_bytes();
358        bytes[1] = 0xFF;
359        let page_number2 = PageNumber::from_le_bytes(bytes);
360        assert_eq!(page_number, page_number2);
361    }
362}