redb/tree_store/page_store/
page_manager.rs

1use crate::transaction_tracker::TransactionId;
2use crate::transactions::{AllocatorStateKey, AllocatorStateTree};
3use crate::tree_store::btree_base::{BtreeHeader, Checksum};
4use crate::tree_store::page_store::base::{PageHint, MAX_PAGE_INDEX};
5use crate::tree_store::page_store::buddy_allocator::BuddyAllocator;
6use crate::tree_store::page_store::cached_file::PagedCachedFile;
7use crate::tree_store::page_store::header::{DatabaseHeader, DB_HEADER_SIZE, MAGICNUMBER};
8use crate::tree_store::page_store::layout::DatabaseLayout;
9use crate::tree_store::page_store::region::{Allocators, RegionTracker};
10use crate::tree_store::page_store::{hash128_with_seed, PageImpl, PageMut};
11use crate::tree_store::{Page, PageNumber};
12use crate::StorageBackend;
13use crate::{DatabaseError, Result, StorageError};
14#[cfg(feature = "logging")]
15use log::warn;
16use std::cmp::{max, min};
17#[cfg(debug_assertions)]
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::convert::TryInto;
21use std::sync::atomic::{AtomicBool, Ordering};
22#[cfg(debug_assertions)]
23use std::sync::Arc;
24use std::sync::Mutex;
25use std::thread;
26
27// Regions have a maximum size of 4GiB. A `4GiB - overhead` value is the largest that can be represented,
28// because the leaf node format uses 32bit offsets
29const MAX_USABLE_REGION_SPACE: u64 = 4 * 1024 * 1024 * 1024;
30// TODO: remove this constant?
31pub(crate) const MAX_MAX_PAGE_ORDER: u8 = 20;
32pub(super) const MIN_USABLE_PAGES: u32 = 10;
33const MIN_DESIRED_USABLE_BYTES: u64 = 1024 * 1024;
34
35pub(super) const INITIAL_REGIONS: u32 = 1000; // Enough for a 4TiB database
36
37// Original file format. No lengths stored with btrees
38pub(crate) const FILE_FORMAT_VERSION1: u8 = 1;
39// New file format. All btrees have a separate length stored in their header for constant time access
40pub(crate) const FILE_FORMAT_VERSION2: u8 = 2;
41
42fn ceil_log2(x: usize) -> u8 {
43    if x.is_power_of_two() {
44        x.trailing_zeros().try_into().unwrap()
45    } else {
46        x.next_power_of_two().trailing_zeros().try_into().unwrap()
47    }
48}
49
50pub(crate) fn xxh3_checksum(data: &[u8]) -> Checksum {
51    hash128_with_seed(data, 0)
52}
53
54struct InMemoryState {
55    header: DatabaseHeader,
56    allocators: Allocators,
57}
58
59impl InMemoryState {
60    fn from_bytes(header: DatabaseHeader, file: &PagedCachedFile) -> Result<Self> {
61        let allocators = if header.recovery_required {
62            Allocators::new(header.layout())
63        } else {
64            Allocators::from_bytes(&header, file)?
65        };
66        Ok(Self { header, allocators })
67    }
68
69    fn get_region(&self, region: u32) -> &BuddyAllocator {
70        &self.allocators.region_allocators[region as usize]
71    }
72
73    fn get_region_mut(&mut self, region: u32) -> &mut BuddyAllocator {
74        &mut self.allocators.region_allocators[region as usize]
75    }
76
77    fn get_region_tracker_mut(&mut self) -> &mut RegionTracker {
78        &mut self.allocators.region_tracker
79    }
80}
81
82pub(crate) struct TransactionalMemory {
83    // Pages allocated since the last commit
84    // TODO: maybe this should be moved to WriteTransaction?
85    allocated_since_commit: Mutex<HashSet<PageNumber>>,
86    // True if the allocator state was corrupted when the file was opened
87    // TODO: maybe we can remove this flag now that CheckedBackend exists?
88    needs_recovery: AtomicBool,
89    storage: PagedCachedFile,
90    state: Mutex<InMemoryState>,
91    // The number of PageMut which are outstanding
92    #[cfg(debug_assertions)]
93    open_dirty_pages: Arc<Mutex<HashSet<PageNumber>>>,
94    // Reference counts of PageImpls that are outstanding
95    #[cfg(debug_assertions)]
96    read_page_ref_counts: Arc<Mutex<HashMap<PageNumber, u64>>>,
97    // Indicates that a non-durable commit has been made, so reads should be served from the secondary meta page
98    read_from_secondary: AtomicBool,
99    page_size: u32,
100    // We store these separately from the layout because they're static, and accessed on the get_page()
101    // code path where there is no locking
102    region_size: u64,
103    region_header_with_padding_size: u64,
104}
105
106impl TransactionalMemory {
107    #[allow(clippy::too_many_arguments)]
108    pub(crate) fn new(
109        file: Box<dyn StorageBackend>,
110        page_size: usize,
111        requested_region_size: Option<u64>,
112        read_cache_size_bytes: usize,
113        write_cache_size_bytes: usize,
114    ) -> Result<Self, DatabaseError> {
115        assert!(page_size.is_power_of_two() && page_size >= DB_HEADER_SIZE);
116
117        let region_size = requested_region_size.unwrap_or(MAX_USABLE_REGION_SPACE);
118        let region_size = min(
119            region_size,
120            (u64::from(MAX_PAGE_INDEX) + 1) * page_size as u64,
121        );
122        assert!(region_size.is_power_of_two());
123
124        let storage = PagedCachedFile::new(
125            file,
126            page_size as u64,
127            read_cache_size_bytes,
128            write_cache_size_bytes,
129        )?;
130
131        let magic_number: [u8; MAGICNUMBER.len()] =
132            if storage.raw_file_len()? >= MAGICNUMBER.len() as u64 {
133                storage
134                    .read_direct(0, MAGICNUMBER.len())?
135                    .try_into()
136                    .unwrap()
137            } else {
138                [0; MAGICNUMBER.len()]
139            };
140
141        if magic_number != MAGICNUMBER {
142            let region_tracker_required_bytes =
143                RegionTracker::new(INITIAL_REGIONS, MAX_MAX_PAGE_ORDER + 1)
144                    .to_vec()
145                    .len();
146
147            // Make sure that there is enough room to allocate the region tracker into a page
148            let size: u64 = max(
149                MIN_DESIRED_USABLE_BYTES,
150                page_size as u64 * u64::from(MIN_USABLE_PAGES),
151            );
152            let tracker_space =
153                (page_size * ((region_tracker_required_bytes + page_size - 1) / page_size)) as u64;
154            let starting_size = size + tracker_space;
155
156            let layout = DatabaseLayout::calculate(
157                starting_size,
158                (region_size / u64::try_from(page_size).unwrap())
159                    .try_into()
160                    .unwrap(),
161                page_size.try_into().unwrap(),
162            );
163
164            {
165                let file_len = storage.raw_file_len()?;
166
167                if file_len < layout.len() {
168                    storage.resize(layout.len())?;
169                }
170            }
171
172            let mut allocators = Allocators::new(layout);
173
174            // Allocate the region tracker in the zeroth region
175            let tracker_page = {
176                let tracker_required_pages =
177                    (allocators.region_tracker.to_vec().len() + page_size - 1) / page_size;
178                let required_order = ceil_log2(tracker_required_pages);
179                let page_number = allocators.region_allocators[0]
180                    .alloc(required_order)
181                    .unwrap();
182                PageNumber::new(0, page_number, required_order)
183            };
184
185            let mut header = DatabaseHeader::new(
186                layout,
187                TransactionId::new(0),
188                FILE_FORMAT_VERSION2,
189                tracker_page,
190            );
191
192            header.recovery_required = false;
193            header.two_phase_commit = true;
194            storage
195                .write(0, DB_HEADER_SIZE, true)?
196                .mem_mut()
197                .copy_from_slice(&header.to_bytes(false));
198            allocators.flush_to(tracker_page, layout, &storage)?;
199
200            storage.flush(false)?;
201            // Write the magic number only after the data structure is initialized and written to disk
202            // to ensure that it's crash safe
203            storage
204                .write(0, DB_HEADER_SIZE, true)?
205                .mem_mut()
206                .copy_from_slice(&header.to_bytes(true));
207            storage.flush(false)?;
208        }
209        let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?;
210        let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
211
212        assert_eq!(header.page_size() as usize, page_size);
213        assert!(storage.raw_file_len()? >= header.layout().len());
214        let needs_recovery =
215            header.recovery_required || header.layout().len() != storage.raw_file_len()?;
216        if needs_recovery {
217            let layout = header.layout();
218            let region_max_pages = layout.full_region_layout().num_pages();
219            let region_header_pages = layout.full_region_layout().get_header_pages();
220            header.set_layout(DatabaseLayout::recalculate(
221                storage.raw_file_len()?,
222                region_header_pages,
223                region_max_pages,
224                page_size.try_into().unwrap(),
225            ));
226            header.pick_primary_for_repair(repair_info)?;
227            assert!(!repair_info.invalid_magic_number);
228            storage
229                .write(0, DB_HEADER_SIZE, true)?
230                .mem_mut()
231                .copy_from_slice(&header.to_bytes(true));
232            storage.flush(false)?;
233        }
234
235        let layout = header.layout();
236        assert_eq!(layout.len(), storage.raw_file_len()?);
237        let region_size = layout.full_region_layout().len();
238        let region_header_size = layout.full_region_layout().data_section().start;
239
240        let state = InMemoryState::from_bytes(header, &storage)?;
241
242        assert!(page_size >= DB_HEADER_SIZE);
243
244        Ok(Self {
245            allocated_since_commit: Mutex::new(HashSet::new()),
246            needs_recovery: AtomicBool::new(needs_recovery),
247            storage,
248            state: Mutex::new(state),
249            #[cfg(debug_assertions)]
250            open_dirty_pages: Arc::new(Mutex::new(HashSet::new())),
251            #[cfg(debug_assertions)]
252            read_page_ref_counts: Arc::new(Mutex::new(HashMap::new())),
253            read_from_secondary: AtomicBool::new(false),
254            page_size: page_size.try_into().unwrap(),
255            region_size,
256            region_header_with_padding_size: region_header_size,
257        })
258    }
259
260    pub(crate) fn check_io_errors(&self) -> Result {
261        self.storage.check_io_errors()
262    }
263
264    #[cfg(any(test, fuzzing))]
265    pub(crate) fn all_allocated_pages(&self) -> Vec<PageNumber> {
266        self.state.lock().unwrap().allocators.all_allocated()
267    }
268
269    #[cfg(any(test, fuzzing))]
270    pub(crate) fn tracker_page(&self) -> PageNumber {
271        self.state.lock().unwrap().header.region_tracker()
272    }
273
274    pub(crate) fn clear_read_cache(&self) {
275        self.storage.invalidate_cache_all();
276    }
277
278    pub(crate) fn clear_cache_and_reload(&mut self) -> Result<bool, DatabaseError> {
279        assert!(self.allocated_since_commit.lock().unwrap().is_empty());
280
281        self.storage.flush(false)?;
282        self.storage.invalidate_cache_all();
283
284        let header_bytes = self.storage.read_direct(0, DB_HEADER_SIZE)?;
285        let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
286        // TODO: This ends up always being true because this is called from check_integrity() once the db is already open
287        // TODO: Also we should recheck the layout
288        let mut was_clean = true;
289        if header.recovery_required {
290            if !header.pick_primary_for_repair(repair_info)? {
291                was_clean = false;
292            }
293            if repair_info.invalid_magic_number {
294                return Err(StorageError::Corrupted("Invalid magic number".to_string()).into());
295            }
296            self.storage
297                .write(0, DB_HEADER_SIZE, true)?
298                .mem_mut()
299                .copy_from_slice(&header.to_bytes(true));
300            self.storage.flush(false)?;
301        }
302
303        self.needs_recovery
304            .store(header.recovery_required, Ordering::Release);
305        self.state.lock().unwrap().header = header;
306
307        Ok(was_clean)
308    }
309
310    pub(crate) fn begin_writable(&self) -> Result {
311        let mut state = self.state.lock().unwrap();
312        assert!(!state.header.recovery_required);
313        state.header.recovery_required = true;
314        self.write_header(&state.header)?;
315        self.storage.flush(false)
316    }
317
318    pub(crate) fn needs_repair(&self) -> Result<bool> {
319        Ok(self.state.lock().unwrap().header.recovery_required)
320    }
321
322    pub(crate) fn used_two_phase_commit(&self) -> bool {
323        self.state.lock().unwrap().header.two_phase_commit
324    }
325
326    pub(crate) fn allocator_hash(&self) -> u128 {
327        self.state.lock().unwrap().allocators.xxh3_hash()
328    }
329
330    // TODO: need a clearer distinction between this and needs_repair()
331    pub(crate) fn storage_failure(&self) -> bool {
332        self.needs_recovery.load(Ordering::Acquire)
333    }
334
335    pub(crate) fn repair_primary_corrupted(&self) {
336        let mut state = self.state.lock().unwrap();
337        state.header.swap_primary_slot();
338    }
339
340    pub(crate) fn begin_repair(&self) -> Result<()> {
341        let mut state = self.state.lock().unwrap();
342        state.allocators = Allocators::new(state.header.layout());
343
344        Ok(())
345    }
346
347    pub(crate) fn mark_page_allocated(&self, page_number: PageNumber) {
348        let mut state = self.state.lock().unwrap();
349        let region_index = page_number.region;
350        let allocator = state.get_region_mut(region_index);
351        allocator.record_alloc(page_number.page_index, page_number.page_order);
352    }
353
354    fn write_header(&self, header: &DatabaseHeader) -> Result {
355        self.storage
356            .write(0, DB_HEADER_SIZE, true)?
357            .mem_mut()
358            .copy_from_slice(&header.to_bytes(true));
359
360        Ok(())
361    }
362
363    pub(crate) fn end_repair(&self) -> Result<()> {
364        self.allocate_region_tracker_page()?;
365
366        let mut state = self.state.lock().unwrap();
367        let tracker_page = state.header.region_tracker();
368        state
369            .allocators
370            .flush_to(tracker_page, state.header.layout(), &self.storage)?;
371
372        state.header.recovery_required = false;
373        self.write_header(&state.header)?;
374        let result = self.storage.flush(false);
375        self.needs_recovery.store(false, Ordering::Release);
376
377        result
378    }
379
380    pub(crate) fn reserve_allocator_state(
381        &self,
382        tree: &mut AllocatorStateTree,
383        transaction_id: TransactionId,
384    ) -> Result<u32> {
385        let state = self.state.lock().unwrap();
386        let layout = state.header.layout();
387        let num_regions = layout.num_regions();
388        let region_header_len = layout.full_region_layout().get_header_pages()
389            * layout.full_region_layout().page_size();
390        let region_tracker_len = state.allocators.region_tracker.to_vec().len();
391        drop(state);
392
393        for i in 0..num_regions {
394            tree.insert(
395                &AllocatorStateKey::Region(i),
396                &vec![0; region_header_len as usize].as_ref(),
397            )?;
398        }
399
400        tree.insert(
401            &AllocatorStateKey::RegionTracker,
402            &vec![0; region_tracker_len].as_ref(),
403        )?;
404
405        tree.insert(
406            &AllocatorStateKey::TransactionId,
407            &transaction_id.raw_id().to_le_bytes().as_ref(),
408        )?;
409
410        Ok(num_regions)
411    }
412
413    // Returns true on success, or false if the number of regions has changed
414    pub(crate) fn try_save_allocator_state(
415        &self,
416        tree: &mut AllocatorStateTree,
417        num_regions: u32,
418    ) -> Result<bool> {
419        // Has the number of regions changed since reserve_allocator_state() was called?
420        let state = self.state.lock().unwrap();
421        if num_regions != state.header.layout().num_regions() {
422            return Ok(false);
423        }
424
425        // Temporarily free the region tracker page, because we don't want to include it in our
426        // recorded allocations
427        let tracker_page = state.header.region_tracker();
428        drop(state);
429        self.free(tracker_page);
430
431        let result = self.try_save_allocator_state_inner(tree, num_regions);
432
433        // Restore the region tracker page
434        self.mark_page_allocated(tracker_page);
435
436        result
437    }
438
439    fn try_save_allocator_state_inner(
440        &self,
441        tree: &mut AllocatorStateTree,
442        num_regions: u32,
443    ) -> Result<bool> {
444        for i in 0..num_regions {
445            let region_bytes =
446                &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec();
447            tree.insert_inplace(&AllocatorStateKey::Region(i), &region_bytes.as_ref())?;
448        }
449
450        let region_tracker_bytes = self
451            .state
452            .lock()
453            .unwrap()
454            .allocators
455            .region_tracker
456            .to_vec();
457        tree.insert_inplace(
458            &AllocatorStateKey::RegionTracker,
459            &region_tracker_bytes.as_ref(),
460        )?;
461
462        Ok(true)
463    }
464
465    // Returns true if the allocator state table is up to date, or false if it's stale
466    pub(crate) fn is_valid_allocator_state(&self, tree: &AllocatorStateTree) -> Result<bool> {
467        // See if this is stale allocator state left over from a previous transaction. That won't
468        // happen during normal operation, since WriteTransaction::commit() always updates the
469        // allocator state table before calling TransactionalMemory::commit(), but there are also
470        // a few places where TransactionalMemory::commit() is called directly without using a
471        // WriteTransaction. When that happens, any existing allocator state table will be left
472        // in place but is no longer valid. (And even if there were no such calls today, it would
473        // be an easy mistake to make! So it's good that we check.)
474        let transaction_id = TransactionId::new(u64::from_le_bytes(
475            tree.get(&AllocatorStateKey::TransactionId)?
476                .unwrap()
477                .value()
478                .try_into()
479                .unwrap(),
480        ));
481
482        Ok(transaction_id == self.get_last_committed_transaction_id()?)
483    }
484
485    pub(crate) fn load_allocator_state(&self, tree: &AllocatorStateTree) -> Result {
486        assert!(self.is_valid_allocator_state(tree)?);
487
488        // Load the allocator state
489        let mut region_allocators = vec![];
490        for region in
491            tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))?
492        {
493            region_allocators.push(BuddyAllocator::from_bytes(region?.value()));
494        }
495
496        let region_tracker = RegionTracker::from_page(
497            tree.get(&AllocatorStateKey::RegionTracker)?
498                .unwrap()
499                .value(),
500        );
501
502        let mut state = self.state.lock().unwrap();
503        state.allocators = Allocators {
504            region_tracker,
505            region_allocators,
506        };
507
508        // Resize the allocators to match the current file size
509        let layout = state.header.layout();
510        state.allocators.resize_to(layout);
511        drop(state);
512
513        // Allocate a page for the region tracker
514        self.allocate_region_tracker_page()?;
515
516        self.state.lock().unwrap().header.recovery_required = false;
517        self.needs_recovery.store(false, Ordering::Release);
518
519        Ok(())
520    }
521
522    pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
523        let state = self.state.lock().unwrap();
524        let allocator = state.get_region(page.region);
525
526        allocator.is_allocated(page.page_index, page.page_order)
527    }
528
529    // Allocate a page for the region tracker. If possible, this will pick the same page that
530    // was used last time; otherwise it'll pick a new page and update the database header to
531    // match
532    fn allocate_region_tracker_page(&self) -> Result {
533        let mut state = self.state.lock().unwrap();
534        let tracker_len = state.allocators.region_tracker.to_vec().len();
535        let tracker_page = state.header.region_tracker();
536
537        let allocator = state.get_region_mut(tracker_page.region);
538        // Pick a new tracker page, if the old one was overwritten or is too small
539        if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
540            || tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
541        {
542            drop(state);
543
544            let new_tracker_page = self
545                .allocate_non_transactional(tracker_len, false)?
546                .get_page_number();
547
548            let mut state = self.state.lock().unwrap();
549            state.header.set_region_tracker(new_tracker_page);
550            self.write_header(&state.header)?;
551            self.storage.flush(false)?;
552        } else {
553            // The old page is available, so just mark it as allocated
554            allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
555            drop(state);
556        }
557
558        Ok(())
559    }
560
561    // Relocates the region tracker to a lower page, if possible
562    // Returns true if the page was moved
563    pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
564        let state = self.state.lock().unwrap();
565        let region_tracker_size = state
566            .header
567            .region_tracker()
568            .page_size_bytes(self.page_size);
569        let old_tracker_page = state.header.region_tracker();
570        // allocate acquires this lock, so we need to drop it
571        drop(state);
572        let new_page =
573            self.allocate_non_transactional(region_tracker_size.try_into().unwrap(), true)?;
574        if new_page.get_page_number().is_before(old_tracker_page) {
575            let mut state = self.state.lock().unwrap();
576            state.header.set_region_tracker(new_page.get_page_number());
577            drop(state);
578            self.free(old_tracker_page);
579            Ok(true)
580        } else {
581            let new_page_number = new_page.get_page_number();
582            drop(new_page);
583            self.free(new_page_number);
584            Ok(false)
585        }
586    }
587
588    pub(crate) fn get_raw_allocator_states(&self) -> Vec<Vec<u8>> {
589        let state = self.state.lock().unwrap();
590
591        let mut regional_allocators = vec![];
592        for i in 0..state.header.layout().num_regions() {
593            regional_allocators.push(state.get_region(i).make_state_for_savepoint());
594        }
595
596        regional_allocators
597    }
598
599    // Commit all outstanding changes and make them visible as the primary
600    #[allow(clippy::too_many_arguments)]
601    pub(crate) fn commit(
602        &self,
603        data_root: Option<BtreeHeader>,
604        system_root: Option<BtreeHeader>,
605        freed_root: Option<BtreeHeader>,
606        transaction_id: TransactionId,
607        eventual: bool,
608        two_phase: bool,
609    ) -> Result {
610        let result = self.commit_inner(
611            data_root,
612            system_root,
613            freed_root,
614            transaction_id,
615            eventual,
616            two_phase,
617        );
618        if result.is_err() {
619            self.needs_recovery.store(true, Ordering::Release);
620        }
621        result
622    }
623
624    #[allow(clippy::too_many_arguments)]
625    fn commit_inner(
626        &self,
627        data_root: Option<BtreeHeader>,
628        system_root: Option<BtreeHeader>,
629        freed_root: Option<BtreeHeader>,
630        transaction_id: TransactionId,
631        eventual: bool,
632        two_phase: bool,
633    ) -> Result {
634        // All mutable pages must be dropped, this ensures that when a transaction completes
635        // no more writes can happen to the pages it allocated. Thus it is safe to make them visible
636        // to future read transactions
637        #[cfg(debug_assertions)]
638        debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
639        assert!(!self.needs_recovery.load(Ordering::Acquire));
640
641        let mut state = self.state.lock().unwrap();
642        // Trim surplus file space, before finalizing the commit
643        let shrunk = Self::try_shrink(&mut state)?;
644        // Copy the header so that we can release the state lock, while we flush the file
645        let mut header = state.header.clone();
646        drop(state);
647
648        let old_transaction_id = header.secondary_slot().transaction_id;
649        let secondary = header.secondary_slot_mut();
650        secondary.transaction_id = transaction_id;
651        secondary.user_root = data_root;
652        secondary.system_root = system_root;
653        secondary.freed_root = freed_root;
654
655        self.write_header(&header)?;
656
657        // Use 2-phase commit, if checksums are disabled
658        if two_phase {
659            self.storage.flush(eventual)?;
660        }
661
662        // Make our new commit the primary, and record whether it was a 2-phase commit.
663        // These two bits need to be written atomically
664        header.swap_primary_slot();
665        header.two_phase_commit = two_phase;
666
667        // Write the new header to disk
668        self.write_header(&header)?;
669        self.storage.flush(eventual)?;
670
671        if shrunk {
672            let result = self.storage.resize(header.layout().len());
673            if result.is_err() {
674                // TODO: it would be nice to have a more cohesive approach to setting this.
675                // we do it in commit() & rollback() on failure, but there are probably other places that need it
676                self.needs_recovery.store(true, Ordering::Release);
677                return result;
678            }
679        }
680        self.allocated_since_commit.lock().unwrap().clear();
681
682        let mut state = self.state.lock().unwrap();
683        assert_eq!(
684            state.header.secondary_slot().transaction_id,
685            old_transaction_id
686        );
687        state.header = header;
688        self.read_from_secondary.store(false, Ordering::Release);
689        // Hold lock until read_from_secondary is set to false, so that the new primary state is read.
690        // TODO: maybe we can remove the whole read_from_secondary flag?
691        drop(state);
692
693        Ok(())
694    }
695
696    // Make changes visible, without a durability guarantee
697    pub(crate) fn non_durable_commit(
698        &self,
699        data_root: Option<BtreeHeader>,
700        system_root: Option<BtreeHeader>,
701        freed_root: Option<BtreeHeader>,
702        transaction_id: TransactionId,
703    ) -> Result {
704        // All mutable pages must be dropped, this ensures that when a transaction completes
705        // no more writes can happen to the pages it allocated. Thus it is safe to make them visible
706        // to future read transactions
707        #[cfg(debug_assertions)]
708        debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
709        assert!(!self.needs_recovery.load(Ordering::Acquire));
710
711        self.allocated_since_commit.lock().unwrap().clear();
712        self.storage.write_barrier()?;
713
714        let mut state = self.state.lock().unwrap();
715        let secondary = state.header.secondary_slot_mut();
716        secondary.transaction_id = transaction_id;
717        secondary.user_root = data_root;
718        secondary.system_root = system_root;
719        secondary.freed_root = freed_root;
720
721        // TODO: maybe we can remove this flag and just update the in-memory DatabaseHeader state?
722        self.read_from_secondary.store(true, Ordering::Release);
723
724        Ok(())
725    }
726
727    pub(crate) fn rollback_uncommitted_writes(&self) -> Result {
728        let result = self.rollback_uncommitted_writes_inner();
729        if result.is_err() {
730            self.needs_recovery.store(true, Ordering::Release);
731        }
732        result
733    }
734
735    fn rollback_uncommitted_writes_inner(&self) -> Result {
736        #[cfg(debug_assertions)]
737        {
738            let dirty_pages = self.open_dirty_pages.lock().unwrap();
739            debug_assert!(
740                dirty_pages.is_empty(),
741                "Dirty pages outstanding: {dirty_pages:?}"
742            );
743        }
744        assert!(!self.needs_recovery.load(Ordering::Acquire));
745        let mut state = self.state.lock().unwrap();
746        let mut guard = self.allocated_since_commit.lock().unwrap();
747        for page_number in guard.iter() {
748            let region_index = page_number.region;
749            state
750                .get_region_tracker_mut()
751                .mark_free(page_number.page_order, region_index);
752            state
753                .get_region_mut(region_index)
754                .free(page_number.page_index, page_number.page_order);
755
756            let address = page_number.address_range(
757                self.page_size.into(),
758                self.region_size,
759                self.region_header_with_padding_size,
760                self.page_size,
761            );
762            let len: usize = (address.end - address.start).try_into().unwrap();
763            self.storage.invalidate_cache(address.start, len);
764            self.storage.cancel_pending_write(address.start, len);
765        }
766        guard.clear();
767
768        Ok(())
769    }
770
771    // TODO: make all callers explicitly provide a hint
772    pub(crate) fn get_page(&self, page_number: PageNumber) -> Result<PageImpl> {
773        self.get_page_extended(page_number, PageHint::None)
774    }
775
776    pub(crate) fn get_page_extended(
777        &self,
778        page_number: PageNumber,
779        hint: PageHint,
780    ) -> Result<PageImpl> {
781        let range = page_number.address_range(
782            self.page_size.into(),
783            self.region_size,
784            self.region_header_with_padding_size,
785            self.page_size,
786        );
787        let len: usize = (range.end - range.start).try_into().unwrap();
788        let mem = self.storage.read(range.start, len, hint)?;
789
790        // We must not retrieve an immutable reference to a page which already has a mutable ref to it
791        #[cfg(debug_assertions)]
792        {
793            let dirty_pages = self.open_dirty_pages.lock().unwrap();
794            debug_assert!(!dirty_pages.contains(&page_number), "{page_number:?}");
795            *(self
796                .read_page_ref_counts
797                .lock()
798                .unwrap()
799                .entry(page_number)
800                .or_default()) += 1;
801            drop(dirty_pages);
802        }
803
804        Ok(PageImpl {
805            mem,
806            page_number,
807            #[cfg(debug_assertions)]
808            open_pages: self.read_page_ref_counts.clone(),
809        })
810    }
811
812    // NOTE: the caller must ensure that the read cache has been invalidated or stale reads my occur
813    pub(crate) fn get_page_mut(&self, page_number: PageNumber) -> Result<PageMut> {
814        #[cfg(debug_assertions)]
815        {
816            assert!(!self
817                .read_page_ref_counts
818                .lock()
819                .unwrap()
820                .contains_key(&page_number));
821            assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
822        }
823
824        let address_range = page_number.address_range(
825            self.page_size.into(),
826            self.region_size,
827            self.region_header_with_padding_size,
828            self.page_size,
829        );
830        let len: usize = (address_range.end - address_range.start)
831            .try_into()
832            .unwrap();
833        let mem = self.storage.write(address_range.start, len, false)?;
834
835        #[cfg(debug_assertions)]
836        {
837            assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
838        }
839
840        Ok(PageMut {
841            mem,
842            page_number,
843            #[cfg(debug_assertions)]
844            open_pages: self.open_dirty_pages.clone(),
845        })
846    }
847
848    pub(crate) fn get_version(&self) -> u8 {
849        let state = self.state.lock().unwrap();
850        if self.read_from_secondary.load(Ordering::Acquire) {
851            state.header.secondary_slot().version
852        } else {
853            state.header.primary_slot().version
854        }
855    }
856
857    pub(crate) fn get_data_root(&self) -> Option<BtreeHeader> {
858        let state = self.state.lock().unwrap();
859        if self.read_from_secondary.load(Ordering::Acquire) {
860            state.header.secondary_slot().user_root
861        } else {
862            state.header.primary_slot().user_root
863        }
864    }
865
866    pub(crate) fn get_system_root(&self) -> Option<BtreeHeader> {
867        let state = self.state.lock().unwrap();
868        if self.read_from_secondary.load(Ordering::Acquire) {
869            state.header.secondary_slot().system_root
870        } else {
871            state.header.primary_slot().system_root
872        }
873    }
874
875    pub(crate) fn get_freed_root(&self) -> Option<BtreeHeader> {
876        let state = self.state.lock().unwrap();
877        if self.read_from_secondary.load(Ordering::Acquire) {
878            state.header.secondary_slot().freed_root
879        } else {
880            state.header.primary_slot().freed_root
881        }
882    }
883
884    pub(crate) fn get_layout(&self) -> DatabaseLayout {
885        self.state.lock().unwrap().header.layout()
886    }
887
888    pub(crate) fn get_last_committed_transaction_id(&self) -> Result<TransactionId> {
889        let state = self.state.lock().unwrap();
890        if self.read_from_secondary.load(Ordering::Acquire) {
891            Ok(state.header.secondary_slot().transaction_id)
892        } else {
893            Ok(state.header.primary_slot().transaction_id)
894        }
895    }
896
897    pub(crate) fn free(&self, page: PageNumber) {
898        self.allocated_since_commit.lock().unwrap().remove(&page);
899        self.free_helper(page);
900    }
901
902    fn free_helper(&self, page: PageNumber) {
903        let mut state = self.state.lock().unwrap();
904        let region_index = page.region;
905        // Free in the regional allocator
906        state
907            .get_region_mut(region_index)
908            .free(page.page_index, page.page_order);
909        // Ensure that the region is marked as having free space
910        state
911            .get_region_tracker_mut()
912            .mark_free(page.page_order, region_index);
913
914        let address_range = page.address_range(
915            self.page_size.into(),
916            self.region_size,
917            self.region_header_with_padding_size,
918            self.page_size,
919        );
920        let len: usize = (address_range.end - address_range.start)
921            .try_into()
922            .unwrap();
923        self.storage.invalidate_cache(address_range.start, len);
924        self.storage.cancel_pending_write(address_range.start, len);
925    }
926
927    // Frees the page if it was allocated since the last commit. Returns true, if the page was freed
928    pub(crate) fn free_if_uncommitted(&self, page: PageNumber) -> bool {
929        if self.allocated_since_commit.lock().unwrap().remove(&page) {
930            self.free_helper(page);
931            true
932        } else {
933            false
934        }
935    }
936
937    // Page has not been committed
938    pub(crate) fn uncommitted(&self, page: PageNumber) -> bool {
939        self.allocated_since_commit.lock().unwrap().contains(&page)
940    }
941
942    pub(crate) fn allocate_helper(
943        &self,
944        allocation_size: usize,
945        lowest: bool,
946        transactional: bool,
947    ) -> Result<PageMut> {
948        let required_pages = (allocation_size + self.get_page_size() - 1) / self.get_page_size();
949        let required_order = ceil_log2(required_pages);
950
951        let mut state = self.state.lock().unwrap();
952
953        let page_number = if let Some(page_number) =
954            Self::allocate_helper_retry(&mut state, required_order, lowest)?
955        {
956            page_number
957        } else {
958            self.grow(&mut state, required_order)?;
959            Self::allocate_helper_retry(&mut state, required_order, lowest)?.unwrap()
960        };
961
962        #[cfg(debug_assertions)]
963        {
964            assert!(
965                !self
966                    .read_page_ref_counts
967                    .lock()
968                    .unwrap()
969                    .contains_key(&page_number),
970                "Allocated a page that is still referenced! {page_number:?}"
971            );
972            assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
973        }
974
975        if transactional {
976            self.allocated_since_commit
977                .lock()
978                .unwrap()
979                .insert(page_number);
980        }
981
982        let address_range = page_number.address_range(
983            self.page_size.into(),
984            self.region_size,
985            self.region_header_with_padding_size,
986            self.page_size,
987        );
988        let len: usize = (address_range.end - address_range.start)
989            .try_into()
990            .unwrap();
991
992        #[allow(unused_mut)]
993        let mut mem = self.storage.write(address_range.start, len, true)?;
994        debug_assert!(mem.mem().len() >= allocation_size);
995
996        #[cfg(debug_assertions)]
997        {
998            assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
999
1000            // Poison the memory in debug mode to help detect uninitialized reads
1001            mem.mem_mut().fill(0xFF);
1002        }
1003
1004        Ok(PageMut {
1005            mem,
1006            page_number,
1007            #[cfg(debug_assertions)]
1008            open_pages: self.open_dirty_pages.clone(),
1009        })
1010    }
1011
1012    fn allocate_helper_retry(
1013        state: &mut InMemoryState,
1014        required_order: u8,
1015        lowest: bool,
1016    ) -> Result<Option<PageNumber>> {
1017        loop {
1018            let Some(candidate_region) = state.get_region_tracker_mut().find_free(required_order)
1019            else {
1020                return Ok(None);
1021            };
1022            let region = state.get_region_mut(candidate_region);
1023            let r = if lowest {
1024                region.alloc_lowest(required_order)
1025            } else {
1026                region.alloc(required_order)
1027            };
1028            if let Some(page) = r {
1029                return Ok(Some(PageNumber::new(
1030                    candidate_region,
1031                    page,
1032                    required_order,
1033                )));
1034            }
1035            // Mark the region, if it's full
1036            state
1037                .get_region_tracker_mut()
1038                .mark_full(required_order, candidate_region);
1039        }
1040    }
1041
1042    fn try_shrink(state: &mut InMemoryState) -> Result<bool> {
1043        let layout = state.header.layout();
1044        let last_region_index = layout.num_regions() - 1;
1045        let last_allocator = state.get_region(last_region_index);
1046        let trailing_free = last_allocator.trailing_free_pages();
1047        let last_allocator_len = last_allocator.len();
1048        if trailing_free < last_allocator_len / 2 {
1049            return Ok(false);
1050        }
1051        let reduce_by = if layout.num_regions() > 1 && trailing_free == last_allocator_len {
1052            trailing_free
1053        } else {
1054            trailing_free / 2
1055        };
1056
1057        let mut new_layout = layout;
1058        new_layout.reduce_last_region(reduce_by);
1059        state.allocators.resize_to(new_layout);
1060        assert!(new_layout.len() <= layout.len());
1061        state.header.set_layout(new_layout);
1062
1063        Ok(true)
1064    }
1065
1066    fn grow(&self, state: &mut InMemoryState, required_order_allocation: u8) -> Result<()> {
1067        let layout = state.header.layout();
1068        let required_growth =
1069            2u64.pow(required_order_allocation.into()) * u64::from(state.header.page_size());
1070        let max_region_size = u64::from(state.header.layout().full_region_layout().num_pages())
1071            * u64::from(state.header.page_size());
1072        let next_desired_size = if layout.num_full_regions() > 0 {
1073            if let Some(trailing) = layout.trailing_region_layout() {
1074                if 2 * required_growth < max_region_size - trailing.usable_bytes() {
1075                    // Fill out the trailing region
1076                    layout.usable_bytes() + (max_region_size - trailing.usable_bytes())
1077                } else {
1078                    // Fill out trailing & Grow by 1 region
1079                    layout.usable_bytes() + 2 * max_region_size - trailing.usable_bytes()
1080                }
1081            } else {
1082                // Grow by 1 region
1083                layout.usable_bytes() + max_region_size
1084            }
1085        } else {
1086            max(
1087                layout.usable_bytes() * 2,
1088                layout.usable_bytes() + required_growth * 2,
1089            )
1090        };
1091        let new_layout = DatabaseLayout::calculate(
1092            next_desired_size,
1093            state.header.layout().full_region_layout().num_pages(),
1094            self.page_size,
1095        );
1096        assert!(new_layout.len() >= layout.len());
1097
1098        let result = self.storage.resize(new_layout.len());
1099        if result.is_err() {
1100            // TODO: it would be nice to have a more cohesive approach to setting this.
1101            // we do it in commit() & rollback() on failure, but there are probably other places that need it
1102            self.needs_recovery.store(true, Ordering::Release);
1103            return result;
1104        }
1105
1106        state.allocators.resize_to(new_layout);
1107        state.header.set_layout(new_layout);
1108        Ok(())
1109    }
1110
1111    pub(crate) fn allocate(&self, allocation_size: usize) -> Result<PageMut> {
1112        self.allocate_helper(allocation_size, false, true)
1113    }
1114
1115    pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result<PageMut> {
1116        self.allocate_helper(allocation_size, true, true)
1117    }
1118
1119    // Allocate a page not associated with any transaction. The page is immediately considered committed,
1120    // and won't be rolled back if an abort happens. This is only used for the region tracker
1121    fn allocate_non_transactional(&self, allocation_size: usize, lowest: bool) -> Result<PageMut> {
1122        self.allocate_helper(allocation_size, lowest, false)
1123    }
1124
1125    pub(crate) fn count_allocated_pages(&self) -> Result<u64> {
1126        let state = self.state.lock().unwrap();
1127        let mut count = 0u64;
1128        for i in 0..state.header.layout().num_regions() {
1129            count += u64::from(state.get_region(i).count_allocated_pages());
1130        }
1131
1132        Ok(count)
1133    }
1134
1135    pub(crate) fn get_page_size(&self) -> usize {
1136        self.page_size.try_into().unwrap()
1137    }
1138}
1139
1140impl Drop for TransactionalMemory {
1141    fn drop(&mut self) {
1142        if thread::panicking() || self.needs_recovery.load(Ordering::Acquire) {
1143            return;
1144        }
1145
1146        // Reallocate the region tracker page, which will grow it if necessary
1147        let tracker_page = self.state.lock().unwrap().header.region_tracker();
1148        self.free(tracker_page);
1149        if self.allocate_region_tracker_page().is_err() {
1150            #[cfg(feature = "logging")]
1151            warn!("Failure while flushing allocator state. Repair required at restart.");
1152            return;
1153        }
1154
1155        let mut state = self.state.lock().unwrap();
1156        if state
1157            .allocators
1158            .flush_to(
1159                state.header.region_tracker(),
1160                state.header.layout(),
1161                &self.storage,
1162            )
1163            .is_err()
1164        {
1165            #[cfg(feature = "logging")]
1166            warn!("Failure while flushing allocator state. Repair required at restart.");
1167            return;
1168        }
1169
1170        if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) {
1171            state.header.recovery_required = false;
1172            let _ = self.write_header(&state.header);
1173            let _ = self.storage.flush(false);
1174        }
1175    }
1176}
1177
1178#[cfg(test)]
1179mod test {
1180    use crate::tree_store::page_store::page_manager::INITIAL_REGIONS;
1181    use crate::{Database, TableDefinition};
1182
1183    // Test that the region tracker expansion code works, by adding more data than fits into the initial max regions
1184    #[test]
1185    fn out_of_regions() {
1186        let tmpfile = crate::create_tempfile();
1187        let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1188        let page_size = 1024;
1189        let big_value = vec![0u8; 5 * page_size];
1190
1191        let db = Database::builder()
1192            .set_region_size((8 * page_size).try_into().unwrap())
1193            .set_page_size(page_size)
1194            .create(tmpfile.path())
1195            .unwrap();
1196
1197        let txn = db.begin_write().unwrap();
1198        {
1199            let mut table = txn.open_table(table_definition).unwrap();
1200            for i in 0..=INITIAL_REGIONS {
1201                table.insert(&i, big_value.as_slice()).unwrap();
1202            }
1203        }
1204        txn.commit().unwrap();
1205        drop(db);
1206
1207        let mut db = Database::builder()
1208            .set_region_size((8 * page_size).try_into().unwrap())
1209            .set_page_size(page_size)
1210            .open(tmpfile.path())
1211            .unwrap();
1212        assert!(db.check_integrity().unwrap());
1213    }
1214
1215    // Make sure the database remains consistent after a panic
1216    #[test]
1217    #[cfg(panic = "unwind")]
1218    fn panic() {
1219        let tmpfile = crate::create_tempfile();
1220        let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1221
1222        let _ = std::panic::catch_unwind(|| {
1223            let db = Database::create(&tmpfile).unwrap();
1224            let txn = db.begin_write().unwrap();
1225            txn.open_table(table_definition).unwrap();
1226            panic!();
1227        });
1228
1229        let mut db = Database::open(tmpfile).unwrap();
1230        assert!(db.check_integrity().unwrap());
1231    }
1232}