1use crate::transaction_tracker::TransactionId;
2use crate::transactions::{AllocatorStateKey, AllocatorStateTree};
3use crate::tree_store::btree_base::{BtreeHeader, Checksum};
4use crate::tree_store::page_store::base::{MAX_PAGE_INDEX, PageHint};
5use crate::tree_store::page_store::buddy_allocator::BuddyAllocator;
6use crate::tree_store::page_store::cached_file::PagedCachedFile;
7use crate::tree_store::page_store::header::{DB_HEADER_SIZE, DatabaseHeader, MAGICNUMBER};
8use crate::tree_store::page_store::layout::DatabaseLayout;
9use crate::tree_store::page_store::region::{Allocators, RegionTracker};
10use crate::tree_store::page_store::{PageImpl, PageMut, hash128_with_seed};
11use crate::tree_store::{Page, PageNumber};
12use crate::{CacheStats, StorageBackend};
13use crate::{DatabaseError, Result, StorageError};
14#[cfg(feature = "logging")]
15use log::warn;
16use std::cmp::{max, min};
17#[cfg(debug_assertions)]
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::convert::TryInto;
21use std::io::ErrorKind;
22#[cfg(debug_assertions)]
23use std::sync::Arc;
24use std::sync::Mutex;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::thread;
27
28const MAX_USABLE_REGION_SPACE: u64 = 4 * 1024 * 1024 * 1024;
31pub(crate) const MAX_MAX_PAGE_ORDER: u8 = 20;
33pub(super) const MIN_USABLE_PAGES: u32 = 10;
34const MIN_DESIRED_USABLE_BYTES: u64 = 1024 * 1024;
35
36pub(super) const INITIAL_REGIONS: u32 = 1000; pub(crate) const FILE_FORMAT_VERSION1: u8 = 1;
40pub(crate) const FILE_FORMAT_VERSION2: u8 = 2;
42
43fn ceil_log2(x: usize) -> u8 {
44 if x.is_power_of_two() {
45 x.trailing_zeros().try_into().unwrap()
46 } else {
47 x.next_power_of_two().trailing_zeros().try_into().unwrap()
48 }
49}
50
51pub(crate) fn xxh3_checksum(data: &[u8]) -> Checksum {
52 hash128_with_seed(data, 0)
53}
54
55struct InMemoryState {
56 header: DatabaseHeader,
57 allocators: Allocators,
58}
59
60impl InMemoryState {
61 fn from_bytes(header: DatabaseHeader, file: &PagedCachedFile) -> Result<Self> {
62 let allocators = if header.recovery_required {
63 Allocators::new(header.layout())
64 } else {
65 Allocators::from_bytes(&header, file)?
66 };
67 Ok(Self { header, allocators })
68 }
69
70 fn get_region(&self, region: u32) -> &BuddyAllocator {
71 &self.allocators.region_allocators[region as usize]
72 }
73
74 fn get_region_mut(&mut self, region: u32) -> &mut BuddyAllocator {
75 &mut self.allocators.region_allocators[region as usize]
76 }
77
78 fn get_region_tracker_mut(&mut self) -> &mut RegionTracker {
79 &mut self.allocators.region_tracker
80 }
81}
82
83pub(crate) struct TransactionalMemory {
84 allocated_since_commit: Mutex<HashSet<PageNumber>>,
87 needs_recovery: AtomicBool,
90 storage: PagedCachedFile,
91 state: Mutex<InMemoryState>,
92 #[cfg(debug_assertions)]
94 open_dirty_pages: Arc<Mutex<HashSet<PageNumber>>>,
95 #[cfg(debug_assertions)]
97 read_page_ref_counts: Arc<Mutex<HashMap<PageNumber, u64>>>,
98 read_from_secondary: AtomicBool,
100 page_size: u32,
101 region_size: u64,
104 region_header_with_padding_size: u64,
105}
106
107impl TransactionalMemory {
108 #[allow(clippy::too_many_arguments)]
109 pub(crate) fn new(
110 file: Box<dyn StorageBackend>,
111 allow_initialize: bool,
113 page_size: usize,
114 requested_region_size: Option<u64>,
115 read_cache_size_bytes: usize,
116 write_cache_size_bytes: usize,
117 ) -> Result<Self, DatabaseError> {
118 assert!(page_size.is_power_of_two() && page_size >= DB_HEADER_SIZE);
119
120 let region_size = requested_region_size.unwrap_or(MAX_USABLE_REGION_SPACE);
121 let region_size = min(
122 region_size,
123 (u64::from(MAX_PAGE_INDEX) + 1) * page_size as u64,
124 );
125 assert!(region_size.is_power_of_two());
126
127 let storage = PagedCachedFile::new(
128 file,
129 page_size as u64,
130 read_cache_size_bytes,
131 write_cache_size_bytes,
132 )?;
133
134 let initial_storage_len = storage.raw_file_len()?;
135
136 let magic_number: [u8; MAGICNUMBER.len()] =
137 if initial_storage_len >= MAGICNUMBER.len() as u64 {
138 storage
139 .read_direct(0, MAGICNUMBER.len())?
140 .try_into()
141 .unwrap()
142 } else {
143 [0; MAGICNUMBER.len()]
144 };
145
146 if initial_storage_len > 0 {
147 if magic_number != MAGICNUMBER {
149 return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
150 }
151 } else {
152 if !allow_initialize {
154 return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
155 }
156 }
157
158 if magic_number != MAGICNUMBER {
159 let region_tracker_required_bytes =
160 RegionTracker::new(INITIAL_REGIONS, MAX_MAX_PAGE_ORDER + 1)
161 .to_vec()
162 .len();
163
164 let size: u64 = max(
166 MIN_DESIRED_USABLE_BYTES,
167 page_size as u64 * u64::from(MIN_USABLE_PAGES),
168 );
169 let tracker_space =
170 (page_size * region_tracker_required_bytes.div_ceil(page_size)) as u64;
171 let starting_size = size + tracker_space;
172
173 let layout = DatabaseLayout::calculate(
174 starting_size,
175 (region_size / u64::try_from(page_size).unwrap())
176 .try_into()
177 .unwrap(),
178 page_size.try_into().unwrap(),
179 );
180
181 {
182 let file_len = storage.raw_file_len()?;
183
184 if file_len < layout.len() {
185 storage.resize(layout.len())?;
186 }
187 }
188
189 let mut allocators = Allocators::new(layout);
190
191 let tracker_page = {
193 let tracker_required_pages =
194 allocators.region_tracker.to_vec().len().div_ceil(page_size);
195 let required_order = ceil_log2(tracker_required_pages);
196 let page_number = allocators.region_allocators[0]
197 .alloc(required_order)
198 .unwrap();
199 PageNumber::new(0, page_number, required_order)
200 };
201
202 let mut header = DatabaseHeader::new(
203 layout,
204 TransactionId::new(0),
205 FILE_FORMAT_VERSION2,
206 tracker_page,
207 );
208
209 header.recovery_required = false;
210 header.two_phase_commit = true;
211 storage
212 .write(0, DB_HEADER_SIZE, true)?
213 .mem_mut()
214 .copy_from_slice(&header.to_bytes(false));
215 allocators.flush_to(tracker_page, layout, &storage)?;
216
217 storage.flush(false)?;
218 storage
221 .write(0, DB_HEADER_SIZE, true)?
222 .mem_mut()
223 .copy_from_slice(&header.to_bytes(true));
224 storage.flush(false)?;
225 }
226 let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?;
227 let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
228
229 assert_eq!(header.page_size() as usize, page_size);
230 assert!(storage.raw_file_len()? >= header.layout().len());
231 let needs_recovery =
232 header.recovery_required || header.layout().len() != storage.raw_file_len()?;
233 if needs_recovery {
234 let layout = header.layout();
235 let region_max_pages = layout.full_region_layout().num_pages();
236 let region_header_pages = layout.full_region_layout().get_header_pages();
237 header.set_layout(DatabaseLayout::recalculate(
238 storage.raw_file_len()?,
239 region_header_pages,
240 region_max_pages,
241 page_size.try_into().unwrap(),
242 ));
243 header.pick_primary_for_repair(repair_info)?;
244 assert!(!repair_info.invalid_magic_number);
245 storage
246 .write(0, DB_HEADER_SIZE, true)?
247 .mem_mut()
248 .copy_from_slice(&header.to_bytes(true));
249 storage.flush(false)?;
250 }
251
252 let layout = header.layout();
253 assert_eq!(layout.len(), storage.raw_file_len()?);
254 let region_size = layout.full_region_layout().len();
255 let region_header_size = layout.full_region_layout().data_section().start;
256
257 let state = InMemoryState::from_bytes(header, &storage)?;
258
259 assert!(page_size >= DB_HEADER_SIZE);
260
261 Ok(Self {
262 allocated_since_commit: Mutex::new(HashSet::new()),
263 needs_recovery: AtomicBool::new(needs_recovery),
264 storage,
265 state: Mutex::new(state),
266 #[cfg(debug_assertions)]
267 open_dirty_pages: Arc::new(Mutex::new(HashSet::new())),
268 #[cfg(debug_assertions)]
269 read_page_ref_counts: Arc::new(Mutex::new(HashMap::new())),
270 read_from_secondary: AtomicBool::new(false),
271 page_size: page_size.try_into().unwrap(),
272 region_size,
273 region_header_with_padding_size: region_header_size,
274 })
275 }
276
277 pub(crate) fn cache_stats(&self) -> CacheStats {
278 self.storage.cache_stats()
279 }
280
281 pub(crate) fn check_io_errors(&self) -> Result {
282 self.storage.check_io_errors()
283 }
284
285 #[cfg(any(test, fuzzing))]
286 pub(crate) fn all_allocated_pages(&self) -> Vec<PageNumber> {
287 self.state.lock().unwrap().allocators.all_allocated()
288 }
289
290 #[cfg(any(test, fuzzing))]
291 pub(crate) fn tracker_page(&self) -> PageNumber {
292 self.state.lock().unwrap().header.region_tracker()
293 }
294
295 pub(crate) fn clear_read_cache(&self) {
296 self.storage.invalidate_cache_all();
297 }
298
299 pub(crate) fn clear_cache_and_reload(&mut self) -> Result<bool, DatabaseError> {
300 assert!(self.allocated_since_commit.lock().unwrap().is_empty());
301
302 self.storage.flush(false)?;
303 self.storage.invalidate_cache_all();
304
305 let header_bytes = self.storage.read_direct(0, DB_HEADER_SIZE)?;
306 let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
307 let mut was_clean = true;
310 if header.recovery_required {
311 if !header.pick_primary_for_repair(repair_info)? {
312 was_clean = false;
313 }
314 if repair_info.invalid_magic_number {
315 return Err(StorageError::Corrupted("Invalid magic number".to_string()).into());
316 }
317 self.storage
318 .write(0, DB_HEADER_SIZE, true)?
319 .mem_mut()
320 .copy_from_slice(&header.to_bytes(true));
321 self.storage.flush(false)?;
322 }
323
324 self.needs_recovery
325 .store(header.recovery_required, Ordering::Release);
326 self.state.lock().unwrap().header = header;
327
328 Ok(was_clean)
329 }
330
331 pub(crate) fn begin_writable(&self) -> Result {
332 let mut state = self.state.lock().unwrap();
333 assert!(!state.header.recovery_required);
334 state.header.recovery_required = true;
335 self.write_header(&state.header)?;
336 self.storage.flush(false)
337 }
338
339 pub(crate) fn needs_repair(&self) -> Result<bool> {
340 Ok(self.state.lock().unwrap().header.recovery_required)
341 }
342
343 pub(crate) fn used_two_phase_commit(&self) -> bool {
344 self.state.lock().unwrap().header.two_phase_commit
345 }
346
347 pub(crate) fn allocator_hash(&self) -> u128 {
348 self.state.lock().unwrap().allocators.xxh3_hash()
349 }
350
351 pub(crate) fn storage_failure(&self) -> bool {
353 self.needs_recovery.load(Ordering::Acquire)
354 }
355
356 pub(crate) fn repair_primary_corrupted(&self) {
357 let mut state = self.state.lock().unwrap();
358 state.header.swap_primary_slot();
359 }
360
361 pub(crate) fn begin_repair(&self) -> Result<()> {
362 let mut state = self.state.lock().unwrap();
363 state.allocators = Allocators::new(state.header.layout());
364
365 Ok(())
366 }
367
368 pub(crate) fn mark_page_allocated(&self, page_number: PageNumber) {
369 let mut state = self.state.lock().unwrap();
370 let region_index = page_number.region;
371 let allocator = state.get_region_mut(region_index);
372 allocator.record_alloc(page_number.page_index, page_number.page_order);
373 }
374
375 fn write_header(&self, header: &DatabaseHeader) -> Result {
376 self.storage
377 .write(0, DB_HEADER_SIZE, true)?
378 .mem_mut()
379 .copy_from_slice(&header.to_bytes(true));
380
381 Ok(())
382 }
383
384 pub(crate) fn end_repair(&self) -> Result<()> {
385 self.allocate_region_tracker_page()?;
386
387 let mut state = self.state.lock().unwrap();
388 let tracker_page = state.header.region_tracker();
389 state
390 .allocators
391 .flush_to(tracker_page, state.header.layout(), &self.storage)?;
392
393 state.header.recovery_required = false;
394 self.write_header(&state.header)?;
395 let result = self.storage.flush(false);
396 self.needs_recovery.store(false, Ordering::Release);
397
398 result
399 }
400
401 pub(crate) fn reserve_allocator_state(
402 &self,
403 tree: &mut AllocatorStateTree,
404 transaction_id: TransactionId,
405 ) -> Result<u32> {
406 let state = self.state.lock().unwrap();
407 let layout = state.header.layout();
408 let num_regions = layout.num_regions();
409 let region_header_len = layout.full_region_layout().get_header_pages()
410 * layout.full_region_layout().page_size();
411 let region_tracker_len = state.allocators.region_tracker.to_vec().len();
412 drop(state);
413
414 for i in 0..num_regions {
415 tree.insert(
416 &AllocatorStateKey::Region(i),
417 &vec![0; region_header_len as usize].as_ref(),
418 )?;
419 }
420
421 tree.insert(
422 &AllocatorStateKey::RegionTracker,
423 &vec![0; region_tracker_len].as_ref(),
424 )?;
425
426 tree.insert(
427 &AllocatorStateKey::TransactionId,
428 &transaction_id.raw_id().to_le_bytes().as_ref(),
429 )?;
430
431 Ok(num_regions)
432 }
433
434 pub(crate) fn try_save_allocator_state(
436 &self,
437 tree: &mut AllocatorStateTree,
438 num_regions: u32,
439 ) -> Result<bool> {
440 let state = self.state.lock().unwrap();
442 if num_regions != state.header.layout().num_regions() {
443 return Ok(false);
444 }
445
446 let tracker_page = state.header.region_tracker();
449 drop(state);
450 self.free(tracker_page);
451
452 let result = self.try_save_allocator_state_inner(tree, num_regions);
453
454 self.mark_page_allocated(tracker_page);
456
457 result
458 }
459
460 fn try_save_allocator_state_inner(
461 &self,
462 tree: &mut AllocatorStateTree,
463 num_regions: u32,
464 ) -> Result<bool> {
465 for i in 0..num_regions {
466 let region_bytes =
467 &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec();
468 tree.insert_inplace(&AllocatorStateKey::Region(i), ®ion_bytes.as_ref())?;
469 }
470
471 let region_tracker_bytes = self
472 .state
473 .lock()
474 .unwrap()
475 .allocators
476 .region_tracker
477 .to_vec();
478 tree.insert_inplace(
479 &AllocatorStateKey::RegionTracker,
480 ®ion_tracker_bytes.as_ref(),
481 )?;
482
483 Ok(true)
484 }
485
486 pub(crate) fn is_valid_allocator_state(&self, tree: &AllocatorStateTree) -> Result<bool> {
488 let transaction_id = TransactionId::new(u64::from_le_bytes(
496 tree.get(&AllocatorStateKey::TransactionId)?
497 .unwrap()
498 .value()
499 .try_into()
500 .unwrap(),
501 ));
502
503 Ok(transaction_id == self.get_last_committed_transaction_id()?)
504 }
505
506 pub(crate) fn load_allocator_state(&self, tree: &AllocatorStateTree) -> Result {
507 assert!(self.is_valid_allocator_state(tree)?);
508
509 let mut region_allocators = vec![];
511 for region in
512 tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))?
513 {
514 region_allocators.push(BuddyAllocator::from_bytes(region?.value()));
515 }
516
517 let region_tracker = RegionTracker::from_page(
518 tree.get(&AllocatorStateKey::RegionTracker)?
519 .unwrap()
520 .value(),
521 );
522
523 let mut state = self.state.lock().unwrap();
524 state.allocators = Allocators {
525 region_tracker,
526 region_allocators,
527 };
528
529 let layout = state.header.layout();
531 state.allocators.resize_to(layout);
532 drop(state);
533
534 self.allocate_region_tracker_page()?;
536
537 self.state.lock().unwrap().header.recovery_required = false;
538 self.needs_recovery.store(false, Ordering::Release);
539
540 Ok(())
541 }
542
543 pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
544 let state = self.state.lock().unwrap();
545 let allocator = state.get_region(page.region);
546
547 allocator.is_allocated(page.page_index, page.page_order)
548 }
549
550 fn allocate_region_tracker_page(&self) -> Result {
554 let mut state = self.state.lock().unwrap();
555 let tracker_len = state.allocators.region_tracker.to_vec().len();
556 let tracker_page = state.header.region_tracker();
557
558 let allocator = state.get_region_mut(tracker_page.region);
559 if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
561 || tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
562 {
563 drop(state);
564
565 let new_tracker_page = self
566 .allocate_non_transactional(tracker_len, false)?
567 .get_page_number();
568
569 let mut state = self.state.lock().unwrap();
570 state.header.set_region_tracker(new_tracker_page);
571 self.write_header(&state.header)?;
572 self.storage.flush(false)?;
573 } else {
574 allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
576 drop(state);
577 }
578
579 Ok(())
580 }
581
582 pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
585 let state = self.state.lock().unwrap();
586 let region_tracker_size = state
587 .header
588 .region_tracker()
589 .page_size_bytes(self.page_size);
590 let old_tracker_page = state.header.region_tracker();
591 drop(state);
593 let new_page =
594 self.allocate_non_transactional(region_tracker_size.try_into().unwrap(), true)?;
595 if new_page.get_page_number().is_before(old_tracker_page) {
596 let mut state = self.state.lock().unwrap();
597 state.header.set_region_tracker(new_page.get_page_number());
598 drop(state);
599 self.free(old_tracker_page);
600 Ok(true)
601 } else {
602 let new_page_number = new_page.get_page_number();
603 drop(new_page);
604 self.free(new_page_number);
605 Ok(false)
606 }
607 }
608
609 pub(crate) fn pages_allocated_since_raw_state(
612 &self,
613 old_states: &[BuddyAllocator],
614 ) -> Vec<PageNumber> {
615 let mut result = vec![];
616 let state = self.state.lock().unwrap();
617
618 for i in 0..state.header.layout().num_regions() {
619 let current_state = state.get_region(i);
620 if let Some(old_state) = old_states.get(i as usize) {
621 current_state.difference(i, old_state, &mut result);
622 } else {
623 current_state.get_allocated_pages(i, &mut result);
625 }
626 }
627
628 result.retain(|x| *x != state.header.region_tracker());
631
632 result
633 }
634
635 pub(crate) fn get_raw_allocator_states(&self) -> Vec<Vec<u8>> {
636 let state = self.state.lock().unwrap();
637
638 let mut regional_allocators = vec![];
639 for i in 0..state.header.layout().num_regions() {
640 regional_allocators.push(state.get_region(i).make_state_for_savepoint());
641 }
642
643 regional_allocators
644 }
645
646 #[allow(clippy::too_many_arguments)]
648 pub(crate) fn commit(
649 &self,
650 data_root: Option<BtreeHeader>,
651 system_root: Option<BtreeHeader>,
652 freed_root: Option<BtreeHeader>,
653 transaction_id: TransactionId,
654 eventual: bool,
655 two_phase: bool,
656 ) -> Result {
657 let result = self.commit_inner(
658 data_root,
659 system_root,
660 freed_root,
661 transaction_id,
662 eventual,
663 two_phase,
664 );
665 if result.is_err() {
666 self.needs_recovery.store(true, Ordering::Release);
667 }
668 result
669 }
670
671 #[allow(clippy::too_many_arguments)]
672 fn commit_inner(
673 &self,
674 data_root: Option<BtreeHeader>,
675 system_root: Option<BtreeHeader>,
676 freed_root: Option<BtreeHeader>,
677 transaction_id: TransactionId,
678 eventual: bool,
679 two_phase: bool,
680 ) -> Result {
681 #[cfg(debug_assertions)]
685 debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
686 assert!(!self.needs_recovery.load(Ordering::Acquire));
687
688 let mut state = self.state.lock().unwrap();
689 let shrunk = Self::try_shrink(&mut state)?;
691 let mut header = state.header.clone();
693 drop(state);
694
695 let old_transaction_id = header.secondary_slot().transaction_id;
696 let secondary = header.secondary_slot_mut();
697 secondary.transaction_id = transaction_id;
698 secondary.user_root = data_root;
699 secondary.system_root = system_root;
700 secondary.freed_root = freed_root;
701
702 self.write_header(&header)?;
703
704 if two_phase {
706 self.storage.flush(eventual)?;
707 }
708
709 header.swap_primary_slot();
712 header.two_phase_commit = two_phase;
713
714 self.write_header(&header)?;
716 self.storage.flush(eventual)?;
717
718 if shrunk {
719 let result = self.storage.resize(header.layout().len());
720 if result.is_err() {
721 self.needs_recovery.store(true, Ordering::Release);
724 return result;
725 }
726 }
727 self.allocated_since_commit.lock().unwrap().clear();
728
729 let mut state = self.state.lock().unwrap();
730 assert_eq!(
731 state.header.secondary_slot().transaction_id,
732 old_transaction_id
733 );
734 state.header = header;
735 self.read_from_secondary.store(false, Ordering::Release);
736 drop(state);
739
740 Ok(())
741 }
742
743 pub(crate) fn non_durable_commit(
745 &self,
746 data_root: Option<BtreeHeader>,
747 system_root: Option<BtreeHeader>,
748 freed_root: Option<BtreeHeader>,
749 transaction_id: TransactionId,
750 ) -> Result {
751 #[cfg(debug_assertions)]
755 debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
756 assert!(!self.needs_recovery.load(Ordering::Acquire));
757
758 self.allocated_since_commit.lock().unwrap().clear();
759 self.storage.write_barrier()?;
760
761 let mut state = self.state.lock().unwrap();
762 let secondary = state.header.secondary_slot_mut();
763 secondary.transaction_id = transaction_id;
764 secondary.user_root = data_root;
765 secondary.system_root = system_root;
766 secondary.freed_root = freed_root;
767
768 self.read_from_secondary.store(true, Ordering::Release);
770
771 Ok(())
772 }
773
774 pub(crate) fn rollback_uncommitted_writes(&self) -> Result {
775 let result = self.rollback_uncommitted_writes_inner();
776 if result.is_err() {
777 self.needs_recovery.store(true, Ordering::Release);
778 }
779 result
780 }
781
782 fn rollback_uncommitted_writes_inner(&self) -> Result {
783 #[cfg(debug_assertions)]
784 {
785 let dirty_pages = self.open_dirty_pages.lock().unwrap();
786 debug_assert!(
787 dirty_pages.is_empty(),
788 "Dirty pages outstanding: {dirty_pages:?}"
789 );
790 }
791 assert!(!self.needs_recovery.load(Ordering::Acquire));
792 let mut state = self.state.lock().unwrap();
793 let mut guard = self.allocated_since_commit.lock().unwrap();
794 for page_number in guard.iter() {
795 let region_index = page_number.region;
796 state
797 .get_region_tracker_mut()
798 .mark_free(page_number.page_order, region_index);
799 state
800 .get_region_mut(region_index)
801 .free(page_number.page_index, page_number.page_order);
802
803 let address = page_number.address_range(
804 self.page_size.into(),
805 self.region_size,
806 self.region_header_with_padding_size,
807 self.page_size,
808 );
809 let len: usize = (address.end - address.start).try_into().unwrap();
810 self.storage.invalidate_cache(address.start, len);
811 self.storage.cancel_pending_write(address.start, len);
812 }
813 guard.clear();
814
815 Ok(())
816 }
817
818 pub(crate) fn get_page(&self, page_number: PageNumber) -> Result<PageImpl> {
820 self.get_page_extended(page_number, PageHint::None)
821 }
822
823 pub(crate) fn get_page_extended(
824 &self,
825 page_number: PageNumber,
826 hint: PageHint,
827 ) -> Result<PageImpl> {
828 let range = page_number.address_range(
829 self.page_size.into(),
830 self.region_size,
831 self.region_header_with_padding_size,
832 self.page_size,
833 );
834 let len: usize = (range.end - range.start).try_into().unwrap();
835 let mem = self.storage.read(range.start, len, hint)?;
836
837 #[cfg(debug_assertions)]
839 {
840 let dirty_pages = self.open_dirty_pages.lock().unwrap();
841 debug_assert!(!dirty_pages.contains(&page_number), "{page_number:?}");
842 *(self
843 .read_page_ref_counts
844 .lock()
845 .unwrap()
846 .entry(page_number)
847 .or_default()) += 1;
848 drop(dirty_pages);
849 }
850
851 Ok(PageImpl {
852 mem,
853 page_number,
854 #[cfg(debug_assertions)]
855 open_pages: self.read_page_ref_counts.clone(),
856 })
857 }
858
859 pub(crate) fn get_page_mut(&self, page_number: PageNumber) -> Result<PageMut> {
861 #[cfg(debug_assertions)]
862 {
863 assert!(
864 !self
865 .read_page_ref_counts
866 .lock()
867 .unwrap()
868 .contains_key(&page_number)
869 );
870 assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
871 }
872
873 let address_range = page_number.address_range(
874 self.page_size.into(),
875 self.region_size,
876 self.region_header_with_padding_size,
877 self.page_size,
878 );
879 let len: usize = (address_range.end - address_range.start)
880 .try_into()
881 .unwrap();
882 let mem = self.storage.write(address_range.start, len, false)?;
883
884 #[cfg(debug_assertions)]
885 {
886 assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
887 }
888
889 Ok(PageMut {
890 mem,
891 page_number,
892 #[cfg(debug_assertions)]
893 open_pages: self.open_dirty_pages.clone(),
894 })
895 }
896
897 pub(crate) fn get_version(&self) -> u8 {
898 let state = self.state.lock().unwrap();
899 if self.read_from_secondary.load(Ordering::Acquire) {
900 state.header.secondary_slot().version
901 } else {
902 state.header.primary_slot().version
903 }
904 }
905
906 pub(crate) fn get_data_root(&self) -> Option<BtreeHeader> {
907 let state = self.state.lock().unwrap();
908 if self.read_from_secondary.load(Ordering::Acquire) {
909 state.header.secondary_slot().user_root
910 } else {
911 state.header.primary_slot().user_root
912 }
913 }
914
915 pub(crate) fn get_system_root(&self) -> Option<BtreeHeader> {
916 let state = self.state.lock().unwrap();
917 if self.read_from_secondary.load(Ordering::Acquire) {
918 state.header.secondary_slot().system_root
919 } else {
920 state.header.primary_slot().system_root
921 }
922 }
923
924 pub(crate) fn get_freed_root(&self) -> Option<BtreeHeader> {
925 let state = self.state.lock().unwrap();
926 if self.read_from_secondary.load(Ordering::Acquire) {
927 state.header.secondary_slot().freed_root
928 } else {
929 state.header.primary_slot().freed_root
930 }
931 }
932
933 pub(crate) fn get_last_committed_transaction_id(&self) -> Result<TransactionId> {
934 let state = self.state.lock()?;
935 if self.read_from_secondary.load(Ordering::Acquire) {
936 Ok(state.header.secondary_slot().transaction_id)
937 } else {
938 Ok(state.header.primary_slot().transaction_id)
939 }
940 }
941
942 pub(crate) fn free(&self, page: PageNumber) {
943 self.allocated_since_commit.lock().unwrap().remove(&page);
944 self.free_helper(page);
945 }
946
947 fn free_helper(&self, page: PageNumber) {
948 let mut state = self.state.lock().unwrap();
949 let region_index = page.region;
950 state
952 .get_region_mut(region_index)
953 .free(page.page_index, page.page_order);
954 state
956 .get_region_tracker_mut()
957 .mark_free(page.page_order, region_index);
958
959 let address_range = page.address_range(
960 self.page_size.into(),
961 self.region_size,
962 self.region_header_with_padding_size,
963 self.page_size,
964 );
965 let len: usize = (address_range.end - address_range.start)
966 .try_into()
967 .unwrap();
968 self.storage.invalidate_cache(address_range.start, len);
969 self.storage.cancel_pending_write(address_range.start, len);
970 }
971
972 pub(crate) fn free_if_uncommitted(&self, page: PageNumber) -> bool {
974 if self.allocated_since_commit.lock().unwrap().remove(&page) {
975 self.free_helper(page);
976 true
977 } else {
978 false
979 }
980 }
981
982 pub(crate) fn uncommitted(&self, page: PageNumber) -> bool {
984 self.allocated_since_commit.lock().unwrap().contains(&page)
985 }
986
987 pub(crate) fn allocate_helper(
988 &self,
989 allocation_size: usize,
990 lowest: bool,
991 transactional: bool,
992 ) -> Result<PageMut> {
993 let required_pages = allocation_size.div_ceil(self.get_page_size());
994 let required_order = ceil_log2(required_pages);
995
996 let mut state = self.state.lock().unwrap();
997
998 let page_number = if let Some(page_number) =
999 Self::allocate_helper_retry(&mut state, required_order, lowest)?
1000 {
1001 page_number
1002 } else {
1003 self.grow(&mut state, required_order)?;
1004 Self::allocate_helper_retry(&mut state, required_order, lowest)?.unwrap()
1005 };
1006
1007 #[cfg(debug_assertions)]
1008 {
1009 assert!(
1010 !self
1011 .read_page_ref_counts
1012 .lock()
1013 .unwrap()
1014 .contains_key(&page_number),
1015 "Allocated a page that is still referenced! {page_number:?}"
1016 );
1017 assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
1018 }
1019
1020 if transactional {
1021 self.allocated_since_commit
1022 .lock()
1023 .unwrap()
1024 .insert(page_number);
1025 }
1026
1027 let address_range = page_number.address_range(
1028 self.page_size.into(),
1029 self.region_size,
1030 self.region_header_with_padding_size,
1031 self.page_size,
1032 );
1033 let len: usize = (address_range.end - address_range.start)
1034 .try_into()
1035 .unwrap();
1036
1037 #[allow(unused_mut)]
1038 let mut mem = self.storage.write(address_range.start, len, true)?;
1039 debug_assert!(mem.mem().len() >= allocation_size);
1040
1041 #[cfg(debug_assertions)]
1042 {
1043 assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
1044
1045 mem.mem_mut().fill(0xFF);
1047 }
1048
1049 Ok(PageMut {
1050 mem,
1051 page_number,
1052 #[cfg(debug_assertions)]
1053 open_pages: self.open_dirty_pages.clone(),
1054 })
1055 }
1056
1057 fn allocate_helper_retry(
1058 state: &mut InMemoryState,
1059 required_order: u8,
1060 lowest: bool,
1061 ) -> Result<Option<PageNumber>> {
1062 loop {
1063 let Some(candidate_region) = state.get_region_tracker_mut().find_free(required_order)
1064 else {
1065 return Ok(None);
1066 };
1067 let region = state.get_region_mut(candidate_region);
1068 let r = if lowest {
1069 region.alloc_lowest(required_order)
1070 } else {
1071 region.alloc(required_order)
1072 };
1073 if let Some(page) = r {
1074 return Ok(Some(PageNumber::new(
1075 candidate_region,
1076 page,
1077 required_order,
1078 )));
1079 }
1080 state
1082 .get_region_tracker_mut()
1083 .mark_full(required_order, candidate_region);
1084 }
1085 }
1086
1087 fn try_shrink(state: &mut InMemoryState) -> Result<bool> {
1088 let layout = state.header.layout();
1089 let last_region_index = layout.num_regions() - 1;
1090 let last_allocator = state.get_region(last_region_index);
1091 let trailing_free = last_allocator.trailing_free_pages();
1092 let last_allocator_len = last_allocator.len();
1093 if trailing_free < last_allocator_len / 2 {
1094 return Ok(false);
1095 }
1096 let reduce_by = if layout.num_regions() > 1 && trailing_free == last_allocator_len {
1097 trailing_free
1098 } else {
1099 trailing_free / 2
1100 };
1101
1102 let mut new_layout = layout;
1103 new_layout.reduce_last_region(reduce_by);
1104 state.allocators.resize_to(new_layout);
1105 assert!(new_layout.len() <= layout.len());
1106 state.header.set_layout(new_layout);
1107
1108 Ok(true)
1109 }
1110
1111 fn grow(&self, state: &mut InMemoryState, required_order_allocation: u8) -> Result<()> {
1112 let layout = state.header.layout();
1113 let required_growth =
1114 2u64.pow(required_order_allocation.into()) * u64::from(state.header.page_size());
1115 let max_region_size = u64::from(state.header.layout().full_region_layout().num_pages())
1116 * u64::from(state.header.page_size());
1117 let next_desired_size = if layout.num_full_regions() > 0 {
1118 if let Some(trailing) = layout.trailing_region_layout() {
1119 if 2 * required_growth < max_region_size - trailing.usable_bytes() {
1120 layout.usable_bytes() + (max_region_size - trailing.usable_bytes())
1122 } else {
1123 layout.usable_bytes() + 2 * max_region_size - trailing.usable_bytes()
1125 }
1126 } else {
1127 layout.usable_bytes() + max_region_size
1129 }
1130 } else {
1131 max(
1132 layout.usable_bytes() * 2,
1133 layout.usable_bytes() + required_growth * 2,
1134 )
1135 };
1136 let new_layout = DatabaseLayout::calculate(
1137 next_desired_size,
1138 state.header.layout().full_region_layout().num_pages(),
1139 self.page_size,
1140 );
1141 assert!(new_layout.len() >= layout.len());
1142
1143 let result = self.storage.resize(new_layout.len());
1144 if result.is_err() {
1145 self.needs_recovery.store(true, Ordering::Release);
1148 return result;
1149 }
1150
1151 state.allocators.resize_to(new_layout);
1152 state.header.set_layout(new_layout);
1153 Ok(())
1154 }
1155
1156 pub(crate) fn allocate(&self, allocation_size: usize) -> Result<PageMut> {
1157 self.allocate_helper(allocation_size, false, true)
1158 }
1159
1160 pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result<PageMut> {
1161 self.allocate_helper(allocation_size, true, true)
1162 }
1163
1164 fn allocate_non_transactional(&self, allocation_size: usize, lowest: bool) -> Result<PageMut> {
1167 self.allocate_helper(allocation_size, lowest, false)
1168 }
1169
1170 pub(crate) fn count_allocated_pages(&self) -> Result<u64> {
1171 let state = self.state.lock().unwrap();
1172 let mut count = 0u64;
1173 for i in 0..state.header.layout().num_regions() {
1174 count += u64::from(state.get_region(i).count_allocated_pages());
1175 }
1176
1177 Ok(count)
1178 }
1179
1180 pub(crate) fn count_free_pages(&self) -> Result<u64> {
1181 let state = self.state.lock().unwrap();
1182 let mut count = 0u64;
1183 for i in 0..state.header.layout().num_regions() {
1184 count += u64::from(state.get_region(i).count_free_pages());
1185 }
1186
1187 Ok(count)
1188 }
1189
1190 pub(crate) fn get_page_size(&self) -> usize {
1191 self.page_size.try_into().unwrap()
1192 }
1193}
1194
1195impl Drop for TransactionalMemory {
1196 fn drop(&mut self) {
1197 if thread::panicking() || self.needs_recovery.load(Ordering::Acquire) {
1198 return;
1199 }
1200
1201 let tracker_page = self.state.lock().unwrap().header.region_tracker();
1203 self.free(tracker_page);
1204 if self.allocate_region_tracker_page().is_err() {
1205 #[cfg(feature = "logging")]
1206 warn!("Failure while flushing allocator state. Repair required at restart.");
1207 return;
1208 }
1209
1210 let mut state = self.state.lock().unwrap();
1211 if state
1212 .allocators
1213 .flush_to(
1214 state.header.region_tracker(),
1215 state.header.layout(),
1216 &self.storage,
1217 )
1218 .is_err()
1219 {
1220 #[cfg(feature = "logging")]
1221 warn!("Failure while flushing allocator state. Repair required at restart.");
1222 return;
1223 }
1224
1225 if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) {
1226 state.header.recovery_required = false;
1227 let _ = self.write_header(&state.header);
1228 let _ = self.storage.flush(false);
1229 }
1230 }
1231}
1232
1233#[cfg(test)]
1234mod test {
1235 use crate::tree_store::page_store::page_manager::INITIAL_REGIONS;
1236 use crate::{Database, TableDefinition};
1237
1238 #[test]
1240 fn out_of_regions() {
1241 let tmpfile = crate::create_tempfile();
1242 let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1243 let page_size = 1024;
1244 let big_value = vec![0u8; 5 * page_size];
1245
1246 let db = Database::builder()
1247 .set_region_size((8 * page_size).try_into().unwrap())
1248 .set_page_size(page_size)
1249 .create(tmpfile.path())
1250 .unwrap();
1251
1252 let txn = db.begin_write().unwrap();
1253 {
1254 let mut table = txn.open_table(table_definition).unwrap();
1255 for i in 0..=INITIAL_REGIONS {
1256 table.insert(&i, big_value.as_slice()).unwrap();
1257 }
1258 }
1259 txn.commit().unwrap();
1260 drop(db);
1261
1262 let mut db = Database::builder()
1263 .set_region_size((8 * page_size).try_into().unwrap())
1264 .set_page_size(page_size)
1265 .open(tmpfile.path())
1266 .unwrap();
1267 assert!(db.check_integrity().unwrap());
1268 }
1269
1270 #[test]
1272 #[cfg(panic = "unwind")]
1273 fn panic() {
1274 let tmpfile = crate::create_tempfile();
1275 let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1276
1277 let _ = std::panic::catch_unwind(|| {
1278 let db = Database::create(&tmpfile).unwrap();
1279 let txn = db.begin_write().unwrap();
1280 txn.open_table(table_definition).unwrap();
1281 panic!();
1282 });
1283
1284 let mut db = Database::open(tmpfile).unwrap();
1285 assert!(db.check_integrity().unwrap());
1286 }
1287}