redb/tree_store/page_store/
header.rs

1use crate::transaction_tracker::TransactionId;
2use crate::tree_store::btree_base::BtreeHeader;
3use crate::tree_store::page_store::layout::{DatabaseLayout, RegionLayout};
4use crate::tree_store::page_store::page_manager::{
5    xxh3_checksum, FILE_FORMAT_VERSION1, FILE_FORMAT_VERSION2,
6};
7use crate::tree_store::{Checksum, PageNumber};
8use crate::{DatabaseError, Result, StorageError};
9use std::mem::size_of;
10
11// Database layout:
12//
13// Super-header (header + commit slots)
14// The super-header length is rounded up to the nearest full page size
15//
16// Header (first 64 bytes):
17// 9 bytes: magic number
18// 1 byte: god byte
19// 2 byte: padding
20// 4 bytes: page size
21// Definition of region
22// 4 bytes: region header pages
23// 4 bytes: region max data pages
24//
25// Commit slot 0 (next 128 bytes):
26// 1 byte: version
27// 1 byte: != 0 if root page is non-null
28// 1 byte: != 0 if freed table root page is non-null
29// 5 bytes: padding
30// 8 bytes: root page
31// 16 bytes: root checksum
32// 8 bytes: freed table root page
33// 16 bytes: freed table root checksum
34// 8 bytes: last committed transaction id
35// 4 bytes: number of full regions
36// 4 bytes: data pages in partial trailing region
37// 8 bytes: region tracker page number
38// 16 bytes: slot checksum
39//
40// Commit slot 1 (next 128 bytes):
41// Same layout as slot 0
42
43// Inspired by PNG's magic number
44pub(super) const MAGICNUMBER: [u8; 9] = [b'r', b'e', b'd', b'b', 0x1A, 0x0A, 0xA9, 0x0D, 0x0A];
45const GOD_BYTE_OFFSET: usize = MAGICNUMBER.len();
46const PAGE_SIZE_OFFSET: usize = GOD_BYTE_OFFSET + size_of::<u8>() + 2; // +2 for padding
47const REGION_HEADER_PAGES_OFFSET: usize = PAGE_SIZE_OFFSET + size_of::<u32>();
48const REGION_MAX_DATA_PAGES_OFFSET: usize = REGION_HEADER_PAGES_OFFSET + size_of::<u32>();
49const NUM_FULL_REGIONS_OFFSET: usize = REGION_MAX_DATA_PAGES_OFFSET + size_of::<u32>();
50const TRAILING_REGION_DATA_PAGES_OFFSET: usize = NUM_FULL_REGIONS_OFFSET + size_of::<u32>();
51const REGION_TRACKER_PAGE_NUMBER_OFFSET: usize =
52    TRAILING_REGION_DATA_PAGES_OFFSET + size_of::<u32>();
53const TRANSACTION_SIZE: usize = 128;
54const TRANSACTION_0_OFFSET: usize = 64;
55const TRANSACTION_1_OFFSET: usize = TRANSACTION_0_OFFSET + TRANSACTION_SIZE;
56pub(super) const DB_HEADER_SIZE: usize = TRANSACTION_1_OFFSET + TRANSACTION_SIZE;
57
58// God byte flags
59const PRIMARY_BIT: u8 = 1;
60const RECOVERY_REQUIRED: u8 = 2;
61const TWO_PHASE_COMMIT: u8 = 4;
62
63// Structure of each commit slot
64const VERSION_OFFSET: usize = 0;
65const USER_ROOT_NON_NULL_OFFSET: usize = size_of::<u8>();
66const SYSTEM_ROOT_NON_NULL_OFFSET: usize = USER_ROOT_NON_NULL_OFFSET + size_of::<u8>();
67const FREED_ROOT_NON_NULL_OFFSET: usize = SYSTEM_ROOT_NON_NULL_OFFSET + size_of::<u8>();
68const PADDING: usize = 4;
69
70const USER_ROOT_OFFSET: usize = FREED_ROOT_NON_NULL_OFFSET + size_of::<u8>() + PADDING;
71const SYSTEM_ROOT_OFFSET: usize = USER_ROOT_OFFSET + BtreeHeader::serialized_size();
72const FREED_ROOT_OFFSET: usize = SYSTEM_ROOT_OFFSET + BtreeHeader::serialized_size();
73const TRANSACTION_ID_OFFSET: usize = FREED_ROOT_OFFSET + BtreeHeader::serialized_size();
74const TRANSACTION_LAST_FIELD: usize = TRANSACTION_ID_OFFSET + size_of::<u64>();
75
76const SLOT_CHECKSUM_OFFSET: usize = TRANSACTION_SIZE - size_of::<Checksum>();
77
78pub(crate) const PAGE_SIZE: usize = 4096;
79
80fn get_u32(data: &[u8]) -> u32 {
81    u32::from_le_bytes(data[..size_of::<u32>()].try_into().unwrap())
82}
83
84fn get_u64(data: &[u8]) -> u64 {
85    u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap())
86}
87
88#[derive(Copy, Clone)]
89pub(super) struct HeaderRepairInfo {
90    pub(super) invalid_magic_number: bool,
91    pub(super) primary_corrupted: bool,
92    pub(super) secondary_corrupted: bool,
93}
94
95#[derive(Clone)]
96pub(super) struct DatabaseHeader {
97    primary_slot: usize,
98    pub(super) recovery_required: bool,
99    pub(super) two_phase_commit: bool,
100    page_size: u32,
101    region_header_pages: u32,
102    region_max_data_pages: u32,
103    full_regions: u32,
104    trailing_partial_region_pages: u32,
105    region_tracker: PageNumber,
106    transaction_slots: [TransactionHeader; 2],
107}
108
109impl DatabaseHeader {
110    pub(super) fn new(
111        layout: DatabaseLayout,
112        transaction_id: TransactionId,
113        version: u8,
114        region_tracker: PageNumber,
115    ) -> Self {
116        #[allow(clippy::assertions_on_constants)]
117        {
118            assert!(TRANSACTION_LAST_FIELD <= SLOT_CHECKSUM_OFFSET);
119        }
120
121        let slot = TransactionHeader::new(transaction_id, version);
122        Self {
123            primary_slot: 0,
124            recovery_required: true,
125            two_phase_commit: false,
126            page_size: layout.full_region_layout().page_size(),
127            region_header_pages: layout.full_region_layout().get_header_pages(),
128            region_max_data_pages: layout.full_region_layout().num_pages(),
129            full_regions: layout.num_full_regions(),
130            trailing_partial_region_pages: layout
131                .trailing_region_layout()
132                .map(|x| x.num_pages())
133                .unwrap_or_default(),
134            region_tracker,
135            transaction_slots: [slot.clone(), slot],
136        }
137    }
138
139    pub(super) fn page_size(&self) -> u32 {
140        self.page_size
141    }
142
143    pub(super) fn layout(&self) -> DatabaseLayout {
144        let full_layout = RegionLayout::new(
145            self.region_max_data_pages,
146            self.region_header_pages,
147            self.page_size,
148        );
149        let trailing = if self.trailing_partial_region_pages > 0 {
150            Some(RegionLayout::new(
151                self.trailing_partial_region_pages,
152                self.region_header_pages,
153                self.page_size,
154            ))
155        } else {
156            None
157        };
158        DatabaseLayout::new(self.full_regions, full_layout, trailing)
159    }
160
161    pub(super) fn set_layout(&mut self, layout: DatabaseLayout) {
162        assert_eq!(
163            self.layout().full_region_layout(),
164            layout.full_region_layout()
165        );
166        if let Some(trailing) = layout.trailing_region_layout() {
167            assert_eq!(trailing.get_header_pages(), self.region_header_pages);
168            assert_eq!(trailing.page_size(), self.page_size);
169            self.trailing_partial_region_pages = trailing.num_pages();
170        } else {
171            self.trailing_partial_region_pages = 0;
172        }
173        self.full_regions = layout.num_full_regions();
174    }
175
176    pub(super) fn region_tracker(&self) -> PageNumber {
177        self.region_tracker
178    }
179
180    pub(super) fn set_region_tracker(&mut self, page: PageNumber) {
181        self.region_tracker = page;
182    }
183
184    pub(super) fn primary_slot(&self) -> &TransactionHeader {
185        &self.transaction_slots[self.primary_slot]
186    }
187
188    pub(super) fn secondary_slot(&self) -> &TransactionHeader {
189        &self.transaction_slots[self.primary_slot ^ 1]
190    }
191
192    pub(super) fn secondary_slot_mut(&mut self) -> &mut TransactionHeader {
193        &mut self.transaction_slots[self.primary_slot ^ 1]
194    }
195
196    pub(super) fn swap_primary_slot(&mut self) {
197        self.primary_slot ^= 1;
198    }
199
200    // Figure out which slot to use as the primary when starting a repair. The repair process might
201    // still switch to the other slot later, if the tree checksums turn out to be invalid.
202    //
203    // Returns true if we picked the original primary, or false if we swapped
204    pub(super) fn pick_primary_for_repair(
205        &mut self,
206        repair_info: HeaderRepairInfo,
207    ) -> Result<bool> {
208        // If the primary was written using 2-phase commit, it's guaranteed to be valid. Don't look
209        // at the secondary; even if it happens to have a valid checksum, Durability::Paranoid means
210        // we can't trust it
211        if self.two_phase_commit {
212            if repair_info.primary_corrupted {
213                return Err(StorageError::Corrupted(
214                    "Primary is corrupted despite 2-phase commit".to_string(),
215                ));
216            }
217            return Ok(true);
218        }
219
220        // Pick whichever slot is newer, assuming it has a valid checksum. This handles an edge case
221        // where we crash during fsync(), and the only data that got written to disk was the god byte
222        // update swapping the primary -- in that case, the primary contains a valid but out-of-date
223        // transaction, so we need to load from the secondary instead
224        if repair_info.primary_corrupted {
225            if repair_info.secondary_corrupted {
226                return Err(StorageError::Corrupted(
227                    "Both commit slots are corrupted".to_string(),
228                ));
229            }
230            self.swap_primary_slot();
231            return Ok(false);
232        }
233
234        let secondary_newer =
235            self.secondary_slot().transaction_id > self.primary_slot().transaction_id;
236        if secondary_newer && !repair_info.secondary_corrupted {
237            self.swap_primary_slot();
238            return Ok(false);
239        }
240
241        Ok(true)
242    }
243
244    // TODO: consider returning an Err with the repair info
245    pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, HeaderRepairInfo), DatabaseError> {
246        let invalid_magic_number = data[..MAGICNUMBER.len()] != MAGICNUMBER;
247
248        let primary_slot = usize::from(data[GOD_BYTE_OFFSET] & PRIMARY_BIT != 0);
249        let recovery_required = (data[GOD_BYTE_OFFSET] & RECOVERY_REQUIRED) != 0;
250        let two_phase_commit = (data[GOD_BYTE_OFFSET] & TWO_PHASE_COMMIT) != 0;
251        let page_size = get_u32(&data[PAGE_SIZE_OFFSET..]);
252        let region_header_pages = get_u32(&data[REGION_HEADER_PAGES_OFFSET..]);
253        let region_max_data_pages = get_u32(&data[REGION_MAX_DATA_PAGES_OFFSET..]);
254        let full_regions = get_u32(&data[NUM_FULL_REGIONS_OFFSET..]);
255        let trailing_data_pages = get_u32(&data[TRAILING_REGION_DATA_PAGES_OFFSET..]);
256        let region_tracker = PageNumber::from_le_bytes(
257            data[REGION_TRACKER_PAGE_NUMBER_OFFSET
258                ..(REGION_TRACKER_PAGE_NUMBER_OFFSET + PageNumber::serialized_size())]
259                .try_into()
260                .unwrap(),
261        );
262        let (slot0, slot0_corrupted) = TransactionHeader::from_bytes(
263            &data[TRANSACTION_0_OFFSET..(TRANSACTION_0_OFFSET + TRANSACTION_SIZE)],
264        )?;
265        let (slot1, slot1_corrupted) = TransactionHeader::from_bytes(
266            &data[TRANSACTION_1_OFFSET..(TRANSACTION_1_OFFSET + TRANSACTION_SIZE)],
267        )?;
268        let (primary_corrupted, secondary_corrupted) = if primary_slot == 0 {
269            (slot0_corrupted, slot1_corrupted)
270        } else {
271            (slot1_corrupted, slot0_corrupted)
272        };
273
274        let result = Self {
275            primary_slot,
276            recovery_required,
277            two_phase_commit,
278            page_size,
279            region_header_pages,
280            region_max_data_pages,
281            full_regions,
282            trailing_partial_region_pages: trailing_data_pages,
283            region_tracker,
284            transaction_slots: [slot0, slot1],
285        };
286        let repair = HeaderRepairInfo {
287            invalid_magic_number,
288            primary_corrupted,
289            secondary_corrupted,
290        };
291        Ok((result, repair))
292    }
293
294    pub(super) fn to_bytes(&self, include_magic_number: bool) -> [u8; DB_HEADER_SIZE] {
295        let mut result = [0; DB_HEADER_SIZE];
296        if include_magic_number {
297            result[..MAGICNUMBER.len()].copy_from_slice(&MAGICNUMBER);
298        }
299        result[GOD_BYTE_OFFSET] = self.primary_slot.try_into().unwrap();
300        if self.recovery_required {
301            result[GOD_BYTE_OFFSET] |= RECOVERY_REQUIRED;
302        }
303        if self.two_phase_commit {
304            result[GOD_BYTE_OFFSET] |= TWO_PHASE_COMMIT;
305        }
306        result[PAGE_SIZE_OFFSET..(PAGE_SIZE_OFFSET + size_of::<u32>())]
307            .copy_from_slice(&self.page_size.to_le_bytes());
308        result[REGION_HEADER_PAGES_OFFSET..(REGION_HEADER_PAGES_OFFSET + size_of::<u32>())]
309            .copy_from_slice(&self.region_header_pages.to_le_bytes());
310        result[REGION_MAX_DATA_PAGES_OFFSET..(REGION_MAX_DATA_PAGES_OFFSET + size_of::<u32>())]
311            .copy_from_slice(&self.region_max_data_pages.to_le_bytes());
312        result[NUM_FULL_REGIONS_OFFSET..(NUM_FULL_REGIONS_OFFSET + size_of::<u32>())]
313            .copy_from_slice(&self.full_regions.to_le_bytes());
314        result[TRAILING_REGION_DATA_PAGES_OFFSET
315            ..(TRAILING_REGION_DATA_PAGES_OFFSET + size_of::<u32>())]
316            .copy_from_slice(&self.trailing_partial_region_pages.to_le_bytes());
317        result[REGION_TRACKER_PAGE_NUMBER_OFFSET
318            ..(REGION_TRACKER_PAGE_NUMBER_OFFSET + PageNumber::serialized_size())]
319            .copy_from_slice(&self.region_tracker.to_le_bytes());
320        let slot0 = self.transaction_slots[0].to_bytes();
321        result[TRANSACTION_0_OFFSET..(TRANSACTION_0_OFFSET + slot0.len())].copy_from_slice(&slot0);
322        let slot1 = self.transaction_slots[1].to_bytes();
323        result[TRANSACTION_1_OFFSET..(TRANSACTION_1_OFFSET + slot1.len())].copy_from_slice(&slot1);
324
325        result
326    }
327}
328
329#[derive(Clone)]
330pub(super) struct TransactionHeader {
331    pub(super) version: u8,
332    pub(super) user_root: Option<BtreeHeader>,
333    pub(super) system_root: Option<BtreeHeader>,
334    pub(super) freed_root: Option<BtreeHeader>,
335    pub(super) transaction_id: TransactionId,
336}
337
338impl TransactionHeader {
339    fn new(transaction_id: TransactionId, version: u8) -> Self {
340        Self {
341            version,
342            user_root: None,
343            system_root: None,
344            freed_root: None,
345            transaction_id,
346        }
347    }
348
349    // Returned bool indicates whether the checksum was corrupted
350    pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, bool), DatabaseError> {
351        let version = data[VERSION_OFFSET];
352        match version {
353            FILE_FORMAT_VERSION1 => return Err(DatabaseError::UpgradeRequired(version)),
354            FILE_FORMAT_VERSION2 => {}
355            _ => {
356                return Err(StorageError::Corrupted(format!(
357                    "Expected file format version <= {FILE_FORMAT_VERSION2}, found {version}",
358                ))
359                .into())
360            }
361        }
362        let checksum = Checksum::from_le_bytes(
363            data[SLOT_CHECKSUM_OFFSET..(SLOT_CHECKSUM_OFFSET + size_of::<Checksum>())]
364                .try_into()
365                .unwrap(),
366        );
367        let corrupted = checksum != xxh3_checksum(&data[..SLOT_CHECKSUM_OFFSET]);
368
369        let user_root = if data[USER_ROOT_NON_NULL_OFFSET] != 0 {
370            Some(BtreeHeader::from_le_bytes(
371                data[USER_ROOT_OFFSET..(USER_ROOT_OFFSET + BtreeHeader::serialized_size())]
372                    .try_into()
373                    .unwrap(),
374            ))
375        } else {
376            None
377        };
378        let system_root = if data[SYSTEM_ROOT_NON_NULL_OFFSET] != 0 {
379            Some(BtreeHeader::from_le_bytes(
380                data[SYSTEM_ROOT_OFFSET..(SYSTEM_ROOT_OFFSET + BtreeHeader::serialized_size())]
381                    .try_into()
382                    .unwrap(),
383            ))
384        } else {
385            None
386        };
387        let freed_root = if data[FREED_ROOT_NON_NULL_OFFSET] != 0 {
388            Some(BtreeHeader::from_le_bytes(
389                data[FREED_ROOT_OFFSET..(FREED_ROOT_OFFSET + BtreeHeader::serialized_size())]
390                    .try_into()
391                    .unwrap(),
392            ))
393        } else {
394            None
395        };
396        let transaction_id = TransactionId::new(get_u64(&data[TRANSACTION_ID_OFFSET..]));
397
398        let result = Self {
399            version,
400            user_root,
401            system_root,
402            freed_root,
403            transaction_id,
404        };
405
406        Ok((result, corrupted))
407    }
408
409    pub(super) fn to_bytes(&self) -> [u8; TRANSACTION_SIZE] {
410        assert_eq!(self.version, FILE_FORMAT_VERSION2);
411        let mut result = [0; TRANSACTION_SIZE];
412        result[VERSION_OFFSET] = self.version;
413        if let Some(header) = self.user_root {
414            result[USER_ROOT_NON_NULL_OFFSET] = 1;
415            result[USER_ROOT_OFFSET..(USER_ROOT_OFFSET + BtreeHeader::serialized_size())]
416                .copy_from_slice(&header.to_le_bytes());
417        }
418        if let Some(header) = self.system_root {
419            result[SYSTEM_ROOT_NON_NULL_OFFSET] = 1;
420            result[SYSTEM_ROOT_OFFSET..(SYSTEM_ROOT_OFFSET + BtreeHeader::serialized_size())]
421                .copy_from_slice(&header.to_le_bytes());
422        }
423        if let Some(header) = self.freed_root {
424            result[FREED_ROOT_NON_NULL_OFFSET] = 1;
425            result[FREED_ROOT_OFFSET..(FREED_ROOT_OFFSET + BtreeHeader::serialized_size())]
426                .copy_from_slice(&header.to_le_bytes());
427        }
428        result[TRANSACTION_ID_OFFSET..(TRANSACTION_ID_OFFSET + size_of::<u64>())]
429            .copy_from_slice(&self.transaction_id.raw_id().to_le_bytes());
430        let checksum = xxh3_checksum(&result[..SLOT_CHECKSUM_OFFSET]);
431        result[SLOT_CHECKSUM_OFFSET..(SLOT_CHECKSUM_OFFSET + size_of::<Checksum>())]
432            .copy_from_slice(&checksum.to_le_bytes());
433
434        result
435    }
436}
437
438#[cfg(test)]
439mod test {
440    use crate::backends::FileBackend;
441    use crate::db::TableDefinition;
442    use crate::tree_store::page_store::header::{
443        GOD_BYTE_OFFSET, MAGICNUMBER, PAGE_SIZE, PRIMARY_BIT, RECOVERY_REQUIRED,
444        TRANSACTION_0_OFFSET, TRANSACTION_1_OFFSET, TWO_PHASE_COMMIT, USER_ROOT_OFFSET,
445    };
446    use crate::tree_store::page_store::TransactionalMemory;
447    #[cfg(not(target_os = "windows"))]
448    use crate::StorageError;
449    use crate::{Database, DatabaseError, ReadableTable};
450    use std::fs::OpenOptions;
451    use std::io::{Read, Seek, SeekFrom, Write};
452    use std::mem::size_of;
453
454    const X: TableDefinition<&str, &str> = TableDefinition::new("x");
455
456    #[test]
457    fn repair_allocator_checksums() {
458        let tmpfile = crate::create_tempfile();
459        let db = Database::builder().create(tmpfile.path()).unwrap();
460        let write_txn = db.begin_write().unwrap();
461        {
462            let mut table = write_txn.open_table(X).unwrap();
463            table.insert("hello", "world").unwrap();
464        }
465        write_txn.commit().unwrap();
466
467        // Start a read to be sure the previous write isn't garbage collected
468        let read_txn = db.begin_read().unwrap();
469
470        let mut write_txn = db.begin_write().unwrap();
471        {
472            // We want this to be the last commit before the database is closed, so it needs to
473            // use quick-repair -- otherwise, Database::drop() will generate its own quick-repair
474            // commit on shutdown
475            write_txn.set_quick_repair(true);
476            let mut table = write_txn.open_table(X).unwrap();
477            table.insert("hello", "world2").unwrap();
478        }
479        write_txn.commit().unwrap();
480        drop(read_txn);
481        drop(db);
482
483        let mut file = OpenOptions::new()
484            .read(true)
485            .write(true)
486            .open(tmpfile.path())
487            .unwrap();
488
489        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
490        let mut buffer = [0u8; 1];
491        file.read_exact(&mut buffer).unwrap();
492        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
493        buffer[0] |= RECOVERY_REQUIRED;
494        buffer[0] &= !TWO_PHASE_COMMIT;
495        file.write_all(&buffer).unwrap();
496
497        // Overwrite the primary checksum to simulate a failure during commit
498        let primary_slot_offset = if buffer[0] & PRIMARY_BIT == 0 {
499            TRANSACTION_0_OFFSET
500        } else {
501            TRANSACTION_1_OFFSET
502        };
503        file.seek(SeekFrom::Start(
504            (primary_slot_offset + USER_ROOT_OFFSET) as u64,
505        ))
506        .unwrap();
507        file.write_all(&[0; size_of::<u128>()]).unwrap();
508
509        assert!(TransactionalMemory::new(
510            Box::new(FileBackend::new(file).unwrap()),
511            PAGE_SIZE,
512            None,
513            0,
514            0
515        )
516        .unwrap()
517        .needs_repair()
518        .unwrap());
519
520        #[allow(unused_mut)]
521        let mut db2 = Database::create(tmpfile.path()).unwrap();
522        let write_txn = db2.begin_write().unwrap();
523        {
524            let mut table = write_txn.open_table(X).unwrap();
525            assert_eq!(table.get("hello").unwrap().unwrap().value(), "world");
526            table.insert("hello2", "world2").unwrap();
527        }
528        write_txn.commit().unwrap();
529
530        // Locks are exclusive on Windows, so we can't concurrently overwrite the file
531        #[cfg(not(target_os = "windows"))]
532        {
533            let mut file = OpenOptions::new()
534                .read(true)
535                .write(true)
536                .open(tmpfile.path())
537                .unwrap();
538            file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
539            let mut buffer = [0u8; 1];
540            file.read_exact(&mut buffer).unwrap();
541
542            // Overwrite the primary checksum to simulate a failure during commit
543            let primary_slot_offset = if buffer[0] & PRIMARY_BIT == 0 {
544                TRANSACTION_0_OFFSET
545            } else {
546                TRANSACTION_1_OFFSET
547            };
548            file.seek(SeekFrom::Start(
549                (primary_slot_offset + USER_ROOT_OFFSET) as u64,
550            ))
551            .unwrap();
552            file.write_all(&[0; size_of::<u128>()]).unwrap();
553
554            assert!(!db2.check_integrity().unwrap());
555
556            // Overwrite both checksums to simulate corruption
557            file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
558            let mut buffer = [0u8; 1];
559            file.read_exact(&mut buffer).unwrap();
560
561            file.seek(SeekFrom::Start(
562                (TRANSACTION_0_OFFSET + USER_ROOT_OFFSET) as u64,
563            ))
564            .unwrap();
565            file.write_all(&[0; size_of::<u128>()]).unwrap();
566            file.seek(SeekFrom::Start(
567                (TRANSACTION_1_OFFSET + USER_ROOT_OFFSET) as u64,
568            ))
569            .unwrap();
570            file.write_all(&[0; size_of::<u128>()]).unwrap();
571
572            assert!(matches!(
573                db2.check_integrity().unwrap_err(),
574                DatabaseError::Storage(StorageError::Corrupted(_))
575            ));
576        }
577    }
578
579    #[test]
580    fn repair_empty() {
581        let tmpfile = crate::create_tempfile();
582        let db = Database::builder().create(tmpfile.path()).unwrap();
583        drop(db);
584
585        let mut file = OpenOptions::new()
586            .read(true)
587            .write(true)
588            .open(tmpfile.path())
589            .unwrap();
590
591        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
592        let mut buffer = [0u8; 1];
593        file.read_exact(&mut buffer).unwrap();
594        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
595        buffer[0] |= RECOVERY_REQUIRED;
596        file.write_all(&buffer).unwrap();
597
598        assert!(TransactionalMemory::new(
599            Box::new(FileBackend::new(file).unwrap()),
600            PAGE_SIZE,
601            None,
602            0,
603            0
604        )
605        .unwrap()
606        .needs_repair()
607        .unwrap());
608
609        Database::open(tmpfile.path()).unwrap();
610    }
611
612    #[test]
613    fn abort_repair() {
614        let tmpfile = crate::create_tempfile();
615        let db = Database::builder().create(tmpfile.path()).unwrap();
616        drop(db);
617
618        let mut file = OpenOptions::new()
619            .read(true)
620            .write(true)
621            .open(tmpfile.path())
622            .unwrap();
623
624        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
625        let mut buffer = [0u8; 1];
626        file.read_exact(&mut buffer).unwrap();
627        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
628        buffer[0] |= RECOVERY_REQUIRED;
629        buffer[0] &= !TWO_PHASE_COMMIT;
630        file.write_all(&buffer).unwrap();
631
632        assert!(TransactionalMemory::new(
633            Box::new(FileBackend::new(file).unwrap()),
634            PAGE_SIZE,
635            None,
636            0,
637            0
638        )
639        .unwrap()
640        .needs_repair()
641        .unwrap());
642
643        let err = Database::builder()
644            .set_repair_callback(|handle| handle.abort())
645            .open(tmpfile.path())
646            .unwrap_err();
647        assert!(matches!(err, DatabaseError::RepairAborted));
648    }
649
650    #[test]
651    fn repair_insert_reserve_regression() {
652        let tmpfile = crate::create_tempfile();
653        let db = Database::builder().create(tmpfile.path()).unwrap();
654
655        let def: TableDefinition<&str, &[u8]> = TableDefinition::new("x");
656
657        let write_txn = db.begin_write().unwrap();
658        {
659            let mut table = write_txn.open_table(def).unwrap();
660            let mut value = table.insert_reserve("hello", 5).unwrap();
661            value.as_mut().copy_from_slice(b"world");
662        }
663        write_txn.commit().unwrap();
664
665        let write_txn = db.begin_write().unwrap();
666        {
667            let mut table = write_txn.open_table(def).unwrap();
668            let mut value = table.insert_reserve("hello2", 5).unwrap();
669            value.as_mut().copy_from_slice(b"world");
670        }
671        write_txn.commit().unwrap();
672
673        drop(db);
674
675        let mut file = OpenOptions::new()
676            .read(true)
677            .write(true)
678            .open(tmpfile.path())
679            .unwrap();
680
681        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
682        let mut buffer = [0u8; 1];
683        file.read_exact(&mut buffer).unwrap();
684        file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
685        buffer[0] |= RECOVERY_REQUIRED;
686        file.write_all(&buffer).unwrap();
687
688        assert!(TransactionalMemory::new(
689            Box::new(FileBackend::new(file).unwrap()),
690            PAGE_SIZE,
691            None,
692            0,
693            0
694        )
695        .unwrap()
696        .needs_repair()
697        .unwrap());
698
699        Database::open(tmpfile.path()).unwrap();
700    }
701
702    #[test]
703    fn magic_number() {
704        // Test compliance with some, but not all, provisions recommended by
705        // IETF Memo "Care and Feeding of Magic Numbers"
706
707        // Test that magic number is not valid utf-8
708        #[allow(invalid_from_utf8)]
709        {
710            assert!(std::str::from_utf8(&MAGICNUMBER).is_err());
711        }
712        // Test there is a octet with high-bit set
713        assert!(MAGICNUMBER.iter().any(|x| *x & 0x80 != 0));
714        // Test there is a non-printable ASCII character
715        assert!(MAGICNUMBER.iter().any(|x| *x < 0x20 || *x > 0x7E));
716        // Test there is a printable ASCII character
717        assert!(MAGICNUMBER.iter().any(|x| *x >= 0x20 && *x <= 0x7E));
718        // Test there is a printable ISO-8859 that's non-ASCII printable
719        assert!(MAGICNUMBER.iter().any(|x| *x >= 0xA0));
720        // Test there is a ISO-8859 control character other than 0x09, 0x0A, 0x0C, 0x0D
721        assert!(MAGICNUMBER.iter().any(|x| *x < 0x09
722            || *x == 0x0B
723            || (0x0E <= *x && *x <= 0x1F)
724            || (0x7F <= *x && *x <= 0x9F)));
725    }
726}