redb/tree_store/page_store/
base.rs1use crate::tree_store::page_store::cached_file::WritablePage;
2use crate::tree_store::page_store::page_manager::MAX_MAX_PAGE_ORDER;
3use std::cmp::Ordering;
4#[cfg(debug_assertions)]
5use std::collections::HashMap;
6#[cfg(debug_assertions)]
7use std::collections::HashSet;
8use std::fmt::{Debug, Formatter};
9use std::ops::Range;
10use std::sync::Arc;
11#[cfg(debug_assertions)]
12use std::sync::Mutex;
13
14pub(crate) const MAX_VALUE_LENGTH: usize = 3 * 1024 * 1024 * 1024;
15pub(crate) const MAX_PAIR_LENGTH: usize = 3 * 1024 * 1024 * 1024 + 768 * 1024 * 1024;
16pub(crate) const MAX_PAGE_INDEX: u32 = 0x000F_FFFF;
17
18#[derive(Copy, Clone, Eq, PartialEq, Hash)]
26pub(crate) struct PageNumber {
27 pub(crate) region: u32,
28 pub(crate) page_index: u32,
29 pub(crate) page_order: u8,
30}
31
32impl Ord for PageNumber {
34 fn cmp(&self, other: &Self) -> Ordering {
35 match self.region.cmp(&other.region) {
36 Ordering::Less => Ordering::Less,
37 Ordering::Equal => {
38 let self_order0 = self.page_index * 2u32.pow(self.page_order.into());
39 let other_order0 = other.page_index * 2u32.pow(other.page_order.into());
40 assert!(
41 self_order0 != other_order0 || self.page_order == other.page_order,
42 "{self:?} overlaps {other:?}, but is not equal"
43 );
44 self_order0.cmp(&other_order0)
45 }
46 Ordering::Greater => Ordering::Greater,
47 }
48 }
49}
50
51impl PartialOrd for PageNumber {
52 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
53 Some(self.cmp(other))
54 }
55}
56
57impl PageNumber {
58 #[inline(always)]
59 pub(crate) const fn serialized_size() -> usize {
60 8
61 }
62
63 pub(crate) fn new(region: u32, page_index: u32, page_order: u8) -> Self {
64 debug_assert!(region <= 0x000F_FFFF);
65 debug_assert!(page_index <= MAX_PAGE_INDEX);
66 debug_assert!(page_order <= MAX_MAX_PAGE_ORDER);
67 Self {
68 region,
69 page_index,
70 page_order,
71 }
72 }
73
74 pub(crate) fn to_le_bytes(self) -> [u8; 8] {
75 let mut temp = 0x000F_FFFF & u64::from(self.page_index);
76 temp |= (0x000F_FFFF & u64::from(self.region)) << 20;
77 temp |= (0b0001_1111 & u64::from(self.page_order)) << 59;
78 temp.to_le_bytes()
79 }
80
81 pub(crate) fn from_le_bytes(bytes: [u8; 8]) -> Self {
82 let temp = u64::from_le_bytes(bytes);
83 let index = (temp & 0x000F_FFFF) as u32;
84 let region = ((temp >> 20) & 0x000F_FFFF) as u32;
85 let order = (temp >> 59) as u8;
86
87 Self {
88 region,
89 page_index: index,
90 page_order: order,
91 }
92 }
93
94 pub(crate) fn is_before(&self, other: PageNumber) -> bool {
96 if self.region < other.region {
97 return true;
98 }
99 if self.region > other.region {
100 return false;
101 }
102 let self_order0 = self.page_index * 2u32.pow(self.page_order.into());
103 let other_order0 = other.page_index * 2u32.pow(other.page_order.into());
104 assert_ne!(self_order0, other_order0, "{self:?} overlaps {other:?}");
105 self_order0 < other_order0
106 }
107
108 #[cfg(test)]
109 pub(crate) fn to_order0(self) -> Vec<PageNumber> {
110 let mut pages = vec![self];
111 loop {
112 let mut progress = false;
113 let mut new_pages = vec![];
114 for page in pages {
115 if page.page_order == 0 {
116 new_pages.push(page);
117 } else {
118 progress = true;
119 new_pages.push(PageNumber::new(
120 page.region,
121 page.page_index * 2,
122 page.page_order - 1,
123 ));
124 new_pages.push(PageNumber::new(
125 page.region,
126 page.page_index * 2 + 1,
127 page.page_order - 1,
128 ));
129 }
130 }
131 pages = new_pages;
132 if !progress {
133 break;
134 }
135 }
136
137 pages
138 }
139
140 pub(crate) fn address_range(
141 &self,
142 data_section_offset: u64,
143 region_size: u64,
144 region_pages_start: u64,
145 page_size: u32,
146 ) -> Range<u64> {
147 let regional_start =
148 region_pages_start + u64::from(self.page_index) * self.page_size_bytes(page_size);
149 debug_assert!(regional_start < region_size);
150 let region_base = u64::from(self.region) * region_size;
151 let start = data_section_offset + region_base + regional_start;
152 let end = start + self.page_size_bytes(page_size);
153 start..end
154 }
155
156 pub(crate) fn page_size_bytes(&self, page_size: u32) -> u64 {
157 let pages = 1u64 << self.page_order;
158 pages * u64::from(page_size)
159 }
160}
161
162impl Debug for PageNumber {
163 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164 write!(
165 f,
166 "r{}.{}/{}",
167 self.region, self.page_index, self.page_order
168 )
169 }
170}
171
172pub(crate) trait Page {
173 fn memory(&self) -> &[u8];
174
175 fn get_page_number(&self) -> PageNumber;
176}
177
178pub struct PageImpl {
179 pub(super) mem: Arc<[u8]>,
180 pub(super) page_number: PageNumber,
181 #[cfg(debug_assertions)]
182 pub(super) open_pages: Arc<Mutex<HashMap<PageNumber, u64>>>,
183}
184
185impl PageImpl {
186 pub(crate) fn to_arc(&self) -> Arc<[u8]> {
187 self.mem.clone()
188 }
189}
190
191impl Debug for PageImpl {
192 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
193 f.write_fmt(format_args!("PageImpl: page_number={:?}", self.page_number))
194 }
195}
196
197#[cfg(debug_assertions)]
198impl Drop for PageImpl {
199 fn drop(&mut self) {
200 let mut open_pages = self.open_pages.lock().unwrap();
201 let value = open_pages.get_mut(&self.page_number).unwrap();
202 assert!(*value > 0);
203 *value -= 1;
204 if *value == 0 {
205 open_pages.remove(&self.page_number);
206 }
207 }
208}
209
210impl Page for PageImpl {
211 fn memory(&self) -> &[u8] {
212 self.mem.as_ref()
213 }
214
215 fn get_page_number(&self) -> PageNumber {
216 self.page_number
217 }
218}
219
220impl Clone for PageImpl {
221 fn clone(&self) -> Self {
222 #[cfg(debug_assertions)]
223 {
224 *self
225 .open_pages
226 .lock()
227 .unwrap()
228 .get_mut(&self.page_number)
229 .unwrap() += 1;
230 }
231 Self {
232 mem: self.mem.clone(),
233 page_number: self.page_number,
234 #[cfg(debug_assertions)]
235 open_pages: self.open_pages.clone(),
236 }
237 }
238}
239
240pub(crate) struct PageMut {
241 pub(super) mem: WritablePage,
242 pub(super) page_number: PageNumber,
243 #[cfg(debug_assertions)]
244 pub(super) open_pages: Arc<Mutex<HashSet<PageNumber>>>,
245}
246
247impl PageMut {
248 pub(crate) fn memory_mut(&mut self) -> &mut [u8] {
249 self.mem.mem_mut()
250 }
251}
252
253impl Page for PageMut {
254 fn memory(&self) -> &[u8] {
255 self.mem.mem()
256 }
257
258 fn get_page_number(&self) -> PageNumber {
259 self.page_number
260 }
261}
262
263#[cfg(debug_assertions)]
264impl Drop for PageMut {
265 fn drop(&mut self) {
266 assert!(self.open_pages.lock().unwrap().remove(&self.page_number));
267 }
268}
269
270#[derive(Copy, Clone)]
271pub(crate) enum PageHint {
272 None,
273 Clean,
274}
275
276#[cfg(test)]
277mod test {
278 use crate::tree_store::PageNumber;
279
280 #[test]
281 fn last_page() {
282 let region_data_size = 2u64.pow(32);
283 let page_size = 4096;
284 let pages_per_region = region_data_size / page_size;
285 let region_header_size = 2u64.pow(16);
286 let last_page_index = pages_per_region - 1;
287 let page_number = PageNumber::new(1, last_page_index.try_into().unwrap(), 0);
288 page_number.address_range(
289 4096,
290 region_data_size + region_header_size,
291 region_header_size,
292 page_size.try_into().unwrap(),
293 );
294 }
295}