redb/tree_store/page_store/
base.rs1use crate::tree_store::page_store::cached_file::WritablePage;
2use crate::tree_store::page_store::page_manager::MAX_MAX_PAGE_ORDER;
3use std::cmp::Ordering;
4#[cfg(debug_assertions)]
5use std::collections::HashMap;
6use std::collections::HashSet;
7use std::fmt::{Debug, Formatter};
8use std::mem;
9use std::ops::Range;
10use std::sync::Arc;
11#[cfg(debug_assertions)]
12use std::sync::Mutex;
13
14pub(crate) const MAX_VALUE_LENGTH: usize = 3 * 1024 * 1024 * 1024;
15pub(crate) const MAX_PAIR_LENGTH: usize = 3 * 1024 * 1024 * 1024 + 768 * 1024 * 1024;
16pub(crate) const MAX_PAGE_INDEX: u32 = 0x000F_FFFF;
17
18#[derive(Copy, Clone, Eq, PartialEq, Hash)]
30pub(crate) struct PageNumber {
31 pub(crate) region: u32,
32 pub(crate) page_index: u32,
33 pub(crate) page_order: u8,
34}
35
36impl Ord for PageNumber {
38 fn cmp(&self, other: &Self) -> Ordering {
39 match self.region.cmp(&other.region) {
40 Ordering::Less => Ordering::Less,
41 Ordering::Equal => {
42 let self_order0 = self.page_index * 2u32.pow(self.page_order.into());
43 let other_order0 = other.page_index * 2u32.pow(other.page_order.into());
44 assert!(
45 self_order0 != other_order0 || self.page_order == other.page_order,
46 "{self:?} overlaps {other:?}, but is not equal"
47 );
48 self_order0.cmp(&other_order0)
49 }
50 Ordering::Greater => Ordering::Greater,
51 }
52 }
53}
54
55impl PartialOrd for PageNumber {
56 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
57 Some(self.cmp(other))
58 }
59}
60
61impl PageNumber {
62 #[inline(always)]
63 pub(crate) const fn serialized_size() -> usize {
64 8
65 }
66
67 pub(crate) fn new(region: u32, page_index: u32, page_order: u8) -> Self {
68 debug_assert!(region <= 0x000F_FFFF);
69 debug_assert!(page_index <= MAX_PAGE_INDEX);
70 debug_assert!(page_order <= MAX_MAX_PAGE_ORDER);
71 Self {
72 region,
73 page_index,
74 page_order,
75 }
76 }
77
78 pub(crate) fn to_le_bytes(self) -> [u8; 8] {
79 let mut temp = 0x000F_FFFF & u64::from(self.page_index);
80 temp |= (0x000F_FFFF & u64::from(self.region)) << 20;
81 temp |= (0b0001_1111 & u64::from(self.page_order)) << 59;
82 temp.to_le_bytes()
83 }
84
85 pub(crate) fn from_le_bytes(bytes: [u8; 8]) -> Self {
86 let temp = u64::from_le_bytes(bytes);
87 let order = (temp >> 59) as u8;
88 let index = u32::try_from(temp & (0x000F_FFFF >> order)).unwrap();
89 let region = ((temp >> 20) & 0x000F_FFFF) as u32;
90
91 Self {
92 region,
93 page_index: index,
94 page_order: order,
95 }
96 }
97
98 pub(crate) fn is_before(&self, other: PageNumber) -> bool {
100 if self.region < other.region {
101 return true;
102 }
103 if self.region > other.region {
104 return false;
105 }
106 let self_order0 = self.page_index * 2u32.pow(self.page_order.into());
107 let other_order0 = other.page_index * 2u32.pow(other.page_order.into());
108 assert_ne!(self_order0, other_order0, "{self:?} overlaps {other:?}");
109 self_order0 < other_order0
110 }
111
112 #[cfg(test)]
113 pub(crate) fn to_order0(self) -> Vec<PageNumber> {
114 let mut pages = vec![self];
115 loop {
116 let mut progress = false;
117 let mut new_pages = vec![];
118 for page in pages {
119 if page.page_order == 0 {
120 new_pages.push(page);
121 } else {
122 progress = true;
123 new_pages.push(PageNumber::new(
124 page.region,
125 page.page_index * 2,
126 page.page_order - 1,
127 ));
128 new_pages.push(PageNumber::new(
129 page.region,
130 page.page_index * 2 + 1,
131 page.page_order - 1,
132 ));
133 }
134 }
135 pages = new_pages;
136 if !progress {
137 break;
138 }
139 }
140
141 pages
142 }
143
144 pub(crate) fn address_range(
145 &self,
146 data_section_offset: u64,
147 region_size: u64,
148 region_pages_start: u64,
149 page_size: u32,
150 ) -> Range<u64> {
151 let regional_start =
152 region_pages_start + u64::from(self.page_index) * self.page_size_bytes(page_size);
153 debug_assert!(regional_start < region_size);
154 let region_base = u64::from(self.region) * region_size;
155 let start = data_section_offset + region_base + regional_start;
156 let end = start + self.page_size_bytes(page_size);
157 start..end
158 }
159
160 pub(crate) fn page_size_bytes(&self, page_size: u32) -> u64 {
161 let pages = 1u64 << self.page_order;
162 pages * u64::from(page_size)
163 }
164}
165
166impl Debug for PageNumber {
167 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168 write!(
169 f,
170 "r{}.{}/{}",
171 self.region, self.page_index, self.page_order
172 )
173 }
174}
175
176pub(crate) trait Page {
177 fn memory(&self) -> &[u8];
178
179 fn get_page_number(&self) -> PageNumber;
180}
181
182pub struct PageImpl {
183 pub(super) mem: Arc<[u8]>,
184 pub(super) page_number: PageNumber,
185 #[cfg(debug_assertions)]
186 pub(super) open_pages: Arc<Mutex<HashMap<PageNumber, u64>>>,
187}
188
189impl PageImpl {
190 pub(crate) fn to_arc(&self) -> Arc<[u8]> {
191 self.mem.clone()
192 }
193}
194
195impl Debug for PageImpl {
196 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
197 f.write_fmt(format_args!("PageImpl: page_number={:?}", self.page_number))
198 }
199}
200
201#[cfg(debug_assertions)]
202impl Drop for PageImpl {
203 fn drop(&mut self) {
204 let mut open_pages = self.open_pages.lock().unwrap();
205 let value = open_pages.get_mut(&self.page_number).unwrap();
206 assert!(*value > 0);
207 *value -= 1;
208 if *value == 0 {
209 open_pages.remove(&self.page_number);
210 }
211 }
212}
213
214impl Page for PageImpl {
215 fn memory(&self) -> &[u8] {
216 self.mem.as_ref()
217 }
218
219 fn get_page_number(&self) -> PageNumber {
220 self.page_number
221 }
222}
223
224impl Clone for PageImpl {
225 fn clone(&self) -> Self {
226 #[cfg(debug_assertions)]
227 {
228 *self
229 .open_pages
230 .lock()
231 .unwrap()
232 .get_mut(&self.page_number)
233 .unwrap() += 1;
234 }
235 Self {
236 mem: self.mem.clone(),
237 page_number: self.page_number,
238 #[cfg(debug_assertions)]
239 open_pages: self.open_pages.clone(),
240 }
241 }
242}
243
244pub(crate) struct PageMut {
245 pub(super) mem: WritablePage,
246 pub(super) page_number: PageNumber,
247 #[cfg(debug_assertions)]
248 pub(super) open_pages: Arc<Mutex<HashSet<PageNumber>>>,
249}
250
251impl PageMut {
252 pub(crate) fn memory_mut(&mut self) -> &mut [u8] {
253 self.mem.mem_mut()
254 }
255}
256
257impl Page for PageMut {
258 fn memory(&self) -> &[u8] {
259 self.mem.mem()
260 }
261
262 fn get_page_number(&self) -> PageNumber {
263 self.page_number
264 }
265}
266
267#[cfg(debug_assertions)]
268impl Drop for PageMut {
269 fn drop(&mut self) {
270 assert!(self.open_pages.lock().unwrap().remove(&self.page_number));
271 }
272}
273
274#[derive(Copy, Clone)]
275pub(crate) enum PageHint {
276 None,
277 Clean,
278}
279
280pub(crate) enum PageTrackerPolicy {
281 Ignore,
282 Track(HashSet<PageNumber>),
283 Closed,
284}
285
286impl PageTrackerPolicy {
287 pub(crate) fn new_tracking() -> Self {
288 PageTrackerPolicy::Track(HashSet::new())
289 }
290
291 pub(crate) fn is_empty(&self) -> bool {
292 match self {
293 PageTrackerPolicy::Ignore | PageTrackerPolicy::Closed => true,
294 PageTrackerPolicy::Track(x) => x.is_empty(),
295 }
296 }
297
298 pub(super) fn remove(&mut self, page: PageNumber) {
299 match self {
300 PageTrackerPolicy::Ignore => {}
301 PageTrackerPolicy::Track(x) => {
302 assert!(x.remove(&page));
303 }
304 PageTrackerPolicy::Closed => {
305 panic!("Page tracker is closed");
306 }
307 }
308 }
309
310 pub(super) fn insert(&mut self, page: PageNumber) {
311 match self {
312 PageTrackerPolicy::Ignore => {}
313 PageTrackerPolicy::Track(x) => {
314 assert!(x.insert(page));
315 }
316 PageTrackerPolicy::Closed => {
317 panic!("Page tracker is closed");
318 }
319 }
320 }
321
322 pub(crate) fn close(&mut self) -> HashSet<PageNumber> {
323 let old = mem::replace(self, PageTrackerPolicy::Closed);
324 match old {
325 PageTrackerPolicy::Ignore => HashSet::new(),
326 PageTrackerPolicy::Track(x) => x,
327 PageTrackerPolicy::Closed => {
328 panic!("Page tracker is closed");
329 }
330 }
331 }
332}
333
334#[cfg(test)]
335mod test {
336 use crate::tree_store::PageNumber;
337
338 #[test]
339 fn last_page() {
340 let region_data_size = 2u64.pow(32);
341 let page_size = 4096;
342 let pages_per_region = region_data_size / page_size;
343 let region_header_size = 2u64.pow(16);
344 let last_page_index = pages_per_region - 1;
345 let page_number = PageNumber::new(1, last_page_index.try_into().unwrap(), 0);
346 page_number.address_range(
347 4096,
348 region_data_size + region_header_size,
349 region_header_size,
350 page_size.try_into().unwrap(),
351 );
352 }
353
354 #[test]
355 fn reserved_bits() {
356 let page_number = PageNumber::new(0, 0, 12);
357 let mut bytes = page_number.to_le_bytes();
358 bytes[1] = 0xFF;
359 let page_number2 = PageNumber::from_le_bytes(bytes);
360 assert_eq!(page_number, page_number2);
361 }
362}