1use crate::transaction_tracker::TransactionId;
2use crate::transactions::{AllocatorStateKey, AllocatorStateTree};
3use crate::tree_store::btree_base::{BtreeHeader, Checksum};
4use crate::tree_store::page_store::base::{PageHint, MAX_PAGE_INDEX};
5use crate::tree_store::page_store::buddy_allocator::BuddyAllocator;
6use crate::tree_store::page_store::cached_file::PagedCachedFile;
7use crate::tree_store::page_store::header::{DatabaseHeader, DB_HEADER_SIZE, MAGICNUMBER};
8use crate::tree_store::page_store::layout::DatabaseLayout;
9use crate::tree_store::page_store::region::{Allocators, RegionTracker};
10use crate::tree_store::page_store::{hash128_with_seed, PageImpl, PageMut};
11use crate::tree_store::{Page, PageNumber};
12use crate::StorageBackend;
13use crate::{DatabaseError, Result, StorageError};
14#[cfg(feature = "logging")]
15use log::warn;
16use std::cmp::{max, min};
17#[cfg(debug_assertions)]
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::convert::TryInto;
21use std::sync::atomic::{AtomicBool, Ordering};
22#[cfg(debug_assertions)]
23use std::sync::Arc;
24use std::sync::Mutex;
25use std::thread;
26
27const MAX_USABLE_REGION_SPACE: u64 = 4 * 1024 * 1024 * 1024;
30pub(crate) const MAX_MAX_PAGE_ORDER: u8 = 20;
32pub(super) const MIN_USABLE_PAGES: u32 = 10;
33const MIN_DESIRED_USABLE_BYTES: u64 = 1024 * 1024;
34
35pub(super) const INITIAL_REGIONS: u32 = 1000; pub(crate) const FILE_FORMAT_VERSION1: u8 = 1;
39pub(crate) const FILE_FORMAT_VERSION2: u8 = 2;
41
42fn ceil_log2(x: usize) -> u8 {
43 if x.is_power_of_two() {
44 x.trailing_zeros().try_into().unwrap()
45 } else {
46 x.next_power_of_two().trailing_zeros().try_into().unwrap()
47 }
48}
49
50pub(crate) fn xxh3_checksum(data: &[u8]) -> Checksum {
51 hash128_with_seed(data, 0)
52}
53
54struct InMemoryState {
55 header: DatabaseHeader,
56 allocators: Allocators,
57}
58
59impl InMemoryState {
60 fn from_bytes(header: DatabaseHeader, file: &PagedCachedFile) -> Result<Self> {
61 let allocators = if header.recovery_required {
62 Allocators::new(header.layout())
63 } else {
64 Allocators::from_bytes(&header, file)?
65 };
66 Ok(Self { header, allocators })
67 }
68
69 fn get_region(&self, region: u32) -> &BuddyAllocator {
70 &self.allocators.region_allocators[region as usize]
71 }
72
73 fn get_region_mut(&mut self, region: u32) -> &mut BuddyAllocator {
74 &mut self.allocators.region_allocators[region as usize]
75 }
76
77 fn get_region_tracker_mut(&mut self) -> &mut RegionTracker {
78 &mut self.allocators.region_tracker
79 }
80}
81
82pub(crate) struct TransactionalMemory {
83 allocated_since_commit: Mutex<HashSet<PageNumber>>,
86 needs_recovery: AtomicBool,
89 storage: PagedCachedFile,
90 state: Mutex<InMemoryState>,
91 #[cfg(debug_assertions)]
93 open_dirty_pages: Arc<Mutex<HashSet<PageNumber>>>,
94 #[cfg(debug_assertions)]
96 read_page_ref_counts: Arc<Mutex<HashMap<PageNumber, u64>>>,
97 read_from_secondary: AtomicBool,
99 page_size: u32,
100 region_size: u64,
103 region_header_with_padding_size: u64,
104}
105
106impl TransactionalMemory {
107 #[allow(clippy::too_many_arguments)]
108 pub(crate) fn new(
109 file: Box<dyn StorageBackend>,
110 page_size: usize,
111 requested_region_size: Option<u64>,
112 read_cache_size_bytes: usize,
113 write_cache_size_bytes: usize,
114 ) -> Result<Self, DatabaseError> {
115 assert!(page_size.is_power_of_two() && page_size >= DB_HEADER_SIZE);
116
117 let region_size = requested_region_size.unwrap_or(MAX_USABLE_REGION_SPACE);
118 let region_size = min(
119 region_size,
120 (u64::from(MAX_PAGE_INDEX) + 1) * page_size as u64,
121 );
122 assert!(region_size.is_power_of_two());
123
124 let storage = PagedCachedFile::new(
125 file,
126 page_size as u64,
127 read_cache_size_bytes,
128 write_cache_size_bytes,
129 )?;
130
131 let magic_number: [u8; MAGICNUMBER.len()] =
132 if storage.raw_file_len()? >= MAGICNUMBER.len() as u64 {
133 storage
134 .read_direct(0, MAGICNUMBER.len())?
135 .try_into()
136 .unwrap()
137 } else {
138 [0; MAGICNUMBER.len()]
139 };
140
141 if magic_number != MAGICNUMBER {
142 let region_tracker_required_bytes =
143 RegionTracker::new(INITIAL_REGIONS, MAX_MAX_PAGE_ORDER + 1)
144 .to_vec()
145 .len();
146
147 let size: u64 = max(
149 MIN_DESIRED_USABLE_BYTES,
150 page_size as u64 * u64::from(MIN_USABLE_PAGES),
151 );
152 let tracker_space =
153 (page_size * ((region_tracker_required_bytes + page_size - 1) / page_size)) as u64;
154 let starting_size = size + tracker_space;
155
156 let layout = DatabaseLayout::calculate(
157 starting_size,
158 (region_size / u64::try_from(page_size).unwrap())
159 .try_into()
160 .unwrap(),
161 page_size.try_into().unwrap(),
162 );
163
164 {
165 let file_len = storage.raw_file_len()?;
166
167 if file_len < layout.len() {
168 storage.resize(layout.len())?;
169 }
170 }
171
172 let mut allocators = Allocators::new(layout);
173
174 let tracker_page = {
176 let tracker_required_pages =
177 (allocators.region_tracker.to_vec().len() + page_size - 1) / page_size;
178 let required_order = ceil_log2(tracker_required_pages);
179 let page_number = allocators.region_allocators[0]
180 .alloc(required_order)
181 .unwrap();
182 PageNumber::new(0, page_number, required_order)
183 };
184
185 let mut header = DatabaseHeader::new(
186 layout,
187 TransactionId::new(0),
188 FILE_FORMAT_VERSION2,
189 tracker_page,
190 );
191
192 header.recovery_required = false;
193 header.two_phase_commit = true;
194 storage
195 .write(0, DB_HEADER_SIZE, true)?
196 .mem_mut()
197 .copy_from_slice(&header.to_bytes(false));
198 allocators.flush_to(tracker_page, layout, &storage)?;
199
200 storage.flush(false)?;
201 storage
204 .write(0, DB_HEADER_SIZE, true)?
205 .mem_mut()
206 .copy_from_slice(&header.to_bytes(true));
207 storage.flush(false)?;
208 }
209 let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?;
210 let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
211
212 assert_eq!(header.page_size() as usize, page_size);
213 assert!(storage.raw_file_len()? >= header.layout().len());
214 let needs_recovery =
215 header.recovery_required || header.layout().len() != storage.raw_file_len()?;
216 if needs_recovery {
217 let layout = header.layout();
218 let region_max_pages = layout.full_region_layout().num_pages();
219 let region_header_pages = layout.full_region_layout().get_header_pages();
220 header.set_layout(DatabaseLayout::recalculate(
221 storage.raw_file_len()?,
222 region_header_pages,
223 region_max_pages,
224 page_size.try_into().unwrap(),
225 ));
226 header.pick_primary_for_repair(repair_info)?;
227 assert!(!repair_info.invalid_magic_number);
228 storage
229 .write(0, DB_HEADER_SIZE, true)?
230 .mem_mut()
231 .copy_from_slice(&header.to_bytes(true));
232 storage.flush(false)?;
233 }
234
235 let layout = header.layout();
236 assert_eq!(layout.len(), storage.raw_file_len()?);
237 let region_size = layout.full_region_layout().len();
238 let region_header_size = layout.full_region_layout().data_section().start;
239
240 let state = InMemoryState::from_bytes(header, &storage)?;
241
242 assert!(page_size >= DB_HEADER_SIZE);
243
244 Ok(Self {
245 allocated_since_commit: Mutex::new(HashSet::new()),
246 needs_recovery: AtomicBool::new(needs_recovery),
247 storage,
248 state: Mutex::new(state),
249 #[cfg(debug_assertions)]
250 open_dirty_pages: Arc::new(Mutex::new(HashSet::new())),
251 #[cfg(debug_assertions)]
252 read_page_ref_counts: Arc::new(Mutex::new(HashMap::new())),
253 read_from_secondary: AtomicBool::new(false),
254 page_size: page_size.try_into().unwrap(),
255 region_size,
256 region_header_with_padding_size: region_header_size,
257 })
258 }
259
260 pub(crate) fn check_io_errors(&self) -> Result {
261 self.storage.check_io_errors()
262 }
263
264 #[cfg(any(test, fuzzing))]
265 pub(crate) fn all_allocated_pages(&self) -> Vec<PageNumber> {
266 self.state.lock().unwrap().allocators.all_allocated()
267 }
268
269 #[cfg(any(test, fuzzing))]
270 pub(crate) fn tracker_page(&self) -> PageNumber {
271 self.state.lock().unwrap().header.region_tracker()
272 }
273
274 pub(crate) fn clear_read_cache(&self) {
275 self.storage.invalidate_cache_all();
276 }
277
278 pub(crate) fn clear_cache_and_reload(&mut self) -> Result<bool, DatabaseError> {
279 assert!(self.allocated_since_commit.lock().unwrap().is_empty());
280
281 self.storage.flush(false)?;
282 self.storage.invalidate_cache_all();
283
284 let header_bytes = self.storage.read_direct(0, DB_HEADER_SIZE)?;
285 let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes)?;
286 let mut was_clean = true;
289 if header.recovery_required {
290 if !header.pick_primary_for_repair(repair_info)? {
291 was_clean = false;
292 }
293 if repair_info.invalid_magic_number {
294 return Err(StorageError::Corrupted("Invalid magic number".to_string()).into());
295 }
296 self.storage
297 .write(0, DB_HEADER_SIZE, true)?
298 .mem_mut()
299 .copy_from_slice(&header.to_bytes(true));
300 self.storage.flush(false)?;
301 }
302
303 self.needs_recovery
304 .store(header.recovery_required, Ordering::Release);
305 self.state.lock().unwrap().header = header;
306
307 Ok(was_clean)
308 }
309
310 pub(crate) fn begin_writable(&self) -> Result {
311 let mut state = self.state.lock().unwrap();
312 assert!(!state.header.recovery_required);
313 state.header.recovery_required = true;
314 self.write_header(&state.header)?;
315 self.storage.flush(false)
316 }
317
318 pub(crate) fn needs_repair(&self) -> Result<bool> {
319 Ok(self.state.lock().unwrap().header.recovery_required)
320 }
321
322 pub(crate) fn used_two_phase_commit(&self) -> bool {
323 self.state.lock().unwrap().header.two_phase_commit
324 }
325
326 pub(crate) fn allocator_hash(&self) -> u128 {
327 self.state.lock().unwrap().allocators.xxh3_hash()
328 }
329
330 pub(crate) fn storage_failure(&self) -> bool {
332 self.needs_recovery.load(Ordering::Acquire)
333 }
334
335 pub(crate) fn repair_primary_corrupted(&self) {
336 let mut state = self.state.lock().unwrap();
337 state.header.swap_primary_slot();
338 }
339
340 pub(crate) fn begin_repair(&self) -> Result<()> {
341 let mut state = self.state.lock().unwrap();
342 state.allocators = Allocators::new(state.header.layout());
343
344 Ok(())
345 }
346
347 pub(crate) fn mark_page_allocated(&self, page_number: PageNumber) {
348 let mut state = self.state.lock().unwrap();
349 let region_index = page_number.region;
350 let allocator = state.get_region_mut(region_index);
351 allocator.record_alloc(page_number.page_index, page_number.page_order);
352 }
353
354 fn write_header(&self, header: &DatabaseHeader) -> Result {
355 self.storage
356 .write(0, DB_HEADER_SIZE, true)?
357 .mem_mut()
358 .copy_from_slice(&header.to_bytes(true));
359
360 Ok(())
361 }
362
363 pub(crate) fn end_repair(&self) -> Result<()> {
364 self.allocate_region_tracker_page()?;
365
366 let mut state = self.state.lock().unwrap();
367 let tracker_page = state.header.region_tracker();
368 state
369 .allocators
370 .flush_to(tracker_page, state.header.layout(), &self.storage)?;
371
372 state.header.recovery_required = false;
373 self.write_header(&state.header)?;
374 let result = self.storage.flush(false);
375 self.needs_recovery.store(false, Ordering::Release);
376
377 result
378 }
379
380 pub(crate) fn reserve_allocator_state(
381 &self,
382 tree: &mut AllocatorStateTree,
383 transaction_id: TransactionId,
384 ) -> Result<u32> {
385 let state = self.state.lock().unwrap();
386 let layout = state.header.layout();
387 let num_regions = layout.num_regions();
388 let region_header_len = layout.full_region_layout().get_header_pages()
389 * layout.full_region_layout().page_size();
390 let region_tracker_len = state.allocators.region_tracker.to_vec().len();
391 drop(state);
392
393 for i in 0..num_regions {
394 tree.insert(
395 &AllocatorStateKey::Region(i),
396 &vec![0; region_header_len as usize].as_ref(),
397 )?;
398 }
399
400 tree.insert(
401 &AllocatorStateKey::RegionTracker,
402 &vec![0; region_tracker_len].as_ref(),
403 )?;
404
405 tree.insert(
406 &AllocatorStateKey::TransactionId,
407 &transaction_id.raw_id().to_le_bytes().as_ref(),
408 )?;
409
410 Ok(num_regions)
411 }
412
413 pub(crate) fn try_save_allocator_state(
415 &self,
416 tree: &mut AllocatorStateTree,
417 num_regions: u32,
418 ) -> Result<bool> {
419 let state = self.state.lock().unwrap();
421 if num_regions != state.header.layout().num_regions() {
422 return Ok(false);
423 }
424
425 let tracker_page = state.header.region_tracker();
428 drop(state);
429 self.free(tracker_page);
430
431 let result = self.try_save_allocator_state_inner(tree, num_regions);
432
433 self.mark_page_allocated(tracker_page);
435
436 result
437 }
438
439 fn try_save_allocator_state_inner(
440 &self,
441 tree: &mut AllocatorStateTree,
442 num_regions: u32,
443 ) -> Result<bool> {
444 for i in 0..num_regions {
445 let region_bytes =
446 &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec();
447 tree.insert_inplace(&AllocatorStateKey::Region(i), ®ion_bytes.as_ref())?;
448 }
449
450 let region_tracker_bytes = self
451 .state
452 .lock()
453 .unwrap()
454 .allocators
455 .region_tracker
456 .to_vec();
457 tree.insert_inplace(
458 &AllocatorStateKey::RegionTracker,
459 ®ion_tracker_bytes.as_ref(),
460 )?;
461
462 Ok(true)
463 }
464
465 pub(crate) fn is_valid_allocator_state(&self, tree: &AllocatorStateTree) -> Result<bool> {
467 let transaction_id = TransactionId::new(u64::from_le_bytes(
475 tree.get(&AllocatorStateKey::TransactionId)?
476 .unwrap()
477 .value()
478 .try_into()
479 .unwrap(),
480 ));
481
482 Ok(transaction_id == self.get_last_committed_transaction_id()?)
483 }
484
485 pub(crate) fn load_allocator_state(&self, tree: &AllocatorStateTree) -> Result {
486 assert!(self.is_valid_allocator_state(tree)?);
487
488 let mut region_allocators = vec![];
490 for region in
491 tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))?
492 {
493 region_allocators.push(BuddyAllocator::from_bytes(region?.value()));
494 }
495
496 let region_tracker = RegionTracker::from_page(
497 tree.get(&AllocatorStateKey::RegionTracker)?
498 .unwrap()
499 .value(),
500 );
501
502 let mut state = self.state.lock().unwrap();
503 state.allocators = Allocators {
504 region_tracker,
505 region_allocators,
506 };
507
508 let layout = state.header.layout();
510 state.allocators.resize_to(layout);
511 drop(state);
512
513 self.allocate_region_tracker_page()?;
515
516 self.state.lock().unwrap().header.recovery_required = false;
517 self.needs_recovery.store(false, Ordering::Release);
518
519 Ok(())
520 }
521
522 pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
523 let state = self.state.lock().unwrap();
524 let allocator = state.get_region(page.region);
525
526 allocator.is_allocated(page.page_index, page.page_order)
527 }
528
529 fn allocate_region_tracker_page(&self) -> Result {
533 let mut state = self.state.lock().unwrap();
534 let tracker_len = state.allocators.region_tracker.to_vec().len();
535 let tracker_page = state.header.region_tracker();
536
537 let allocator = state.get_region_mut(tracker_page.region);
538 if allocator.is_allocated(tracker_page.page_index, tracker_page.page_order)
540 || tracker_page.page_size_bytes(self.page_size) < tracker_len as u64
541 {
542 drop(state);
543
544 let new_tracker_page = self
545 .allocate_non_transactional(tracker_len, false)?
546 .get_page_number();
547
548 let mut state = self.state.lock().unwrap();
549 state.header.set_region_tracker(new_tracker_page);
550 self.write_header(&state.header)?;
551 self.storage.flush(false)?;
552 } else {
553 allocator.record_alloc(tracker_page.page_index, tracker_page.page_order);
555 drop(state);
556 }
557
558 Ok(())
559 }
560
561 pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
564 let state = self.state.lock().unwrap();
565 let region_tracker_size = state
566 .header
567 .region_tracker()
568 .page_size_bytes(self.page_size);
569 let old_tracker_page = state.header.region_tracker();
570 drop(state);
572 let new_page =
573 self.allocate_non_transactional(region_tracker_size.try_into().unwrap(), true)?;
574 if new_page.get_page_number().is_before(old_tracker_page) {
575 let mut state = self.state.lock().unwrap();
576 state.header.set_region_tracker(new_page.get_page_number());
577 drop(state);
578 self.free(old_tracker_page);
579 Ok(true)
580 } else {
581 let new_page_number = new_page.get_page_number();
582 drop(new_page);
583 self.free(new_page_number);
584 Ok(false)
585 }
586 }
587
588 pub(crate) fn get_raw_allocator_states(&self) -> Vec<Vec<u8>> {
589 let state = self.state.lock().unwrap();
590
591 let mut regional_allocators = vec![];
592 for i in 0..state.header.layout().num_regions() {
593 regional_allocators.push(state.get_region(i).make_state_for_savepoint());
594 }
595
596 regional_allocators
597 }
598
599 #[allow(clippy::too_many_arguments)]
601 pub(crate) fn commit(
602 &self,
603 data_root: Option<BtreeHeader>,
604 system_root: Option<BtreeHeader>,
605 freed_root: Option<BtreeHeader>,
606 transaction_id: TransactionId,
607 eventual: bool,
608 two_phase: bool,
609 ) -> Result {
610 let result = self.commit_inner(
611 data_root,
612 system_root,
613 freed_root,
614 transaction_id,
615 eventual,
616 two_phase,
617 );
618 if result.is_err() {
619 self.needs_recovery.store(true, Ordering::Release);
620 }
621 result
622 }
623
624 #[allow(clippy::too_many_arguments)]
625 fn commit_inner(
626 &self,
627 data_root: Option<BtreeHeader>,
628 system_root: Option<BtreeHeader>,
629 freed_root: Option<BtreeHeader>,
630 transaction_id: TransactionId,
631 eventual: bool,
632 two_phase: bool,
633 ) -> Result {
634 #[cfg(debug_assertions)]
638 debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
639 assert!(!self.needs_recovery.load(Ordering::Acquire));
640
641 let mut state = self.state.lock().unwrap();
642 let shrunk = Self::try_shrink(&mut state)?;
644 let mut header = state.header.clone();
646 drop(state);
647
648 let old_transaction_id = header.secondary_slot().transaction_id;
649 let secondary = header.secondary_slot_mut();
650 secondary.transaction_id = transaction_id;
651 secondary.user_root = data_root;
652 secondary.system_root = system_root;
653 secondary.freed_root = freed_root;
654
655 self.write_header(&header)?;
656
657 if two_phase {
659 self.storage.flush(eventual)?;
660 }
661
662 header.swap_primary_slot();
665 header.two_phase_commit = two_phase;
666
667 self.write_header(&header)?;
669 self.storage.flush(eventual)?;
670
671 if shrunk {
672 let result = self.storage.resize(header.layout().len());
673 if result.is_err() {
674 self.needs_recovery.store(true, Ordering::Release);
677 return result;
678 }
679 }
680 self.allocated_since_commit.lock().unwrap().clear();
681
682 let mut state = self.state.lock().unwrap();
683 assert_eq!(
684 state.header.secondary_slot().transaction_id,
685 old_transaction_id
686 );
687 state.header = header;
688 self.read_from_secondary.store(false, Ordering::Release);
689 drop(state);
692
693 Ok(())
694 }
695
696 pub(crate) fn non_durable_commit(
698 &self,
699 data_root: Option<BtreeHeader>,
700 system_root: Option<BtreeHeader>,
701 freed_root: Option<BtreeHeader>,
702 transaction_id: TransactionId,
703 ) -> Result {
704 #[cfg(debug_assertions)]
708 debug_assert!(self.open_dirty_pages.lock().unwrap().is_empty());
709 assert!(!self.needs_recovery.load(Ordering::Acquire));
710
711 self.allocated_since_commit.lock().unwrap().clear();
712 self.storage.write_barrier()?;
713
714 let mut state = self.state.lock().unwrap();
715 let secondary = state.header.secondary_slot_mut();
716 secondary.transaction_id = transaction_id;
717 secondary.user_root = data_root;
718 secondary.system_root = system_root;
719 secondary.freed_root = freed_root;
720
721 self.read_from_secondary.store(true, Ordering::Release);
723
724 Ok(())
725 }
726
727 pub(crate) fn rollback_uncommitted_writes(&self) -> Result {
728 let result = self.rollback_uncommitted_writes_inner();
729 if result.is_err() {
730 self.needs_recovery.store(true, Ordering::Release);
731 }
732 result
733 }
734
735 fn rollback_uncommitted_writes_inner(&self) -> Result {
736 #[cfg(debug_assertions)]
737 {
738 let dirty_pages = self.open_dirty_pages.lock().unwrap();
739 debug_assert!(
740 dirty_pages.is_empty(),
741 "Dirty pages outstanding: {dirty_pages:?}"
742 );
743 }
744 assert!(!self.needs_recovery.load(Ordering::Acquire));
745 let mut state = self.state.lock().unwrap();
746 let mut guard = self.allocated_since_commit.lock().unwrap();
747 for page_number in guard.iter() {
748 let region_index = page_number.region;
749 state
750 .get_region_tracker_mut()
751 .mark_free(page_number.page_order, region_index);
752 state
753 .get_region_mut(region_index)
754 .free(page_number.page_index, page_number.page_order);
755
756 let address = page_number.address_range(
757 self.page_size.into(),
758 self.region_size,
759 self.region_header_with_padding_size,
760 self.page_size,
761 );
762 let len: usize = (address.end - address.start).try_into().unwrap();
763 self.storage.invalidate_cache(address.start, len);
764 self.storage.cancel_pending_write(address.start, len);
765 }
766 guard.clear();
767
768 Ok(())
769 }
770
771 pub(crate) fn get_page(&self, page_number: PageNumber) -> Result<PageImpl> {
773 self.get_page_extended(page_number, PageHint::None)
774 }
775
776 pub(crate) fn get_page_extended(
777 &self,
778 page_number: PageNumber,
779 hint: PageHint,
780 ) -> Result<PageImpl> {
781 let range = page_number.address_range(
782 self.page_size.into(),
783 self.region_size,
784 self.region_header_with_padding_size,
785 self.page_size,
786 );
787 let len: usize = (range.end - range.start).try_into().unwrap();
788 let mem = self.storage.read(range.start, len, hint)?;
789
790 #[cfg(debug_assertions)]
792 {
793 let dirty_pages = self.open_dirty_pages.lock().unwrap();
794 debug_assert!(!dirty_pages.contains(&page_number), "{page_number:?}");
795 *(self
796 .read_page_ref_counts
797 .lock()
798 .unwrap()
799 .entry(page_number)
800 .or_default()) += 1;
801 drop(dirty_pages);
802 }
803
804 Ok(PageImpl {
805 mem,
806 page_number,
807 #[cfg(debug_assertions)]
808 open_pages: self.read_page_ref_counts.clone(),
809 })
810 }
811
812 pub(crate) fn get_page_mut(&self, page_number: PageNumber) -> Result<PageMut> {
814 #[cfg(debug_assertions)]
815 {
816 assert!(!self
817 .read_page_ref_counts
818 .lock()
819 .unwrap()
820 .contains_key(&page_number));
821 assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
822 }
823
824 let address_range = page_number.address_range(
825 self.page_size.into(),
826 self.region_size,
827 self.region_header_with_padding_size,
828 self.page_size,
829 );
830 let len: usize = (address_range.end - address_range.start)
831 .try_into()
832 .unwrap();
833 let mem = self.storage.write(address_range.start, len, false)?;
834
835 #[cfg(debug_assertions)]
836 {
837 assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
838 }
839
840 Ok(PageMut {
841 mem,
842 page_number,
843 #[cfg(debug_assertions)]
844 open_pages: self.open_dirty_pages.clone(),
845 })
846 }
847
848 pub(crate) fn get_version(&self) -> u8 {
849 let state = self.state.lock().unwrap();
850 if self.read_from_secondary.load(Ordering::Acquire) {
851 state.header.secondary_slot().version
852 } else {
853 state.header.primary_slot().version
854 }
855 }
856
857 pub(crate) fn get_data_root(&self) -> Option<BtreeHeader> {
858 let state = self.state.lock().unwrap();
859 if self.read_from_secondary.load(Ordering::Acquire) {
860 state.header.secondary_slot().user_root
861 } else {
862 state.header.primary_slot().user_root
863 }
864 }
865
866 pub(crate) fn get_system_root(&self) -> Option<BtreeHeader> {
867 let state = self.state.lock().unwrap();
868 if self.read_from_secondary.load(Ordering::Acquire) {
869 state.header.secondary_slot().system_root
870 } else {
871 state.header.primary_slot().system_root
872 }
873 }
874
875 pub(crate) fn get_freed_root(&self) -> Option<BtreeHeader> {
876 let state = self.state.lock().unwrap();
877 if self.read_from_secondary.load(Ordering::Acquire) {
878 state.header.secondary_slot().freed_root
879 } else {
880 state.header.primary_slot().freed_root
881 }
882 }
883
884 pub(crate) fn get_layout(&self) -> DatabaseLayout {
885 self.state.lock().unwrap().header.layout()
886 }
887
888 pub(crate) fn get_last_committed_transaction_id(&self) -> Result<TransactionId> {
889 let state = self.state.lock().unwrap();
890 if self.read_from_secondary.load(Ordering::Acquire) {
891 Ok(state.header.secondary_slot().transaction_id)
892 } else {
893 Ok(state.header.primary_slot().transaction_id)
894 }
895 }
896
897 pub(crate) fn free(&self, page: PageNumber) {
898 self.allocated_since_commit.lock().unwrap().remove(&page);
899 self.free_helper(page);
900 }
901
902 fn free_helper(&self, page: PageNumber) {
903 let mut state = self.state.lock().unwrap();
904 let region_index = page.region;
905 state
907 .get_region_mut(region_index)
908 .free(page.page_index, page.page_order);
909 state
911 .get_region_tracker_mut()
912 .mark_free(page.page_order, region_index);
913
914 let address_range = page.address_range(
915 self.page_size.into(),
916 self.region_size,
917 self.region_header_with_padding_size,
918 self.page_size,
919 );
920 let len: usize = (address_range.end - address_range.start)
921 .try_into()
922 .unwrap();
923 self.storage.invalidate_cache(address_range.start, len);
924 self.storage.cancel_pending_write(address_range.start, len);
925 }
926
927 pub(crate) fn free_if_uncommitted(&self, page: PageNumber) -> bool {
929 if self.allocated_since_commit.lock().unwrap().remove(&page) {
930 self.free_helper(page);
931 true
932 } else {
933 false
934 }
935 }
936
937 pub(crate) fn uncommitted(&self, page: PageNumber) -> bool {
939 self.allocated_since_commit.lock().unwrap().contains(&page)
940 }
941
942 pub(crate) fn allocate_helper(
943 &self,
944 allocation_size: usize,
945 lowest: bool,
946 transactional: bool,
947 ) -> Result<PageMut> {
948 let required_pages = (allocation_size + self.get_page_size() - 1) / self.get_page_size();
949 let required_order = ceil_log2(required_pages);
950
951 let mut state = self.state.lock().unwrap();
952
953 let page_number = if let Some(page_number) =
954 Self::allocate_helper_retry(&mut state, required_order, lowest)?
955 {
956 page_number
957 } else {
958 self.grow(&mut state, required_order)?;
959 Self::allocate_helper_retry(&mut state, required_order, lowest)?.unwrap()
960 };
961
962 #[cfg(debug_assertions)]
963 {
964 assert!(
965 !self
966 .read_page_ref_counts
967 .lock()
968 .unwrap()
969 .contains_key(&page_number),
970 "Allocated a page that is still referenced! {page_number:?}"
971 );
972 assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number));
973 }
974
975 if transactional {
976 self.allocated_since_commit
977 .lock()
978 .unwrap()
979 .insert(page_number);
980 }
981
982 let address_range = page_number.address_range(
983 self.page_size.into(),
984 self.region_size,
985 self.region_header_with_padding_size,
986 self.page_size,
987 );
988 let len: usize = (address_range.end - address_range.start)
989 .try_into()
990 .unwrap();
991
992 #[allow(unused_mut)]
993 let mut mem = self.storage.write(address_range.start, len, true)?;
994 debug_assert!(mem.mem().len() >= allocation_size);
995
996 #[cfg(debug_assertions)]
997 {
998 assert!(self.open_dirty_pages.lock().unwrap().insert(page_number));
999
1000 mem.mem_mut().fill(0xFF);
1002 }
1003
1004 Ok(PageMut {
1005 mem,
1006 page_number,
1007 #[cfg(debug_assertions)]
1008 open_pages: self.open_dirty_pages.clone(),
1009 })
1010 }
1011
1012 fn allocate_helper_retry(
1013 state: &mut InMemoryState,
1014 required_order: u8,
1015 lowest: bool,
1016 ) -> Result<Option<PageNumber>> {
1017 loop {
1018 let Some(candidate_region) = state.get_region_tracker_mut().find_free(required_order)
1019 else {
1020 return Ok(None);
1021 };
1022 let region = state.get_region_mut(candidate_region);
1023 let r = if lowest {
1024 region.alloc_lowest(required_order)
1025 } else {
1026 region.alloc(required_order)
1027 };
1028 if let Some(page) = r {
1029 return Ok(Some(PageNumber::new(
1030 candidate_region,
1031 page,
1032 required_order,
1033 )));
1034 }
1035 state
1037 .get_region_tracker_mut()
1038 .mark_full(required_order, candidate_region);
1039 }
1040 }
1041
1042 fn try_shrink(state: &mut InMemoryState) -> Result<bool> {
1043 let layout = state.header.layout();
1044 let last_region_index = layout.num_regions() - 1;
1045 let last_allocator = state.get_region(last_region_index);
1046 let trailing_free = last_allocator.trailing_free_pages();
1047 let last_allocator_len = last_allocator.len();
1048 if trailing_free < last_allocator_len / 2 {
1049 return Ok(false);
1050 }
1051 let reduce_by = if layout.num_regions() > 1 && trailing_free == last_allocator_len {
1052 trailing_free
1053 } else {
1054 trailing_free / 2
1055 };
1056
1057 let mut new_layout = layout;
1058 new_layout.reduce_last_region(reduce_by);
1059 state.allocators.resize_to(new_layout);
1060 assert!(new_layout.len() <= layout.len());
1061 state.header.set_layout(new_layout);
1062
1063 Ok(true)
1064 }
1065
1066 fn grow(&self, state: &mut InMemoryState, required_order_allocation: u8) -> Result<()> {
1067 let layout = state.header.layout();
1068 let required_growth =
1069 2u64.pow(required_order_allocation.into()) * u64::from(state.header.page_size());
1070 let max_region_size = u64::from(state.header.layout().full_region_layout().num_pages())
1071 * u64::from(state.header.page_size());
1072 let next_desired_size = if layout.num_full_regions() > 0 {
1073 if let Some(trailing) = layout.trailing_region_layout() {
1074 if 2 * required_growth < max_region_size - trailing.usable_bytes() {
1075 layout.usable_bytes() + (max_region_size - trailing.usable_bytes())
1077 } else {
1078 layout.usable_bytes() + 2 * max_region_size - trailing.usable_bytes()
1080 }
1081 } else {
1082 layout.usable_bytes() + max_region_size
1084 }
1085 } else {
1086 max(
1087 layout.usable_bytes() * 2,
1088 layout.usable_bytes() + required_growth * 2,
1089 )
1090 };
1091 let new_layout = DatabaseLayout::calculate(
1092 next_desired_size,
1093 state.header.layout().full_region_layout().num_pages(),
1094 self.page_size,
1095 );
1096 assert!(new_layout.len() >= layout.len());
1097
1098 let result = self.storage.resize(new_layout.len());
1099 if result.is_err() {
1100 self.needs_recovery.store(true, Ordering::Release);
1103 return result;
1104 }
1105
1106 state.allocators.resize_to(new_layout);
1107 state.header.set_layout(new_layout);
1108 Ok(())
1109 }
1110
1111 pub(crate) fn allocate(&self, allocation_size: usize) -> Result<PageMut> {
1112 self.allocate_helper(allocation_size, false, true)
1113 }
1114
1115 pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result<PageMut> {
1116 self.allocate_helper(allocation_size, true, true)
1117 }
1118
1119 fn allocate_non_transactional(&self, allocation_size: usize, lowest: bool) -> Result<PageMut> {
1122 self.allocate_helper(allocation_size, lowest, false)
1123 }
1124
1125 pub(crate) fn count_allocated_pages(&self) -> Result<u64> {
1126 let state = self.state.lock().unwrap();
1127 let mut count = 0u64;
1128 for i in 0..state.header.layout().num_regions() {
1129 count += u64::from(state.get_region(i).count_allocated_pages());
1130 }
1131
1132 Ok(count)
1133 }
1134
1135 pub(crate) fn get_page_size(&self) -> usize {
1136 self.page_size.try_into().unwrap()
1137 }
1138}
1139
1140impl Drop for TransactionalMemory {
1141 fn drop(&mut self) {
1142 if thread::panicking() || self.needs_recovery.load(Ordering::Acquire) {
1143 return;
1144 }
1145
1146 let tracker_page = self.state.lock().unwrap().header.region_tracker();
1148 self.free(tracker_page);
1149 if self.allocate_region_tracker_page().is_err() {
1150 #[cfg(feature = "logging")]
1151 warn!("Failure while flushing allocator state. Repair required at restart.");
1152 return;
1153 }
1154
1155 let mut state = self.state.lock().unwrap();
1156 if state
1157 .allocators
1158 .flush_to(
1159 state.header.region_tracker(),
1160 state.header.layout(),
1161 &self.storage,
1162 )
1163 .is_err()
1164 {
1165 #[cfg(feature = "logging")]
1166 warn!("Failure while flushing allocator state. Repair required at restart.");
1167 return;
1168 }
1169
1170 if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) {
1171 state.header.recovery_required = false;
1172 let _ = self.write_header(&state.header);
1173 let _ = self.storage.flush(false);
1174 }
1175 }
1176}
1177
1178#[cfg(test)]
1179mod test {
1180 use crate::tree_store::page_store::page_manager::INITIAL_REGIONS;
1181 use crate::{Database, TableDefinition};
1182
1183 #[test]
1185 fn out_of_regions() {
1186 let tmpfile = crate::create_tempfile();
1187 let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1188 let page_size = 1024;
1189 let big_value = vec![0u8; 5 * page_size];
1190
1191 let db = Database::builder()
1192 .set_region_size((8 * page_size).try_into().unwrap())
1193 .set_page_size(page_size)
1194 .create(tmpfile.path())
1195 .unwrap();
1196
1197 let txn = db.begin_write().unwrap();
1198 {
1199 let mut table = txn.open_table(table_definition).unwrap();
1200 for i in 0..=INITIAL_REGIONS {
1201 table.insert(&i, big_value.as_slice()).unwrap();
1202 }
1203 }
1204 txn.commit().unwrap();
1205 drop(db);
1206
1207 let mut db = Database::builder()
1208 .set_region_size((8 * page_size).try_into().unwrap())
1209 .set_page_size(page_size)
1210 .open(tmpfile.path())
1211 .unwrap();
1212 assert!(db.check_integrity().unwrap());
1213 }
1214
1215 #[test]
1217 #[cfg(panic = "unwind")]
1218 fn panic() {
1219 let tmpfile = crate::create_tempfile();
1220 let table_definition: TableDefinition<u32, &[u8]> = TableDefinition::new("x");
1221
1222 let _ = std::panic::catch_unwind(|| {
1223 let db = Database::create(&tmpfile).unwrap();
1224 let txn = db.begin_write().unwrap();
1225 txn.open_table(table_definition).unwrap();
1226 panic!();
1227 });
1228
1229 let mut db = Database::open(tmpfile).unwrap();
1230 assert!(db.check_integrity().unwrap());
1231 }
1232}