redb/tree_store/page_store/
page_manager.rs

1use crate::transaction_tracker::TransactionId;
2use crate::transactions::{AllocatorStateKey, AllocatorStateTree};
3use crate::tree_store::btree_base::{BtreeHeader, Checksum};
4use crate::tree_store::page_store::base::{MAX_PAGE_INDEX, PageHint};
5use crate::tree_store::page_store::buddy_allocator::BuddyAllocator;
6use crate::tree_store::page_store::cached_file::PagedCachedFile;
7use crate::tree_store::page_store::header::{DB_HEADER_SIZE, DatabaseHeader, MAGICNUMBER};
8use crate::tree_store::page_store::layout::DatabaseLayout;
9use crate::tree_store::page_store::region::{Allocators, RegionTracker};
10use crate::tree_store::page_store::{PageImpl, PageMut, hash128_with_seed};
11use crate::tree_store::{Page, PageNumber};
12use crate::{CacheStats, StorageBackend};
13use crate::{DatabaseError, Result, StorageError};
14#[cfg(feature = "logging")]
15use log::warn;
16use std::cmp::{max, min};
17#[cfg(debug_assertions)]
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::convert::TryInto;
21use std::io::ErrorKind;
22#[cfg(debug_assertions)]
23use std::sync::Arc;
24use std::sync::Mutex;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::thread;
27
28// Regions have a maximum size of 4GiB. A `4GiB - overhead` value is the largest that can be represented,
29// because the leaf node format uses 32bit offsets
30const MAX_USABLE_REGION_SPACE: u64 = 4 * 1024 * 1024 * 1024;
31// TODO: remove this constant?
32pub(crate) const MAX_MAX_PAGE_ORDER: u8 = 20;
33pub(super) const MIN_USABLE_PAGES: u32 = 10;
34const MIN_DESIRED_USABLE_BYTES: u64 = 1024 * 1024;
35
36pub(super) const INITIAL_REGIONS: u32 = 1000; // Enough for a 4TiB database
37
38// Original file format. No lengths stored with btrees
39pub(crate) const FILE_FORMAT_VERSION1: u8 = 1;
40// New file format. All btrees have a separate length stored in their header for constant time access
41pub(crate) const FILE_FORMAT_VERSION2: u8 = 2;
42
43fn ceil_log2(x: usize) -> u8 {
44    if x.is_power_of_two() {
45        x.trailing_zeros().try_into().unwrap()
46    } else {
47        x.next_power_of_two().trailing_zeros().try_into().unwrap()
48    }
49}
50
51pub(crate) fn xxh3_checksum(data: &[u8]) -> Checksum {
52    hash128_with_seed(data, 0)
53}
54
55struct InMemoryState {
56    header: DatabaseHeader,
57    allocators: Allocators,
58}
59
60impl InMemoryState {
61    fn from_bytes(header: DatabaseHeader, file: &PagedCachedFile) -> Result<Self> {
62        let allocators = if header.recovery_required {
63            Allocators::new(header.layout())
64        } else {
65            Allocators::from_bytes(&header, file)?
66        };
67        Ok(Self { header, allocators })
68    }
69
70    fn get_region(&self, region: u32) -> &BuddyAllocator {
71        &self.allocators.region_allocators[region as usize]
72    }
73
74    fn get_region_mut(&mut self, region: u32) -> &mut BuddyAllocator {
75        &mut self.allocators.region_allocators[region as usize]
76    }
77
78    fn get_region_tracker_mut(&mut self) -> &mut RegionTracker {
79        &mut self.allocators.region_tracker
80    }
81}
82
83pub(crate) struct TransactionalMemory {
84    // Pages allocated since the last commit
85    // TODO: maybe this should be moved to WriteTransaction?
86    allocated_since_commit: Mutex<HashSet<PageNumber>>,
87    // True if the allocator state was corrupted when the file was opened
88    // TODO: maybe we can remove this flag now that CheckedBackend exists?
89    needs_recovery: AtomicBool,
90    storage: PagedCachedFile,
91    state: Mutex<InMemoryState>,
92    // The number of PageMut which are outstanding
93    #[cfg(debug_assertions)]
94    open_dirty_pages: Arc<Mutex<HashSet<PageNumber>>>,
95    // Reference counts of PageImpls that are outstanding
96    #[cfg(debug_assertions)]
97    read_page_ref_counts: Arc<Mutex<HashMap<PageNumber, u64>>>,
98    // Indicates that a non-durable commit has been made, so reads should be served from the secondary meta page
99    read_from_secondary: AtomicBool,
100    page_size: u32,
101    // We store these separately from the layout because they're static, and accessed on the get_page()
102    // code path where there is no locking
103    region_size: u64,
104    region_header_with_padding_size: u64,
105}
106
107impl TransactionalMemory {
108    #[allow(clippy::too_many_arguments)]
109    pub(crate) fn new(
110        file: Box<dyn StorageBackend>,
111        // Allow initializing a new database in an empty file
112        allow_initialize: bool,
113        page_size: usize,
114        requested_region_size: Option<u64>,
115        read_cache_size_bytes: usize,
116        write_cache_size_bytes: usize,
117    ) -> Result<Self, DatabaseError> {
118        assert!(page_size.is_power_of_two() && page_size >= DB_HEADER_SIZE);
119
120        let region_size = requested_region_size.unwrap_or(MAX_USABLE_REGION_SPACE);
121        let region_size = min(
122            region_size,
123            (u64::from(MAX_PAGE_INDEX) + 1) * page_size as u64,
124        );
125        assert!(region_size.is_power_of_two());
126
127        let storage = PagedCachedFile::new(
128            file,
129            page_size as u64,
130            read_cache_size_bytes,
131            write_cache_size_bytes,
132        )?;
133
134        let initial_storage_len = storage.raw_file_len()?;
135
136        let magic_number: [u8; MAGICNUMBER.len()] =
137            if initial_storage_len >= MAGICNUMBER.len() as u64 {
138                storage
139                    .read_direct(0, MAGICNUMBER.len())?
140                    .try_into()
141                    .unwrap()
142            } else {
143                [0; MAGICNUMBER.len()]
144            };
145
146        if initial_storage_len > 0 {
147            // File already exists check that the magic number matches
148            if magic_number != MAGICNUMBER {
149                return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
150            }
151        } else {
152            // File is empty, check that we're allowed to initialize a new database (i.e. the caller is Database::create() and not open())
153            if !allow_initialize {
154                return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
155            }
156        }
157
158        if magic_number != MAGICNUMBER {
159            let region_tracker_required_bytes =
160                RegionTracker::new(INITIAL_REGIONS, MAX_MAX_PAGE_ORDER + 1)
161                    .to_vec()
162                    .len();
163
164            // Make sure that there is enough room to allocate the region tracker into a page
165            let size: u64 = max(
166                MIN_DESIRED_USABLE_BYTES,
167                page_size as u64 * u64::from(MIN_USABLE_PAGES),
168            );
169            let tracker_space =
170                (page_size * region_tracker_required_bytes.div_ceil(page_size)) as u64;
171            let starting_size = size + tracker_space;
172
173            let layout = DatabaseLayout::calculate(
174                starting_size,
175                (region_size / u64::try_from(page_size).unwrap())
176                    .try_into()
177                    .unwrap(),
178                page_size.try_into().unwrap(),
179            );
180
181            {
182                let file_len = storage.raw_file_len()?;
183
184                if file_len < layout.len() {
185                    storage.resize(layout.len())?;
186                }
187            }
188
189            let mut allocators = Allocators::new(layout);
190
191            // Allocate the region tracker in the zeroth region
192            let tracker_page = {
193                let tracker_required_pages =
194                    allocators.region_tracker.to_vec().len().div_ceil(page_size);
195                let required_order = ceil_log2(tracker_required_pages);
196                let page_number = allocators.region_allocators[0]
197                    .alloc(required_order)
198                    .unwrap();
199                PageNumber::new(0, page_number, required_order)
200            };
201
202            let mut header = DatabaseHeader::new(
203                layout,
204                TransactionId::new(0),
205                FILE_FORMAT_VERSION2,
206                tracker_page,
207            );
208
209            header.recovery_required = false;
210            header.two_phase_commit = true;
211            storage
212                .write(0, DB_HEADER_SIZE, true)?
213                .mem_mut()
214                .copy_from_slice(&header.to_bytes(false));
215            allocators.flush_to(tracker_page, layout, &storage)?;
216
217            storage.flush(false)?;
218            // Write the magic number only after the data structure is initialized and written to disk
219            // to ensure that it's crash safe
220            storage
221                .write(0, DB_HEADER_SIZE, true)?
222                .mem_mut()
223                .copy_from_slice(&header.to_bytes(true));
224            storage.flush(false)?;
225        }
226        let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?;
227        let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
228
229        assert_eq!(header.page_size() as usize, page_size);
230        assert!(storage.raw_file_len()? >= header.layout().len());
231        let needs_recovery =
232            header.recovery_required || header.layout().len() != storage.raw_file_len()?;
233        if needs_recovery {
234            let layout = header.layout();
235            let region_max_pages = layout.full_region_layout().num_pages();
236            let region_header_pages = layout.full_region_layout().get_header_pages();
237            header.set_layout(DatabaseLayout::recalculate(
238                storage.raw_file_len()?,
239                region_header_pages,
240                region_max_pages,
241                page_size.try_into().unwrap(),
242            ));
243            header.pick_primary_for_repair(repair_info)?;
244            assert!(!repair_info.invalid_magic_number);
245            storage
246                .write(0, DB_HEADER_SIZE, true)?
247                .mem_mut()
248                .copy_from_slice(&header.to_bytes(true));
249            storage.flush(false)?;
250        }
251
252        let layout = header.layout();
253        assert_eq!(layout.len(), storage.raw_file_len()?);
254        let region_size = layout.full_region_layout().len();
255        let region_header_size = layout.full_region_layout().data_section().start;
256
257        let state = InMemoryState::from_bytes(header, &storage)?;
258
259        assert!(page_size >= DB_HEADER_SIZE);
260
261        Ok(Self {
262            allocated_since_commit: Mutex::new(HashSet::new()),
263            needs_recovery: AtomicBool::new(needs_recovery),
264            storage,
265            state: Mutex::new(state),
266            #[cfg(debug_assertions)]
267            open_dirty_pages: Arc::new(Mutex::new(HashSet::new())),
268            #[cfg(debug_assertions)]
269            read_page_ref_counts: Arc::new(Mutex::new(HashMap::new())),
270            read_from_secondary: AtomicBool::new(false),
271            page_size: page_size.try_into().unwrap(),
272            region_size,
273            region_header_with_padding_size: region_header_size,
274        })
275    }
276
277    pub(crate) fn cache_stats(&self) -> CacheStats {
278        self.storage.cache_stats()
279    }
280
281    pub(crate) fn check_io_errors(&self) -> Result {
282        self.storage.check_io_errors()
283    }
284
285    #[cfg(any(test, fuzzing))]
286    pub(crate) fn all_allocated_pages(&self) -> Vec<PageNumber> {
287        self.state.lock().unwrap().allocators.all_allocated()
288    }
289
290    #[cfg(any(test, fuzzing))]
291    pub(crate) fn tracker_page(&self) -> PageNumber {
292        self.state.lock().unwrap().header.region_tracker()
293    }
294
295    pub(crate) fn clear_read_cache(&self) {
296        self.storage.invalidate_cache_all();
297    }
298
299    pub(crate) fn clear_cache_and_reload(&mut self) -> Result<bool, DatabaseError> {
300        assert!(self.allocated_since_commit.lock().unwrap().is_empty());
301
302        self.storage.flush(false)?;
303        self.storage.invalidate_cache_all();
304
305        let header_bytes = self.storage.read_direct(0, DB_HEADER_SIZE)?;
306        let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
307        // TODO: This ends up always being true because this is called from check_integrity() once the db is already open
308        // TODO: Also we should recheck the layout
309        let mut was_clean = true;
310        if header.recovery_required {
311            if !header.pick_primary_for_repair(repair_info)? {
312                was_clean = false;
313            }
314            if repair_info.invalid_magic_number {
315                return Err(StorageError::Corrupted("Invalid magic number".to_string()).into());
316            }
317            self.storage
318                .write(0, DB_HEADER_SIZE, true)?
319                .mem_mut()
320                .copy_from_slice(&header.to_bytes(true));
321            self.storage.flush(false)?;
322        }
323
324        self.needs_recovery
325            .store(header.recovery_required, Ordering::Release);
326        self.state.lock().unwrap().header = header;
327
328        Ok(was_clean)
329    }
330
331    pub(crate) fn begin_writable(&self) -> Result {
332        let mut state = self.state.lock().unwrap();
333        assert!(!state.header.recovery_required);
334        state.header.recovery_required = true;
335        self.write_header(&state.header)?;
336        self.storage.flush(false)
337    }
338
339    pub(crate) fn needs_repair(&self) -> Result<bool> {
340        Ok(self.state.lock().unwrap().header.recovery_required)
341    }
342
343    pub(crate) fn used_two_phase_commit(&self) -> bool {
344        self.state.lock().unwrap().header.two_phase_commit
345    }
346
347    pub(crate) fn allocator_hash(&self) -> u128 {
348        self.state.lock().unwrap().allocators.xxh3_hash()
349    }
350
351    // TODO: need a clearer distinction between this and needs_repair()
352    pub(crate) fn storage_failure(&self) -> bool {
353        self.needs_recovery.load(Ordering::Acquire)
354    }
355
356    pub(crate) fn repair_primary_corrupted(&self) {
357        let mut state = self.state.lock().unwrap();
358        state.header.swap_primary_slot();
359    }
360
361    pub(crate) fn begin_repair(&self) -> Result<()> {
362        let mut state = self.state.lock().unwrap();
363        state.allocators = Allocators::new(state.header.layout());
364
365        Ok(())
366    }
367
368    pub(crate) fn mark_page_allocated(&self, page_number: PageNumber) {
369        let mut state = self.state.lock().unwrap();
370        let region_index = page_number.region;
371        let allocator = state.get_region_mut(region_index);
372        allocator.record_alloc(page_number.page_index, page_number.page_order);
373    }
374
375    fn write_header(&self, header: &DatabaseHeader) -> Result {
376        self.storage
377            .write(0, DB_HEADER_SIZE, true)?
378            .mem_mut()
379            .copy_from_slice(&header.to_bytes(true));
380
381        Ok(())
382    }
383
384    pub(crate) fn end_repair(&self) -> Result<()> {
385        self.allocate_region_tracker_page()?;
386
387        let mut state = self.state.lock().unwrap();
388        let tracker_page = state.header.region_tracker();
389        state
390            .allocators
391            .flush_to(tracker_page, state.header.layout(), &self.storage)?;
392
393        state.header.recovery_required = false;
394        self.write_header(&state.header)?;
395        let result = self.storage.flush(false);
396        self.needs_recovery.store(false, Ordering::Release);
397
398        result
399    }
400
401    pub(crate) fn reserve_allocator_state(
402        &self,
403        tree: &mut AllocatorStateTree,
404        transaction_id: TransactionId,
405    ) -> Result<u32> {
406        let state = self.state.lock().unwrap();
407        let layout = state.header.layout();
408        let num_regions = layout.num_regions();
409        let region_header_len = layout.full_region_layout().get_header_pages()
410            * layout.full_region_layout().page_size();
411        let region_tracker_len = state.allocators.region_tracker.to_vec().len();
412        drop(state);
413
414        for i in 0..num_regions {
415            tree.insert(
416                &AllocatorStateKey::Region(i),
417                &vec![0; region_header_len as usize].as_ref(),
418            )?;
419        }
420
421        tree.insert(
422            &AllocatorStateKey::RegionTracker,
423            &vec![0; region_tracker_len].as_ref(),
424        )?;
425
426        tree.insert(
427            &AllocatorStateKey::TransactionId,
428            &transaction_id.raw_id().to_le_bytes().as_ref(),
429        )?;
430
431        Ok(num_regions)
432    }
433
434    // Returns true on success, or false if the number of regions has changed
435    pub(crate) fn try_save_allocator_state(
436        &self,
437        tree: &mut AllocatorStateTree,
438        num_regions: u32,
439    ) -> Result<bool> {
440        // Has the number of regions changed since reserve_allocator_state() was called?
441        let state = self.state.lock().unwrap();
442        if num_regions != state.header.layout().num_regions() {
443            return Ok(false);
444        }
445
446        // Temporarily free the region tracker page, because we don't want to include it in our
447        // recorded allocations
448        let tracker_page = state.header.region_tracker();
449        drop(state);
450        self.free(tracker_page);
451
452        let result = self.try_save_allocator_state_inner(tree, num_regions);
453
454        // Restore the region tracker page
455        self.mark_page_allocated(tracker_page);
456
457        result
458    }
459
460    fn try_save_allocator_state_inner(
461        &self,
462        tree: &mut AllocatorStateTree,
463        num_regions: u32,
464    ) -> Result<bool> {
465        for i in 0..num_regions {
466            let region_bytes =
467                &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec();
468            tree.insert_inplace(&AllocatorStateKey::Region(i), &region_bytes.as_ref())?;
469        }
470
471        let region_tracker_bytes = self
472            .state
473            .lock()
474            .unwrap()
475            .allocators
476            .region_tracker
477            .to_vec();
478        tree.insert_inplace(
479            &AllocatorStateKey::RegionTracker,
480            &region_tracker_bytes.as_ref(),
481        )?;
482
483        Ok(true)
484    }
485
486    // Returns true if the allocator state table is up to date, or false if it's stale
487    pub(crate) fn is_valid_allocator_state(&self, tree: &AllocatorStateTree) -> Result<bool> {
488        // See if this is stale allocator state left over from a previous transaction. That won't
489        // happen during normal operation, since WriteTransaction::commit() always updates the
490        // allocator state table before calling TransactionalMemory::commit(), but there are also
491        // a few places where TransactionalMemory::commit() is called directly without using a
492        // WriteTransaction. When that happens, any existing allocator state table will be left
493        // in place but is no longer valid. (And even if there were no such calls today, it would
494        // be an easy mistake to make! So it's good that we check.)
495        let transaction_id = TransactionId::new(u64::from_le_bytes(
496            tree.get(&AllocatorStateKey::TransactionId)?
497                .unwrap()
498                .value()
499                .try_into()
500                .unwrap(),
501        ));
502
503        Ok(transaction_id == self.get_last_committed_transaction_id()?)
504    }
505
506    pub(crate) fn load_allocator_state(&self, tree: &AllocatorStateTree) -> Result {
507        assert!(self.is_valid_allocator_state(tree)?);
508
509        // Load the allocator state
510        let mut region_allocators = vec![];
511        for region in
512            tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))?
513        {
514            region_allocators.push(BuddyAllocator::from_bytes(region?.value()));
515        }
516
517        let region_tracker = RegionTracker::from_page(
518            tree.get(&AllocatorStateKey::RegionTracker)?
519                .unwrap()
520                .value(),
521        );
522
523        let mut state = self.state.lock().unwrap();
524        state.allocators = Allocators {
525            region_tracker,
526            region_allocators,
527        };
528
529        // Resize the allocators to match the current file size
530        let layout = state.header.layout();
531        state.allocators.resize_to(layout);
532        drop(state);
533
534        // Allocate a page for the region tracker
535        self.allocate_region_tracker_page()?;
536
537        self.state.lock().unwrap().header.recovery_required = false;
538        self.needs_recovery.store(false, Ordering::Release);
539
540        Ok(())
541    }
542
543    pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
544        let state = self.state.lock().unwrap();
545        let allocator = state.get_region(page.region);
546
547        allocator.is_allocated(page.page_index, page.page_order)
548    }
549
550    // Allocate a page for the region tracker. If possible, this will pick the same page that
551    // was used last time; otherwise it'll pick a new page and update the database header to
552    // match
553    fn allocate_region_tracker_page(&self) -> Result {
554        let mut state = self.state.lock().unwrap();
555        let tracker_len = state.allocators.region_tracker.to_vec().len();
556        let tracker_page = state.header.region_tracker();
557
558        let allocator = state.get_region_mut(tracker_page.region);
559        // Pick a new tracker page, if the old one was overwritten or is too small
560        if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
561            || tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
562        {
563            drop(state);
564
565            let new_tracker_page = self
566                .allocate_non_transactional(tracker_len, false)?
567                .get_page_number();
568
569            let mut state = self.state.lock().unwrap();
570            state.header.set_region_tracker(new_tracker_page);
571            self.write_header(&state.header)?;
572            self.storage.flush(false)?;
573        } else {
574            // The old page is available, so just mark it as allocated
575            allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
576            drop(state);
577        }
578
579        Ok(())
580    }
581
582    // Relocates the region tracker to a lower page, if possible
583    // Returns true if the page was moved
584    pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
585        let state = self.state.lock().unwrap();
586        let region_tracker_size = state
587            .header
588            .region_tracker()
589            .page_size_bytes(self.page_size);
590        let old_tracker_page = state.header.region_tracker();
591        // allocate acquires this lock, so we need to drop it
592        drop(state);
593        let new_page =
594            self.allocate_non_transactional(region_tracker_size.try_into().unwrap(), true)?;
595        if new_page.get_page_number().is_before(old_tracker_page) {
596            let mut state = self.state.lock().unwrap();
597            state.header.set_region_tracker(new_page.get_page_number());
598            drop(state);
599            self.free(old_tracker_page);
600            Ok(true)
601        } else {
602            let new_page_number = new_page.get_page_number();
603            drop(new_page);
604            self.free(new_page_number);
605            Ok(false)
606        }
607    }
608
609    // Diffs region_states, which must be derived from get_raw_allocator_states(), against
610    // the currently allocated set of pages
611    pub(crate) fn pages_allocated_since_raw_state(
612        &self,
613        old_states: &[BuddyAllocator],
614    ) -> Vec<PageNumber> {
615        let mut result = vec![];
616        let state = self.state.lock().unwrap();
617
618        for i in 0..state.header.layout().num_regions() {
619            let current_state = state.get_region(i);
620            if let Some(old_state) = old_states.get(i as usize) {
621                current_state.difference(i, old_state, &mut result);
622            } else {
623                // This region didn't exist, so everything is newly allocated
624                current_state.get_allocated_pages(i, &mut result);
625            }
626        }
627
628        // Don't include the region tracker, since we manage that internally to the TranscationalMemory
629        // Otherwise restoring a savepoint would free it.
630        result.retain(|x| *x != state.header.region_tracker());
631
632        result
633    }
634
635    pub(crate) fn get_raw_allocator_states(&self) -> Vec<Vec<u8>> {
636        let state = self.state.lock().unwrap();
637
638        let mut regional_allocators = vec![];
639        for i in 0..state.header.layout().num_regions() {
640            regional_allocators.push(state.get_region(i).make_state_for_savepoint());
641        }
642
643        regional_allocators
644    }
645
646    // Commit all outstanding changes and make them visible as the primary
647    #[allow(clippy::too_many_arguments)]
648    pub(crate) fn commit(
649        &self,
650        data_root: Option<BtreeHeader>,
651        system_root: Option<BtreeHeader>,
652        freed_root: Option<BtreeHeader>,
653        transaction_id: TransactionId,
654        eventual: bool,
655        two_phase: bool,
656    ) -> Result {
657        let result = self.commit_inner(
658            data_root,
659            system_root,
660            freed_root,
661            transaction_id,
662            eventual,
663            two_phase,
664        );
665        if result.is_err() {
666            self.needs_recovery.store(true, Ordering::Release);
667        }
668        result
669    }
670
671    #[allow(clippy::too_many_arguments)]
672    fn commit_inner(
673        &self,
674        data_root: Option<BtreeHeader>,
675        system_root: Option<BtreeHeader>,
676        freed_root: Option<BtreeHeader>,
677        transaction_id: TransactionId,
678        eventual: bool,
679        two_phase: bool,
680    ) -> Result {
681        // All mutable pages must be dropped, this ensures that when a transaction completes
682        // no more writes can happen to the pages it allocated. Thus it is safe to make them visible
683        // to future read transactions
684        #[cfg(debug_assertions)]
685        debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
686        assert!(!self.needs_recovery.load(Ordering::Acquire));
687
688        let mut state = self.state.lock().unwrap();
689        // Trim surplus file space, before finalizing the commit
690        let shrunk = Self::try_shrink(&mut state)?;
691        // Copy the header so that we can release the state lock, while we flush the file
692        let mut header = state.header.clone();
693        drop(state);
694
695        let old_transaction_id = header.secondary_slot().transaction_id;
696        let secondary = header.secondary_slot_mut();
697        secondary.transaction_id = transaction_id;
698        secondary.user_root = data_root;
699        secondary.system_root = system_root;
700        secondary.freed_root = freed_root;
701
702        self.write_header(&header)?;
703
704        // Use 2-phase commit, if checksums are disabled
705        if two_phase {
706            self.storage.flush(eventual)?;
707        }
708
709        // Make our new commit the primary, and record whether it was a 2-phase commit.
710        // These two bits need to be written atomically
711        header.swap_primary_slot();
712        header.two_phase_commit = two_phase;
713
714        // Write the new header to disk
715        self.write_header(&header)?;
716        self.storage.flush(eventual)?;
717
718        if shrunk {
719            let result = self.storage.resize(header.layout().len());
720            if result.is_err() {
721                // TODO: it would be nice to have a more cohesive approach to setting this.
722                // we do it in commit() & rollback() on failure, but there are probably other places that need it
723                self.needs_recovery.store(true, Ordering::Release);
724                return result;
725            }
726        }
727        self.allocated_since_commit.lock().unwrap().clear();
728
729        let mut state = self.state.lock().unwrap();
730        assert_eq!(
731            state.header.secondary_slot().transaction_id,
732            old_transaction_id
733        );
734        state.header = header;
735        self.read_from_secondary.store(false, Ordering::Release);
736        // Hold lock until read_from_secondary is set to false, so that the new primary state is read.
737        // TODO: maybe we can remove the whole read_from_secondary flag?
738        drop(state);
739
740        Ok(())
741    }
742
743    // Make changes visible, without a durability guarantee
744    pub(crate) fn non_durable_commit(
745        &self,
746        data_root: Option<BtreeHeader>,
747        system_root: Option<BtreeHeader>,
748        freed_root: Option<BtreeHeader>,
749        transaction_id: TransactionId,
750    ) -> Result {
751        // All mutable pages must be dropped, this ensures that when a transaction completes
752        // no more writes can happen to the pages it allocated. Thus it is safe to make them visible
753        // to future read transactions
754        #[cfg(debug_assertions)]
755        debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
756        assert!(!self.needs_recovery.load(Ordering::Acquire));
757
758        self.allocated_since_commit.lock().unwrap().clear();
759        self.storage.write_barrier()?;
760
761        let mut state = self.state.lock().unwrap();
762        let secondary = state.header.secondary_slot_mut();
763        secondary.transaction_id = transaction_id;
764        secondary.user_root = data_root;
765        secondary.system_root = system_root;
766        secondary.freed_root = freed_root;
767
768        // TODO: maybe we can remove this flag and just update the in-memory DatabaseHeader state?
769        self.read_from_secondary.store(true, Ordering::Release);
770
771        Ok(())
772    }
773
774    pub(crate) fn rollback_uncommitted_writes(&self) -> Result {
775        let result = self.rollback_uncommitted_writes_inner();
776        if result.is_err() {
777            self.needs_recovery.store(true, Ordering::Release);
778        }
779        result
780    }
781
782    fn rollback_uncommitted_writes_inner(&self) -> Result {
783        #[cfg(debug_assertions)]
784        {
785            let dirty_pages = self.open_dirty_pages.lock().unwrap();
786            debug_assert!(
787                dirty_pages.is_empty(),
788                "Dirty pages outstanding: {dirty_pages:?}"
789            );
790        }
791        assert!(!self.needs_recovery.load(Ordering::Acquire));
792        let mut state = self.state.lock().unwrap();
793        let mut guard = self.allocated_since_commit.lock().unwrap();
794        for page_number in guard.iter() {
795            let region_index = page_number.region;
796            state
797                .get_region_tracker_mut()
798                .mark_free(page_number.page_order, region_index);
799            state
800                .get_region_mut(region_index)
801                .free(page_number.page_index, page_number.page_order);
802
803            let address = page_number.address_range(
804                self.page_size.into(),
805                self.region_size,
806                self.region_header_with_padding_size,
807                self.page_size,
808            );
809            let len: usize = (address.end - address.start).try_into().unwrap();
810            self.storage.invalidate_cache(address.start, len);
811            self.storage.cancel_pending_write(address.start, len);
812        }
813        guard.clear();
814
815        Ok(())
816    }
817
818    // TODO: make all callers explicitly provide a hint
819    pub(crate) fn get_page(&self, page_number: PageNumber) -> Result<PageImpl> {
820        self.get_page_extended(page_number, PageHint::None)
821    }
822
823    pub(crate) fn get_page_extended(
824        &self,
825        page_number: PageNumber,
826        hint: PageHint,
827    ) -> Result<PageImpl> {
828        let range = page_number.address_range(
829            self.page_size.into(),
830            self.region_size,
831            self.region_header_with_padding_size,
832            self.page_size,
833        );
834        let len: usize = (range.end - range.start).try_into().unwrap();
835        let mem = self.storage.read(range.start, len, hint)?;
836
837        // We must not retrieve an immutable reference to a page which already has a mutable ref to it
838        #[cfg(debug_assertions)]
839        {
840            let dirty_pages = self.open_dirty_pages.lock().unwrap();
841            debug_assert!(!dirty_pages.contains(&page_number), "{page_number:?}");
842            *(self
843                .read_page_ref_counts
844                .lock()
845                .unwrap()
846                .entry(page_number)
847                .or_default()) += 1;
848            drop(dirty_pages);
849        }
850
851        Ok(PageImpl {
852            mem,
853            page_number,
854            #[cfg(debug_assertions)]
855            open_pages: self.read_page_ref_counts.clone(),
856        })
857    }
858
859    // NOTE: the caller must ensure that the read cache has been invalidated or stale reads my occur
860    pub(crate) fn get_page_mut(&self, page_number: PageNumber) -> Result<PageMut> {
861        #[cfg(debug_assertions)]
862        {
863            assert!(
864                !self
865                    .read_page_ref_counts
866                    .lock()
867                    .unwrap()
868                    .contains_key(&page_number)
869            );
870            assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
871        }
872
873        let address_range = page_number.address_range(
874            self.page_size.into(),
875            self.region_size,
876            self.region_header_with_padding_size,
877            self.page_size,
878        );
879        let len: usize = (address_range.end - address_range.start)
880            .try_into()
881            .unwrap();
882        let mem = self.storage.write(address_range.start, len, false)?;
883
884        #[cfg(debug_assertions)]
885        {
886            assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
887        }
888
889        Ok(PageMut {
890            mem,
891            page_number,
892            #[cfg(debug_assertions)]
893            open_pages: self.open_dirty_pages.clone(),
894        })
895    }
896
897    pub(crate) fn get_version(&self) -> u8 {
898        let state = self.state.lock().unwrap();
899        if self.read_from_secondary.load(Ordering::Acquire) {
900            state.header.secondary_slot().version
901        } else {
902            state.header.primary_slot().version
903        }
904    }
905
906    pub(crate) fn get_data_root(&self) -> Option<BtreeHeader> {
907        let state = self.state.lock().unwrap();
908        if self.read_from_secondary.load(Ordering::Acquire) {
909            state.header.secondary_slot().user_root
910        } else {
911            state.header.primary_slot().user_root
912        }
913    }
914
915    pub(crate) fn get_system_root(&self) -> Option<BtreeHeader> {
916        let state = self.state.lock().unwrap();
917        if self.read_from_secondary.load(Ordering::Acquire) {
918            state.header.secondary_slot().system_root
919        } else {
920            state.header.primary_slot().system_root
921        }
922    }
923
924    pub(crate) fn get_freed_root(&self) -> Option<BtreeHeader> {
925        let state = self.state.lock().unwrap();
926        if self.read_from_secondary.load(Ordering::Acquire) {
927            state.header.secondary_slot().freed_root
928        } else {
929            state.header.primary_slot().freed_root
930        }
931    }
932
933    pub(crate) fn get_last_committed_transaction_id(&self) -> Result<TransactionId> {
934        let state = self.state.lock()?;
935        if self.read_from_secondary.load(Ordering::Acquire) {
936            Ok(state.header.secondary_slot().transaction_id)
937        } else {
938            Ok(state.header.primary_slot().transaction_id)
939        }
940    }
941
942    pub(crate) fn free(&self, page: PageNumber) {
943        self.allocated_since_commit.lock().unwrap().remove(&page);
944        self.free_helper(page);
945    }
946
947    fn free_helper(&self, page: PageNumber) {
948        let mut state = self.state.lock().unwrap();
949        let region_index = page.region;
950        // Free in the regional allocator
951        state
952            .get_region_mut(region_index)
953            .free(page.page_index, page.page_order);
954        // Ensure that the region is marked as having free space
955        state
956            .get_region_tracker_mut()
957            .mark_free(page.page_order, region_index);
958
959        let address_range = page.address_range(
960            self.page_size.into(),
961            self.region_size,
962            self.region_header_with_padding_size,
963            self.page_size,
964        );
965        let len: usize = (address_range.end - address_range.start)
966            .try_into()
967            .unwrap();
968        self.storage.invalidate_cache(address_range.start, len);
969        self.storage.cancel_pending_write(address_range.start, len);
970    }
971
972    // Frees the page if it was allocated since the last commit. Returns true, if the page was freed
973    pub(crate) fn free_if_uncommitted(&self, page: PageNumber) -> bool {
974        if self.allocated_since_commit.lock().unwrap().remove(&page) {
975            self.free_helper(page);
976            true
977        } else {
978            false
979        }
980    }
981
982    // Page has not been committed
983    pub(crate) fn uncommitted(&self, page: PageNumber) -> bool {
984        self.allocated_since_commit.lock().unwrap().contains(&page)
985    }
986
987    pub(crate) fn allocate_helper(
988        &self,
989        allocation_size: usize,
990        lowest: bool,
991        transactional: bool,
992    ) -> Result<PageMut> {
993        let required_pages = allocation_size.div_ceil(self.get_page_size());
994        let required_order = ceil_log2(required_pages);
995
996        let mut state = self.state.lock().unwrap();
997
998        let page_number = if let Some(page_number) =
999            Self::allocate_helper_retry(&mut state, required_order, lowest)?
1000        {
1001            page_number
1002        } else {
1003            self.grow(&mut state, required_order)?;
1004            Self::allocate_helper_retry(&mut state, required_order, lowest)?.unwrap()
1005        };
1006
1007        #[cfg(debug_assertions)]
1008        {
1009            assert!(
1010                !self
1011                    .read_page_ref_counts
1012                    .lock()
1013                    .unwrap()
1014                    .contains_key(&page_number),
1015                "Allocated a page that is still referenced! {page_number:?}"
1016            );
1017            assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
1018        }
1019
1020        if transactional {
1021            self.allocated_since_commit
1022                .lock()
1023                .unwrap()
1024                .insert(page_number);
1025        }
1026
1027        let address_range = page_number.address_range(
1028            self.page_size.into(),
1029            self.region_size,
1030            self.region_header_with_padding_size,
1031            self.page_size,
1032        );
1033        let len: usize = (address_range.end - address_range.start)
1034            .try_into()
1035            .unwrap();
1036
1037        #[allow(unused_mut)]
1038        let mut mem = self.storage.write(address_range.start, len, true)?;
1039        debug_assert!(mem.mem().len() >= allocation_size);
1040
1041        #[cfg(debug_assertions)]
1042        {
1043            assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
1044
1045            // Poison the memory in debug mode to help detect uninitialized reads
1046            mem.mem_mut().fill(0xFF);
1047        }
1048
1049        Ok(PageMut {
1050            mem,
1051            page_number,
1052            #[cfg(debug_assertions)]
1053            open_pages: self.open_dirty_pages.clone(),
1054        })
1055    }
1056
1057    fn allocate_helper_retry(
1058        state: &mut InMemoryState,
1059        required_order: u8,
1060        lowest: bool,
1061    ) -> Result<Option<PageNumber>> {
1062        loop {
1063            let Some(candidate_region) = state.get_region_tracker_mut().find_free(required_order)
1064            else {
1065                return Ok(None);
1066            };
1067            let region = state.get_region_mut(candidate_region);
1068            let r = if lowest {
1069                region.alloc_lowest(required_order)
1070            } else {
1071                region.alloc(required_order)
1072            };
1073            if let Some(page) = r {
1074                return Ok(Some(PageNumber::new(
1075                    candidate_region,
1076                    page,
1077                    required_order,
1078                )));
1079            }
1080            // Mark the region, if it's full
1081            state
1082                .get_region_tracker_mut()
1083                .mark_full(required_order, candidate_region);
1084        }
1085    }
1086
1087    fn try_shrink(state: &mut InMemoryState) -> Result<bool> {
1088        let layout = state.header.layout();
1089        let last_region_index = layout.num_regions() - 1;
1090        let last_allocator = state.get_region(last_region_index);
1091        let trailing_free = last_allocator.trailing_free_pages();
1092        let last_allocator_len = last_allocator.len();
1093        if trailing_free < last_allocator_len / 2 {
1094            return Ok(false);
1095        }
1096        let reduce_by = if layout.num_regions() > 1 && trailing_free == last_allocator_len {
1097            trailing_free
1098        } else {
1099            trailing_free / 2
1100        };
1101
1102        let mut new_layout = layout;
1103        new_layout.reduce_last_region(reduce_by);
1104        state.allocators.resize_to(new_layout);
1105        assert!(new_layout.len() <= layout.len());
1106        state.header.set_layout(new_layout);
1107
1108        Ok(true)
1109    }
1110
1111    fn grow(&self, state: &mut InMemoryState, required_order_allocation: u8) -> Result<()> {
1112        let layout = state.header.layout();
1113        let required_growth =
1114            2u64.pow(required_order_allocation.into()) * u64::from(state.header.page_size());
1115        let max_region_size = u64::from(state.header.layout().full_region_layout().num_pages())
1116            * u64::from(state.header.page_size());
1117        let next_desired_size = if layout.num_full_regions() > 0 {
1118            if let Some(trailing) = layout.trailing_region_layout() {
1119                if 2 * required_growth < max_region_size - trailing.usable_bytes() {
1120                    // Fill out the trailing region
1121                    layout.usable_bytes() + (max_region_size - trailing.usable_bytes())
1122                } else {
1123                    // Fill out trailing & Grow by 1 region
1124                    layout.usable_bytes() + 2 * max_region_size - trailing.usable_bytes()
1125                }
1126            } else {
1127                // Grow by 1 region
1128                layout.usable_bytes() + max_region_size
1129            }
1130        } else {
1131            max(
1132                layout.usable_bytes() * 2,
1133                layout.usable_bytes() + required_growth * 2,
1134            )
1135        };
1136        let new_layout = DatabaseLayout::calculate(
1137            next_desired_size,
1138            state.header.layout().full_region_layout().num_pages(),
1139            self.page_size,
1140        );
1141        assert!(new_layout.len() >= layout.len());
1142
1143        let result = self.storage.resize(new_layout.len());
1144        if result.is_err() {
1145            // TODO: it would be nice to have a more cohesive approach to setting this.
1146            // we do it in commit() & rollback() on failure, but there are probably other places that need it
1147            self.needs_recovery.store(true, Ordering::Release);
1148            return result;
1149        }
1150
1151        state.allocators.resize_to(new_layout);
1152        state.header.set_layout(new_layout);
1153        Ok(())
1154    }
1155
1156    pub(crate) fn allocate(&self, allocation_size: usize) -> Result<PageMut> {
1157        self.allocate_helper(allocation_size, false, true)
1158    }
1159
1160    pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result<PageMut> {
1161        self.allocate_helper(allocation_size, true, true)
1162    }
1163
1164    // Allocate a page not associated with any transaction. The page is immediately considered committed,
1165    // and won't be rolled back if an abort happens. This is only used for the region tracker
1166    fn allocate_non_transactional(&self, allocation_size: usize, lowest: bool) -> Result<PageMut> {
1167        self.allocate_helper(allocation_size, lowest, false)
1168    }
1169
1170    pub(crate) fn count_allocated_pages(&self) -> Result<u64> {
1171        let state = self.state.lock().unwrap();
1172        let mut count = 0u64;
1173        for i in 0..state.header.layout().num_regions() {
1174            count += u64::from(state.get_region(i).count_allocated_pages());
1175        }
1176
1177        Ok(count)
1178    }
1179
1180    pub(crate) fn count_free_pages(&self) -> Result<u64> {
1181        let state = self.state.lock().unwrap();
1182        let mut count = 0u64;
1183        for i in 0..state.header.layout().num_regions() {
1184            count += u64::from(state.get_region(i).count_free_pages());
1185        }
1186
1187        Ok(count)
1188    }
1189
1190    pub(crate) fn get_page_size(&self) -> usize {
1191        self.page_size.try_into().unwrap()
1192    }
1193}
1194
1195impl Drop for TransactionalMemory {
1196    fn drop(&mut self) {
1197        if thread::panicking() || self.needs_recovery.load(Ordering::Acquire) {
1198            return;
1199        }
1200
1201        // Reallocate the region tracker page, which will grow it if necessary
1202        let tracker_page = self.state.lock().unwrap().header.region_tracker();
1203        self.free(tracker_page);
1204        if self.allocate_region_tracker_page().is_err() {
1205            #[cfg(feature = "logging")]
1206            warn!("Failure while flushing allocator state. Repair required at restart.");
1207            return;
1208        }
1209
1210        let mut state = self.state.lock().unwrap();
1211        if state
1212            .allocators
1213            .flush_to(
1214                state.header.region_tracker(),
1215                state.header.layout(),
1216                &self.storage,
1217            )
1218            .is_err()
1219        {
1220            #[cfg(feature = "logging")]
1221            warn!("Failure while flushing allocator state. Repair required at restart.");
1222            return;
1223        }
1224
1225        if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) {
1226            state.header.recovery_required = false;
1227            let _ = self.write_header(&state.header);
1228            let _ = self.storage.flush(false);
1229        }
1230    }
1231}
1232
1233#[cfg(test)]
1234mod test {
1235    use crate::tree_store::page_store::page_manager::INITIAL_REGIONS;
1236    use crate::{Database, TableDefinition};
1237
1238    // Test that the region tracker expansion code works, by adding more data than fits into the initial max regions
1239    #[test]
1240    fn out_of_regions() {
1241        let tmpfile = crate::create_tempfile();
1242        let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1243        let page_size = 1024;
1244        let big_value = vec![0u8; 5 * page_size];
1245
1246        let db = Database::builder()
1247            .set_region_size((8 * page_size).try_into().unwrap())
1248            .set_page_size(page_size)
1249            .create(tmpfile.path())
1250            .unwrap();
1251
1252        let txn = db.begin_write().unwrap();
1253        {
1254            let mut table = txn.open_table(table_definition).unwrap();
1255            for i in 0..=INITIAL_REGIONS {
1256                table.insert(&i, big_value.as_slice()).unwrap();
1257            }
1258        }
1259        txn.commit().unwrap();
1260        drop(db);
1261
1262        let mut db = Database::builder()
1263            .set_region_size((8 * page_size).try_into().unwrap())
1264            .set_page_size(page_size)
1265            .open(tmpfile.path())
1266            .unwrap();
1267        assert!(db.check_integrity().unwrap());
1268    }
1269
1270    // Make sure the database remains consistent after a panic
1271    #[test]
1272    #[cfg(panic = "unwind")]
1273    fn panic() {
1274        let tmpfile = crate::create_tempfile();
1275        let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1276
1277        let _ = std::panic::catch_unwind(|| {
1278            let db = Database::create(&tmpfile).unwrap();
1279            let txn = db.begin_write().unwrap();
1280            txn.open_table(table_definition).unwrap();
1281            panic!();
1282        });
1283
1284        let mut db = Database::open(tmpfile).unwrap();
1285        assert!(db.check_integrity().unwrap());
1286    }
1287}