redb/tree_store/page_store/
region.rs
1use crate::Result;
2use crate::tree_store::PageNumber;
3use crate::tree_store::page_store::bitmap::BtreeBitmap;
4use crate::tree_store::page_store::buddy_allocator::BuddyAllocator;
5use crate::tree_store::page_store::cached_file::PagedCachedFile;
6use crate::tree_store::page_store::header::DatabaseHeader;
7use crate::tree_store::page_store::layout::DatabaseLayout;
8use crate::tree_store::page_store::page_manager::{INITIAL_REGIONS, MAX_MAX_PAGE_ORDER};
9use crate::tree_store::page_store::xxh3_checksum;
10use std::cmp::{self, max};
11use std::mem::size_of;
12
13const REGION_FORMAT_VERSION: u8 = 1;
14const ALLOCATOR_LENGTH_OFFSET: usize = 4;
15const ALLOCATOR_OFFSET: usize = ALLOCATOR_LENGTH_OFFSET + size_of::<u32>();
16
17pub(crate) struct RegionTracker {
20 order_trackers: Vec<BtreeBitmap>,
21}
22
23impl RegionTracker {
24 pub(crate) fn new(regions: u32, orders: u8) -> Self {
25 let mut data = vec![];
26 for _ in 0..orders {
27 data.push(BtreeBitmap::new(regions));
28 }
29 Self {
30 order_trackers: data,
31 }
32 }
33
34 pub(super) fn to_vec(&self) -> Vec<u8> {
39 let mut result = vec![];
40 let orders: u32 = self.order_trackers.len().try_into().unwrap();
41 let allocator_len: u32 = self.order_trackers[0].to_vec().len().try_into().unwrap();
42 result.extend(orders.to_le_bytes());
43 result.extend(allocator_len.to_le_bytes());
44 for order in &self.order_trackers {
45 result.extend(&order.to_vec());
46 }
47 result
48 }
49
50 pub(super) fn from_page(page: &[u8]) -> Self {
52 let orders = u32::from_le_bytes(page[..size_of::<u32>()].try_into().unwrap());
53 let allocator_len = u32::from_le_bytes(
54 page[size_of::<u32>()..2 * size_of::<u32>()]
55 .try_into()
56 .unwrap(),
57 ) as usize;
58 let mut data = vec![];
59 let mut start = 2 * size_of::<u32>();
60 for _ in 0..orders {
61 data.push(BtreeBitmap::from_bytes(
62 &page[start..(start + allocator_len)],
63 ));
64 start += allocator_len;
65 }
66
67 Self {
68 order_trackers: data,
69 }
70 }
71
72 pub(crate) fn find_free(&self, order: u8) -> Option<u32> {
73 self.order_trackers[order as usize].find_first_unset()
74 }
75
76 pub(crate) fn mark_free(&mut self, order: u8, region: u32) {
77 let order: usize = order.into();
78 for i in 0..=order {
79 self.order_trackers[i].clear(region);
80 }
81 }
82
83 pub(crate) fn mark_full(&mut self, order: u8, region: u32) {
84 let order: usize = order.into();
85 assert!(order < self.order_trackers.len());
86 for i in order..self.order_trackers.len() {
87 self.order_trackers[i].set(region);
88 }
89 }
90
91 fn expand(&mut self, new_capacity: u32) {
92 let mut new_trackers = vec![];
93 for order in 0..self.order_trackers.len() {
94 let mut new_bitmap = BtreeBitmap::new(new_capacity);
95 for region in 0..self.order_trackers[order].len() {
96 if !self.order_trackers[order].get(region) {
97 new_bitmap.clear(region);
98 }
99 }
100 new_trackers.push(new_bitmap);
101 }
102
103 self.order_trackers = new_trackers;
104 }
105
106 fn capacity(&self) -> u32 {
107 self.order_trackers[0].capacity()
108 }
109
110 fn len(&self) -> u32 {
111 self.order_trackers[0].len()
112 }
113}
114
115pub(super) struct Allocators {
116 pub(super) region_tracker: RegionTracker,
117 pub(super) region_allocators: Vec<BuddyAllocator>,
118}
119
120impl Allocators {
121 pub(super) fn new(layout: DatabaseLayout) -> Self {
122 let mut region_allocators = vec![];
123 let initial_regions = max(INITIAL_REGIONS, layout.num_regions());
124 let mut region_tracker = RegionTracker::new(initial_regions, MAX_MAX_PAGE_ORDER + 1);
125 for i in 0..layout.num_regions() {
126 let region_layout = layout.region_layout(i);
127 let allocator = BuddyAllocator::new(
128 region_layout.num_pages(),
129 layout.full_region_layout().num_pages(),
130 );
131 let max_order = allocator.get_max_order();
132 region_tracker.mark_free(max_order, i);
133 region_allocators.push(allocator);
134 }
135
136 Self {
137 region_tracker,
138 region_allocators,
139 }
140 }
141
142 #[cfg(any(test, fuzzing))]
143 pub(super) fn all_allocated(&self) -> Vec<PageNumber> {
144 let mut pages = vec![];
145 for (i, allocator) in self.region_allocators.iter().enumerate() {
146 allocator.get_allocated_pages(i.try_into().unwrap(), &mut pages);
147 }
148 pages
149 }
150
151 pub(crate) fn xxh3_hash(&self) -> u128 {
152 let mut result = 0;
155 for allocator in &self.region_allocators {
156 result ^= xxh3_checksum(&allocator.to_vec());
157 }
158 result
159 }
160
161 pub(super) fn from_bytes(header: &DatabaseHeader, storage: &PagedCachedFile) -> Result<Self> {
162 let page_size = header.page_size();
163 let region_header_size =
164 header.layout().full_region_layout().get_header_pages() * page_size;
165 let region_size = u64::from(header.layout().full_region_layout().num_pages())
166 * u64::from(page_size)
167 + u64::from(region_header_size);
168 let range = header.region_tracker().address_range(
169 page_size.into(),
170 region_size,
171 region_header_size.into(),
172 page_size,
173 );
174 let len: usize = (range.end - range.start).try_into().unwrap();
175 let region_tracker = storage.read_direct(range.start, len)?;
176 let mut region_allocators = vec![];
177 let layout = header.layout();
178 for i in 0..layout.num_regions() {
179 let base = layout.region_base_address(i);
180 let header_len: usize = layout
181 .region_layout(i)
182 .data_section()
183 .start
184 .try_into()
185 .unwrap();
186
187 let mem = storage.read_direct(base, header_len)?;
188 region_allocators.push(RegionHeader::deserialize(&mem));
189 }
190
191 Ok(Self {
192 region_tracker: RegionTracker::from_page(®ion_tracker),
193 region_allocators,
194 })
195 }
196
197 pub(super) fn flush_to(
198 &self,
199 region_tracker_page: PageNumber,
200 layout: DatabaseLayout,
201 storage: &PagedCachedFile,
202 ) -> Result {
203 let page_size = layout.full_region_layout().page_size();
204 let region_header_size =
205 u64::from(layout.full_region_layout().get_header_pages() * page_size);
206 let region_size = u64::from(layout.full_region_layout().num_pages()) * u64::from(page_size)
207 + region_header_size;
208 let mut region_tracker_mem = {
209 let range = region_tracker_page.address_range(
210 page_size.into(),
211 region_size,
212 region_header_size,
213 page_size,
214 );
215 let len: usize = (range.end - range.start).try_into().unwrap();
216 storage.write(range.start, len, false)?
217 };
218 let tracker_bytes = self.region_tracker.to_vec();
219 region_tracker_mem.mem_mut()[..tracker_bytes.len()].copy_from_slice(&tracker_bytes);
220
221 assert_eq!(self.region_allocators.len(), layout.num_regions() as usize);
222 for i in 0..layout.num_regions() {
223 let base = layout.region_base_address(i);
224 let len: usize = layout
225 .region_layout(i)
226 .data_section()
227 .start
228 .try_into()
229 .unwrap();
230
231 let mut mem = storage.write(base, len, false)?;
232 RegionHeader::serialize(&self.region_allocators[i as usize], mem.mem_mut());
233 }
234
235 Ok(())
236 }
237
238 pub(super) fn resize_to(&mut self, new_layout: DatabaseLayout) {
239 let shrink = match (new_layout.num_regions() as usize).cmp(&self.region_allocators.len()) {
240 cmp::Ordering::Less => true,
241 cmp::Ordering::Equal => {
242 let allocator = self.region_allocators.last().unwrap();
243 let last_region = new_layout
244 .trailing_region_layout()
245 .unwrap_or_else(|| new_layout.full_region_layout());
246 match last_region.num_pages().cmp(&allocator.len()) {
247 cmp::Ordering::Less => true,
248 cmp::Ordering::Equal => {
249 return;
251 }
252 cmp::Ordering::Greater => false,
253 }
254 }
255 cmp::Ordering::Greater => false,
256 };
257
258 if shrink {
259 for i in new_layout.num_regions()..(self.region_allocators.len().try_into().unwrap()) {
261 self.region_tracker.mark_full(0, i);
262 }
263 self.region_allocators
264 .drain((new_layout.num_regions() as usize)..);
265
266 let last_region = new_layout
268 .trailing_region_layout()
269 .unwrap_or_else(|| new_layout.full_region_layout());
270 let allocator = self.region_allocators.last_mut().unwrap();
271 if allocator.len() > last_region.num_pages() {
272 allocator.resize(last_region.num_pages());
273 }
274 } else {
275 let old_num_regions = self.region_allocators.len();
276 for i in 0..new_layout.num_regions() {
277 let new_region = new_layout.region_layout(i);
278 if (i as usize) < old_num_regions {
279 let allocator = &mut self.region_allocators[i as usize];
280 assert!(new_region.num_pages() >= allocator.len());
281 if new_region.num_pages() != allocator.len() {
282 allocator.resize(new_region.num_pages());
283 let highest_free = allocator.highest_free_order().unwrap();
284 self.region_tracker.mark_free(highest_free, i);
285 }
286 } else {
287 let allocator = BuddyAllocator::new(
289 new_region.num_pages(),
290 new_layout.full_region_layout().num_pages(),
291 );
292 let highest_free = allocator.highest_free_order().unwrap();
293 if i >= self.region_tracker.len() {
295 self.region_tracker
296 .expand(self.region_tracker.capacity() * 2);
297 }
298 self.region_tracker.mark_free(highest_free, i);
299 self.region_allocators.push(allocator);
300 }
301 }
302 }
303 }
304}
305
306pub(crate) struct RegionHeader {}
312
313impl RegionHeader {
314 pub(crate) fn header_pages_expensive(page_size: u32, pages_per_region: u32) -> u32 {
315 let page_size = u64::from(page_size);
316 let allocator = BuddyAllocator::new(pages_per_region, pages_per_region);
318 let result = 8u64 + allocator.to_vec().len() as u64;
319 result.div_ceil(page_size).try_into().unwrap()
320 }
321
322 fn serialize(allocator: &BuddyAllocator, output: &mut [u8]) {
323 let serialized = allocator.to_vec();
324 let len: u32 = serialized.len().try_into().unwrap();
325 output[0] = REGION_FORMAT_VERSION;
326 output[ALLOCATOR_LENGTH_OFFSET..(ALLOCATOR_LENGTH_OFFSET + size_of::<u32>())]
327 .copy_from_slice(&len.to_le_bytes());
328 output[ALLOCATOR_OFFSET..(ALLOCATOR_OFFSET + serialized.len())]
329 .copy_from_slice(&serialized);
330 }
331
332 fn deserialize(data: &[u8]) -> BuddyAllocator {
333 assert_eq!(REGION_FORMAT_VERSION, data[0]);
334 let allocator_len = u32::from_le_bytes(
335 data[ALLOCATOR_LENGTH_OFFSET..(ALLOCATOR_LENGTH_OFFSET + size_of::<u32>())]
336 .try_into()
337 .unwrap(),
338 ) as usize;
339 BuddyAllocator::from_bytes(&data[ALLOCATOR_OFFSET..(ALLOCATOR_OFFSET + allocator_len)])
340 }
341}