1use crate::db::TransactionGuard;
2use crate::error::TableError;
3use crate::multimap_table::{
4 finalize_tree_and_subtree_checksums, multimap_btree_stats, verify_tree_and_subtree_checksums,
5};
6use crate::tree_store::btree::{btree_stats, UntypedBtreeMut};
7use crate::tree_store::btree_base::BtreeHeader;
8use crate::tree_store::page_store::{new_allocators, BuddyAllocator};
9use crate::tree_store::{
10 Btree, BtreeMut, BtreeRangeIter, InternalTableDefinition, PageHint, PageNumber, PagePath,
11 RawBtree, TableType, TransactionalMemory,
12};
13use crate::types::{Key, MutInPlaceValue, TypeName, Value};
14use crate::{DatabaseStats, Result};
15use std::cmp::max;
16use std::collections::{BTreeMap, HashMap};
17use std::mem::size_of;
18use std::ops::RangeFull;
19use std::sync::{Arc, Mutex};
20
21#[derive(Debug)]
22pub(crate) struct FreedTableKey {
23 pub(crate) transaction_id: u64,
24 pub(crate) pagination_id: u64,
25}
26
27impl Value for FreedTableKey {
28 type SelfType<'a> = FreedTableKey
29 where
30 Self: 'a;
31 type AsBytes<'a> = [u8; 2 * size_of::<u64>()]
32 where
33 Self: 'a;
34
35 fn fixed_width() -> Option<usize> {
36 Some(2 * size_of::<u64>())
37 }
38
39 fn from_bytes<'a>(data: &'a [u8]) -> Self
40 where
41 Self: 'a,
42 {
43 let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
44 let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
45 Self {
46 transaction_id,
47 pagination_id,
48 }
49 }
50
51 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
52 where
53 Self: 'b,
54 {
55 let mut result = [0u8; 2 * size_of::<u64>()];
56 result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
57 result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
58 result
59 }
60
61 fn type_name() -> TypeName {
62 TypeName::internal("redb::FreedTableKey")
63 }
64}
65
66impl Key for FreedTableKey {
67 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
68 let value1 = Self::from_bytes(data1);
69 let value2 = Self::from_bytes(data2);
70
71 match value1.transaction_id.cmp(&value2.transaction_id) {
72 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
73 std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
74 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
75 }
76 }
77}
78
79#[derive(Debug)]
83pub(crate) struct FreedPageList<'a> {
84 data: &'a [u8],
85}
86
87impl<'a> FreedPageList<'a> {
88 pub(crate) fn required_bytes(len: usize) -> usize {
89 2 + PageNumber::serialized_size() * len
90 }
91
92 pub(crate) fn len(&self) -> usize {
93 u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
94 }
95
96 pub(crate) fn get(&self, index: usize) -> PageNumber {
97 let start = size_of::<u16>() + PageNumber::serialized_size() * index;
98 PageNumber::from_le_bytes(
99 self.data[start..(start + PageNumber::serialized_size())]
100 .try_into()
101 .unwrap(),
102 )
103 }
104}
105
106#[derive(Debug)]
107#[repr(transparent)]
108pub(crate) struct FreedPageListMut {
109 data: [u8],
110}
111
112impl FreedPageListMut {
113 pub(crate) fn push_back(&mut self, value: PageNumber) {
114 let len = u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap());
115 self.data[..size_of::<u16>()].copy_from_slice(&(len + 1).to_le_bytes());
116 let len: usize = len.into();
117 let start = size_of::<u16>() + PageNumber::serialized_size() * len;
118 self.data[start..(start + PageNumber::serialized_size())]
119 .copy_from_slice(&value.to_le_bytes());
120 }
121
122 pub(crate) fn clear(&mut self) {
123 self.data[..size_of::<u16>()].fill(0);
124 }
125}
126
127impl Value for FreedPageList<'_> {
128 type SelfType<'a> = FreedPageList<'a>
129 where
130 Self: 'a;
131 type AsBytes<'a> = &'a [u8]
132 where
133 Self: 'a;
134
135 fn fixed_width() -> Option<usize> {
136 None
137 }
138
139 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
140 where
141 Self: 'a,
142 {
143 FreedPageList { data }
144 }
145
146 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
147 where
148 Self: 'b,
149 {
150 value.data
151 }
152
153 fn type_name() -> TypeName {
154 TypeName::internal("redb::FreedPageList")
155 }
156}
157
158impl MutInPlaceValue for FreedPageList<'_> {
159 type BaseRefType = FreedPageListMut;
160
161 fn initialize(data: &mut [u8]) {
162 assert!(data.len() >= 8);
163 data[..8].fill(0);
165 }
166
167 fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
168 unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut FreedPageListMut) }
169 }
170}
171
172pub struct TableNameIter {
173 inner: BtreeRangeIter<&'static str, InternalTableDefinition>,
174 table_type: TableType,
175}
176
177impl Iterator for TableNameIter {
178 type Item = Result<String>;
179
180 fn next(&mut self) -> Option<Self::Item> {
181 for entry in self.inner.by_ref() {
182 match entry {
183 Ok(entry) => {
184 if entry.value().get_type() == self.table_type {
185 return Some(Ok(entry.key().to_string()));
186 }
187 }
188 Err(err) => {
189 return Some(Err(err));
190 }
191 }
192 }
193 None
194 }
195}
196
197pub(crate) struct TableTree {
198 tree: Btree<&'static str, InternalTableDefinition>,
199}
200
201impl TableTree {
202 pub(crate) fn new(
203 master_root: Option<BtreeHeader>,
204 page_hint: PageHint,
205 guard: Arc<TransactionGuard>,
206 mem: Arc<TransactionalMemory>,
207 ) -> Result<Self> {
208 Ok(Self {
209 tree: Btree::new(master_root, page_hint, guard, mem)?,
210 })
211 }
212
213 pub(crate) fn transaction_guard(&self) -> &Arc<TransactionGuard> {
214 self.tree.transaction_guard()
215 }
216
217 pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
219 let iter = self.tree.range::<RangeFull, &str>(&(..))?;
220 let iter = TableNameIter {
221 inner: iter,
222 table_type,
223 };
224 let mut result = vec![];
225 for table in iter {
226 result.push(table?);
227 }
228 Ok(result)
229 }
230
231 pub(crate) fn get_table_untyped(
232 &self,
233 name: &str,
234 table_type: TableType,
235 ) -> Result<Option<InternalTableDefinition>, TableError> {
236 if let Some(guard) = self.tree.get(&name)? {
237 let definition = guard.value();
238 definition.check_match_untyped(table_type, name)?;
239 Ok(Some(definition))
240 } else {
241 Ok(None)
242 }
243 }
244
245 pub(crate) fn get_table<K: Key, V: Value>(
247 &self,
248 name: &str,
249 table_type: TableType,
250 ) -> Result<Option<InternalTableDefinition>, TableError> {
251 Ok(
252 if let Some(definition) = self.get_table_untyped(name, table_type)? {
253 definition.check_match::<K, V>(table_type, name)?;
255 Some(definition)
256 } else {
257 None
258 },
259 )
260 }
261}
262
263pub(crate) struct TableTreeMut<'txn> {
264 tree: BtreeMut<'txn, &'static str, InternalTableDefinition>,
265 guard: Arc<TransactionGuard>,
266 mem: Arc<TransactionalMemory>,
267 pending_table_updates: HashMap<String, (Option<BtreeHeader>, u64)>,
269 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
270}
271
272impl<'txn> TableTreeMut<'txn> {
273 pub(crate) fn new(
274 master_root: Option<BtreeHeader>,
275 guard: Arc<TransactionGuard>,
276 mem: Arc<TransactionalMemory>,
277 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
278 ) -> Self {
279 Self {
280 tree: BtreeMut::new(master_root, guard.clone(), mem.clone(), freed_pages.clone()),
281 guard,
282 mem,
283 pending_table_updates: Default::default(),
284 freed_pages,
285 }
286 }
287
288 pub(crate) fn all_referenced_pages(&self) -> Result<Vec<BuddyAllocator>> {
289 let mut result = new_allocators(self.mem.get_layout());
290
291 self.visit_all_pages(|path| {
292 let page = path.page_number();
293 result[page.region as usize].record_alloc(page.page_index, page.page_order);
294 Ok(())
295 })?;
296
297 Ok(result)
298 }
299
300 pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
301 where
302 F: FnMut(&PagePath) -> Result,
303 {
304 self.tree.visit_all_pages(&mut visitor)?;
306
307 for entry in self.list_tables(TableType::Normal)? {
309 let definition = self
310 .get_table_untyped(&entry, TableType::Normal)
311 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
312 .unwrap();
313 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
314 }
315
316 for entry in self.list_tables(TableType::Multimap)? {
317 let definition = self
318 .get_table_untyped(&entry, TableType::Multimap)
319 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
320 .unwrap();
321 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
322 }
323
324 Ok(())
325 }
326
327 pub(crate) fn stage_update_table_root(
329 &mut self,
330 name: &str,
331 table_root: Option<BtreeHeader>,
332 length: u64,
333 ) {
334 self.pending_table_updates
335 .insert(name.to_string(), (table_root, length));
336 }
337
338 pub(crate) fn clear_table_root_updates(&mut self) {
339 self.pending_table_updates.clear();
340 }
341
342 pub(crate) fn verify_checksums(&self) -> Result<bool> {
343 assert!(self.pending_table_updates.is_empty());
344 if !self.tree.verify_checksum()? {
345 return Ok(false);
346 }
347
348 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
349 let entry = entry?;
350 let definition = entry.value();
351 match definition {
352 InternalTableDefinition::Normal {
353 table_root,
354 fixed_key_size,
355 fixed_value_size,
356 ..
357 } => {
358 if let Some(header) = table_root {
359 if !RawBtree::new(
360 Some(header),
361 fixed_key_size,
362 fixed_value_size,
363 self.mem.clone(),
364 )
365 .verify_checksum()?
366 {
367 return Ok(false);
368 }
369 }
370 }
371 InternalTableDefinition::Multimap {
372 table_root,
373 fixed_key_size,
374 fixed_value_size,
375 ..
376 } => {
377 if !verify_tree_and_subtree_checksums(
378 table_root,
379 fixed_key_size,
380 fixed_value_size,
381 self.mem.clone(),
382 )? {
383 return Ok(false);
384 }
385 }
386 }
387 }
388
389 Ok(true)
390 }
391
392 pub(crate) fn flush_table_root_updates(&mut self) -> Result<&mut Self> {
393 for (name, (new_root, new_length)) in self.pending_table_updates.drain() {
394 let mut definition = self.tree.get(&name.as_str())?.unwrap().value();
396 match definition {
398 InternalTableDefinition::Normal { table_root, .. }
399 | InternalTableDefinition::Multimap { table_root, .. } => {
400 if table_root == new_root {
401 continue;
402 }
403 }
404 }
405 match definition {
407 InternalTableDefinition::Normal {
408 ref mut table_root,
409 ref mut table_length,
410 fixed_key_size,
411 fixed_value_size,
412 ..
413 } => {
414 let mut tree = UntypedBtreeMut::new(
415 new_root,
416 self.mem.clone(),
417 self.freed_pages.clone(),
418 fixed_key_size,
419 fixed_value_size,
420 );
421 *table_root = tree.finalize_dirty_checksums()?;
422 *table_length = new_length;
423 }
424 InternalTableDefinition::Multimap {
425 ref mut table_root,
426 ref mut table_length,
427 fixed_key_size,
428 fixed_value_size,
429 ..
430 } => {
431 *table_root = finalize_tree_and_subtree_checksums(
432 new_root,
433 fixed_key_size,
434 fixed_value_size,
435 self.mem.clone(),
436 )?;
437 *table_length = new_length;
438 }
439 }
440 self.tree.insert(&name.as_str(), &definition)?;
441 }
442 Ok(self)
443 }
444
445 pub(crate) fn create_table_and_flush_table_root<K: Key + 'static, V: Value + 'static>(
449 &mut self,
450 name: &str,
451 f: impl FnOnce(&mut BtreeMut<K, V>) -> Result,
452 ) -> Result {
453 assert!(self.pending_table_updates.is_empty());
454 assert!(self.tree.get(&name)?.is_none());
455
456 self.tree.insert(
458 &name,
459 &InternalTableDefinition::new::<K, V>(TableType::Normal, None, 0),
460 )?;
461
462 let mut tree: BtreeMut<K, V> = BtreeMut::new(
464 None,
465 self.guard.clone(),
466 self.mem.clone(),
467 self.freed_pages.clone(),
468 );
469 f(&mut tree)?;
470
471 let table_root = tree.finalize_dirty_checksums()?;
473 let table_length = tree.get_root().map(|x| x.length).unwrap_or_default();
474
475 self.tree.insert_inplace(
477 &name,
478 &InternalTableDefinition::new::<K, V>(TableType::Normal, table_root, table_length),
479 )?;
480
481 Ok(())
482 }
483
484 pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
485 self.tree.finalize_dirty_checksums()
486 }
487
488 pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
490 let tree = TableTree::new(
491 self.tree.get_root(),
492 PageHint::None,
493 self.guard.clone(),
494 self.mem.clone(),
495 )?;
496 tree.list_tables(table_type)
497 }
498
499 pub(crate) fn get_table_untyped(
500 &self,
501 name: &str,
502 table_type: TableType,
503 ) -> Result<Option<InternalTableDefinition>, TableError> {
504 let tree = TableTree::new(
505 self.tree.get_root(),
506 PageHint::None,
507 self.guard.clone(),
508 self.mem.clone(),
509 )?;
510 let mut result = tree.get_table_untyped(name, table_type);
511
512 if let Ok(Some(definition)) = result.as_mut() {
513 if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
514 definition.set_header(*updated_root, *updated_length);
515 }
516 }
517
518 result
519 }
520
521 pub(crate) fn get_table<K: Key, V: Value>(
523 &self,
524 name: &str,
525 table_type: TableType,
526 ) -> Result<Option<InternalTableDefinition>, TableError> {
527 let tree = TableTree::new(
528 self.tree.get_root(),
529 PageHint::None,
530 self.guard.clone(),
531 self.mem.clone(),
532 )?;
533 let mut result = tree.get_table::<K, V>(name, table_type);
534
535 if let Ok(Some(definition)) = result.as_mut() {
536 if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
537 definition.set_header(*updated_root, *updated_length);
538 }
539 }
540
541 result
542 }
543
544 pub(crate) fn delete_table(
546 &mut self,
547 name: &str,
548 table_type: TableType,
549 ) -> Result<bool, TableError> {
550 if let Some(definition) = self.get_table_untyped(name, table_type)? {
551 let mut freed_pages = self.freed_pages.lock().unwrap();
552 definition.visit_all_pages(self.mem.clone(), |path| {
553 freed_pages.push(path.page_number());
554 Ok(())
555 })?;
556 drop(freed_pages);
557
558 self.pending_table_updates.remove(name);
559
560 let found = self.tree.remove(&name)?.is_some();
561 return Ok(found);
562 }
563
564 Ok(false)
565 }
566
567 pub(crate) fn get_or_create_table<K: Key, V: Value>(
568 &mut self,
569 name: &str,
570 table_type: TableType,
571 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
572 let table = if let Some(found) = self.get_table::<K, V>(name, table_type)? {
573 found
574 } else {
575 let table = InternalTableDefinition::new::<K, V>(table_type, None, 0);
576 self.tree.insert(&name, &table)?;
577 table
578 };
579
580 match table {
581 InternalTableDefinition::Normal {
582 table_root,
583 table_length,
584 ..
585 }
586 | InternalTableDefinition::Multimap {
587 table_root,
588 table_length,
589 ..
590 } => Ok((table_root, table_length)),
591 }
592 }
593
594 pub(crate) fn highest_index_pages(
597 &self,
598 n: usize,
599 output: &mut BTreeMap<PageNumber, PagePath>,
600 ) -> Result {
601 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
602 let entry = entry?;
603 let mut definition = entry.value();
604 if let Some((updated_root, updated_length)) =
605 self.pending_table_updates.get(entry.key())
606 {
607 definition.set_header(*updated_root, *updated_length);
608 }
609
610 definition.visit_all_pages(self.mem.clone(), |path| {
611 output.insert(path.page_number(), path.clone());
612 while output.len() > n {
613 output.pop_first();
614 }
615 Ok(())
616 })?;
617 }
618
619 self.tree.visit_all_pages(|path| {
620 output.insert(path.page_number(), path.clone());
621 while output.len() > n {
622 output.pop_first();
623 }
624 Ok(())
625 })?;
626
627 Ok(())
628 }
629
630 pub(crate) fn relocate_tables(
631 &mut self,
632 relocation_map: &HashMap<PageNumber, PageNumber>,
633 ) -> Result {
634 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
635 let entry = entry?;
636 let mut definition = entry.value();
637 if let Some((updated_root, updated_length)) =
638 self.pending_table_updates.get(entry.key())
639 {
640 definition.set_header(*updated_root, *updated_length);
641 }
642
643 if let Some(new_root) = definition.relocate_tree(
644 self.mem.clone(),
645 self.freed_pages.clone(),
646 relocation_map,
647 )? {
648 self.pending_table_updates
649 .insert(entry.key().to_string(), (new_root, definition.get_length()));
650 }
651 }
652
653 self.tree.relocate(relocation_map)?;
654
655 Ok(())
656 }
657
658 pub fn stats(&self) -> Result<DatabaseStats> {
659 let master_tree_stats = self.tree.stats()?;
660 let mut max_subtree_height = 0;
661 let mut total_stored_bytes = 0;
662 let mut branch_pages = master_tree_stats.branch_pages + master_tree_stats.leaf_pages;
664 let mut leaf_pages = 0;
665 let mut total_metadata_bytes =
667 master_tree_stats.metadata_bytes + master_tree_stats.stored_leaf_bytes;
668 let mut total_fragmented = master_tree_stats.fragmented_bytes;
669
670 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
671 let entry = entry?;
672 let mut definition = entry.value();
673 if let Some((updated_root, length)) = self.pending_table_updates.get(entry.key()) {
674 definition.set_header(*updated_root, *length);
675 }
676 match definition {
677 InternalTableDefinition::Normal {
678 table_root,
679 fixed_key_size,
680 fixed_value_size,
681 ..
682 } => {
683 let subtree_stats = btree_stats(
684 table_root.map(|x| x.root),
685 &self.mem,
686 fixed_key_size,
687 fixed_value_size,
688 )?;
689 max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
690 total_stored_bytes += subtree_stats.stored_leaf_bytes;
691 total_metadata_bytes += subtree_stats.metadata_bytes;
692 total_fragmented += subtree_stats.fragmented_bytes;
693 branch_pages += subtree_stats.branch_pages;
694 leaf_pages += subtree_stats.leaf_pages;
695 }
696 InternalTableDefinition::Multimap {
697 table_root,
698 fixed_key_size,
699 fixed_value_size,
700 ..
701 } => {
702 let subtree_stats = multimap_btree_stats(
703 table_root.map(|x| x.root),
704 &self.mem,
705 fixed_key_size,
706 fixed_value_size,
707 )?;
708 max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
709 total_stored_bytes += subtree_stats.stored_leaf_bytes;
710 total_metadata_bytes += subtree_stats.metadata_bytes;
711 total_fragmented += subtree_stats.fragmented_bytes;
712 branch_pages += subtree_stats.branch_pages;
713 leaf_pages += subtree_stats.leaf_pages;
714 }
715 }
716 }
717 Ok(DatabaseStats {
718 tree_height: master_tree_stats.tree_height + max_subtree_height,
719 allocated_pages: self.mem.count_allocated_pages()?,
720 leaf_pages,
721 branch_pages,
722 stored_leaf_bytes: total_stored_bytes,
723 metadata_bytes: total_metadata_bytes,
724 fragmented_bytes: total_fragmented,
725 page_size: self.mem.get_page_size(),
726 })
727 }
728}
729
730#[cfg(test)]
731mod test {
732 use crate::tree_store::table_tree_base::InternalTableDefinition;
733 use crate::types::TypeName;
734 use crate::Value;
735
736 #[test]
737 fn round_trip() {
738 let x = InternalTableDefinition::Multimap {
739 table_root: None,
740 table_length: 0,
741 fixed_key_size: None,
742 fixed_value_size: Some(5),
743 key_alignment: 6,
744 value_alignment: 7,
745 key_type: TypeName::new("test::Key"),
746 value_type: TypeName::new("test::Value"),
747 };
748 let y = InternalTableDefinition::from_bytes(InternalTableDefinition::as_bytes(&x).as_ref());
749 assert_eq!(x, y);
750 }
751}