1use crate::db::TransactionGuard;
2use crate::error::TableError;
3use crate::multimap_table::{
4 finalize_tree_and_subtree_checksums, multimap_btree_stats, verify_tree_and_subtree_checksums,
5};
6use crate::tree_store::btree::{UntypedBtreeMut, btree_stats};
7use crate::tree_store::btree_base::BtreeHeader;
8use crate::tree_store::{
9 Btree, BtreeMut, BtreeRangeIter, InternalTableDefinition, PageHint, PageNumber, PagePath,
10 RawBtree, TableType, TransactionalMemory,
11};
12use crate::types::{Key, MutInPlaceValue, TypeName, Value};
13use crate::{DatabaseStats, Result};
14use std::cmp::max;
15use std::collections::{BTreeMap, HashMap};
16use std::mem::size_of;
17use std::ops::RangeFull;
18use std::sync::{Arc, Mutex};
19
20#[derive(Debug)]
21pub(crate) struct FreedTableKey {
22 pub(crate) transaction_id: u64,
23 pub(crate) pagination_id: u64,
24}
25
26impl Value for FreedTableKey {
27 type SelfType<'a>
28 = FreedTableKey
29 where
30 Self: 'a;
31 type AsBytes<'a>
32 = [u8; 2 * size_of::<u64>()]
33 where
34 Self: 'a;
35
36 fn fixed_width() -> Option<usize> {
37 Some(2 * size_of::<u64>())
38 }
39
40 fn from_bytes<'a>(data: &'a [u8]) -> Self
41 where
42 Self: 'a,
43 {
44 let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
45 let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
46 Self {
47 transaction_id,
48 pagination_id,
49 }
50 }
51
52 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
53 where
54 Self: 'b,
55 {
56 let mut result = [0u8; 2 * size_of::<u64>()];
57 result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
58 result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
59 result
60 }
61
62 fn type_name() -> TypeName {
63 TypeName::internal("redb::FreedTableKey")
64 }
65}
66
67impl Key for FreedTableKey {
68 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
69 let value1 = Self::from_bytes(data1);
70 let value2 = Self::from_bytes(data2);
71
72 match value1.transaction_id.cmp(&value2.transaction_id) {
73 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
74 std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
75 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
76 }
77 }
78}
79
80#[derive(Debug)]
84pub(crate) struct FreedPageList<'a> {
85 data: &'a [u8],
86}
87
88impl FreedPageList<'_> {
89 pub(crate) fn required_bytes(len: usize) -> usize {
90 2 + PageNumber::serialized_size() * len
91 }
92
93 pub(crate) fn len(&self) -> usize {
94 u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
95 }
96
97 pub(crate) fn get(&self, index: usize) -> PageNumber {
98 let start = size_of::<u16>() + PageNumber::serialized_size() * index;
99 PageNumber::from_le_bytes(
100 self.data[start..(start + PageNumber::serialized_size())]
101 .try_into()
102 .unwrap(),
103 )
104 }
105}
106
107#[derive(Debug)]
108#[repr(transparent)]
109pub(crate) struct FreedPageListMut {
110 data: [u8],
111}
112
113impl FreedPageListMut {
114 pub(crate) fn push_back(&mut self, value: PageNumber) {
115 let len = u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap());
116 self.data[..size_of::<u16>()].copy_from_slice(&(len + 1).to_le_bytes());
117 let len: usize = len.into();
118 let start = size_of::<u16>() + PageNumber::serialized_size() * len;
119 self.data[start..(start + PageNumber::serialized_size())]
120 .copy_from_slice(&value.to_le_bytes());
121 }
122
123 pub(crate) fn clear(&mut self) {
124 self.data[..size_of::<u16>()].fill(0);
125 }
126}
127
128impl Value for FreedPageList<'_> {
129 type SelfType<'a>
130 = FreedPageList<'a>
131 where
132 Self: 'a;
133 type AsBytes<'a>
134 = &'a [u8]
135 where
136 Self: 'a;
137
138 fn fixed_width() -> Option<usize> {
139 None
140 }
141
142 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
143 where
144 Self: 'a,
145 {
146 FreedPageList { data }
147 }
148
149 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
150 where
151 Self: 'b,
152 {
153 value.data
154 }
155
156 fn type_name() -> TypeName {
157 TypeName::internal("redb::FreedPageList")
158 }
159}
160
161impl MutInPlaceValue for FreedPageList<'_> {
162 type BaseRefType = FreedPageListMut;
163
164 fn initialize(data: &mut [u8]) {
165 assert!(data.len() >= 8);
166 data[..8].fill(0);
168 }
169
170 fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
171 unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut FreedPageListMut) }
172 }
173}
174
175pub struct TableNameIter {
176 inner: BtreeRangeIter<&'static str, InternalTableDefinition>,
177 table_type: TableType,
178}
179
180impl Iterator for TableNameIter {
181 type Item = Result<String>;
182
183 fn next(&mut self) -> Option<Self::Item> {
184 for entry in self.inner.by_ref() {
185 match entry {
186 Ok(entry) => {
187 if entry.value().get_type() == self.table_type {
188 return Some(Ok(entry.key().to_string()));
189 }
190 }
191 Err(err) => {
192 return Some(Err(err));
193 }
194 }
195 }
196 None
197 }
198}
199
200pub(crate) struct TableTree {
201 tree: Btree<&'static str, InternalTableDefinition>,
202 mem: Arc<TransactionalMemory>,
203}
204
205impl TableTree {
206 pub(crate) fn new(
207 master_root: Option<BtreeHeader>,
208 page_hint: PageHint,
209 guard: Arc<TransactionGuard>,
210 mem: Arc<TransactionalMemory>,
211 ) -> Result<Self> {
212 Ok(Self {
213 tree: Btree::new(master_root, page_hint, guard, mem.clone())?,
214 mem,
215 })
216 }
217
218 pub(crate) fn transaction_guard(&self) -> &Arc<TransactionGuard> {
219 self.tree.transaction_guard()
220 }
221
222 pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
224 let iter = self.tree.range::<RangeFull, &str>(&(..))?;
225 let iter = TableNameIter {
226 inner: iter,
227 table_type,
228 };
229 let mut result = vec![];
230 for table in iter {
231 result.push(table?);
232 }
233 Ok(result)
234 }
235
236 pub(crate) fn get_table_untyped(
237 &self,
238 name: &str,
239 table_type: TableType,
240 ) -> Result<Option<InternalTableDefinition>, TableError> {
241 if let Some(guard) = self.tree.get(&name)? {
242 let definition = guard.value();
243 definition.check_match_untyped(table_type, name)?;
244 Ok(Some(definition))
245 } else {
246 Ok(None)
247 }
248 }
249
250 pub(crate) fn get_table<K: Key, V: Value>(
252 &self,
253 name: &str,
254 table_type: TableType,
255 ) -> Result<Option<InternalTableDefinition>, TableError> {
256 Ok(
257 if let Some(definition) = self.get_table_untyped(name, table_type)? {
258 definition.check_match::<K, V>(table_type, name)?;
260 Some(definition)
261 } else {
262 None
263 },
264 )
265 }
266
267 pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
268 where
269 F: FnMut(&PagePath) -> Result,
270 {
271 self.tree.visit_all_pages(&mut visitor)?;
273
274 for entry in self.list_tables(TableType::Normal)? {
276 let definition = self
277 .get_table_untyped(&entry, TableType::Normal)
278 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
279 .unwrap();
280 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
281 }
282
283 for entry in self.list_tables(TableType::Multimap)? {
284 let definition = self
285 .get_table_untyped(&entry, TableType::Multimap)
286 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
287 .unwrap();
288 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
289 }
290
291 Ok(())
292 }
293}
294
295pub(crate) struct TableTreeMut<'txn> {
296 tree: BtreeMut<'txn, &'static str, InternalTableDefinition>,
297 guard: Arc<TransactionGuard>,
298 mem: Arc<TransactionalMemory>,
299 pending_table_updates: HashMap<String, (Option<BtreeHeader>, u64)>,
301 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
302}
303
304impl TableTreeMut<'_> {
305 pub(crate) fn new(
306 master_root: Option<BtreeHeader>,
307 guard: Arc<TransactionGuard>,
308 mem: Arc<TransactionalMemory>,
309 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
310 ) -> Self {
311 Self {
312 tree: BtreeMut::new(master_root, guard.clone(), mem.clone(), freed_pages.clone()),
313 guard,
314 mem,
315 pending_table_updates: Default::default(),
316 freed_pages,
317 }
318 }
319
320 pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
321 where
322 F: FnMut(&PagePath) -> Result,
323 {
324 self.tree.visit_all_pages(&mut visitor)?;
326
327 for entry in self.list_tables(TableType::Normal)? {
329 let definition = self
330 .get_table_untyped(&entry, TableType::Normal)
331 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
332 .unwrap();
333 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
334 }
335
336 for entry in self.list_tables(TableType::Multimap)? {
337 let definition = self
338 .get_table_untyped(&entry, TableType::Multimap)
339 .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))?
340 .unwrap();
341 definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?;
342 }
343
344 Ok(())
345 }
346
347 pub(crate) fn stage_update_table_root(
349 &mut self,
350 name: &str,
351 table_root: Option<BtreeHeader>,
352 length: u64,
353 ) {
354 self.pending_table_updates
355 .insert(name.to_string(), (table_root, length));
356 }
357
358 pub(crate) fn clear_table_root_updates(&mut self) {
359 self.pending_table_updates.clear();
360 }
361
362 pub(crate) fn verify_checksums(&self) -> Result<bool> {
363 assert!(self.pending_table_updates.is_empty());
364 if !self.tree.verify_checksum()? {
365 return Ok(false);
366 }
367
368 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
369 let entry = entry?;
370 let definition = entry.value();
371 match definition {
372 InternalTableDefinition::Normal {
373 table_root,
374 fixed_key_size,
375 fixed_value_size,
376 ..
377 } => {
378 if let Some(header) = table_root {
379 if !RawBtree::new(
380 Some(header),
381 fixed_key_size,
382 fixed_value_size,
383 self.mem.clone(),
384 )
385 .verify_checksum()?
386 {
387 return Ok(false);
388 }
389 }
390 }
391 InternalTableDefinition::Multimap {
392 table_root,
393 fixed_key_size,
394 fixed_value_size,
395 ..
396 } => {
397 if !verify_tree_and_subtree_checksums(
398 table_root,
399 fixed_key_size,
400 fixed_value_size,
401 self.mem.clone(),
402 )? {
403 return Ok(false);
404 }
405 }
406 }
407 }
408
409 Ok(true)
410 }
411
412 pub(crate) fn flush_table_root_updates(&mut self) -> Result<&mut Self> {
413 for (name, (new_root, new_length)) in self.pending_table_updates.drain() {
414 let mut definition = self.tree.get(&name.as_str())?.unwrap().value();
416 match definition {
418 InternalTableDefinition::Normal { table_root, .. }
419 | InternalTableDefinition::Multimap { table_root, .. } => {
420 if table_root == new_root {
421 continue;
422 }
423 }
424 }
425 match definition {
427 InternalTableDefinition::Normal {
428 ref mut table_root,
429 ref mut table_length,
430 fixed_key_size,
431 fixed_value_size,
432 ..
433 } => {
434 let mut tree = UntypedBtreeMut::new(
435 new_root,
436 self.mem.clone(),
437 self.freed_pages.clone(),
438 fixed_key_size,
439 fixed_value_size,
440 );
441 *table_root = tree.finalize_dirty_checksums()?;
442 *table_length = new_length;
443 }
444 InternalTableDefinition::Multimap {
445 ref mut table_root,
446 ref mut table_length,
447 fixed_key_size,
448 fixed_value_size,
449 ..
450 } => {
451 *table_root = finalize_tree_and_subtree_checksums(
452 new_root,
453 fixed_key_size,
454 fixed_value_size,
455 self.mem.clone(),
456 )?;
457 *table_length = new_length;
458 }
459 }
460 self.tree.insert(&name.as_str(), &definition)?;
461 }
462 Ok(self)
463 }
464
465 pub(crate) fn create_table_and_flush_table_root<K: Key + 'static, V: Value + 'static>(
469 &mut self,
470 name: &str,
471 f: impl FnOnce(&mut BtreeMut<K, V>) -> Result,
472 ) -> Result {
473 assert!(self.pending_table_updates.is_empty());
474 assert!(self.tree.get(&name)?.is_none());
475
476 self.tree.insert(
478 &name,
479 &InternalTableDefinition::new::<K, V>(TableType::Normal, None, 0),
480 )?;
481
482 let mut tree: BtreeMut<K, V> = BtreeMut::new(
484 None,
485 self.guard.clone(),
486 self.mem.clone(),
487 self.freed_pages.clone(),
488 );
489 f(&mut tree)?;
490
491 let table_root = tree.finalize_dirty_checksums()?;
493 let table_length = tree.get_root().map(|x| x.length).unwrap_or_default();
494
495 self.tree.insert_inplace(
497 &name,
498 &InternalTableDefinition::new::<K, V>(TableType::Normal, table_root, table_length),
499 )?;
500
501 Ok(())
502 }
503
504 pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
505 self.tree.finalize_dirty_checksums()
506 }
507
508 pub(crate) fn list_tables(&self, table_type: TableType) -> Result<Vec<String>> {
510 let tree = TableTree::new(
511 self.tree.get_root(),
512 PageHint::None,
513 self.guard.clone(),
514 self.mem.clone(),
515 )?;
516 tree.list_tables(table_type)
517 }
518
519 pub(crate) fn get_table_untyped(
520 &self,
521 name: &str,
522 table_type: TableType,
523 ) -> Result<Option<InternalTableDefinition>, TableError> {
524 let tree = TableTree::new(
525 self.tree.get_root(),
526 PageHint::None,
527 self.guard.clone(),
528 self.mem.clone(),
529 )?;
530 let mut result = tree.get_table_untyped(name, table_type);
531
532 if let Ok(Some(definition)) = result.as_mut() {
533 if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
534 definition.set_header(*updated_root, *updated_length);
535 }
536 }
537
538 result
539 }
540
541 pub(crate) fn get_table<K: Key, V: Value>(
543 &self,
544 name: &str,
545 table_type: TableType,
546 ) -> Result<Option<InternalTableDefinition>, TableError> {
547 let tree = TableTree::new(
548 self.tree.get_root(),
549 PageHint::None,
550 self.guard.clone(),
551 self.mem.clone(),
552 )?;
553 let mut result = tree.get_table::<K, V>(name, table_type);
554
555 if let Ok(Some(definition)) = result.as_mut() {
556 if let Some((updated_root, updated_length)) = self.pending_table_updates.get(name) {
557 definition.set_header(*updated_root, *updated_length);
558 }
559 }
560
561 result
562 }
563
564 pub(crate) fn rename_table(
565 &mut self,
566 name: &str,
567 new_name: &str,
568 table_type: TableType,
569 ) -> Result<(), TableError> {
570 if let Some(definition) = self.get_table_untyped(name, table_type)? {
571 if self.get_table_untyped(new_name, table_type)?.is_some() {
572 return Err(TableError::TableExists(new_name.to_string()));
573 }
574 if let Some(update) = self.pending_table_updates.remove(name) {
575 self.pending_table_updates
576 .insert(new_name.to_string(), update);
577 }
578 assert!(self.tree.remove(&name)?.is_some());
579 assert!(self.tree.insert(&new_name, &definition)?.is_none());
580 } else {
581 return Err(TableError::TableDoesNotExist(name.to_string()));
582 }
583
584 Ok(())
585 }
586
587 pub(crate) fn delete_table(
588 &mut self,
589 name: &str,
590 table_type: TableType,
591 ) -> Result<bool, TableError> {
592 if let Some(definition) = self.get_table_untyped(name, table_type)? {
593 let mut freed_pages = self.freed_pages.lock().unwrap();
594 definition.visit_all_pages(self.mem.clone(), |path| {
595 freed_pages.push(path.page_number());
596 Ok(())
597 })?;
598 drop(freed_pages);
599
600 self.pending_table_updates.remove(name);
601
602 let found = self.tree.remove(&name)?.is_some();
603 return Ok(found);
604 }
605
606 Ok(false)
607 }
608
609 pub(crate) fn get_or_create_table<K: Key, V: Value>(
610 &mut self,
611 name: &str,
612 table_type: TableType,
613 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
614 let table = if let Some(found) = self.get_table::<K, V>(name, table_type)? {
615 found
616 } else {
617 let table = InternalTableDefinition::new::<K, V>(table_type, None, 0);
618 self.tree.insert(&name, &table)?;
619 table
620 };
621
622 match table {
623 InternalTableDefinition::Normal {
624 table_root,
625 table_length,
626 ..
627 }
628 | InternalTableDefinition::Multimap {
629 table_root,
630 table_length,
631 ..
632 } => Ok((table_root, table_length)),
633 }
634 }
635
636 pub(crate) fn highest_index_pages(
639 &self,
640 n: usize,
641 output: &mut BTreeMap<PageNumber, PagePath>,
642 ) -> Result {
643 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
644 let entry = entry?;
645 let mut definition = entry.value();
646 if let Some((updated_root, updated_length)) =
647 self.pending_table_updates.get(entry.key())
648 {
649 definition.set_header(*updated_root, *updated_length);
650 }
651
652 definition.visit_all_pages(self.mem.clone(), |path| {
653 output.insert(path.page_number(), path.clone());
654 while output.len() > n {
655 output.pop_first();
656 }
657 Ok(())
658 })?;
659 }
660
661 self.tree.visit_all_pages(|path| {
662 output.insert(path.page_number(), path.clone());
663 while output.len() > n {
664 output.pop_first();
665 }
666 Ok(())
667 })?;
668
669 Ok(())
670 }
671
672 pub(crate) fn relocate_tables(
673 &mut self,
674 relocation_map: &HashMap<PageNumber, PageNumber>,
675 ) -> Result {
676 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
677 let entry = entry?;
678 let mut definition = entry.value();
679 if let Some((updated_root, updated_length)) =
680 self.pending_table_updates.get(entry.key())
681 {
682 definition.set_header(*updated_root, *updated_length);
683 }
684
685 if let Some(new_root) = definition.relocate_tree(
686 self.mem.clone(),
687 self.freed_pages.clone(),
688 relocation_map,
689 )? {
690 self.pending_table_updates
691 .insert(entry.key().to_string(), (new_root, definition.get_length()));
692 }
693 }
694
695 self.tree.relocate(relocation_map)?;
696
697 Ok(())
698 }
699
700 pub fn stats(&self) -> Result<DatabaseStats> {
701 let master_tree_stats = self.tree.stats()?;
702 let mut max_subtree_height = 0;
703 let mut total_stored_bytes = 0;
704 let mut branch_pages = master_tree_stats.branch_pages + master_tree_stats.leaf_pages;
706 let mut leaf_pages = 0;
707 let mut total_metadata_bytes =
709 master_tree_stats.metadata_bytes + master_tree_stats.stored_leaf_bytes;
710 let mut total_fragmented = master_tree_stats.fragmented_bytes;
711
712 for entry in self.tree.range::<RangeFull, &str>(&(..))? {
713 let entry = entry?;
714 let mut definition = entry.value();
715 if let Some((updated_root, length)) = self.pending_table_updates.get(entry.key()) {
716 definition.set_header(*updated_root, *length);
717 }
718 match definition {
719 InternalTableDefinition::Normal {
720 table_root,
721 fixed_key_size,
722 fixed_value_size,
723 ..
724 } => {
725 let subtree_stats = btree_stats(
726 table_root.map(|x| x.root),
727 &self.mem,
728 fixed_key_size,
729 fixed_value_size,
730 )?;
731 max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
732 total_stored_bytes += subtree_stats.stored_leaf_bytes;
733 total_metadata_bytes += subtree_stats.metadata_bytes;
734 total_fragmented += subtree_stats.fragmented_bytes;
735 branch_pages += subtree_stats.branch_pages;
736 leaf_pages += subtree_stats.leaf_pages;
737 }
738 InternalTableDefinition::Multimap {
739 table_root,
740 fixed_key_size,
741 fixed_value_size,
742 ..
743 } => {
744 let subtree_stats = multimap_btree_stats(
745 table_root.map(|x| x.root),
746 &self.mem,
747 fixed_key_size,
748 fixed_value_size,
749 )?;
750 max_subtree_height = max(max_subtree_height, subtree_stats.tree_height);
751 total_stored_bytes += subtree_stats.stored_leaf_bytes;
752 total_metadata_bytes += subtree_stats.metadata_bytes;
753 total_fragmented += subtree_stats.fragmented_bytes;
754 branch_pages += subtree_stats.branch_pages;
755 leaf_pages += subtree_stats.leaf_pages;
756 }
757 }
758 }
759 Ok(DatabaseStats {
760 tree_height: master_tree_stats.tree_height + max_subtree_height,
761 allocated_pages: self.mem.count_allocated_pages()?,
762 leaf_pages,
763 branch_pages,
764 stored_leaf_bytes: total_stored_bytes,
765 metadata_bytes: total_metadata_bytes,
766 fragmented_bytes: total_fragmented,
767 page_size: self.mem.get_page_size(),
768 })
769 }
770}
771
772#[cfg(test)]
773mod test {
774 use crate::Value;
775 use crate::tree_store::table_tree_base::InternalTableDefinition;
776 use crate::types::TypeName;
777
778 #[test]
779 fn round_trip() {
780 let x = InternalTableDefinition::Multimap {
781 table_root: None,
782 table_length: 0,
783 fixed_key_size: None,
784 fixed_value_size: Some(5),
785 key_alignment: 6,
786 value_alignment: 7,
787 key_type: TypeName::new("test::Key"),
788 value_type: TypeName::new("test::Value"),
789 };
790 let y = InternalTableDefinition::from_bytes(InternalTableDefinition::as_bytes(&x).as_ref());
791 assert_eq!(x, y);
792 }
793}