redb/tree_store/
table_tree_base.rs

1use crate::multimap_table::{relocate_subtrees, UntypedMultiBtree};
2use crate::tree_store::{
3    BtreeHeader, PageNumber, PagePath, TransactionalMemory, UntypedBtree, UntypedBtreeMut,
4};
5use crate::{Key, Result, TableError, TypeName, Value};
6use std::collections::HashMap;
7use std::mem::size_of;
8use std::sync::{Arc, Mutex};
9
10// Forward compatibility feature in case alignment can be supported in the future
11// See https://github.com/cberner/redb/issues/360
12const ALIGNMENT: usize = 1;
13
14#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
15pub(crate) enum TableType {
16    Normal,
17    Multimap,
18}
19
20impl TableType {
21    fn is_legacy(value: u8) -> bool {
22        value == 1 || value == 2
23    }
24}
25
26#[allow(clippy::from_over_into)]
27impl Into<u8> for TableType {
28    fn into(self) -> u8 {
29        match self {
30            // 1 & 2 were used in the v1 file format
31            // TableType::Normal => 1,
32            // TableType::Multimap => 2,
33            TableType::Normal => 3,
34            TableType::Multimap => 4,
35        }
36    }
37}
38
39impl From<u8> for TableType {
40    fn from(value: u8) -> Self {
41        match value {
42            3 => TableType::Normal,
43            4 => TableType::Multimap,
44            _ => unreachable!(),
45        }
46    }
47}
48
49#[derive(Clone, PartialEq, Debug)]
50pub(crate) enum InternalTableDefinition {
51    Normal {
52        table_root: Option<BtreeHeader>,
53        table_length: u64,
54        fixed_key_size: Option<usize>,
55        fixed_value_size: Option<usize>,
56        key_alignment: usize,
57        value_alignment: usize,
58        key_type: TypeName,
59        value_type: TypeName,
60    },
61    Multimap {
62        table_root: Option<BtreeHeader>,
63        table_length: u64,
64        fixed_key_size: Option<usize>,
65        fixed_value_size: Option<usize>,
66        key_alignment: usize,
67        value_alignment: usize,
68        key_type: TypeName,
69        value_type: TypeName,
70    },
71}
72
73impl InternalTableDefinition {
74    pub(super) fn new<K: Key, V: Value>(
75        table_type: TableType,
76        table_root: Option<BtreeHeader>,
77        table_length: u64,
78    ) -> Self {
79        match table_type {
80            TableType::Normal => InternalTableDefinition::Normal {
81                table_root,
82                table_length,
83                fixed_key_size: K::fixed_width(),
84                fixed_value_size: V::fixed_width(),
85                key_alignment: ALIGNMENT,
86                value_alignment: ALIGNMENT,
87                key_type: K::type_name(),
88                value_type: V::type_name(),
89            },
90            TableType::Multimap => InternalTableDefinition::Multimap {
91                table_root,
92                table_length,
93                fixed_key_size: K::fixed_width(),
94                fixed_value_size: V::fixed_width(),
95                key_alignment: ALIGNMENT,
96                value_alignment: ALIGNMENT,
97                key_type: K::type_name(),
98                value_type: V::type_name(),
99            },
100        }
101    }
102
103    pub(super) fn set_header(&mut self, root: Option<BtreeHeader>, length: u64) {
104        match self {
105            InternalTableDefinition::Normal {
106                table_root,
107                table_length,
108                ..
109            }
110            | InternalTableDefinition::Multimap {
111                table_root,
112                table_length,
113                ..
114            } => {
115                *table_root = root;
116                *table_length = length;
117            }
118        }
119    }
120
121    pub(super) fn check_match_untyped(
122        &self,
123        table_type: TableType,
124        name: &str,
125    ) -> Result<(), TableError> {
126        if self.get_type() != table_type {
127            return if self.get_type() == TableType::Multimap {
128                Err(TableError::TableIsMultimap(name.to_string()))
129            } else {
130                Err(TableError::TableIsNotMultimap(name.to_string()))
131            };
132        }
133        if self.private_get_key_alignment() != ALIGNMENT {
134            return Err(TableError::TypeDefinitionChanged {
135                name: self.private_key_type(),
136                alignment: self.private_get_key_alignment(),
137                width: self.private_get_fixed_key_size(),
138            });
139        }
140        if self.private_get_value_alignment() != ALIGNMENT {
141            return Err(TableError::TypeDefinitionChanged {
142                name: self.private_value_type(),
143                alignment: self.private_get_value_alignment(),
144                width: self.private_get_fixed_value_size(),
145            });
146        }
147
148        Ok(())
149    }
150
151    pub(super) fn check_match<K: Key, V: Value>(
152        &self,
153        table_type: TableType,
154        name: &str,
155    ) -> Result<(), TableError> {
156        self.check_match_untyped(table_type, name)?;
157
158        if self.private_key_type() != K::type_name() || self.private_value_type() != V::type_name()
159        {
160            return Err(TableError::TableTypeMismatch {
161                table: name.to_string(),
162                key: self.private_key_type(),
163                value: self.private_value_type(),
164            });
165        }
166        if self.private_get_fixed_key_size() != K::fixed_width() {
167            return Err(TableError::TypeDefinitionChanged {
168                name: K::type_name(),
169                alignment: self.private_get_key_alignment(),
170                width: self.private_get_fixed_key_size(),
171            });
172        }
173        if self.private_get_fixed_value_size() != V::fixed_width() {
174            return Err(TableError::TypeDefinitionChanged {
175                name: V::type_name(),
176                alignment: self.private_get_value_alignment(),
177                width: self.private_get_fixed_value_size(),
178            });
179        }
180
181        Ok(())
182    }
183
184    pub(crate) fn visit_all_pages<'a, F>(&self, mem: Arc<TransactionalMemory>, visitor: F) -> Result
185    where
186        F: FnMut(&PagePath) -> Result + 'a,
187    {
188        match self {
189            InternalTableDefinition::Normal {
190                table_root,
191                fixed_key_size,
192                fixed_value_size,
193                ..
194            } => {
195                let tree = UntypedBtree::new(*table_root, mem, *fixed_key_size, *fixed_value_size);
196                tree.visit_all_pages(visitor)?;
197            }
198            InternalTableDefinition::Multimap {
199                table_root,
200                fixed_key_size,
201                fixed_value_size,
202                ..
203            } => {
204                let tree =
205                    UntypedMultiBtree::new(*table_root, mem, *fixed_key_size, *fixed_value_size);
206                tree.visit_all_pages(visitor)?;
207            }
208        }
209
210        Ok(())
211    }
212
213    pub(crate) fn relocate_tree(
214        &mut self,
215        mem: Arc<TransactionalMemory>,
216        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
217        relocation_map: &HashMap<PageNumber, PageNumber>,
218    ) -> Result<Option<Option<BtreeHeader>>> {
219        let original_root = self.private_get_root();
220        let relocated_root = match self {
221            InternalTableDefinition::Normal { table_root, .. } => *table_root,
222            InternalTableDefinition::Multimap {
223                table_root,
224                fixed_key_size,
225                fixed_value_size,
226                ..
227            } => {
228                if let Some(header) = table_root {
229                    let (page_number, checksum) = relocate_subtrees(
230                        (header.root, header.checksum),
231                        *fixed_key_size,
232                        *fixed_value_size,
233                        mem.clone(),
234                        freed_pages.clone(),
235                        relocation_map,
236                    )?;
237                    Some(BtreeHeader::new(page_number, checksum, header.length))
238                } else {
239                    None
240                }
241            }
242        };
243        let mut tree = UntypedBtreeMut::new(
244            relocated_root,
245            mem,
246            freed_pages,
247            self.private_get_fixed_key_size(),
248            self.private_get_fixed_value_size(),
249        );
250        tree.relocate(relocation_map)?;
251        if tree.get_root() != original_root {
252            self.set_header(tree.get_root(), self.get_length());
253            Ok(Some(tree.get_root()))
254        } else {
255            Ok(None)
256        }
257    }
258
259    fn private_get_root(&self) -> Option<BtreeHeader> {
260        match self {
261            InternalTableDefinition::Normal { table_root, .. }
262            | InternalTableDefinition::Multimap { table_root, .. } => *table_root,
263        }
264    }
265
266    pub(crate) fn get_length(&self) -> u64 {
267        match self {
268            InternalTableDefinition::Normal { table_length, .. }
269            | InternalTableDefinition::Multimap { table_length, .. } => *table_length,
270        }
271    }
272
273    fn private_get_fixed_key_size(&self) -> Option<usize> {
274        match self {
275            InternalTableDefinition::Normal { fixed_key_size, .. }
276            | InternalTableDefinition::Multimap { fixed_key_size, .. } => *fixed_key_size,
277        }
278    }
279
280    fn private_get_fixed_value_size(&self) -> Option<usize> {
281        match self {
282            InternalTableDefinition::Normal {
283                fixed_value_size, ..
284            }
285            | InternalTableDefinition::Multimap {
286                fixed_value_size, ..
287            } => *fixed_value_size,
288        }
289    }
290
291    fn private_get_key_alignment(&self) -> usize {
292        match self {
293            InternalTableDefinition::Normal { key_alignment, .. }
294            | InternalTableDefinition::Multimap { key_alignment, .. } => *key_alignment,
295        }
296    }
297
298    fn private_get_value_alignment(&self) -> usize {
299        match self {
300            InternalTableDefinition::Normal {
301                value_alignment, ..
302            }
303            | InternalTableDefinition::Multimap {
304                value_alignment, ..
305            } => *value_alignment,
306        }
307    }
308
309    pub(crate) fn get_type(&self) -> TableType {
310        match self {
311            InternalTableDefinition::Normal { .. } => TableType::Normal,
312            InternalTableDefinition::Multimap { .. } => TableType::Multimap,
313        }
314    }
315
316    fn private_key_type(&self) -> TypeName {
317        match self {
318            InternalTableDefinition::Normal { key_type, .. }
319            | InternalTableDefinition::Multimap { key_type, .. } => key_type.clone(),
320        }
321    }
322
323    fn private_value_type(&self) -> TypeName {
324        match self {
325            InternalTableDefinition::Normal { value_type, .. }
326            | InternalTableDefinition::Multimap { value_type, .. } => value_type.clone(),
327        }
328    }
329}
330
331impl Value for InternalTableDefinition {
332    type SelfType<'a> = InternalTableDefinition;
333    type AsBytes<'a> = Vec<u8>;
334
335    fn fixed_width() -> Option<usize> {
336        None
337    }
338
339    fn from_bytes<'a>(data: &'a [u8]) -> Self
340    where
341        Self: 'a,
342    {
343        debug_assert!(data.len() > 22);
344        let mut offset = 0;
345        let legacy = TableType::is_legacy(data[offset]);
346        assert!(!legacy);
347        let table_type = TableType::from(data[offset]);
348        offset += 1;
349
350        let table_length = u64::from_le_bytes(
351            data[offset..(offset + size_of::<u64>())]
352                .try_into()
353                .unwrap(),
354        );
355        offset += size_of::<u64>();
356
357        let non_null = data[offset] != 0;
358        offset += 1;
359        let table_root = if non_null {
360            Some(BtreeHeader::from_le_bytes(
361                data[offset..(offset + BtreeHeader::serialized_size())]
362                    .try_into()
363                    .unwrap(),
364            ))
365        } else {
366            None
367        };
368        offset += BtreeHeader::serialized_size();
369
370        let non_null = data[offset] != 0;
371        offset += 1;
372        let fixed_key_size = if non_null {
373            let fixed = u32::from_le_bytes(
374                data[offset..(offset + size_of::<u32>())]
375                    .try_into()
376                    .unwrap(),
377            ) as usize;
378            Some(fixed)
379        } else {
380            None
381        };
382        offset += size_of::<u32>();
383
384        let non_null = data[offset] != 0;
385        offset += 1;
386        let fixed_value_size = if non_null {
387            let fixed = u32::from_le_bytes(
388                data[offset..(offset + size_of::<u32>())]
389                    .try_into()
390                    .unwrap(),
391            ) as usize;
392            Some(fixed)
393        } else {
394            None
395        };
396        offset += size_of::<u32>();
397        let key_alignment = u32::from_le_bytes(
398            data[offset..(offset + size_of::<u32>())]
399                .try_into()
400                .unwrap(),
401        ) as usize;
402        offset += size_of::<u32>();
403        let value_alignment = u32::from_le_bytes(
404            data[offset..(offset + size_of::<u32>())]
405                .try_into()
406                .unwrap(),
407        ) as usize;
408        offset += size_of::<u32>();
409
410        let key_type_len = u32::from_le_bytes(
411            data[offset..(offset + size_of::<u32>())]
412                .try_into()
413                .unwrap(),
414        ) as usize;
415        offset += size_of::<u32>();
416        let key_type = TypeName::from_bytes(&data[offset..(offset + key_type_len)]);
417        offset += key_type_len;
418        let value_type = TypeName::from_bytes(&data[offset..]);
419
420        match table_type {
421            TableType::Normal => InternalTableDefinition::Normal {
422                table_root,
423                table_length,
424                fixed_key_size,
425                fixed_value_size,
426                key_alignment,
427                value_alignment,
428                key_type,
429                value_type,
430            },
431            TableType::Multimap => InternalTableDefinition::Multimap {
432                table_root,
433                table_length,
434                fixed_key_size,
435                fixed_value_size,
436                key_alignment,
437                value_alignment,
438                key_type,
439                value_type,
440            },
441        }
442    }
443
444    // Be careful if you change this serialization format! The InternalTableDefinition for
445    // a given table needs to have a consistent serialized length, regardless of the table
446    // contents, so that create_table_and_flush_table_root() can update the allocator state
447    // table without causing more allocations
448    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Vec<u8>
449    where
450        Self: 'b,
451    {
452        let mut result = vec![value.get_type().into()];
453        result.extend_from_slice(&value.get_length().to_le_bytes());
454        if let Some(header) = value.private_get_root() {
455            result.push(1);
456            result.extend_from_slice(&header.to_le_bytes());
457        } else {
458            result.push(0);
459            result.extend_from_slice(&[0; BtreeHeader::serialized_size()]);
460        }
461        if let Some(fixed) = value.private_get_fixed_key_size() {
462            result.push(1);
463            result.extend_from_slice(&u32::try_from(fixed).unwrap().to_le_bytes());
464        } else {
465            result.push(0);
466            result.extend_from_slice(&[0; size_of::<u32>()]);
467        }
468        if let Some(fixed) = value.private_get_fixed_value_size() {
469            result.push(1);
470            result.extend_from_slice(&u32::try_from(fixed).unwrap().to_le_bytes());
471        } else {
472            result.push(0);
473            result.extend_from_slice(&[0; size_of::<u32>()]);
474        }
475        result.extend_from_slice(
476            &u32::try_from(value.private_get_key_alignment())
477                .unwrap()
478                .to_le_bytes(),
479        );
480        result.extend_from_slice(
481            &u32::try_from(value.private_get_value_alignment())
482                .unwrap()
483                .to_le_bytes(),
484        );
485        let key_type_bytes = value.private_key_type().to_bytes();
486        result.extend_from_slice(&u32::try_from(key_type_bytes.len()).unwrap().to_le_bytes());
487        result.extend_from_slice(&key_type_bytes);
488        result.extend_from_slice(&value.private_value_type().to_bytes());
489
490        result
491    }
492
493    fn type_name() -> TypeName {
494        TypeName::internal("redb::InternalTableDefinition")
495    }
496}