From affbc7707df1662052515a29f43c7f4052d563a6 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 27 Oct 2025 17:18:19 -0400 Subject: [PATCH 1/3] Implement spilling to disk for page cache --- core/storage/btree.rs | 280 +++++++++++------- core/storage/page_cache.rs | 332 +++++++++++++++------ core/storage/pager.rs | 507 +++++++++++++++++++++++++++------ core/storage/sqlite3_ondisk.rs | 53 ++++ 4 files changed, 892 insertions(+), 280 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index f200c78f6e..5c1fd01fb6 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -209,7 +209,7 @@ pub enum OverwriteCellState { } struct BalanceContext { - pages_to_balance_new: [Option>; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE], + pages_to_balance_new: [Option; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE], sibling_count_new: usize, cell_array: CellArray, old_cell_count_per_page_cumulative: [u16; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE], @@ -319,6 +319,43 @@ enum PayloadOverflowWithOffset { }, } +#[derive(Debug)] +pub struct PinGuard(PageRef); +impl PinGuard { + pub fn new(p: PageRef) -> Self { + p.pin(); + Self(p) + } +} + +// Since every Drop will unpin, every clone +// needs to add to the pin count +impl Clone for PinGuard { + fn clone(&self) -> Self { + self.0.pin(); + Self(self.0.clone()) + } +} + +impl PinGuard { + pub fn to_page(&self) -> PageRef { + self.0.clone() + } +} + +impl Drop for PinGuard { + fn drop(&mut self) { + self.0.try_unpin(); + } +} + +impl std::ops::Deref for PinGuard { + type Target = PageRef; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + #[derive(Clone, Debug)] pub enum BTreeKey<'a> { TableRowId((i64, Option<&'a ImmutableRecord>)), @@ -365,7 +402,7 @@ impl BTreeKey<'_> { #[derive(Debug, Clone)] struct BalanceInfo { /// Old pages being balanced. We can have maximum 3 pages being balanced at the same time. - pages_to_balance: [Option; MAX_SIBLING_PAGES_TO_BALANCE], + pages_to_balance: [Option; MAX_SIBLING_PAGES_TO_BALANCE], /// Bookkeeping of the rightmost pointer so the offset::BTREE_RIGHTMOST_PTR can be updated. rightmost_pointer: *mut u8, /// Divider cells of old pages. We can have maximum 2 divider cells because of 3 pages. @@ -745,7 +782,7 @@ impl BTreeCursor { let state = self.is_empty_table_state.borrow().clone(); match state { EmptyTableState::Start => { - let (page, c) = self.pager.read_page(self.root_page)?; + let (page, c) = return_if_io!(self.pager.read_page(self.root_page)); *self.is_empty_table_state.borrow_mut() = EmptyTableState::ReadPage { page }; if let Some(c) = c { io_yield_one!(c); @@ -786,7 +823,7 @@ impl BTreeCursor { if let Some(rightmost_pointer) = rightmost_pointer { let past_rightmost_pointer = cell_count as i32 + 1; self.stack.set_cell_index(past_rightmost_pointer); - let (page, c) = self.read_page(rightmost_pointer as i64)?; + let (page, c) = return_if_io!(self.read_page(rightmost_pointer as i64)); self.stack.push_backwards(page); if let Some(c) = c { io_yield_one!(c); @@ -858,7 +895,7 @@ impl BTreeCursor { self.stack.retreat(); } - let (mem_page, c) = self.read_page(left_child_page as i64)?; + let (mem_page, c) = return_if_io!(self.read_page(left_child_page as i64)); self.stack.push_backwards(mem_page); if let Some(c) = c { io_yield_one!(c); @@ -877,7 +914,7 @@ impl BTreeCursor { ) -> Result> { loop { if self.read_overflow_state.borrow().is_none() { - let (page, c) = self.read_page(start_next_page as i64)?; + let (page, c) = return_if_io!(self.read_page(start_next_page as i64)); *self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow { payload: payload.to_vec(), next_page: start_next_page, @@ -909,7 +946,7 @@ impl BTreeCursor { *remaining_to_read -= to_read; if *remaining_to_read != 0 && next != 0 { - let (new_page, c) = self.pager.read_page(next as i64)?; + let (new_page, c) = return_if_io!(self.pager.read_page(next as i64)); *page = new_page; *next_page = next; if let Some(c) = c { @@ -1078,8 +1115,7 @@ impl BTreeCursor { let pages_to_skip = offset / overflow_size as u32; let page_offset = offset % overflow_size as u32; // Read page - let (page, c) = self.read_page(first_overflow_page.unwrap() as i64)?; - + let (page, c) = return_if_io!(self.read_page(first_overflow_page.unwrap() as i64)); self.state = CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { next_page: page, @@ -1089,7 +1125,6 @@ impl BTreeCursor { buffer_offset: bytes_processed as usize, is_write, }); - if let Some(c) = c { io_yield_one!(c); } @@ -1140,8 +1175,7 @@ impl BTreeCursor { } pages_left_to_skip -= 1; - let (next_page, c) = self.read_page(next as i64)?; - + let (next_page, c) = return_if_io!(self.read_page(next as i64)); self.state = CursorState::ReadWritePayload( PayloadOverflowWithOffset::SkipOverflowPages { next_page, @@ -1152,7 +1186,6 @@ impl BTreeCursor { is_write, }, ); - if let Some(c) = c { io_yield_one!(c); } @@ -1207,9 +1240,8 @@ impl BTreeCursor { // Load next page current_offset = 0; // Reset offset for new page - let (next_page, c) = self.read_page(next as i64)?; + let (next_page, c) = return_if_io!(self.read_page(next as i64)); page = next_page; - self.state = CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { remaining_to_read, @@ -1218,7 +1250,6 @@ impl BTreeCursor { buffer_offset, is_write, }); - // Return IO to allow other operations if let Some(c) = c { io_yield_one!(c); } @@ -1330,7 +1361,8 @@ impl BTreeCursor { (Some(right_most_pointer), false) => { // do rightmost self.stack.advance(); - let (mem_page, c) = self.read_page(right_most_pointer as i64)?; + let (mem_page, c) = + return_if_io!(self.read_page(right_most_pointer as i64)); self.stack.push(mem_page); if let Some(c) = c { io_yield_one!(c); @@ -1371,7 +1403,7 @@ impl BTreeCursor { } let left_child_page = contents.cell_interior_read_left_child_page(cell_idx); - let (mem_page, c) = self.read_page(left_child_page as i64)?; + let (mem_page, c) = return_if_io!(self.read_page(left_child_page as i64)); self.stack.push(mem_page); if let Some(c) = c { io_yield_one!(c); @@ -1398,14 +1430,14 @@ impl BTreeCursor { /// Move the cursor to the root page of the btree. #[instrument(skip_all, level = Level::DEBUG)] - fn move_to_root(&mut self) -> Result> { + fn move_to_root(&mut self) -> Result>> { self.seek_state = CursorSeekState::Start; self.going_upwards = false; tracing::trace!(root_page = self.root_page); - let (mem_page, c) = self.read_page(self.root_page)?; + let (mem_page, c) = return_if_io!(self.read_page(self.root_page)); self.stack.clear(); self.stack.push(mem_page); - Ok(c) + Ok(IOResult::Done(c)) } /// Move the cursor to the rightmost record in the btree. @@ -1426,7 +1458,7 @@ impl BTreeCursor { } } let rightmost_page_id = *rightmost_page_id; - let c = self.move_to_root()?; + let c = return_if_io!(self.move_to_root()); self.move_to_right_state = (MoveToRightState::ProcessPage, rightmost_page_id); if let Some(c) = c { io_yield_one!(c); @@ -1448,13 +1480,13 @@ impl BTreeCursor { match contents.rightmost_pointer() { Some(right_most_pointer) => { self.stack.set_cell_index(contents.cell_count() as i32 + 1); - let (mem_page, c) = self.read_page(right_most_pointer as i64)?; + let (mem_page, c) = + return_if_io!(self.read_page(right_most_pointer as i64)); self.stack.push(mem_page); if let Some(c) = c { io_yield_one!(c); } } - None => { unreachable!("interior page should have a rightmost pointer"); } @@ -1526,7 +1558,7 @@ impl BTreeCursor { .unwrap() .cell_interior_read_left_child_page(nearest_matching_cell); self.stack.set_cell_index(nearest_matching_cell as i32); - let (mem_page, c) = self.read_page(left_child_page as i64)?; + let (mem_page, c) = return_if_io!(self.read_page(left_child_page as i64)); self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), @@ -1544,7 +1576,8 @@ impl BTreeCursor { .rightmost_pointer() { Some(right_most_pointer) => { - let (mem_page, c) = self.read_page(right_most_pointer as i64)?; + let (mem_page, c) = + return_if_io!(self.read_page(right_most_pointer as i64)); self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), @@ -1690,7 +1723,8 @@ impl BTreeCursor { .rightmost_pointer() { Some(right_most_pointer) => { - let (mem_page, c) = self.read_page(right_most_pointer as i64)?; + let (mem_page, c) = + return_if_io!(self.read_page(right_most_pointer as i64)); self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), @@ -1737,7 +1771,7 @@ impl BTreeCursor { ); } - let (mem_page, c) = self.read_page(*left_child_page as i64)?; + let (mem_page, c) = return_if_io!(self.read_page(*left_child_page as i64)); self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), @@ -2241,7 +2275,7 @@ impl BTreeCursor { MoveToState::Start => { self.move_to_state = MoveToState::MoveToPage; if matches!(self.seek_state, CursorSeekState::Start) { - let c = self.move_to_root()?; + let c = return_if_io!(self.move_to_root()); if let Some(c) = c { io_yield_one!(c); } @@ -2360,7 +2394,7 @@ impl BTreeCursor { ref mut fill_cell_payload_state, } => { return_if_io!(fill_cell_payload( - page, + &PinGuard::new(page.clone()), bkey.maybe_rowid(), new_payload, *cell_idx, @@ -2655,7 +2689,6 @@ impl BTreeCursor { parent_contents.write_rightmost_ptr(new_rightmost_leaf.get().id as u32); self.pager.add_dirty(parent)?; self.pager.add_dirty(&new_rightmost_leaf)?; - // Continue balance from the parent page (inserting the new divider cell may have overflowed the parent) self.stack.pop(); @@ -2741,7 +2774,7 @@ impl BTreeCursor { page_to_balance_idx ); // Part 1: Find the sibling pages to balance - let mut pages_to_balance: [Option>; MAX_SIBLING_PAGES_TO_BALANCE] = + let mut pages_to_balance: [Option; MAX_SIBLING_PAGES_TO_BALANCE] = [const { None }; MAX_SIBLING_PAGES_TO_BALANCE]; turso_assert!( page_to_balance_idx <= parent_contents.cell_count(), @@ -2832,12 +2865,17 @@ impl BTreeCursor { self.pager.io.drain()?; return Err(e); } - Ok((page, c)) => { - pages_to_balance[i].replace(page); + Ok(IOResult::Done((page, c))) => { + pages_to_balance[i].replace(PinGuard::new(page)); if let Some(c) = c { group.add(&c); } } + Ok(IOResult::IO(IOCompletions::Single(c))) => { + // we cannot just group.add here because this completion + // needs to be awaited before proceeding to read the page + io_yield_one!(c); + } } if i == 0 { break; @@ -2934,14 +2972,14 @@ impl BTreeCursor { } } // Start balancing. - let parent_page = self.stack.top_ref(); + let parent_page = PinGuard::new(self.stack.top_ref().clone()); let parent_contents = parent_page.get_contents(); // 1. Collect cell data from divider cells, and count the total number of cells to be distributed. // The count includes: all cells and overflow cells from the sibling pages, and divider cells from the parent page, // excluding the rightmost divider, which will not be dropped from the parent; instead it will be updated at the end. let mut total_cells_to_redistribute = 0; - let pages_to_balance_new: [Option>; + let pages_to_balance_new: [Option; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE] = [const { None }; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE]; for i in (0..balance_info.sibling_count).rev() { @@ -3112,7 +3150,6 @@ impl BTreeCursor { } total_cells_inserted += cells_inserted; } - turso_assert!( cell_array.cell_payloads.capacity() == cells_capacity_start, "calculation of max cells was wrong" @@ -3424,13 +3461,12 @@ impl BTreeCursor { turso_assert!(page.is_dirty(), "sibling page must be already marked dirty"); pages_to_balance_new[*i].replace(page.clone()); } else { - // FIXME: handle page cache is full let page = return_if_io!(pager.do_allocate_page( page_type, 0, BtreePageAllocMode::Any )); - pages_to_balance_new[*i].replace(page); + pages_to_balance_new[*i].replace(PinGuard::new(page)); // Since this page didn't exist before, we can set it to cells length as it // marks them as empty since it is a prefix sum of cells. old_cell_count_per_page_cumulative[*i] = @@ -3464,7 +3500,7 @@ impl BTreeCursor { .get_contents() .page_type(); let parent_is_root = !self.stack.has_parent(); - let parent_page = self.stack.top_ref(); + let parent_page = PinGuard::new(self.stack.top_ref().clone()); let parent_contents = parent_page.get_contents(); let mut sibling_count_new = *sibling_count_new; let is_table_leaf = matches!(page_type, PageType::TableLeaf); @@ -3490,7 +3526,7 @@ impl BTreeCursor { if *new_id != page.get().id { page.get().id = *new_id; self.pager - .upsert_page_in_cache(*new_id, page.clone(), true)?; + .upsert_page_in_cache(*new_id, page.0.clone(), true)?; } } @@ -3831,7 +3867,7 @@ impl BTreeCursor { #[cfg(debug_assertions)] BTreeCursor::post_balance_non_root_validation( - parent_page, + &parent_page, balance_info, parent_contents, pages_to_balance_new, @@ -3873,7 +3909,7 @@ impl BTreeCursor { let balance_info = balance_info.borrow(); let balance_info = balance_info.as_ref().expect("must be balancing"); let page = balance_info.pages_to_balance[*curr_page].as_ref().unwrap(); - return_if_io!(self.pager.free_page(Some(page.clone()), page.get().id)); + return_if_io!(self.pager.free_page(Some(page.0.clone()), page.get().id)); *sub_state = BalanceSubState::FreePages { curr_page: *curr_page + 1, sibling_count_new: *sibling_count_new, @@ -3940,7 +3976,7 @@ impl BTreeCursor { parent_page: &PageRef, balance_info: &BalanceInfo, parent_contents: &mut PageContent, - pages_to_balance_new: &[Option; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE], + pages_to_balance_new: &[Option; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE], page_type: PageType, is_table_leaf: bool, cells_debug: &mut [Vec], @@ -4338,7 +4374,6 @@ impl BTreeCursor { let root = self.stack.top(); let root_contents = root.get_contents(); - // FIXME: handle page cache is full let child = return_if_io!(self.pager.do_allocate_page( root_contents.page_type(), 0, @@ -4405,6 +4440,7 @@ impl BTreeCursor { self.stack.clear(); self.stack.push(root); self.stack.set_cell_index(0); // leave parent pointing at the rightmost pointer (in this case 0, as there are no cells), since we will be balancing the rightmost child page. + self.stack.push(child.clone()); Ok(IOResult::Done(())) } @@ -4447,7 +4483,7 @@ impl BTreeCursor { self.overflow_state = OverflowState::Start; return Err(LimboError::Corrupt("Invalid overflow page number".into())); } - let (page, c) = self.read_page(next_page as i64)?; + let (page, c) = return_if_io!(self.read_page(next_page as i64)); self.overflow_state = OverflowState::ProcessPage { next_page: page }; if let Some(c) = c { io_yield_one!(c); @@ -4479,7 +4515,7 @@ impl BTreeCursor { self.overflow_state = OverflowState::Start; return Err(LimboError::Corrupt("Invalid overflow page number".into())); } - let (page, c) = self.read_page(next as i64)?; + let (page, c) = return_if_io!(self.read_page(next as i64)); self.overflow_state = OverflowState::ProcessPage { next_page: page }; if let Some(c) = c { io_yield_one!(c); @@ -4515,7 +4551,7 @@ impl BTreeCursor { /// The destruction order would be: [4',4,5,2,6,7,3,1] fn destroy_btree_contents(&mut self, keep_root: bool) -> Result>> { if let CursorState::None = &self.state { - let c = self.move_to_root()?; + let c = return_if_io!(self.move_to_root()); self.state = CursorState::Destroy(DestroyInfo { state: DestroyState::Start, }); @@ -4570,7 +4606,8 @@ impl BTreeCursor { // Non-leaf page which has processed all children but not it's potential right child (false, n) if n == contents.cell_count() as i32 => { if let Some(rightmost) = contents.rightmost_pointer() { - let (rightmost_page, c) = self.read_page(rightmost as i64)?; + let (rightmost_page, c) = + return_if_io!(self.read_page(rightmost as i64)); self.stack.push(rightmost_page); let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", @@ -4630,7 +4667,8 @@ impl BTreeCursor { BTreeCell::IndexInteriorCell(cell) => cell.left_child_page, _ => panic!("expected interior cell"), }; - let (child_page, c) = self.read_page(child_page_id as i64)?; + let (child_page, c) = + return_if_io!(self.read_page(child_page_id as i64)); self.stack.push(child_page); let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", @@ -4648,8 +4686,9 @@ impl BTreeCursor { match cell { // For an index interior cell, clear the left child page now that overflow pages have been cleared BTreeCell::IndexInteriorCell(index_int_cell) => { - let (child_page, c) = - self.read_page(index_int_cell.left_child_page as i64)?; + let (child_page, c) = return_if_io!( + self.read_page(index_int_cell.left_child_page as i64) + ); self.stack.push(child_page); let destroy_info = self .state @@ -4743,7 +4782,7 @@ impl BTreeCursor { } => { { return_if_io!(fill_cell_payload( - page, + &PinGuard::new(page.clone()), *rowid, new_payload, cell_idx, @@ -4868,7 +4907,7 @@ impl BTreeCursor { } } - pub fn read_page(&self, page_idx: i64) -> Result<(PageRef, Option)> { + pub fn read_page(&self, page_idx: i64) -> Result)>> { btree_read_page(&self.pager, page_idx) } @@ -5482,7 +5521,7 @@ impl CursorTrait for BTreeCursor { let state = self.count_state; match state { CountState::Start => { - let c = self.move_to_root()?; + let c = return_if_io!(self.move_to_root()); self.count_state = CountState::Loop; if let Some(c) = c { io_yield_one!(c); @@ -5508,7 +5547,7 @@ impl CursorTrait for BTreeCursor { loop { if !self.stack.has_parent() { // All pages of the b-tree have been visited. Return successfully - let c = self.move_to_root()?; + let c = return_if_io!(self.move_to_root()); self.count_state = CountState::Finish; if let Some(c) = c { io_yield_one!(c); @@ -5541,8 +5580,8 @@ impl CursorTrait for BTreeCursor { // should be safe as contents is not a leaf page let right_most_pointer = contents.rightmost_pointer().unwrap(); self.stack.advance(); - let (mem_page, c) = self.read_page(right_most_pointer as i64)?; - self.stack.push(mem_page); + let (child, c) = return_if_io!(self.read_page(right_most_pointer as i64)); + self.stack.push(child); if let Some(c) = c { io_yield_one!(c); } @@ -5560,8 +5599,9 @@ impl CursorTrait for BTreeCursor { .. }) => { self.stack.advance(); - let (mem_page, c) = self.read_page(left_child_page as i64)?; - self.stack.push(mem_page); + let (child, c) = + return_if_io!(self.read_page(left_child_page as i64)); + self.stack.push(child); if let Some(c) = c { io_yield_one!(c); } @@ -5594,7 +5634,7 @@ impl CursorTrait for BTreeCursor { match self.rewind_state { RewindState::Start => { self.rewind_state = RewindState::NextRecord; - let c = self.move_to_root()?; + let c = return_if_io!(self.move_to_root()); if let Some(c) = c { io_yield_one!(c); } @@ -5652,7 +5692,7 @@ impl CursorTrait for BTreeCursor { loop { match self.seek_end_state { SeekEndState::Start => { - let c = self.move_to_root()?; + let c = return_if_io!(self.move_to_root()); self.seek_end_state = SeekEndState::ProcessPage; if let Some(c) = c { io_yield_one!(c); @@ -5671,7 +5711,8 @@ impl CursorTrait for BTreeCursor { match contents.rightmost_pointer() { Some(right_most_pointer) => { self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior - let (child, c) = self.read_page(right_most_pointer as i64)?; + let (child, c) = + return_if_io!(self.read_page(right_most_pointer as i64)); self.stack.push(child); if let Some(c) = c { io_yield_one!(c); @@ -5910,14 +5951,18 @@ pub fn integrity_check( }; let page = match state.page.take() { Some(page) => page, - None => { - let (page, c) = btree_read_page(pager, page_idx)?; - state.page = Some(page); - if let Some(c) = c { + None => match btree_read_page(pager, page_idx)? { + IOResult::Done((page, c)) => { + state.page = Some(page); + if let Some(c) = c { + io_yield_one!(c); + } + state.page.take().expect("page should be present") + } + IOResult::IO(IOCompletions::Single(c)) => { io_yield_one!(c); } - state.page.take().expect("page should be present") - } + }, }; turso_assert!(page.is_loaded(), "page should be loaded"); state.page_stack.pop(); @@ -6176,7 +6221,7 @@ pub fn integrity_check( pub fn btree_read_page( pager: &Arc, page_idx: i64, -) -> Result<(Arc, Option)> { +) -> Result, Option)>> { pager.read_page(page_idx) } @@ -7538,7 +7583,7 @@ pub enum FillCellPayloadState { /// If this is None, we will copy data into the cell payload on the btree page. /// Also: to safely form a chain of overflow pages, the current page must be pinned to the page cache /// so that e.g. a spilling operation does not evict it to disk. - current_overflow_page: Option, + current_overflow_page: Option, }, } @@ -7556,7 +7601,7 @@ pub enum CopyDataState { /// may require I/O. #[allow(clippy::too_many_arguments)] fn fill_cell_payload( - page: &PageRef, + page: &PinGuard, int_key: Option, cell_payload: &mut Vec, cell_idx: usize, @@ -7571,7 +7616,6 @@ fn fill_cell_payload( let record_buf = record.get_payload(); match fill_cell_payload_state { FillCellPayloadState::Start => { - page.pin(); // We need to pin this page because we will be accessing its contents after fill_cell_payload is done. let page_contents = page.get_contents(); let page_type = page_contents.page_type(); @@ -7649,9 +7693,6 @@ fn fill_cell_payload( } if record_offset_slice.len() - amount_to_copy == 0 { - let cur_page = current_overflow_page.as_ref().expect("we must have overflowed if the remaining payload fits on the current page"); - cur_page.unpin(); // We can safely unpin the current overflow page now. - // Everything copied. break Ok(IOResult::Done(())); } *state = CopyDataState::AllocateOverflowPage; @@ -7659,20 +7700,14 @@ fn fill_cell_payload( } CopyDataState::AllocateOverflowPage => { let new_overflow_page = match pager.allocate_overflow_page() { - Ok(IOResult::Done(new_overflow_page)) => new_overflow_page, + Ok(IOResult::Done(new_overflow_page)) => { + PinGuard::new(new_overflow_page) + } Ok(IOResult::IO(io_result)) => return Ok(IOResult::IO(io_result)), Err(e) => { - if let Some(cur_page) = current_overflow_page { - cur_page.unpin(); - } break Err(e); } }; - new_overflow_page.pin(); // Pin the current overflow page so the cache won't evict it because we need this page to be in memory for the next iteration of FillCellPayloadState::CopyData. - if let Some(prev_page) = current_overflow_page { - prev_page.unpin(); // We can safely unpin the previous overflow page now. - } - turso_assert!( new_overflow_page.is_loaded(), "new overflow page is not loaded" @@ -7706,7 +7741,6 @@ fn fill_cell_payload( } } }; - page.unpin(); result } /// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages. @@ -7861,7 +7895,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &page, + &PinGuard::new(page.clone()), Some(id as i64), &mut payload, pos, @@ -7970,10 +8004,19 @@ mod tests { fn validate_btree(pager: Arc, page_idx: i64) -> (usize, bool) { let num_columns = 5; let cursor = BTreeCursor::new_table(pager.clone(), page_idx, num_columns); - let (page, _c) = cursor.read_page(page_idx).unwrap(); - while page.is_locked() { - pager.io.step().unwrap(); - } + let page = match cursor.read_page(page_idx).unwrap() { + IOResult::IO(IOCompletions::Single(c)) => { + pager.io.wait_for_completion(c); + // TODO + return validate_btree(pager.clone(), page_idx); + } + IOResult::Done((page, _c)) => { + while page.is_locked() { + pager.io.step().unwrap(); + } + page + } + }; // Pin page in order to not drop it in between page.set_dirty(); @@ -7990,11 +8033,17 @@ mod tests { BTreeCell::TableInteriorCell(TableInteriorCell { left_child_page, .. }) => { - let (child_page, _c) = cursor.read_page(left_child_page as i64).unwrap(); - while child_page.is_locked() { - pager.io.step().unwrap(); + match cursor.read_page(left_child_page as i64).unwrap() { + IOResult::Done((child_page, c)) => { + while child_page.is_locked() { + pager.io.step().unwrap(); + } + child_pages.push(child_page); + } + IOResult::IO(IOCompletions::Single(c)) => { + pager.io.wait_for_completion(c); + } } - child_pages.push(child_page); if left_child_page == page.get().id as u32 { valid = false; tracing::error!( @@ -8047,7 +8096,10 @@ mod tests { } let first_page_type = child_pages.first_mut().map(|p| { if !p.is_loaded() { - let (new_page, _c) = pager.read_page(p.get().id as i64).unwrap(); + let IOResult::Done((new_page, _c)) = pager.read_page(p.get().id as i64).unwrap() + else { + panic!(); + }; *p = new_page; } while p.is_locked() { @@ -8058,7 +8110,11 @@ mod tests { if let Some(child_type) = first_page_type { for page in child_pages.iter_mut().skip(1) { if !page.is_loaded() { - let (new_page, _c) = pager.read_page(page.get().id as i64).unwrap(); + let IOResult::Done((new_page, _c)) = + pager.read_page(page.get().id as i64).unwrap() + else { + panic!(); + }; *page = new_page; } while page.is_locked() { @@ -8081,7 +8137,9 @@ mod tests { let num_columns = 5; let cursor = BTreeCursor::new_table(pager.clone(), page_idx, num_columns); - let (page, _c) = cursor.read_page(page_idx).unwrap(); + let IOResult::Done((page, _c)) = cursor.read_page(page_idx).unwrap() else { + panic!(); + }; while page.is_locked() { pager.io.step().unwrap(); } @@ -8597,7 +8655,9 @@ mod tests { pager.deref(), ) .unwrap(); - let c = cursor.move_to_root().unwrap(); + let IOResult::Done(c) = cursor.move_to_root().unwrap() else { + panic!(); + }; if let Some(c) = c { pager.io.wait_for_completion(c).unwrap(); } @@ -8808,7 +8868,9 @@ mod tests { } } - let c = cursor.move_to_root().unwrap(); + let IOResult::Done(c) = cursor.move_to_root().unwrap() else { + panic!(); + }; if let Some(c) = c { pager.io.wait_for_completion(c).unwrap(); } @@ -9132,7 +9194,9 @@ mod tests { .write_page(current_page, buf.clone(), &IOContext::default(), c)?; pager.io.step()?; - let (page, _c) = cursor.read_page(current_page as i64)?; + let IOResult::Done((page, _c)) = cursor.read_page(current_page as i64)? else { + panic!(); + }; while page.is_locked() { cursor.pager.io.step()?; } @@ -9192,7 +9256,9 @@ mod tests { let trunk_page_id = freelist_trunk_page; if trunk_page_id > 0 { // Verify trunk page structure - let (trunk_page, _c) = cursor.read_page(trunk_page_id as i64)?; + let IOResult::Done((trunk_page, _c)) = cursor.read_page(trunk_page_id as i64)? else { + panic!(); + }; let contents = trunk_page.get_contents(); // Read number of leaf pages in trunk let n_leaf = contents.read_u32_no_offset(4); @@ -9626,7 +9692,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &page, + &PinGuard::new(page), Some(i as i64), &mut payload, cell_idx, @@ -9708,7 +9774,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &page, + &PinGuard::new(page), Some(i), &mut payload, cell_idx, @@ -10081,7 +10147,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &page, + &PinGuard::new(page), Some(0), &mut payload, 0, @@ -10167,7 +10233,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &page, + &PinGuard::new(page), Some(0), &mut payload, 0, @@ -10573,7 +10639,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &page, + &PinGuard::new(page), Some(cell_idx as i64), &mut payload, cell_idx as usize, diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index e8ac4e657a..4db123eeb4 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -3,13 +3,14 @@ use rustc_hash::FxHashMap; use std::sync::{atomic::Ordering, Arc}; use tracing::trace; -use crate::turso_assert; +use crate::{ + storage::{btree::PinGuard, sqlite3_ondisk::DatabaseHeader}, + turso_assert, +}; use super::pager::PageRef; -/// FIXME: https://github.com/tursodatabase/turso/issues/1661 -const DEFAULT_PAGE_CACHE_SIZE_IN_PAGES_MAKE_ME_SMALLER_ONCE_WAL_SPILL_IS_IMPLEMENTED: usize = - 100000; +const DEFAULT_PAGE_CACHE_SIZE_IN_PAGES: usize = 1000; #[derive(Debug, Copy, Eq, Hash, PartialEq, Clone)] #[repr(transparent)] @@ -17,6 +18,7 @@ pub struct PageCacheKey(usize); const CLEAR: u8 = 0; const REF_MAX: u8 = 3; +const HOT_PAGE_THRESHOLD: u64 = 5; /// An entry in the page cache. /// @@ -96,6 +98,8 @@ pub enum CacheError { ActiveRefs, #[error("Page cache is full")] Full, + #[error("Page cache is full, can spill to disk")] + FullCanSpill, #[error("key already exists")] KeyExists, } @@ -106,12 +110,25 @@ pub enum CacheResizeResult { PendingEvictions, } +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum EvictionMode { + /// only evict if ref_bit is CLEAR, otherwise decrement. + Sieve, + /// evict clean & unpinned & unlocked page, still in eviction order. + Aggressive, +} + impl PageCacheKey { pub fn new(pgno: usize) -> Self { Self(pgno) } } +#[inline] +fn evictable_clean(page: &PageRef) -> bool { + !page.is_dirty() && !page.is_locked() && !page.is_pinned() && Arc::strong_count(page).eq(&1) +} + impl PageCache { pub fn new(capacity: usize) -> Self { assert!(capacity > 0); @@ -151,17 +168,220 @@ impl PageCache { } } + #[inline] + /// Returns true if there is at least one unpinned dirty page that can be spilled to disk + pub fn has_spillable(&self) -> bool { + if self.len() == 0 { + return false; + } + let mut cur = self.queue.front(); + while let Some(entry) = cur.get() { + let p = &entry.page; + if Self::can_spill_page(p) { + return true; + } + cur.move_next(); + } + false + } + + #[inline] + fn can_spill_page(p: &PageRef) -> bool { + p.is_dirty() + && !p.is_pinned() + && !p.is_locked() + // Don't spill if someone else still holds a reference to the page. + && Arc::strong_count(p) == 1 + // Don't spill while there are pending overflow cells not written into the page body yet. + && p.get() + .contents + .as_ref() + .is_some_and(|c| c.overflow_cells.is_empty()) + && p.get().id.ne(&DatabaseHeader::PAGE_ID) // never spill page 1 + // dirty_gen is increased every time `mark_dirty` is called on a PageRef, dont spill hot pages + && p.dirty_gen() <= HOT_PAGE_THRESHOLD + } + + /// Collect candidates for spilling to disk when cache under pressure + pub fn compute_spill_candidates(&mut self) -> Vec { + const SPILL_BATCH: usize = 128; + if self.len() == 0 || self.clock_hand.is_null() { + return Vec::new(); + } + let mut out = Vec::with_capacity(SPILL_BATCH); + let start = self.clock_hand; + let mut ptr = start; + loop { + let entry = unsafe { &mut *ptr }; + let page = &entry.page; + // On the first pass, only spill if ref_bit is CLEAR, we that we can ensure spilling the coldest pages. + // For the second pass, we can be more lenient and accept pages with ref_bit < REF_MAX. + if Self::can_spill_page(page) { + out.push(PinGuard::new(page.clone())); + if out.len() >= SPILL_BATCH { + break; + } + } + let mut cur = unsafe { self.queue.cursor_mut_from_ptr(ptr) }; + cur.move_next(); + if let Some(next) = cur.get() { + ptr = next as *const _ as *mut PageCacheEntry; + } else if let Some(front) = self.queue.front_mut().get() { + ptr = front as *const _ as *mut PageCacheEntry; + } else { + break; + } + if ptr == start { + break; + } + } + out + } + + #[inline] + /// Evict up to `target` clean, unpinned pages + pub fn evict_clean_aggressive(&mut self, target: usize) -> Result { + self.evict(target, EvictionMode::Aggressive) + } + + fn evict(&mut self, target: usize, mode: EvictionMode) -> Result { + if self.len() == 0 || target == 0 { + return Err(CacheError::InternalError( + "Cannot evict from empty cache".into(), + )); + } + let default_pred = |entry: &mut PageCacheEntry| { + if entry.ref_bit == CLEAR { + true // Evict + } else { + entry.decrement_ref(); + false // Give second chance + } + }; + let evicted = match mode { + EvictionMode::Sieve => { + let max_rounds = REF_MAX as usize + 1; + let mut total_evicted = 0; + for _ in 0..max_rounds { + if total_evicted >= target { + break; + } + let need = target - total_evicted; + let evicted_this_round = self.evict_pass(need, default_pred)?; + total_evicted += evicted_this_round; + // If we made no progress, we're stuck + if evicted_this_round == 0 { + break; + } + } + total_evicted + } + EvictionMode::Aggressive => { + // Two-passes, first we try to respect ref_bits, and if we don't get enough + // from the first pass, the second pass we evict all evictible still in eviction order. + let mut evicted = 0; + evicted += self.evict_pass(target, default_pred)?; + if evicted < target { + let remaining = target - evicted; + evicted += self.evict_pass(remaining, |_| true)?; + } + evicted + } + }; + if evicted >= target { + Ok(evicted) + } else { + Err(if self.has_spillable() { + CacheError::FullCanSpill + } else { + CacheError::Full + }) + } + } + + #[inline] + /// Performs a single eviction pass with a custom predicate. + /// Walks the ring once, evicting clean pages that match the predicate. + /// Returns number of pages evicted. + fn evict_pass(&mut self, target: usize, should_evict: F) -> Result + where + F: Fn(&mut PageCacheEntry) -> bool, + { + if self.clock_hand.is_null() || target == 0 { + return Ok(0); + } + + let mut evicted = 0usize; + let start = self.clock_hand; + let mut ptr = start; + loop { + let entry = unsafe { &mut *ptr }; + // Get next pointer before potential eviction + let next_ptr = { + let mut cursor = unsafe { self.queue.cursor_mut_from_ptr(ptr) }; + cursor.move_next(); + + if let Some(next) = cursor.get() { + next as *const _ as *mut PageCacheEntry + } else if let Some(front) = self.queue.front_mut().get() { + front as *const _ as *mut PageCacheEntry + } else { + std::ptr::null_mut() + } + }; + // Now safe to evict current entry if it matches criteria + if evictable_clean(&entry.page) && should_evict(entry) { + // Handle clock hand before eviction + if self.clock_hand == ptr { + self.clock_hand = if next_ptr != ptr && !next_ptr.is_null() { + next_ptr + } else { + // Find any other entry or null + if let Some(first) = self.queue.front().get() { + first as *const _ as *mut PageCacheEntry + } else { + std::ptr::null_mut() + } + }; + } + // Evict the entry + let key = entry.key; + self.map.remove(&key); + entry.page.clear_loaded(); + let _ = entry.page.get().contents.take(); + unsafe { + let mut cursor = self.queue.cursor_mut_from_ptr(ptr); + cursor.remove(); + } + evicted += 1; + if evicted >= target { + break; + } + } + // Move to next entry + if next_ptr.is_null() || next_ptr == start { + break; // Either we have done a full rotation or empty + } + ptr = next_ptr; + } + Ok(evicted) + } + pub fn contains_key(&self, key: &PageCacheKey) -> bool { self.map.contains_key(key) } #[inline] - pub fn insert(&mut self, key: PageCacheKey, value: PageRef) -> Result<(), CacheError> { + pub fn insert(&mut self, key: PageCacheKey, value: PageRef) -> Result { self._insert(key, value, false) } #[inline] - pub fn upsert_page(&mut self, key: PageCacheKey, value: PageRef) -> Result<(), CacheError> { + pub fn upsert_page( + &mut self, + key: PageCacheKey, + value: PageRef, + ) -> Result { self._insert(key, value, true) } @@ -170,13 +390,15 @@ impl PageCache { key: PageCacheKey, value: PageRef, update_in_place: bool, - ) -> Result<(), CacheError> { + ) -> Result { trace!("insert(key={:?})", key); if let Some(&entry_ptr) = self.map.get(&key) { let entry = unsafe { &mut *entry_ptr }; let p = &entry.page; + // if the existing page is neither loaded nor locked, this means that it's garbage + // from an aborted read_page operation. We can evict it and insert the new page. if !p.is_loaded() && !p.is_locked() { // evict, then continue with fresh insert self._delete(key, true)?; @@ -185,7 +407,7 @@ impl PageCache { entry.bump_ref(); if update_in_place { entry.page = value; - return Ok(()); + return Ok(entry.page.clone()); } else { turso_assert!( Arc::ptr_eq(&entry.page, &value), @@ -197,9 +419,15 @@ impl PageCache { } // Key doesn't exist, proceed with new entry - self.make_room_for(1)?; + match self.make_room_for(1) { + Err(CacheError::Full) => { + self.evict(1, EvictionMode::Aggressive)?; + } + Err(e) => return Err(e), + Ok(()) => {} + } - let entry = PageCacheEntry::new(key, value); + let entry = PageCacheEntry::new(key, value.clone()); if self.clock_hand.is_null() { // First entry - just push it @@ -221,7 +449,7 @@ impl PageCache { } } - Ok(()) + Ok(value) } fn _delete(&mut self, key: PageCacheKey, clean_page: bool) -> Result<(), CacheError> { @@ -323,10 +551,9 @@ impl PageCache { } // Evict entries one by one until we're at new capacity - while new_cap < self.len() { - if self.evict_one().is_err() { - return CacheResizeResult::PendingEvictions; - } + let need = self.len().saturating_sub(new_cap); + if self.evict(need, EvictionMode::Sieve).is_err() { + return CacheResizeResult::PendingEvictions; } self.capacity = new_cap; @@ -348,81 +575,16 @@ impl PageCache { if n > self.capacity { return Err(CacheError::Full); } - let available = self.capacity - self.len(); + let available = self.capacity.saturating_sub(self.len()); if n <= available { return Ok(()); } let need = n - available; - for _ in 0..need { - self.evict_one()?; - } + self.evict(need, EvictionMode::Sieve)?; Ok(()) } - /// Evicts a single page using the SIEVE algorithm - /// Unlike make_room_for(), this ignores capacity and always tries to evict one page - fn evict_one(&mut self) -> Result<(), CacheError> { - if self.len() == 0 { - return Err(CacheError::InternalError( - "Cannot evict from empty cache".into(), - )); - } - - let mut examined = 0usize; - let max_examinations = self.len().saturating_mul(REF_MAX as usize + 1); - - while examined < max_examinations { - // Clock hand should never be null here since we checked len() > 0 - assert!( - !self.clock_hand.is_null(), - "clock hand is null but cache has {} entries", - self.len() - ); - - let entry_ptr = self.clock_hand; - let entry = unsafe { &mut *entry_ptr }; - let key = entry.key; - let page = &entry.page; - - let evictable = !page.is_dirty() && !page.is_locked() && !page.is_pinned(); - - if evictable && entry.ref_bit == CLEAR { - // Evict this entry - self.advance_clock_hand(); - // Check if clock hand wrapped back to the same entry (meaning this is the only/last entry) - if self.clock_hand == entry_ptr { - self.clock_hand = std::ptr::null_mut(); - } - - self.map.remove(&key); - - // Clean the page - page.clear_loaded(); - let _ = page.get().contents.take(); - - // Remove from queue - unsafe { - let mut cursor = self.queue.cursor_mut_from_ptr(entry_ptr); - cursor.remove(); - } - - return Ok(()); - } else if evictable { - // Decrement ref bit and continue - entry.decrement_ref(); - self.advance_clock_hand(); - examined += 1; - } else { - // Skip unevictable page - self.advance_clock_hand(); - examined += 1; - } - } - - Err(CacheError::Full) - } - pub fn clear(&mut self, clear_dirty: bool) -> Result<(), CacheError> { // Check all pages are clean for &entry_ptr in self.map.values() { @@ -473,7 +635,7 @@ impl PageCache { i, entry.key, page.get().flags.load(Ordering::SeqCst), - page.get().pin_count.load(Ordering::SeqCst), + page.get().pin_flags.load(Ordering::SeqCst), entry.ref_bit, ); cursor.move_next(); @@ -538,9 +700,7 @@ impl PageCache { impl Default for PageCache { fn default() -> Self { - PageCache::new( - DEFAULT_PAGE_CACHE_SIZE_IN_PAGES_MAKE_ME_SMALLER_ONCE_WAL_SPILL_IS_IMPLEMENTED, - ) + PageCache::new(DEFAULT_PAGE_CACHE_SIZE_IN_PAGES) } } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 465f0393e7..f9b9b6cbfc 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1,3 +1,5 @@ +use crate::storage::btree::PinGuard; +use crate::storage::sqlite3_ondisk::begin_write_btree_page_with_snapshot; use crate::storage::subjournal::Subjournal; use crate::storage::wal::IOV_MAX; use crate::storage::{ @@ -14,7 +16,7 @@ use crate::{ io::CompletionGroup, return_if_io, turso_assert, types::WalFrameInfo, Completion, Connection, IOResult, LimboError, Result, TransactionState, }; -use crate::{io_yield_one, CompletionError, IOContext, OpenFlags, IO}; +use crate::{io_yield_one, Buffer, CompletionError, IOContext, OpenFlags, IO}; use parking_lot::{Mutex, RwLock}; use roaring::RoaringBitmap; use std::cell::{RefCell, UnsafeCell}; @@ -61,11 +63,14 @@ impl HeaderRef { if !pager.db_state.get().is_initialized() { return Err(LimboError::Page1NotAlloc); } - - let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID as i64)?; - *pager.header_ref_state.write() = HeaderRefState::CreateHeader { page }; - if let Some(c) = c { - io_yield_one!(c); + match pager.read_page(DatabaseHeader::PAGE_ID as i64)? { + IOResult::Done((page, c)) => { + *pager.header_ref_state.write() = HeaderRefState::CreateHeader { page }; + if let Some(c) = c { + io_yield_one!(c); + } + } + IOResult::IO(c) => return Ok(IOResult::IO(c)), } } HeaderRefState::CreateHeader { page } => { @@ -102,10 +107,14 @@ impl HeaderRefMut { return Err(LimboError::Page1NotAlloc); } - let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID as i64)?; - *pager.header_ref_state.write() = HeaderRefState::CreateHeader { page }; - if let Some(c) = c { - io_yield_one!(c); + match pager.read_page(DatabaseHeader::PAGE_ID as i64)? { + IOResult::Done((page, c)) => { + *pager.header_ref_state.write() = HeaderRefState::CreateHeader { page }; + if let Some(c) = c { + io_yield_one!(c); + } + } + IOResult::IO(c) => return Ok(IOResult::IO(c)), } } HeaderRefState::CreateHeader { page } => { @@ -143,7 +152,11 @@ pub struct PageInner { /// /// Note that [PageCache::clear] evicts the pages even if pinned, so as long as /// we clear the page cache on errors, pins will not 'leak'. - pub pin_count: AtomicUsize, + /// + /// Packed { dirty_gen (upper bits) | pin_count (lower PIN_BITS) } + /// dirty_gen is used to track 'generations' of dirty pages, when we spill to disk we can + /// have TOCTOU issues if a page is dirtied again after being spilled but before being evicted. + pub pin_flags: AtomicUsize, /// The WAL frame number this page was loaded from (0 if loaded from main DB file) /// This tracks which version of the page we have in memory pub wal_tag: AtomicU64, @@ -173,6 +186,23 @@ pub fn unpack_tag_pair(tag: u64) -> (u64, u32) { (frame, epoch) } +const PIN_BITS: usize = 16; // up to 65535 nested pins +const PIN_MASK: usize = (1usize << PIN_BITS) - 1; +const GEN_SHIFT: usize = PIN_BITS; +const GEN_MAX: usize = usize::MAX >> GEN_SHIFT; + +#[inline] +fn pack_pin_flags(pin: usize, gen: usize) -> usize { + debug_assert!(pin <= PIN_MASK); + debug_assert!(gen <= GEN_MAX); + (gen << GEN_SHIFT) | pin +} + +#[inline] +fn unpack_pin_flags(val: usize) -> (usize /*pin*/, usize /*gen*/) { + (val & PIN_MASK, val >> GEN_SHIFT) +} + #[derive(Debug)] pub struct Page { pub inner: UnsafeCell, @@ -202,7 +232,7 @@ impl Page { flags: AtomicUsize::new(0), contents: None, id: id as usize, - pin_count: AtomicUsize::new(0), + pin_flags: AtomicUsize::new(0), wal_tag: AtomicU64::new(TAG_UNSET), }), } @@ -222,11 +252,11 @@ impl Page { } pub fn set_locked(&self) { - self.get().flags.fetch_or(PAGE_LOCKED, Ordering::Acquire); + self.get().flags.fetch_or(PAGE_LOCKED, Ordering::AcqRel); } pub fn clear_locked(&self) { - self.get().flags.fetch_and(!PAGE_LOCKED, Ordering::Release); + self.get().flags.fetch_and(!PAGE_LOCKED, Ordering::AcqRel); } pub fn is_dirty(&self) -> bool { @@ -236,13 +266,26 @@ impl Page { pub fn set_dirty(&self) { tracing::debug!("set_dirty(page={})", self.get().id); self.clear_wal_tag(); - self.get().flags.fetch_or(PAGE_DIRTY, Ordering::Release); + self.get().flags.fetch_or(PAGE_DIRTY, Ordering::AcqRel); + self.bump_gen(); + } + + #[inline] + fn set_dirty_gen(&self, new_gen: usize) { + self.get() + .pin_flags + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |old| { + let (pin, _gen) = unpack_pin_flags(old); + Some(pack_pin_flags(pin, new_gen)) + }) + .ok(); } pub fn clear_dirty(&self) { tracing::debug!("clear_dirty(page={})", self.get().id); - self.get().flags.fetch_and(!PAGE_DIRTY, Ordering::Release); + self.get().flags.fetch_and(!PAGE_DIRTY, Ordering::AcqRel); self.clear_wal_tag(); + self.set_dirty_gen(0); } pub fn is_loaded(&self) -> bool { @@ -250,12 +293,12 @@ impl Page { } pub fn set_loaded(&self) { - self.get().flags.fetch_or(PAGE_LOADED, Ordering::Release); + self.get().flags.fetch_or(PAGE_LOADED, Ordering::AcqRel); } pub fn clear_loaded(&self) { tracing::debug!("clear loaded {}", self.get().id); - self.get().flags.fetch_and(!PAGE_LOADED, Ordering::Release); + self.get().flags.fetch_and(!PAGE_LOADED, Ordering::AcqRel); } pub fn is_index(&self) -> bool { @@ -265,41 +308,66 @@ impl Page { } } + #[inline] /// Increment the pin count by 1. A pin count >0 means the page is pinned and not eligible for eviction from the page cache. pub fn pin(&self) { - self.get().pin_count.fetch_add(1, Ordering::SeqCst); - } - - /// Decrement the pin count by 1. If the count reaches 0, the page is no longer - /// pinned and is eligible for eviction from the page cache. - pub fn unpin(&self) { - let was_pinned = self.try_unpin(); - - turso_assert!( - was_pinned, - "Attempted to unpin page {} that was not pinned", - self.get().id - ); + self.get() + .pin_flags + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |old| { + let (pin, gen) = unpack_pin_flags(old); + if pin == PIN_MASK { + None + } else { + Some(pack_pin_flags(pin + 1, gen)) + } + }) + .expect("pin overflow"); } - /// Try to decrement the pin count by 1, but do nothing if it was already 0. - /// Returns true if the pin count was decremented. + #[inline] pub fn try_unpin(&self) -> bool { self.get() - .pin_count - .fetch_update(Ordering::Release, Ordering::SeqCst, |current| { - if current == 0 { + .pin_flags + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |old| { + let (pin, gen) = unpack_pin_flags(old); + if pin == 0 { None } else { - Some(current - 1) + Some(pack_pin_flags(pin - 1, gen)) } }) .is_ok() } - /// Returns true if the page is pinned and thus not eligible for eviction from the page cache. + #[inline] + pub fn unpin(&self) { + let ok = self.try_unpin(); + turso_assert!( + ok, + "Attempted to unpin page {} that was not pinned", + self.get().id + ); + } + + #[inline] pub fn is_pinned(&self) -> bool { - self.get().pin_count.load(Ordering::Acquire) > 0 + (self.get().pin_flags.load(Ordering::Acquire) & PIN_MASK) != 0 + } + + #[inline] + pub fn dirty_gen(&self) -> u64 { + (self.get().pin_flags.load(Ordering::Acquire) >> GEN_SHIFT) as u64 + } + + #[inline] + fn bump_gen(&self) { + let _ = self + .get() + .pin_flags + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |old| { + let (pin, gen) = unpack_pin_flags(old); + Some(pack_pin_flags(pin, gen.wrapping_add(1))) + }); } #[inline] @@ -551,6 +619,8 @@ pub struct Pager { pub(crate) io_ctx: RwLock, /// encryption is an opt-in feature. we will enable it only if the flag is passed enable_encryption: AtomicBool, + spill_info: RwLock, + spill_state: AtomicSpillState, } // SAFETY: This needs to be audited for thread safety. @@ -578,6 +648,40 @@ pub enum PagerCommitResult { Rollback, } +#[derive(Debug)] +struct SpillItem { + page: PinGuard, + id: usize, + // None for WAL path; Some for ephemeral. Set to true by IO completion if we actually cleared dirty. + cleared: Option>, +} +#[derive(Default, Debug)] +struct SpillInfo { + idx: usize, + just_submitted: usize, + items: Vec, + completion: Option, +} +impl SpillInfo { + fn clear(&mut self) { + self.idx = 0; + self.just_submitted = 0; + self.items.clear(); + self.completion = None; + } +} + +#[derive(Debug, AtomicEnum, Default, Clone, Copy)] +enum SpillState { + #[default] + Idle, + PrepareWal, + PrepareWalSync, + SubmitBatch, + WaitBatch, + DrainDirtySet, +} + #[derive(Debug, Clone)] enum AllocatePageState { Start, @@ -587,19 +691,23 @@ enum AllocatePageState { /// - If there are more trunk pages, use the current first trunk page as the new allocation, /// and set the next trunk page as the database's "first freelist trunk page". SearchAvailableFreeListLeaf { - trunk_page: PageRef, + trunk_page: PinGuard, current_db_size: u32, }, /// If a freelist leaf is found, reuse it for the page allocation and remove it from the trunk page. ReuseFreelistLeaf { - trunk_page: PageRef, - leaf_page: PageRef, + trunk_page: PinGuard, + leaf_page: PinGuard, number_of_freelist_leaves: u32, }, /// If a suitable freelist leaf is not found, allocate an entirely new page. AllocateNewPage { current_db_size: u32, }, + FinishAllocPage { + page: PinGuard, + new_db_size: u32, + }, } #[derive(Clone)] @@ -668,6 +776,8 @@ impl Pager { }), io_ctx: RwLock::new(IOContext::default()), enable_encryption: AtomicBool::new(false), + spill_info: RwLock::new(SpillInfo::default()), + spill_state: AtomicSpillState::new(SpillState::default()), }) } @@ -995,7 +1105,7 @@ impl Pager { ptrmap_pg_no ); - let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as i64)?; + let (ptrmap_page, c) = return_if_io!(self.read_page(ptrmap_pg_no as i64)); self.vacuum_state.write().ptrmap_get_state = PtrMapGetState::Deserialize { ptrmap_page, offset_in_ptrmap_page, @@ -1102,7 +1212,7 @@ impl Pager { offset_in_ptrmap_page ); - let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as i64)?; + let (ptrmap_page, c) = return_if_io!(self.read_page(ptrmap_pg_no as i64)); self.vacuum_state.write().ptrmap_put_state = PtrMapPutState::Deserialize { ptrmap_page, offset_in_ptrmap_page, @@ -1268,7 +1378,6 @@ impl Pager { /// Allocate a new overflow page. /// This is done when a cell overflows and new space is needed. - // FIXME: handle no room in page cache pub fn allocate_overflow_page(&self) -> Result> { let page = return_if_io!(self.allocate_page()); tracing::debug!("Pager::allocate_overflow_page(id={})", page.get().id); @@ -1283,7 +1392,6 @@ impl Pager { /// Allocate a new page to the btree via the pager. /// This marks the page as dirty and writes the page header. - // FIXME: handle no room in page cache pub fn do_allocate_page( &self, page_type: PageType, @@ -1562,7 +1670,7 @@ impl Pager { /// Reads a page from the database. #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn read_page(&self, page_idx: i64) -> Result<(PageRef, Option)> { + pub fn read_page(&self, page_idx: i64) -> Result)>> { assert!(page_idx >= 0, "pages in pager should be positive, negative might indicate unallocated pages from mvcc or any other nasty bug"); tracing::debug!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write(); @@ -1574,7 +1682,7 @@ impl Pager { "attempted to read page {page_idx} but got page {}", page.get().id ); - return Ok((page.clone(), None)); + return Ok(IOResult::Done((page.clone(), None))); } let (page, c) = self.read_page_no_cache(page_idx, None, false)?; turso_assert!( @@ -1582,8 +1690,12 @@ impl Pager { "attempted to read page {page_idx} but got page {}", page.get().id ); - self.cache_insert(page_idx as usize, page.clone(), &mut page_cache)?; - Ok((page, Some(c))) + match self.cache_insert(page_idx as usize, page.clone(), &mut page_cache)? { + IOResult::Done(page) => Ok(IOResult::Done((page, Some(c)))), + IOResult::IO(IOCompletions::Single(c)) => { + io_yield_one!(c); + } + } } fn begin_read_disk_page( @@ -1608,18 +1720,217 @@ impl Pager { page_idx: usize, page: PageRef, page_cache: &mut PageCache, - ) -> Result<()> { - let page_key = PageCacheKey::new(page_idx); - match page_cache.insert(page_key, page.clone()) { - Ok(_) => {} + ) -> Result> { + let key = PageCacheKey::new(page_idx); + // If a spill is already in flight, advance it first and propagate IO. + while !matches!(self.spill_state.get(), SpillState::Idle) { + return_if_io!(self.spill_step()); + } + // Try to insert now that we’re either idle or a previous step completed + match page_cache.insert(key, page.clone()) { + Ok(existing) => Ok(IOResult::Done(existing)), Err(CacheError::KeyExists) => { unreachable!("Page should not exist in cache after get() miss") } - Err(e) => return Err(e.into()), + // The page cache will only return FullCanSpill if PageCache::has_spillable(), + Err(CacheError::FullCanSpill) => { + // if we are unable to aggressively evict a clean page, than we initialize spilling + if page_cache.evict_clean_aggressive(1).is_err() { + let candidates = page_cache.compute_spill_candidates(); + if matches!(self.spill_state.get(), SpillState::Idle) { + self.spill_start(candidates)?; + } + match self.spill_step()? { + IOResult::IO(c) => Ok(IOResult::IO(c)), + IOResult::Done(_) => { + // single retry; if still FullCanSpill, fall back to branch + match page_cache.insert(key, page.clone()) { + Ok(p) => Ok(IOResult::Done(p)), + Err(CacheError::FullCanSpill) => Err(LimboError::Busy), + Err(e) => Err(e.into()), + } + } + } + } else { + page_cache + .insert(key, page.clone()) + .map(IOResult::Done) + .map_err(Into::into) + } + } + Err(e) => Err(e.into()), + } + } + + fn spill_start(&self, candidates: Vec) -> Result<()> { + if candidates.is_empty() { + return Ok(()); + } + let mut si = self.spill_info.write(); + + if !matches!(self.spill_state.get(), SpillState::Idle) && !si.items.is_empty() { + si.items.extend(candidates.into_iter().map(|pg| SpillItem { + id: pg.get().id, + page: pg, + cleared: None, + })); + return Ok(()); } + *si = SpillInfo { + items: candidates + .into_iter() + .map(|pg| SpillItem { + id: pg.get().id, + page: pg, + cleared: None, + }) + .collect(), + idx: 0, + just_submitted: 0, + completion: None, + }; + if self.wal.is_some() { + self.spill_state.set(SpillState::PrepareWal); + } else { + self.spill_state.set(SpillState::SubmitBatch); + } + tracing::debug!("spilling started with {} pages", si.items.len()); Ok(()) } + // Cache spill state machine step, returns the number of pages successfully spilled. + fn spill_step(&self) -> Result> { + loop { + let state = self.spill_state.get(); + tracing::debug!("spill_step(state={:?})", state); + match state { + SpillState::Idle => return Ok(IOResult::Done(0)), + SpillState::PrepareWal => { + // Only reachable if wal.is_some() + let wal = self.wal.as_ref().ok_or_else(|| { + LimboError::InternalError("PrepareWal without WAL".into()) + })?; + let copt = wal + .borrow_mut() + .prepare_wal_start(self.get_page_size_unchecked())?; + if let Some(c) = copt { + self.spill_state.set(SpillState::PrepareWalSync); + if !c.succeeded() { + io_yield_one!(c); + } + } else { + self.spill_state.set(SpillState::SubmitBatch); + } + } + SpillState::PrepareWalSync => { + let wal = self.wal.as_ref().ok_or_else(|| { + LimboError::InternalError("PrepareWalSync without WAL".into()) + })?; + let c = wal.borrow_mut().prepare_wal_finish()?; + self.spill_state.set(SpillState::SubmitBatch); + if !c.succeeded() { + io_yield_one!(c); + } + } + SpillState::SubmitBatch => { + let wal_opt = self.wal.as_ref(); + let (start, pages_chunk): (usize, Vec) = { + let mut si = self.spill_info.write(); + if si.idx >= si.items.len() { + self.spill_state.set(SpillState::DrainDirtySet); + continue; + } + let remaining = si.items.len() - si.idx; + let chunk = si.items[si.idx..si.idx + remaining] + .iter() + .map(|it| it.page.to_page()) + .collect(); + si.just_submitted = remaining; + (si.idx, chunk) + }; + if let Some(wal) = wal_opt { + tracing::debug!("spilling {} pages to WAL", pages_chunk.len()); + let c = wal.borrow_mut().append_frames_vectored( + pages_chunk, + self.get_page_size_unchecked(), + None, + )?; + self.spill_state.set(SpillState::WaitBatch); + if !c.succeeded() { + io_yield_one!(c); + } + } else { + let mut group = CompletionGroup::new(|_| {}); + tracing::debug!("spilling {} pages to ephemeral DB", pages_chunk.len()); + for (i, p) in pages_chunk.iter().enumerate() { + let gen = p.dirty_gen(); + let snap = { + let contents = p.get_contents(); + Arc::new(Buffer::new(contents.buffer.as_slice().to_vec())) + }; + let can_clear = Arc::new(AtomicBool::new(false)); + self.spill_info.write().items[start + i].cleared = + Some(can_clear.clone()); + let c = begin_write_btree_page_with_snapshot( + self, p, snap, can_clear, gen, + )?; + group.add(&c); + } + self.spill_info.write().completion = Some(group.build()); + self.spill_state.set(SpillState::WaitBatch); + } + } + SpillState::WaitBatch => { + if self.wal.is_none() { + if let Some(c) = self.spill_info.read().completion.clone() { + if !c.succeeded() { + io_yield_one!(c); + } + } + } + let done_all = { + let mut si = self.spill_info.write(); + let end = si.idx + si.just_submitted; + si.idx = end; + if self.wal.is_none() { + si.completion = None; + } + si.idx >= si.items.len() + }; + self.spill_state.set(if done_all { + SpillState::DrainDirtySet + } else { + SpillState::SubmitBatch + }); + } + SpillState::DrainDirtySet => { + // Remove IDs from global dirty set and reset + let cleaned = { + let mut si = self.spill_info.write(); + let mut dirty = self.dirty_pages.write(); + if self.wal.is_some() { + for it in &si.items { + dirty.remove(&it.id); + } + } else { + for it in &si.items { + if it.cleared.as_ref().unwrap().load(Ordering::Acquire) { + dirty.remove(&it.id); + it.page.clear_dirty(); + } + } + } + let cleaned = si.idx; + si.clear(); + cleaned + }; + self.spill_state.set(SpillState::Idle); + return Ok(IOResult::Done(cleaned)); + } + } + } + } + // Get a page from the cache, if it exists. pub fn cache_get(&self, page_idx: usize) -> Result> { tracing::trace!("read_page(page_idx = {})", page_idx); @@ -2279,7 +2590,7 @@ impl Pager { ))); } - let (page, _c) = match page.take() { + let page = match page.take() { Some(page) => { assert_eq!( page.get().id, @@ -2292,12 +2603,19 @@ impl Pager { let page_contents = page.get_contents(); page_contents.overflow_cells.clear(); } - (page, None) - } - None => { - let (page, c) = self.read_page(page_id as i64)?; - (page, Some(c)) + page } + None => match self.read_page(page_id as i64)? { + IOResult::Done((page, c)) => { + if let Some(c) = c { + if !c.succeeded() { + io_yield_one!(c); + } + } + page + } + IOResult::IO(c) => return Ok(IOResult::IO(c)), + }, }; header.freelist_pages = (header.freelist_pages.get() + 1).into(); @@ -2311,12 +2629,17 @@ impl Pager { } FreePageState::AddToTrunk { page } => { let trunk_page_id = header.freelist_trunk_page.get(); - let (trunk_page, c) = self.read_page(trunk_page_id as i64)?; - if let Some(c) = c { - if !c.succeeded() { - io_yield_one!(c); + let trunk_page = match self.read_page(trunk_page_id as i64)? { + IOResult::Done((page, c)) => { + if let Some(c) = c { + if !c.succeeded() { + io_yield_one!(c); + } + } + page } - } + IOResult::IO(c) => return Ok(IOResult::IO(c)), + }; turso_assert!(trunk_page.is_loaded(), "trunk_page should be loaded"); let trunk_page_contents = trunk_page.get_contents(); @@ -2493,9 +2816,12 @@ impl Pager { new_db_size += 1; let page = allocate_new_page(new_db_size as i64, &self.buffer_pool, 0); self.add_dirty(&page)?; - let page_key = PageCacheKey::new(page.get().id as usize); + let page_key = page.get().id as usize; let mut cache = self.page_cache.write(); - cache.insert(page_key, page.clone())?; + match self.cache_insert(page_key, page.clone(), &mut cache)? { + IOResult::IO(c) => return Ok(IOResult::IO(c)), + IOResult::Done(_) => { /* inserted successfully */ } + } } } @@ -2506,13 +2832,16 @@ impl Pager { }; continue; } - let (trunk_page, c) = self.read_page(first_freelist_trunk_page_id as i64)?; + let (trunk_page, c) = + return_if_io!(self.read_page(first_freelist_trunk_page_id as i64)); *state = AllocatePageState::SearchAvailableFreeListLeaf { - trunk_page, + trunk_page: PinGuard::new(trunk_page), current_db_size: new_db_size, }; if let Some(c) = c { - io_yield_one!(c); + if !c.succeeded() { + io_yield_one!(c); + } } } AllocatePageState::SearchAvailableFreeListLeaf { @@ -2536,17 +2865,16 @@ impl Pager { let page_contents = trunk_page.get_contents(); let next_leaf_page_id = page_contents.read_u32_no_offset(FREELIST_TRUNK_OFFSET_FIRST_LEAF); - let (leaf_page, c) = self.read_page(next_leaf_page_id as i64)?; - + let (leaf_page, c) = + return_if_io!(self.read_page(next_leaf_page_id as i64)); turso_assert!( number_of_freelist_leaves > 0, "Freelist trunk page {} has no leaves", trunk_page.get().id ); - *state = AllocatePageState::ReuseFreelistLeaf { trunk_page: trunk_page.clone(), - leaf_page, + leaf_page: PinGuard::new(leaf_page), number_of_freelist_leaves, }; if let Some(c) = c { @@ -2585,9 +2913,9 @@ impl Pager { trunk_page.get().id ); } - let trunk_page = trunk_page.clone(); + let page = trunk_page.to_page(); *state = AllocatePageState::Start; - return Ok(IOResult::Done(trunk_page)); + return Ok(IOResult::Done(page)); } AllocatePageState::ReuseFreelistLeaf { trunk_page, @@ -2639,9 +2967,8 @@ impl Pager { remaining_leaves_count as u32, ); self.add_dirty(trunk_page)?; - header.freelist_pages = (header.freelist_pages.get() - 1).into(); - let leaf_page = leaf_page.clone(); + let leaf_page = leaf_page.to_page(); *state = AllocatePageState::Start; return Ok(IOResult::Done(leaf_page)); } @@ -2674,17 +3001,23 @@ impl Pager { // FIXME: should reserve page cache entry before modifying the database let page = allocate_new_page(new_db_size as i64, &self.buffer_pool, 0); + // setup page and add to cache + self.add_dirty(&page)?; + *state = AllocatePageState::FinishAllocPage { + page: PinGuard::new(page), + new_db_size, + }; + } + AllocatePageState::FinishAllocPage { page, new_db_size } => { { - // setup page and add to cache - self.add_dirty(&page)?; - - let page_key = PageCacheKey::new(page.get().id as usize); - { + let page_key = page.get().id; + let page = { // Run in separate block to avoid deadlock on page cache write lock let mut cache = self.page_cache.write(); - cache.insert(page_key, page.clone())?; - } - header.database_size = new_db_size.into(); + return_if_io!(self.cache_insert(page_key, page.to_page(), &mut cache)) + }; + header.database_size = (*new_db_size).into(); + let page = page.clone(); *state = AllocatePageState::Start; return Ok(IOResult::Done(page)); } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 887ed23e04..5c4c78d217 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -950,6 +950,59 @@ pub fn finish_read_page(page_idx: usize, buffer_ref: Arc, page: PageRef) } } +/// Write `snap` buffer (captured at submit-time) to `page`'s location in the main DB file. +/// On completion, clear dirty *only if* the page still has the same dirty generation +/// that it had at submit time, meaning it hasnt been modified in-memory since we submitted the write. +/// This is used for cache-spilling to ephemeral DB files, where pages are typically very hot and +/// we have to be sure we are not improperly clearing the dirty flag. +pub fn begin_write_btree_page_with_snapshot( + pager: &Pager, + page: &PageRef, + snap: Arc, + cb: Arc, + gen_at_submit: u64, +) -> Result { + tracing::trace!( + "begin_write_btree_page_with_snapshot(page={})", + page.get().id + ); + let page_source = &pager.db_file; + let page_finish = page.clone(); + + let page_id = page.get().id; + let buf_len = snap.len() as i32; + + let write_complete = { + Box::new(move |res: Result| { + let Ok(bytes_written) = res else { + return; + }; + tracing::trace!( + "finish_write_btree_page_with_snapshot(page={})", + page_finish.get().id + ); + turso_assert!( + bytes_written == buf_len, + "wrote({bytes_written}) != expected({buf_len})" + ); + // Clear dirty only if the page didn't change since submit. + if page_finish.dirty_gen() == gen_at_submit { + cb.store(true, Ordering::Release); + tracing::trace!( + "Page {page_id} spilled, is_pinned={}, is_locked={}, gen={}", + page_finish.is_pinned(), + page_finish.is_locked(), + page_finish.dirty_gen(), + ); + } + }) + }; + + let c = Completion::new_write(write_complete); + let io_ctx = pager.io_ctx.read(); + page_source.write_page(page_id, snap, &io_ctx, c) +} + #[instrument(skip_all, level = Level::DEBUG)] pub fn begin_write_btree_page(pager: &Pager, page: &PageRef) -> Result { tracing::trace!("begin_write_btree_page(page={})", page.get().id); From 93b1e55e01aa15c9fde45ae6a9d4a03fafaaa44f Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Thu, 20 Nov 2025 21:59:34 -0500 Subject: [PATCH 2/3] Fix tests and improve algo to collect spillable pages --- core/storage/btree.rs | 17 +++++------ core/storage/page_cache.rs | 60 +++++++++++++++++++++++++++++--------- 2 files changed, 55 insertions(+), 22 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 5c1fd01fb6..11f3709292 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -8006,8 +8006,7 @@ mod tests { let cursor = BTreeCursor::new_table(pager.clone(), page_idx, num_columns); let page = match cursor.read_page(page_idx).unwrap() { IOResult::IO(IOCompletions::Single(c)) => { - pager.io.wait_for_completion(c); - // TODO + pager.io.wait_for_completion(c).unwrap(); return validate_btree(pager.clone(), page_idx); } IOResult::Done((page, _c)) => { @@ -8034,14 +8033,14 @@ mod tests { left_child_page, .. }) => { match cursor.read_page(left_child_page as i64).unwrap() { - IOResult::Done((child_page, c)) => { + IOResult::Done((child_page, _c)) => { while child_page.is_locked() { pager.io.step().unwrap(); } child_pages.push(child_page); } IOResult::IO(IOCompletions::Single(c)) => { - pager.io.wait_for_completion(c); + pager.io.wait_for_completion(c).unwrap(); } } if left_child_page == page.get().id as u32 { @@ -9692,7 +9691,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &PinGuard::new(page), + &PinGuard::new(page.clone()), Some(i as i64), &mut payload, cell_idx, @@ -9774,7 +9773,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &PinGuard::new(page), + &PinGuard::new(page.clone()), Some(i), &mut payload, cell_idx, @@ -10147,7 +10146,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &PinGuard::new(page), + &PinGuard::new(page.clone()), Some(0), &mut payload, 0, @@ -10233,7 +10232,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &PinGuard::new(page), + &PinGuard::new(page.clone()), Some(0), &mut payload, 0, @@ -10639,7 +10638,7 @@ mod tests { run_until_done( || { fill_cell_payload( - &PinGuard::new(page), + &PinGuard::new(page.clone()), Some(cell_idx as i64), &mut payload, cell_idx as usize, diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 4db123eeb4..dc7a453d98 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -18,7 +18,9 @@ pub struct PageCacheKey(usize); const CLEAR: u8 = 0; const REF_MAX: u8 = 3; -const HOT_PAGE_THRESHOLD: u64 = 5; + +/// semi-arbitrary heuristic to determine pages which are frequently touched. +const HOT_PAGE_THRESHOLD: u64 = 6; /// An entry in the page cache. /// @@ -186,6 +188,9 @@ impl PageCache { } #[inline] + /// Returns true if the page qualifies to be spilled to disk. + /// + /// Does not enforce any hot-page policy; callers can layer that separately. fn can_spill_page(p: &PageRef) -> bool { p.is_dirty() && !p.is_pinned() @@ -198,30 +203,51 @@ impl PageCache { .as_ref() .is_some_and(|c| c.overflow_cells.is_empty()) && p.get().id.ne(&DatabaseHeader::PAGE_ID) // never spill page 1 - // dirty_gen is increased every time `mark_dirty` is called on a PageRef, dont spill hot pages - && p.dirty_gen() <= HOT_PAGE_THRESHOLD } - /// Collect candidates for spilling to disk when cache under pressure + /// Collect candidates for spilling to disk when cache under pressure. pub fn compute_spill_candidates(&mut self) -> Vec { const SPILL_BATCH: usize = 128; if self.len() == 0 || self.clock_hand.is_null() { return Vec::new(); } - let mut out = Vec::with_capacity(SPILL_BATCH); + // Collect all spillable pages and sort by a composite key: + // cold first (dirty_gen <= HOT_PAGE_THRESHOLD), then lower ref_bit, + // then lower dirty_gen, then lower page id. This keeps the coldest, + // safest pages at the front while still allowing hot pages to be chosen + // when we are fully saturated. + let mut candidates: Vec<(bool, u8, u64, usize, PinGuard)> = Vec::new(); let start = self.clock_hand; let mut ptr = start; loop { let entry = unsafe { &mut *ptr }; let page = &entry.page; - // On the first pass, only spill if ref_bit is CLEAR, we that we can ensure spilling the coldest pages. - // For the second pass, we can be more lenient and accept pages with ref_bit < REF_MAX. - if Self::can_spill_page(page) { - out.push(PinGuard::new(page.clone())); - if out.len() >= SPILL_BATCH { + if !Self::can_spill_page(page) { + let mut cur = unsafe { self.queue.cursor_mut_from_ptr(ptr) }; + cur.move_next(); + if let Some(next) = cur.get() { + ptr = next as *const _ as *mut PageCacheEntry; + } else if let Some(front) = self.queue.front_mut().get() { + ptr = front as *const _ as *mut PageCacheEntry; + } else { + break; + } + if ptr == start { break; } + continue; } + + let gen = page.dirty_gen(); + let is_hot = gen > HOT_PAGE_THRESHOLD; + let meta = ( + is_hot, + entry.ref_bit, + gen, + page.get().id, + PinGuard::new(page.clone()), + ); + candidates.push(meta); let mut cur = unsafe { self.queue.cursor_mut_from_ptr(ptr) }; cur.move_next(); if let Some(next) = cur.get() { @@ -235,7 +261,15 @@ impl PageCache { break; } } - out + candidates.sort_by_key(|(is_hot, ref_bit, dirty_gen, id, _)| { + (*is_hot, *ref_bit, *dirty_gen, *id) + }); + + candidates + .into_iter() + .take(SPILL_BATCH) + .map(|(_, _, _, _, pg)| pg) + .collect() } #[inline] @@ -796,7 +830,7 @@ mod tests { // Inserting same page instance should return KeyExists error let result = cache.insert(key1, page1_v2.clone()); - assert_eq!(result, Err(CacheError::KeyExists)); + assert!(matches!(result, Err(CacheError::KeyExists))); assert_eq!(cache.len(), 1); // Verify the page is still accessible @@ -957,7 +991,7 @@ mod tests { let page3 = page_with_content(3); let result = cache.insert(key3, page3); - assert_eq!(result, Err(CacheError::Full)); + assert!(matches!(result, Err(CacheError::Full))); assert_eq!(cache.len(), 2); cache.verify_cache_integrity(); } From 83aeb5a1800e59cc95094173f483b49821830e0c Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 21 Nov 2025 22:18:04 -0500 Subject: [PATCH 3/3] Make re-entrant in btree now that cache insert returns IO --- core/storage/btree.rs | 115 +++++++++++++++++++++++++++++++++--------- 1 file changed, 92 insertions(+), 23 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 11f3709292..a69b032db3 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -506,6 +506,14 @@ pub enum CursorValidState { RequireAdvance(IterationDirection), } +#[derive(Debug, Clone)] +struct PendingChild { + page_id: i64, + push_backwards: bool, + set_cell_index: Option, + retreat_parent: bool, +} + #[derive(Debug)] /// State used for seeking pub enum CursorSeekState { @@ -674,6 +682,8 @@ pub struct BTreeCursor { /// Advancing is only skipped if the cursor is currently pointing to a valid record /// when next() is called. pub skip_advance: Cell, + /// Pending child page read that must complete before mutating the stack (keeps loops re-entrant on IO). + pending_child: RefCell>, } /// We store the cell index and cell count for each page in the stack. @@ -699,6 +709,40 @@ impl BTreeNodeState { } impl BTreeCursor { + /// Load a pending child page if present, applying deferred stack mutations only after the read completes. + fn load_pending_child(&mut self) -> Result> { + let Some(pending) = self.pending_child.borrow_mut().take() else { + return Ok(IOResult::Done(())); + }; + + match self.read_page(pending.page_id)? { + IOResult::Done((page, c)) => { + if let Some(c) = c { + if !c.succeeded() { + *self.pending_child.borrow_mut() = Some(pending); + return Ok(IOResult::IO(IOCompletions::Single(c))); + } + } + if pending.retreat_parent { + self.stack.retreat(); + } + if let Some(idx) = pending.set_cell_index { + self.stack.set_cell_index(idx); + } + if pending.push_backwards { + self.stack.push_backwards(page); + } else { + self.stack.push(page); + } + Ok(IOResult::Done(())) + } + IOResult::IO(c) => { + *self.pending_child.borrow_mut() = Some(pending); + Ok(IOResult::IO(c)) + } + } + } + pub fn new(pager: Arc, root_page: i64, num_columns: usize) -> Self { let valid_state = if root_page == 1 && !pager.db_state.get().is_initialized() { CursorValidState::Invalid @@ -741,6 +785,7 @@ impl BTreeCursor { seek_end_state: SeekEndState::Start, move_to_state: MoveToState::Start, skip_advance: Cell::new(false), + pending_child: RefCell::new(None), } } @@ -802,6 +847,9 @@ impl BTreeCursor { #[instrument(skip(self), level = Level::DEBUG, name = "prev")] pub fn get_prev_record(&mut self) -> Result> { loop { + if let IOResult::IO(c) = self.load_pending_child()? { + return Ok(IOResult::IO(c)); + } let (old_top_idx, page_type, is_index, is_leaf, cell_count) = { let page = self.stack.top_ref(); let contents = page.get_contents(); @@ -822,12 +870,12 @@ impl BTreeCursor { let rightmost_pointer = self.stack.top_ref().get_contents().rightmost_pointer(); if let Some(rightmost_pointer) = rightmost_pointer { let past_rightmost_pointer = cell_count as i32 + 1; - self.stack.set_cell_index(past_rightmost_pointer); - let (page, c) = return_if_io!(self.read_page(rightmost_pointer as i64)); - self.stack.push_backwards(page); - if let Some(c) = c { - io_yield_one!(c); - } + *self.pending_child.borrow_mut() = Some(PendingChild { + page_id: rightmost_pointer as i64, + push_backwards: true, + set_cell_index: Some(past_rightmost_pointer), + retreat_parent: false, + }); continue; } } @@ -892,7 +940,14 @@ impl BTreeCursor { // this parent: key 666 // left child has: key 663, key 664, key 665 // we need to move to the previous parent (with e.g. key 662) when iterating backwards. - self.stack.retreat(); + // Defer retreat until after the child page load completes to keep this re-entrant. + self.pending_child.borrow_mut().replace(PendingChild { + page_id: left_child_page as i64, + push_backwards: true, + set_cell_index: None, + retreat_parent: true, + }); + continue; } let (mem_page, c) = return_if_io!(self.read_page(left_child_page as i64)); @@ -914,17 +969,23 @@ impl BTreeCursor { ) -> Result> { loop { if self.read_overflow_state.borrow().is_none() { - let (page, c) = return_if_io!(self.read_page(start_next_page as i64)); - *self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow { - payload: payload.to_vec(), - next_page: start_next_page, - remaining_to_read: payload_size as usize - payload.len(), - page, - }); - if let Some(c) = c { - io_yield_one!(c); + match self.read_page(start_next_page as i64)? { + IOResult::Done((page, c)) => { + if let Some(c) = c { + if !c.succeeded() { + return Ok(IOResult::IO(IOCompletions::Single(c))); + } + } + *self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow { + payload: payload.to_vec(), + next_page: start_next_page, + remaining_to_read: payload_size as usize - payload.len(), + page, + }); + continue; + } + IOResult::IO(c) => return Ok(IOResult::IO(c)), } - continue; } let mut read_overflow_state = self.read_overflow_state.borrow_mut(); let ReadPayloadOverflow { @@ -946,13 +1007,21 @@ impl BTreeCursor { *remaining_to_read -= to_read; if *remaining_to_read != 0 && next != 0 { - let (new_page, c) = return_if_io!(self.pager.read_page(next as i64)); - *page = new_page; - *next_page = next; - if let Some(c) = c { - io_yield_one!(c); + match self.pager.read_page(next as i64)? { + IOResult::Done((new_page, c)) => { + *page = new_page; + *next_page = next; + if let Some(c) = c { + io_yield_one!(c); + } + continue; + } + IOResult::IO(c) => { + // Preserve progress and retry after IO completes. + *next_page = next; + return Ok(IOResult::IO(c)); + } } - continue; } turso_assert!( *remaining_to_read == 0 && next == 0,