redb/tree_store/page_store/
header.rs

1use crate::transaction_tracker::TransactionId;
2use crate::tree_store::btree_base::BtreeHeader;
3use crate::tree_store::page_store::layout::{DatabaseLayout, RegionLayout};
4use crate::tree_store::page_store::page_manager::{
5    FILE_FORMAT_VERSION1, FILE_FORMAT_VERSION2, FILE_FORMAT_VERSION3, xxh3_checksum,
6};
7use crate::tree_store::{Checksum, PageNumber};
8use crate::{DatabaseError, Result, StorageError};
9use std::mem::size_of;
10
11// Database layout:
12//
13// Super-header (header + commit slots)
14// The super-header length is rounded up to the nearest full page size
15//
16// Header (first 64 bytes):
17// 9 bytes: magic number
18// 1 byte: god byte
19// 2 byte: padding
20// 4 bytes: page size
21// Definition of region
22// 4 bytes: region header pages
23// 4 bytes: region max data pages
24//
25// Commit slot 0 (next 128 bytes):
26// 1 byte: version
27// 1 byte: != 0 if root page is non-null
28// 1 byte: != 0 if freed table root page is non-null
29// 5 bytes: padding
30// 8 bytes: root page
31// 16 bytes: root checksum
32// 8 bytes: freed table root page
33// 16 bytes: freed table root checksum
34// 8 bytes: last committed transaction id
35// 4 bytes: number of full regions
36// 4 bytes: data pages in partial trailing region
37// 8 bytes: region tracker page number
38// 16 bytes: slot checksum
39//
40// Commit slot 1 (next 128 bytes):
41// Same layout as slot 0
42
43// Inspired by PNG's magic number
44pub(super) const MAGICNUMBER: [u8; 9] = [b'r', b'e', b'd', b'b', 0x1A, 0x0A, 0xA9, 0x0D, 0x0A];
45const GOD_BYTE_OFFSET: usize = MAGICNUMBER.len();
46const PAGE_SIZE_OFFSET: usize = GOD_BYTE_OFFSET + size_of::<u8>() + 2; // +2 for padding
47const REGION_HEADER_PAGES_OFFSET: usize = PAGE_SIZE_OFFSET + size_of::<u32>();
48const REGION_MAX_DATA_PAGES_OFFSET: usize = REGION_HEADER_PAGES_OFFSET + size_of::<u32>();
49const NUM_FULL_REGIONS_OFFSET: usize = REGION_MAX_DATA_PAGES_OFFSET + size_of::<u32>();
50const TRAILING_REGION_DATA_PAGES_OFFSET: usize = NUM_FULL_REGIONS_OFFSET + size_of::<u32>();
51const REGION_TRACKER_PAGE_NUMBER_OFFSET: usize =
52    TRAILING_REGION_DATA_PAGES_OFFSET + size_of::<u32>();
53const TRANSACTION_SIZE: usize = 128;
54const TRANSACTION_0_OFFSET: usize = 64;
55const TRANSACTION_1_OFFSET: usize = TRANSACTION_0_OFFSET + TRANSACTION_SIZE;
56pub(super) const DB_HEADER_SIZE: usize = TRANSACTION_1_OFFSET + TRANSACTION_SIZE;
57
58// God byte flags
59const PRIMARY_BIT: u8 = 1;
60const RECOVERY_REQUIRED: u8 = 2;
61const TWO_PHASE_COMMIT: u8 = 4;
62
63// Structure of each commit slot
64const VERSION_OFFSET: usize = 0;
65const USER_ROOT_NON_NULL_OFFSET: usize = size_of::<u8>();
66const SYSTEM_ROOT_NON_NULL_OFFSET: usize = USER_ROOT_NON_NULL_OFFSET + size_of::<u8>();
67const FREED_ROOT_NON_NULL_OFFSET: usize = SYSTEM_ROOT_NON_NULL_OFFSET + size_of::<u8>();
68const PADDING: usize = 4;
69
70const USER_ROOT_OFFSET: usize = FREED_ROOT_NON_NULL_OFFSET + size_of::<u8>() + PADDING;
71const SYSTEM_ROOT_OFFSET: usize = USER_ROOT_OFFSET + BtreeHeader::serialized_size();
72const FREED_ROOT_OFFSET: usize = SYSTEM_ROOT_OFFSET + BtreeHeader::serialized_size();
73const TRANSACTION_ID_OFFSET: usize = FREED_ROOT_OFFSET + BtreeHeader::serialized_size();
74const TRANSACTION_LAST_FIELD: usize = TRANSACTION_ID_OFFSET + size_of::<u64>();
75
76const SLOT_CHECKSUM_OFFSET: usize = TRANSACTION_SIZE - size_of::<Checksum>();
77
78pub(crate) const PAGE_SIZE: usize = 4096;
79
80fn get_u32(data: &[u8]) -> u32 {
81    u32::from_le_bytes(data[..size_of::<u32>()].try_into().unwrap())
82}
83
84fn get_u64(data: &[u8]) -> u64 {
85    u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap())
86}
87
88#[derive(Copy, Clone)]
89pub(super) struct HeaderRepairInfo {
90    pub(super) invalid_magic_number: bool,
91    pub(super) primary_corrupted: bool,
92    pub(super) secondary_corrupted: bool,
93}
94
95#[derive(Clone)]
96pub(super) struct DatabaseHeader {
97    primary_slot: usize,
98    pub(super) recovery_required: bool,
99    pub(super) two_phase_commit: bool,
100    page_size: u32,
101    region_header_pages: u32,
102    region_max_data_pages: u32,
103    full_regions: u32,
104    trailing_partial_region_pages: u32,
105    region_tracker: PageNumber,
106    transaction_slots: [TransactionHeader; 2],
107}
108
109impl DatabaseHeader {
110    pub(super) fn new(
111        layout: DatabaseLayout,
112        transaction_id: TransactionId,
113        version: u8,
114        region_tracker: PageNumber,
115    ) -> Self {
116        #[allow(clippy::assertions_on_constants)]
117        {
118            assert!(TRANSACTION_LAST_FIELD <= SLOT_CHECKSUM_OFFSET);
119        }
120
121        let slot = TransactionHeader::new(transaction_id, version);
122        Self {
123            primary_slot: 0,
124            recovery_required: true,
125            two_phase_commit: false,
126            page_size: layout.full_region_layout().page_size(),
127            region_header_pages: layout.full_region_layout().get_header_pages(),
128            region_max_data_pages: layout.full_region_layout().num_pages(),
129            full_regions: layout.num_full_regions(),
130            trailing_partial_region_pages: layout
131                .trailing_region_layout()
132                .map(|x| x.num_pages())
133                .unwrap_or_default(),
134            region_tracker,
135            transaction_slots: [slot.clone(), slot],
136        }
137    }
138
139    pub(super) fn page_size(&self) -> u32 {
140        self.page_size
141    }
142
143    pub(super) fn layout(&self) -> DatabaseLayout {
144        let full_layout = RegionLayout::new(
145            self.region_max_data_pages,
146            self.region_header_pages,
147            self.page_size,
148        );
149        let trailing = if self.trailing_partial_region_pages > 0 {
150            Some(RegionLayout::new(
151                self.trailing_partial_region_pages,
152                self.region_header_pages,
153                self.page_size,
154            ))
155        } else {
156            None
157        };
158        DatabaseLayout::new(self.full_regions, full_layout, trailing)
159    }
160
161    pub(super) fn set_layout(&mut self, layout: DatabaseLayout) {
162        assert_eq!(
163            self.layout().full_region_layout(),
164            layout.full_region_layout()
165        );
166        if let Some(trailing) = layout.trailing_region_layout() {
167            assert_eq!(trailing.get_header_pages(), self.region_header_pages);
168            assert_eq!(trailing.page_size(), self.page_size);
169            self.trailing_partial_region_pages = trailing.num_pages();
170        } else {
171            self.trailing_partial_region_pages = 0;
172        }
173        self.full_regions = layout.num_full_regions();
174    }
175
176    pub(super) fn region_tracker(&self) -> PageNumber {
177        assert_ne!(self.primary_slot().version, FILE_FORMAT_VERSION3);
178        self.region_tracker
179    }
180
181    pub(super) fn set_region_tracker(&mut self, page: PageNumber) {
182        assert_ne!(self.primary_slot().version, FILE_FORMAT_VERSION3);
183        self.region_tracker = page;
184    }
185
186    pub(super) fn primary_slot(&self) -> &TransactionHeader {
187        &self.transaction_slots[self.primary_slot]
188    }
189
190    pub(super) fn secondary_slot(&self) -> &TransactionHeader {
191        &self.transaction_slots[self.primary_slot ^ 1]
192    }
193
194    pub(super) fn secondary_slot_mut(&mut self) -> &mut TransactionHeader {
195        &mut self.transaction_slots[self.primary_slot ^ 1]
196    }
197
198    pub(super) fn swap_primary_slot(&mut self) {
199        self.primary_slot ^= 1;
200    }
201
202    // Figure out which slot to use as the primary when starting a repair. The repair process might
203    // still switch to the other slot later, if the tree checksums turn out to be invalid.
204    //
205    // Returns true if we picked the original primary, or false if we swapped
206    pub(super) fn pick_primary_for_repair(
207        &mut self,
208        repair_info: HeaderRepairInfo,
209    ) -> Result<bool> {
210        // If the primary was written using 2-phase commit, it's guaranteed to be valid. Don't look
211        // at the secondary; even if it happens to have a valid checksum, Durability::Paranoid means
212        // we can't trust it
213        if self.two_phase_commit {
214            if repair_info.primary_corrupted {
215                return Err(StorageError::Corrupted(
216                    "Primary is corrupted despite 2-phase commit".to_string(),
217                ));
218            }
219            return Ok(true);
220        }
221
222        // Pick whichever slot is newer, assuming it has a valid checksum. This handles an edge case
223        // where we crash during fsync(), and the only data that got written to disk was the god byte
224        // update swapping the primary -- in that case, the primary contains a valid but out-of-date
225        // transaction, so we need to load from the secondary instead
226        if repair_info.primary_corrupted {
227            if repair_info.secondary_corrupted {
228                return Err(StorageError::Corrupted(
229                    "Both commit slots are corrupted".to_string(),
230                ));
231            }
232            self.swap_primary_slot();
233            return Ok(false);
234        }
235
236        let secondary_newer =
237            self.secondary_slot().transaction_id > self.primary_slot().transaction_id;
238        if secondary_newer && !repair_info.secondary_corrupted {
239            self.swap_primary_slot();
240            return Ok(false);
241        }
242
243        Ok(true)
244    }
245
246    // TODO: consider returning an Err with the repair info
247    pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, HeaderRepairInfo), DatabaseError> {
248        let invalid_magic_number = data[..MAGICNUMBER.len()] != MAGICNUMBER;
249
250        let primary_slot = usize::from(data[GOD_BYTE_OFFSET] & PRIMARY_BIT != 0);
251        let recovery_required = (data[GOD_BYTE_OFFSET] & RECOVERY_REQUIRED) != 0;
252        let two_phase_commit = (data[GOD_BYTE_OFFSET] & TWO_PHASE_COMMIT) != 0;
253        let page_size = get_u32(&data[PAGE_SIZE_OFFSET..]);
254        let region_header_pages = get_u32(&data[REGION_HEADER_PAGES_OFFSET..]);
255        let region_max_data_pages = get_u32(&data[REGION_MAX_DATA_PAGES_OFFSET..]);
256        let full_regions = get_u32(&data[NUM_FULL_REGIONS_OFFSET..]);
257        let trailing_data_pages = get_u32(&data[TRAILING_REGION_DATA_PAGES_OFFSET..]);
258        let region_tracker = PageNumber::from_le_bytes(
259            data[REGION_TRACKER_PAGE_NUMBER_OFFSET
260                ..(REGION_TRACKER_PAGE_NUMBER_OFFSET + PageNumber::serialized_size())]
261                .try_into()
262                .unwrap(),
263        );
264        let (slot0, slot0_corrupted) = TransactionHeader::from_bytes(
265            &data[TRANSACTION_0_OFFSET..(TRANSACTION_0_OFFSET + TRANSACTION_SIZE)],
266        )?;
267        let (slot1, slot1_corrupted) = TransactionHeader::from_bytes(
268            &data[TRANSACTION_1_OFFSET..(TRANSACTION_1_OFFSET + TRANSACTION_SIZE)],
269        )?;
270        let (primary_corrupted, secondary_corrupted) = if primary_slot == 0 {
271            (slot0_corrupted, slot1_corrupted)
272        } else {
273            (slot1_corrupted, slot0_corrupted)
274        };
275
276        let result = Self {
277            primary_slot,
278            recovery_required,
279            two_phase_commit,
280            page_size,
281            region_header_pages,
282            region_max_data_pages,
283            full_regions,
284            trailing_partial_region_pages: trailing_data_pages,
285            region_tracker,
286            transaction_slots: [slot0, slot1],
287        };
288        let repair = HeaderRepairInfo {
289            invalid_magic_number,
290            primary_corrupted,
291            secondary_corrupted,
292        };
293        Ok((result, repair))
294    }
295
296    pub(super) fn to_bytes(&self, include_magic_number: bool) -> [u8; DB_HEADER_SIZE] {
297        let mut result = [0; DB_HEADER_SIZE];
298        if include_magic_number {
299            result[..MAGICNUMBER.len()].copy_from_slice(&MAGICNUMBER);
300        }
301        result[GOD_BYTE_OFFSET] = self.primary_slot.try_into().unwrap();
302        if self.recovery_required {
303            result[GOD_BYTE_OFFSET] |= RECOVERY_REQUIRED;
304        }
305        if self.two_phase_commit {
306            result[GOD_BYTE_OFFSET] |= TWO_PHASE_COMMIT;
307        }
308        result[PAGE_SIZE_OFFSET..(PAGE_SIZE_OFFSET + size_of::<u32>())]
309            .copy_from_slice(&self.page_size.to_le_bytes());
310        result[REGION_HEADER_PAGES_OFFSET..(REGION_HEADER_PAGES_OFFSET + size_of::<u32>())]
311            .copy_from_slice(&self.region_header_pages.to_le_bytes());
312        result[REGION_MAX_DATA_PAGES_OFFSET..(REGION_MAX_DATA_PAGES_OFFSET + size_of::<u32>())]
313            .copy_from_slice(&self.region_max_data_pages.to_le_bytes());
314        result[NUM_FULL_REGIONS_OFFSET..(NUM_FULL_REGIONS_OFFSET + size_of::<u32>())]
315            .copy_from_slice(&self.full_regions.to_le_bytes());
316        result[TRAILING_REGION_DATA_PAGES_OFFSET
317            ..(TRAILING_REGION_DATA_PAGES_OFFSET + size_of::<u32>())]
318            .copy_from_slice(&self.trailing_partial_region_pages.to_le_bytes());
319        result[REGION_TRACKER_PAGE_NUMBER_OFFSET
320            ..(REGION_TRACKER_PAGE_NUMBER_OFFSET + PageNumber::serialized_size())]
321            .copy_from_slice(&self.region_tracker.to_le_bytes());
322        let slot0 = self.transaction_slots[0].to_bytes();
323        result[TRANSACTION_0_OFFSET..(TRANSACTION_0_OFFSET + slot0.len())].copy_from_slice(&slot0);
324        let slot1 = self.transaction_slots[1].to_bytes();
325        result[TRANSACTION_1_OFFSET..(TRANSACTION_1_OFFSET + slot1.len())].copy_from_slice(&slot1);
326
327        result
328    }
329}
330
331#[derive(Clone)]
332pub(super) struct TransactionHeader {
333    pub(super) version: u8,
334    pub(super) user_root: Option<BtreeHeader>,
335    pub(super) system_root: Option<BtreeHeader>,
336    pub(super) freed_root: Option<BtreeHeader>,
337    pub(super) transaction_id: TransactionId,
338}
339
340impl TransactionHeader {
341    fn new(transaction_id: TransactionId, version: u8) -> Self {
342        Self {
343            version,
344            user_root: None,
345            system_root: None,
346            freed_root: None,
347            transaction_id,
348        }
349    }
350
351    // Returned bool indicates whether the checksum was corrupted
352    pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, bool), DatabaseError> {
353        let version = data[VERSION_OFFSET];
354        match version {
355            FILE_FORMAT_VERSION1 => {
356                return Err(DatabaseError::UpgradeRequired(version));
357            }
358            FILE_FORMAT_VERSION2 | FILE_FORMAT_VERSION3 => {}
359            _ => {
360                return Err(StorageError::Corrupted(format!(
361                    "Expected file format version <= {FILE_FORMAT_VERSION3}, found {version}",
362                ))
363                .into());
364            }
365        }
366        let checksum = Checksum::from_le_bytes(
367            data[SLOT_CHECKSUM_OFFSET..(SLOT_CHECKSUM_OFFSET + size_of::<Checksum>())]
368                .try_into()
369                .unwrap(),
370        );
371        let corrupted = checksum != xxh3_checksum(&data[..SLOT_CHECKSUM_OFFSET]);
372
373        let user_root = if data[USER_ROOT_NON_NULL_OFFSET] != 0 {
374            Some(BtreeHeader::from_le_bytes(
375                data[USER_ROOT_OFFSET..(USER_ROOT_OFFSET + BtreeHeader::serialized_size())]
376                    .try_into()
377                    .unwrap(),
378            ))
379        } else {
380            None
381        };
382        let system_root = if data[SYSTEM_ROOT_NON_NULL_OFFSET] != 0 {
383            Some(BtreeHeader::from_le_bytes(
384                data[SYSTEM_ROOT_OFFSET..(SYSTEM_ROOT_OFFSET + BtreeHeader::serialized_size())]
385                    .try_into()
386                    .unwrap(),
387            ))
388        } else {
389            None
390        };
391        let freed_root = if data[FREED_ROOT_NON_NULL_OFFSET] != 0 {
392            Some(BtreeHeader::from_le_bytes(
393                data[FREED_ROOT_OFFSET..(FREED_ROOT_OFFSET + BtreeHeader::serialized_size())]
394                    .try_into()
395                    .unwrap(),
396            ))
397        } else {
398            None
399        };
400        let transaction_id = TransactionId::new(get_u64(&data[TRANSACTION_ID_OFFSET..]));
401
402        let result = Self {
403            version,
404            user_root,
405            system_root,
406            freed_root,
407            transaction_id,
408        };
409
410        Ok((result, corrupted))
411    }
412
413    pub(super) fn to_bytes(&self) -> [u8; TRANSACTION_SIZE] {
414        assert!(self.version == FILE_FORMAT_VERSION2 || self.version == FILE_FORMAT_VERSION3);
415        let mut result = [0; TRANSACTION_SIZE];
416        result[VERSION_OFFSET] = self.version;
417        if let Some(header) = self.user_root {
418            result[USER_ROOT_NON_NULL_OFFSET] = 1;
419            result[USER_ROOT_OFFSET..(USER_ROOT_OFFSET + BtreeHeader::serialized_size())]
420                .copy_from_slice(&header.to_le_bytes());
421        }
422        if let Some(header) = self.system_root {
423            result[SYSTEM_ROOT_NON_NULL_OFFSET] = 1;
424            result[SYSTEM_ROOT_OFFSET..(SYSTEM_ROOT_OFFSET + BtreeHeader::serialized_size())]
425                .copy_from_slice(&header.to_le_bytes());
426        }
427        if let Some(header) = self.freed_root {
428            result[FREED_ROOT_NON_NULL_OFFSET] = 1;
429            result[FREED_ROOT_OFFSET..(FREED_ROOT_OFFSET + BtreeHeader::serialized_size())]
430                .copy_from_slice(&header.to_le_bytes());
431        }
432        result[TRANSACTION_ID_OFFSET..(TRANSACTION_ID_OFFSET + size_of::<u64>())]
433            .copy_from_slice(&self.transaction_id.raw_id().to_le_bytes());
434        let checksum = xxh3_checksum(&result[..SLOT_CHECKSUM_OFFSET]);
435        result[SLOT_CHECKSUM_OFFSET..(SLOT_CHECKSUM_OFFSET + size_of::<Checksum>())]
436            .copy_from_slice(&checksum.to_le_bytes());
437
438        result
439    }
440}
441
442#[cfg(test)]
443mod test {
444    #[cfg(not(target_os = "windows"))]
445    use crate::StorageError;
446    use crate::backends::FileBackend;
447    use crate::db::TableDefinition;
448    use crate::tree_store::page_store::TransactionalMemory;
449    use crate::tree_store::page_store::header::{
450        GOD_BYTE_OFFSET, MAGICNUMBER, PAGE_SIZE, PRIMARY_BIT, RECOVERY_REQUIRED,
451        TRANSACTION_0_OFFSET, TRANSACTION_1_OFFSET, TWO_PHASE_COMMIT, USER_ROOT_OFFSET,
452    };
453    use crate::{Database, DatabaseError, ReadableTable};
454    use std::fs::OpenOptions;
455    use std::io::{Read, Seek, SeekFrom, Write};
456    use std::mem::size_of;
457
458    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
459
460    #[test]
461    fn repair_allocator_checksums() {
462        let tmpfile = crate::create_tempfile();
463        let db = Database::builder().create(tmpfile.path()).unwrap();
464        let write_txn = db.begin_write().unwrap();
465        {
466            let mut table = write_txn.open_table(X).unwrap();
467            table.insert("hello", "world").unwrap();
468        }
469        write_txn.commit().unwrap();
470
471        // Start a read to be sure the previous write isn't garbage collected
472        let read_txn = db.begin_read().unwrap();
473
474        let mut write_txn = db.begin_write().unwrap();
475        {
476            // We want this to be the last commit before the database is closed, so it needs to
477            // use quick-repair -- otherwise, Database::drop() will generate its own quick-repair
478            // commit on shutdown
479            write_txn.set_quick_repair(true);
480            let mut table = write_txn.open_table(X).unwrap();
481            table.insert("hello", "world2").unwrap();
482        }
483        write_txn.commit().unwrap();
484        drop(read_txn);
485        drop(db);
486
487        let mut file = OpenOptions::new()
488            .read(true)
489            .write(true)
490            .open(tmpfile.path())
491            .unwrap();
492
493        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
494        let mut buffer = [0u8; 1];
495        file.read_exact(&mut buffer).unwrap();
496        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
497        buffer[0] |= RECOVERY_REQUIRED;
498        buffer[0] &= !TWO_PHASE_COMMIT;
499        file.write_all(&buffer).unwrap();
500
501        // Overwrite the primary checksum to simulate a failure during commit
502        let primary_slot_offset = if buffer[0] & PRIMARY_BIT == 0 {
503            TRANSACTION_0_OFFSET
504        } else {
505            TRANSACTION_1_OFFSET
506        };
507        file.seek(SeekFrom::Start(
508            (primary_slot_offset + USER_ROOT_OFFSET) as u64,
509        ))
510        .unwrap();
511        file.write_all(&[0; size_of::<u128>()]).unwrap();
512
513        assert!(
514            TransactionalMemory::new(
515                Box::new(FileBackend::new(file).unwrap()),
516                false,
517                PAGE_SIZE,
518                None,
519                0,
520                0,
521                false,
522            )
523            .unwrap()
524            .needs_repair()
525            .unwrap()
526        );
527
528        #[allow(unused_mut)]
529        let mut db2 = Database::create(tmpfile.path()).unwrap();
530        let write_txn = db2.begin_write().unwrap();
531        {
532            let mut table = write_txn.open_table(X).unwrap();
533            assert_eq!(table.get("hello").unwrap().unwrap().value(), "world");
534            table.insert("hello2", "world2").unwrap();
535        }
536        write_txn.commit().unwrap();
537
538        // Locks are exclusive on Windows, so we can't concurrently overwrite the file
539        #[cfg(not(target_os = "windows"))]
540        {
541            let mut file = OpenOptions::new()
542                .read(true)
543                .write(true)
544                .open(tmpfile.path())
545                .unwrap();
546            file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
547            let mut buffer = [0u8; 1];
548            file.read_exact(&mut buffer).unwrap();
549
550            // Overwrite the primary checksum to simulate a failure during commit
551            let primary_slot_offset = if buffer[0] & PRIMARY_BIT == 0 {
552                TRANSACTION_0_OFFSET
553            } else {
554                TRANSACTION_1_OFFSET
555            };
556            file.seek(SeekFrom::Start(
557                (primary_slot_offset + USER_ROOT_OFFSET) as u64,
558            ))
559            .unwrap();
560            file.write_all(&[0; size_of::<u128>()]).unwrap();
561
562            assert!(!db2.check_integrity().unwrap());
563
564            // Overwrite both checksums to simulate corruption
565            file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
566            let mut buffer = [0u8; 1];
567            file.read_exact(&mut buffer).unwrap();
568
569            file.seek(SeekFrom::Start(
570                (TRANSACTION_0_OFFSET + USER_ROOT_OFFSET) as u64,
571            ))
572            .unwrap();
573            file.write_all(&[0; size_of::<u128>()]).unwrap();
574            file.seek(SeekFrom::Start(
575                (TRANSACTION_1_OFFSET + USER_ROOT_OFFSET) as u64,
576            ))
577            .unwrap();
578            file.write_all(&[0; size_of::<u128>()]).unwrap();
579
580            assert!(matches!(
581                db2.check_integrity().unwrap_err(),
582                DatabaseError::Storage(StorageError::Corrupted(_))
583            ));
584        }
585    }
586
587    #[test]
588    fn repair_empty() {
589        let tmpfile = crate::create_tempfile();
590        let db = Database::builder().create(tmpfile.path()).unwrap();
591        drop(db);
592
593        let mut file = OpenOptions::new()
594            .read(true)
595            .write(true)
596            .open(tmpfile.path())
597            .unwrap();
598
599        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
600        let mut buffer = [0u8; 1];
601        file.read_exact(&mut buffer).unwrap();
602        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
603        buffer[0] |= RECOVERY_REQUIRED;
604        file.write_all(&buffer).unwrap();
605
606        assert!(
607            TransactionalMemory::new(
608                Box::new(FileBackend::new(file).unwrap()),
609                false,
610                PAGE_SIZE,
611                None,
612                0,
613                0,
614                false,
615            )
616            .unwrap()
617            .needs_repair()
618            .unwrap()
619        );
620
621        Database::open(tmpfile.path()).unwrap();
622    }
623
624    #[test]
625    fn abort_repair() {
626        let tmpfile = crate::create_tempfile();
627        let db = Database::builder().create(tmpfile.path()).unwrap();
628        drop(db);
629
630        let mut file = OpenOptions::new()
631            .read(true)
632            .write(true)
633            .open(tmpfile.path())
634            .unwrap();
635
636        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
637        let mut buffer = [0u8; 1];
638        file.read_exact(&mut buffer).unwrap();
639        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
640        buffer[0] |= RECOVERY_REQUIRED;
641        buffer[0] &= !TWO_PHASE_COMMIT;
642        file.write_all(&buffer).unwrap();
643
644        assert!(
645            TransactionalMemory::new(
646                Box::new(FileBackend::new(file).unwrap()),
647                false,
648                PAGE_SIZE,
649                None,
650                0,
651                0,
652                false,
653            )
654            .unwrap()
655            .needs_repair()
656            .unwrap()
657        );
658
659        let err = Database::builder()
660            .set_repair_callback(|handle| handle.abort())
661            .open(tmpfile.path())
662            .unwrap_err();
663        assert!(matches!(err, DatabaseError::RepairAborted));
664    }
665
666    #[test]
667    fn repair_insert_reserve_regression() {
668        let tmpfile = crate::create_tempfile();
669        let db = Database::builder().create(tmpfile.path()).unwrap();
670
671        let def: TableDefinition<&str, &[u8]> = TableDefinition::new("x");
672
673        let write_txn = db.begin_write().unwrap();
674        {
675            let mut table = write_txn.open_table(def).unwrap();
676            let mut value = table.insert_reserve("hello", 5).unwrap();
677            value.as_mut().copy_from_slice(b"world");
678        }
679        write_txn.commit().unwrap();
680
681        let write_txn = db.begin_write().unwrap();
682        {
683            let mut table = write_txn.open_table(def).unwrap();
684            let mut value = table.insert_reserve("hello2", 5).unwrap();
685            value.as_mut().copy_from_slice(b"world");
686        }
687        write_txn.commit().unwrap();
688
689        drop(db);
690
691        let mut file = OpenOptions::new()
692            .read(true)
693            .write(true)
694            .open(tmpfile.path())
695            .unwrap();
696
697        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
698        let mut buffer = [0u8; 1];
699        file.read_exact(&mut buffer).unwrap();
700        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
701        buffer[0] |= RECOVERY_REQUIRED;
702        file.write_all(&buffer).unwrap();
703
704        assert!(
705            TransactionalMemory::new(
706                Box::new(FileBackend::new(file).unwrap()),
707                false,
708                PAGE_SIZE,
709                None,
710                0,
711                0,
712                false,
713            )
714            .unwrap()
715            .needs_repair()
716            .unwrap()
717        );
718
719        Database::open(tmpfile.path()).unwrap();
720    }
721
722    #[test]
723    fn magic_number() {
724        // Test compliance with some, but not all, provisions recommended by
725        // IETF Memo "Care and Feeding of Magic Numbers"
726
727        // Test that magic number is not valid utf-8
728        #[allow(invalid_from_utf8)]
729        {
730            assert!(std::str::from_utf8(&MAGICNUMBER).is_err());
731        }
732        // Test there is a octet with high-bit set
733        assert!(MAGICNUMBER.iter().any(|x| *x & 0x80 != 0));
734        // Test there is a non-printable ASCII character
735        assert!(MAGICNUMBER.iter().any(|x| *x < 0x20 || *x > 0x7E));
736        // Test there is a printable ASCII character
737        assert!(MAGICNUMBER.iter().any(|x| *x >= 0x20 && *x <= 0x7E));
738        // Test there is a printable ISO-8859 that's non-ASCII printable
739        assert!(MAGICNUMBER.iter().any(|x| *x >= 0xA0));
740        // Test there is a ISO-8859 control character other than 0x09, 0x0A, 0x0C, 0x0D
741        assert!(MAGICNUMBER.iter().any(|x| *x < 0x09
742            || *x == 0x0B
743            || (0x0E <= *x && *x <= 0x1F)
744            || (0x7F <= *x && *x <= 0x9F)));
745    }
746}