From 65b302ff8a117038a0c74fc5183b74b453ce788c Mon Sep 17 00:00:00 2001 From: Duy Dang <55247256+ddwalias@users.noreply.github.com> Date: Sun, 2 Nov 2025 04:03:38 +0700 Subject: [PATCH 1/4] simulator: pre-initialize autovacuum database --- simulator/profiles/mod.rs | 5 +++++ simulator/runner/env.rs | 26 ++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/simulator/profiles/mod.rs b/simulator/profiles/mod.rs index e4ea1dc061..a849907f2a 100644 --- a/simulator/profiles/mod.rs +++ b/simulator/profiles/mod.rs @@ -28,6 +28,9 @@ pub struct Profile { #[garde(skip)] /// Experimental MVCC feature pub experimental_mvcc: bool, + #[garde(skip)] + /// Whether to seed simulator databases with a SQLite-generated auto-vacuum file. + pub seed_sqlite_autovacuum: bool, #[garde(range(min = 1, max = 64))] pub max_connections: usize, #[garde(dive)] @@ -40,6 +43,7 @@ impl Default for Profile { fn default() -> Self { Self { experimental_mvcc: false, + seed_sqlite_autovacuum: true, max_connections: 10, io: Default::default(), query: Default::default(), @@ -118,6 +122,7 @@ impl Profile { }, experimental_mvcc: true, max_connections: 2, + seed_sqlite_autovacuum: true, }; profile.validate().unwrap(); profile diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index 23267519bc..d0d6a528e4 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -235,6 +235,10 @@ impl SimulatorEnv { } self.db = None; + if self.profile.seed_sqlite_autovacuum { + self.seed_sqlite_autovacuum(&db_path); + } + let db = match Database::open_file( io.clone(), db_path.to_str().unwrap(), @@ -251,6 +255,28 @@ impl SimulatorEnv { self.db = Some(db); } + fn seed_sqlite_autovacuum(&self, db_path: &Path) { + let conn = rusqlite::Connection::open(db_path).unwrap_or_else(|e| { + panic!( + "failed to open sqlite connection while seeding auto-vacuum database at {}: {e}", + db_path.display() + ) + }); + conn.execute_batch( + "PRAGMA auto_vacuum=FULL;\ + VACUUM;\ + CREATE TABLE IF NOT EXISTS __sim_autovacuum_seed__(id INTEGER PRIMARY KEY, value TEXT);\ + INSERT OR IGNORE INTO __sim_autovacuum_seed__ VALUES (1,'seed');", + ) + .unwrap_or_else(|e| { + panic!( + "failed to seed sqlite auto-vacuum database at {}: {e}", + db_path.display() + ) + }); + drop(conn); + } + pub(crate) fn get_db_path(&self) -> PathBuf { self.paths.db(&self.type_, &self.phase) } From 9639769b147bf8f7bfbe1489437cace5ac53be72 Mon Sep 17 00:00:00 2001 From: Duy Dang <55247256+ddwalias@users.noreply.github.com> Date: Sun, 2 Nov 2025 04:08:25 +0700 Subject: [PATCH 2/4] Add pre-initialize autovacuum integration test --- tests/integration/common.rs | 25 ++++++++++++++++++++ tests/integration/storage/auto_vacuum.rs | 29 ++++++++++++++++++++++++ tests/integration/storage/mod.rs | 1 + 3 files changed, 55 insertions(+) create mode 100644 tests/integration/storage/auto_vacuum.rs diff --git a/tests/integration/common.rs b/tests/integration/common.rs index a8a37077a9..e95e8489f6 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -87,6 +87,31 @@ impl TempDatabase { } } + pub fn new_with_autovacuum_full(sql: &str) -> Self { + let mut path = TempDir::new().unwrap().keep(); + path.push("test.db"); + { + let connection = rusqlite::Connection::open(&path).unwrap(); + connection.pragma_update(None, "page_size", 4096).unwrap(); + connection.pragma_update(None, "auto_vacuum", 1).unwrap(); + connection.execute_batch("VACUUM;").unwrap(); + connection.execute_batch(sql).unwrap(); + } + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let db = Database::open_file_with_flags( + io.clone(), + path.to_str().unwrap(), + turso_core::OpenFlags::default(), + turso_core::DatabaseOpts::new() + .with_indexes(true) + .with_index_method(true), + None, + ) + .unwrap(); + + Self { path, io, db } + } + pub fn new_with_rusqlite(table_sql: &str) -> Self { let mut path = TempDir::new().unwrap().keep(); path.push("test.db"); diff --git a/tests/integration/storage/auto_vacuum.rs b/tests/integration/storage/auto_vacuum.rs new file mode 100644 index 0000000000..dcec6f54bf --- /dev/null +++ b/tests/integration/storage/auto_vacuum.rs @@ -0,0 +1,29 @@ +use core_tester::common::{self, maybe_setup_tracing, rusqlite_integrity_check, TempDatabase}; +use turso_core::Row; + +#[test] + +fn test_autovacuum_pointer_map_integrity() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + maybe_setup_tracing(); + + let tmp_db = TempDatabase::new_with_autovacuum_full( + "CREATE TABLE seed_table(id INTEGER PRIMARY KEY, value TEXT);\ + INSERT INTO seed_table VALUES (1, 'seed');", + ); + let conn = tmp_db.connect_limbo(); + + common::run_query(&tmp_db, &conn, "CREATE TABLE third(a);")?; + for i in 1..=2000 { + common::run_query(&tmp_db, &conn, &format!("INSERT INTO third VALUES ({i});"))?; + } + + common::run_query_on_row(&tmp_db, &conn, "PRAGMA integrity_check;", |row: &Row| { + let res = row.get::(0).unwrap(); + assert_eq!(res, "ok"); + })?; + + rusqlite_integrity_check(tmp_db.path.as_path())?; + + Ok(()) +} diff --git a/tests/integration/storage/mod.rs b/tests/integration/storage/mod.rs index eb366ee729..c5062241c2 100644 --- a/tests/integration/storage/mod.rs +++ b/tests/integration/storage/mod.rs @@ -1,2 +1,3 @@ +mod auto_vacuum; #[cfg(feature = "checksum")] mod checksum; From 0bbe40eb09cf87f0f7cebb687985a6031047a502 Mon Sep 17 00:00:00 2001 From: Duy Dang <55247256+ddwalias@users.noreply.github.com> Date: Fri, 21 Nov 2025 03:14:01 +0700 Subject: [PATCH 3/4] Honor existing autovacuum mode on open --- core/lib.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 399020ff18..3430441514 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -590,16 +590,14 @@ impl Database { AutoVacuumMode::None }; - // Force autovacuum to None if the experimental flag is not enabled - let final_mode = if !self.opts.enable_autovacuum { - if mode != AutoVacuumMode::None { - tracing::warn!( - "Database has autovacuum enabled but --experimental-autovacuum flag is not set. Forcing autovacuum to None." - ); - } - AutoVacuumMode::None - } else { + // TODO: remove behind experimental toggle once feature is fully supported. + // Do not disable autovacuum if the database header already has it enabled; otherwise we would + // corrupt pointer-map expectations for existing databases. The flag only controls enabling it + // when creating/rewriting databases, not honoring an already-enabled file. + let final_mode = if mode != AutoVacuumMode::None || self.opts.enable_autovacuum { mode + } else { + AutoVacuumMode::None }; pager.set_auto_vacuum_mode(final_mode); From 20c75bea58711a029088e2883e809b84bdd49108 Mon Sep 17 00:00:00 2001 From: Duy Dang <55247256+ddwalias@users.noreply.github.com> Date: Fri, 21 Nov 2025 03:53:45 +0700 Subject: [PATCH 4/4] Keep autovacuum ptrmap entries up to date during mutations --- core/storage/btree.rs | 336 +++++++++++++++++++++++++++++++++++++----- core/storage/pager.rs | 114 ++++++++++++++ 2 files changed, 415 insertions(+), 35 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 659dc22085..b87fc6836d 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1,6 +1,8 @@ use rustc_hash::FxHashMap; use tracing::{instrument, Level}; +#[cfg(not(feature = "omit_autovacuum"))] +use crate::storage::pager::{ptrmap, ptrmap::PtrmapType, AutoVacuumMode}; use crate::{ io::CompletionGroup, io_yield_one, @@ -374,6 +376,9 @@ struct BalanceInfo { sibling_count: usize, /// First divider cell to remove that marks the first sibling first_divider_cell: usize, + /// Parent page id of the balanced siblings (used for ptrmap updates) + #[cfg(not(feature = "omit_autovacuum"))] + parent_page_id: u32, } /// Holds the state machine for the operation that was in flight when the cursor @@ -2613,6 +2618,12 @@ impl BTreeCursor { .get_page_at_level(self.stack.current() - 1) .expect("parent page should be on the stack"); let parent_contents = parent.get_contents(); + #[cfg(not(feature = "omit_autovacuum"))] + return_if_io!(self.update_ptrmap( + new_rightmost_leaf.get().id as u32, + PtrmapType::BTreeNode, + parent.get().id as u32 + )); let rightmost_pointer = parent_contents .rightmost_pointer() .expect("parent should have a rightmost pointer"); @@ -2656,6 +2667,27 @@ impl BTreeCursor { parent_contents.write_rightmost_ptr(new_rightmost_leaf.get().id as u32); self.pager.add_dirty(parent)?; self.pager.add_dirty(&new_rightmost_leaf)?; + #[cfg(not(feature = "omit_autovacuum"))] + { + return_if_io!(BTreeCursor::update_child_ptrmaps_for_page( + &self.pager, + parent, + parent_contents, + usable_space + )); + return_if_io!(BTreeCursor::update_child_ptrmaps_for_page( + &self.pager, + old_rightmost_leaf, + old_rightmost_leaf_contents, + usable_space + )); + return_if_io!(BTreeCursor::update_child_ptrmaps_for_page( + &self.pager, + &new_rightmost_leaf, + new_rightmost_leaf_contents, + usable_space + )); + } // Continue balance from the parent page (inserting the new divider cell may have overflowed the parent) self.stack.pop(); @@ -2901,6 +2933,8 @@ impl BTreeCursor { divider_cell_payloads: [const { None }; MAX_SIBLING_PAGES_TO_BALANCE - 1], sibling_count, first_divider_cell: first_cell_divider, + #[cfg(not(feature = "omit_autovacuum"))] + parent_page_id: parent_page.get().id as u32, }); *sub_state = BalanceSubState::NonRootDoBalancing; let completion = group.build(); @@ -3431,6 +3465,14 @@ impl BTreeCursor { 0, BtreePageAllocMode::Any )); + #[cfg(not(feature = "omit_autovacuum"))] + if !matches!(pager.get_auto_vacuum_mode(), AutoVacuumMode::None) { + return_if_io!(pager.ptrmap_put( + page.get().id as u32, + PtrmapType::BTreeNode, + balance_info.parent_page_id + )); + } pages_to_balance_new[*i].replace(page); // Since this page didn't exist before, we can set it to cells length as it // marks them as empty since it is a prefix sum of cells. @@ -3469,43 +3511,17 @@ impl BTreeCursor { let parent_contents = parent_page.get_contents(); let mut sibling_count_new = *sibling_count_new; let is_table_leaf = matches!(page_type, PageType::TableLeaf); - // Reassign page numbers in increasing order + #[cfg(not(feature = "omit_autovacuum"))] { - let mut page_numbers: [usize; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE] = - [0; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE]; - for (i, page) in pages_to_balance_new - .iter() - .take(sibling_count_new) - .enumerate() - { - page_numbers[i] = page.as_ref().unwrap().get().id; - } - page_numbers.sort(); - for (page, new_id) in pages_to_balance_new - .iter() - .take(sibling_count_new) - .rev() - .zip(page_numbers.iter().rev().take(sibling_count_new)) - { - let page = page.as_ref().unwrap(); - if *new_id != page.get().id { - page.get().id = *new_id; - self.pager - .upsert_page_in_cache(*new_id, page.clone(), true)?; - } - } - - #[cfg(debug_assertions)] - { - tracing::debug!( - "balance_non_root(parent page_id={})", - parent_page.get().id - ); + let parent_id = parent_page.get().id as u32; + if !matches!(self.pager.get_auto_vacuum_mode(), AutoVacuumMode::None) { for page in pages_to_balance_new.iter().take(sibling_count_new) { - tracing::debug!( - "balance_non_root(new_sibling page_id={})", - page.as_ref().unwrap().get().id - ); + let page = page.as_ref().unwrap(); + return_if_io!(self.pager.ptrmap_put( + page.get().id as u32, + PtrmapType::BTreeNode, + parent_id + )); } } } @@ -3684,6 +3700,13 @@ impl BTreeCursor { ); } } + #[cfg(not(feature = "omit_autovacuum"))] + return_if_io!(BTreeCursor::update_child_ptrmaps_for_page( + &self.pager, + parent_page, + parent_contents, + usable_space, + )); /* 7. Start real movement of cells. Next comment is borrowed from SQLite: */ /* Now update the actual sibling pages. The order in which they are updated ** is important, as this code needs to avoid disrupting any page from which @@ -3772,11 +3795,34 @@ impl BTreeCursor { page_contents.cell_count() ); page_contents.overflow_cells.clear(); + #[cfg(not(feature = "omit_autovacuum"))] + return_if_io!(BTreeCursor::update_child_ptrmaps_for_page( + &self.pager, + page, + page_contents, + usable_space + )); done[page_idx] = true; } } + // Ensure ptrmaps for all balanced pages reflect their final contents, + // including any overflow chains that may have moved between pages. + #[cfg(not(feature = "omit_autovacuum"))] + if !matches!(self.pager.get_auto_vacuum_mode(), AutoVacuumMode::None) { + for page in pages_to_balance_new.iter().take(sibling_count_new) { + let page = page.as_ref().unwrap(); + let contents = page.get_contents(); + return_if_io!(BTreeCursor::update_child_ptrmaps_for_page( + &self.pager, + page, + contents, + usable_space, + )); + } + } + // TODO: vacuum support let first_child_page = pages_to_balance_new[0].as_ref().unwrap(); let first_child_contents = first_child_page.get_contents(); @@ -3826,6 +3872,14 @@ impl BTreeCursor { ..first_child_contents.offset + header_and_pointer_size], ); + #[cfg(not(feature = "omit_autovacuum"))] + return_if_io!(BTreeCursor::update_child_ptrmaps_for_page( + &self.pager, + parent_page, + parent_contents, + usable_space + )); + sibling_count_new -= 1; // decrease sibling count for debugging and free at the end assert!(sibling_count_new < balance_info.sibling_count); } @@ -4348,6 +4402,12 @@ impl BTreeCursor { let is_page_1 = root.get().id == 1; let offset = if is_page_1 { DatabaseHeader::SIZE } else { 0 }; + #[cfg(not(feature = "omit_autovacuum"))] + return_if_io!(self.update_ptrmap( + child.get().id as u32, + PtrmapType::BTreeNode, + root.get().id as u32 + )); tracing::debug!( "balance_root(root={}, rightmost={}, page_type={:?})", @@ -4386,6 +4446,13 @@ impl BTreeCursor { &mut root_contents.overflow_cells, ); root_contents.overflow_cells.clear(); + #[cfg(not(feature = "omit_autovacuum"))] + return_if_io!(BTreeCursor::update_child_ptrmaps_for_page( + &self.pager, + &child, + child_contents, + self.usable_space() + )); // 2. Modify root let new_root_page_type = match root_contents.page_type() { @@ -4402,6 +4469,13 @@ impl BTreeCursor { root_contents.write_fragmented_bytes_count(0); root_contents.overflow_cells.clear(); + #[cfg(not(feature = "omit_autovacuum"))] + return_if_io!(BTreeCursor::update_child_ptrmaps_for_page( + &self.pager, + &root, + root_contents, + self.usable_space() + )); self.root_page = root.get().id as i64; self.stack.clear(); self.stack.push(root); @@ -4877,6 +4951,89 @@ impl BTreeCursor { self.pager .do_allocate_page(page_type, offset, BtreePageAllocMode::Any) } + + #[cfg(not(feature = "omit_autovacuum"))] + fn update_ptrmap( + &self, + page_no: u32, + entry_type: PtrmapType, + parent_page_no: u32, + ) -> Result> { + if matches!(self.pager.get_auto_vacuum_mode(), AutoVacuumMode::None) { + return Ok(IOResult::Done(())); + } + self.pager.ptrmap_put(page_no, entry_type, parent_page_no) + } + + #[cfg(not(feature = "omit_autovacuum"))] + fn update_child_ptrmaps_for_page( + pager: &Arc, + parent_page: &PageRef, + contents: &PageContent, + usable_space: usize, + ) -> Result> { + if matches!(pager.get_auto_vacuum_mode(), AutoVacuumMode::None) { + return Ok(IOResult::Done(())); + } + let parent_id = parent_page.get().id as u32; + + let update_overflow_chain = |first: Option, owner: u32| -> Result> { + if let Some(mut page_no) = first { + return_if_io!(pager.ptrmap_put(page_no, PtrmapType::Overflow1, owner)); + loop { + let (overflow_page, c) = pager.read_page(page_no as i64)?; + let next = overflow_page.get_contents().read_u32_no_offset(0); + if let Some(c) = c { + io_yield_one!(c); + } + if next == 0 { + break; + } + return_if_io!(pager.ptrmap_put(next, PtrmapType::Overflow2, page_no)); + page_no = next; + } + } + Ok(IOResult::Done(())) + }; + + for cell_idx in 0..contents.cell_count() { + let cell = contents.cell_get(cell_idx, usable_space)?; + match cell { + BTreeCell::TableInteriorCell(cell) => { + return_if_io!(pager.ptrmap_put( + cell.left_child_page, + PtrmapType::BTreeNode, + parent_id + )); + } + BTreeCell::IndexInteriorCell(cell) => { + return_if_io!(pager.ptrmap_put( + cell.left_child_page, + PtrmapType::BTreeNode, + parent_id + )); + return_if_io!(update_overflow_chain(cell.first_overflow_page, parent_id)); + } + BTreeCell::TableLeafCell(cell) => { + return_if_io!(update_overflow_chain(cell.first_overflow_page, parent_id)); + } + BTreeCell::IndexLeafCell(cell) => { + return_if_io!(update_overflow_chain(cell.first_overflow_page, parent_id)); + } + } + } + + if matches!( + contents.page_type(), + PageType::TableInterior | PageType::IndexInterior + ) { + if let Some(rightmost_child) = contents.rightmost_pointer() { + return_if_io!(pager.ptrmap_put(rightmost_child, PtrmapType::BTreeNode, parent_id)); + } + } + + Ok(IOResult::Done(())) + } } impl CursorTrait for BTreeCursor { @@ -5780,6 +5937,20 @@ pub enum IntegrityCheckError { }, #[error("Page {page_id}: never used")] PageNeverUsed { page_id: i64 }, + #[cfg(not(feature = "omit_autovacuum"))] + #[error("Ptrmap entry missing for page {page_id}")] + PtrMapMissing { page_id: i64 }, + #[cfg(not(feature = "omit_autovacuum"))] + #[error( + "Ptrmap entry for page {page_id} mismatch: expected=({expected:?}, parent={expected_parent}), got=({actual:?}, parent={actual_parent})" + )] + PtrMapMismatch { + page_id: i64, + expected: PtrmapType, + expected_parent: u32, + actual: PtrmapType, + actual_parent: u32, + }, } #[derive(Debug, Clone, Copy, PartialEq)] @@ -5810,6 +5981,7 @@ pub struct IntegrityCheckState { pub db_size: usize, first_leaf_level: Option, pub page_reference: FxHashMap, + pub page_categories: FxHashMap, page: Option, pub freelist_count: CheckFreelist, } @@ -5820,6 +5992,7 @@ impl IntegrityCheckState { page_stack: Vec::new(), db_size, page_reference: FxHashMap::default(), + page_categories: FxHashMap::default(), first_leaf_level: None, page: None, freelist_count: CheckFreelist { @@ -5865,6 +6038,7 @@ impl IntegrityCheckState { errors: &mut Vec, ) { let page_id = entry.page_idx; + self.page_categories.insert(page_id, entry.page_category); let Some(previous) = self.page_reference.insert(page_id, referenced_by) else { self.page_stack.push(entry); return; @@ -5875,6 +6049,17 @@ impl IntegrityCheckState { references: vec![previous, referenced_by], }); } + + #[cfg(not(feature = "omit_autovacuum"))] + fn parent_category(&self, page_id: i64) -> Option { + let Some(parent_id) = self.page_reference.get(&page_id) else { + return None; + }; + if *parent_id == 0 { + return None; + } + self.page_categories.get(parent_id).copied() + } } impl std::fmt::Debug for IntegrityCheckState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -5921,9 +6106,76 @@ pub fn integrity_check( } }; turso_assert!(page.is_loaded(), "page should be loaded"); + + #[cfg(not(feature = "omit_autovacuum"))] + if !matches!(pager.get_auto_vacuum_mode(), AutoVacuumMode::None) { + state.page = Some(page.clone()); + let page_id = page.get().id as u32; + let page_size = pager.get_page_size_unchecked().get() as usize; + if page_id > 1 && !ptrmap::is_ptrmap_page(page_id, page_size) { + let expected = match page_category { + PageCategory::PointerMap => None, + PageCategory::Normal => { + let referenced_by = *state.page_reference.get(&page_idx).unwrap_or(&0); + // Page 1/root has referenced_by == 0 + let entry_type = if referenced_by == 0 { + PtrmapType::RootPage + } else { + PtrmapType::BTreeNode + }; + Some((entry_type, referenced_by as u32)) + } + PageCategory::Overflow => { + let referenced_by = *state.page_reference.get(&page_idx).unwrap_or(&0); + // Determine if parent was another overflow page to pick the right ptrmap type. + let parent_is_overflow = matches!( + state.parent_category(page_idx), + Some(PageCategory::Overflow) + ); + // If we don't know the parent, fall back to Overflow1. + let entry_type = if parent_is_overflow { + PtrmapType::Overflow2 + } else { + PtrmapType::Overflow1 + }; + Some((entry_type, referenced_by as u32)) + } + PageCategory::FreeListTrunk | PageCategory::FreePage => { + Some((PtrmapType::FreePage, 0)) + } + }; + + if let Some((expected_type, expected_parent)) = expected { + match return_if_io!(pager.ptrmap_get(page_id)) { + Some(entry) => { + if entry.entry_type != expected_type + || entry.parent_page_no != expected_parent + { + errors.push(IntegrityCheckError::PtrMapMismatch { + page_id: page_idx, + expected: expected_type, + expected_parent, + actual: entry.entry_type, + actual_parent: entry.parent_page_no, + }); + } + } + None => { + errors.push(IntegrityCheckError::PtrMapMissing { page_id: page_idx }) + } + } + } + } + state.page = None; + } + state.page_stack.pop(); let contents = page.get_contents(); + #[cfg(not(feature = "omit_autovacuum"))] + if matches!(page_category, PageCategory::PointerMap) { + continue; + } if page_category == PageCategory::FreeListTrunk { state.freelist_count.actual_count += 1; let next_freelist_trunk_page = contents.read_u32_no_offset(0); @@ -7679,6 +7931,20 @@ fn fill_cell_payload( "new overflow page is not loaded" ); let new_overflow_page_id = new_overflow_page.get().id as u32; + #[cfg(not(feature = "omit_autovacuum"))] + if !matches!(pager.get_auto_vacuum_mode(), AutoVacuumMode::None) { + let (entry_type, parent_page_no) = + if let Some(prev_page) = current_overflow_page.as_ref() { + (PtrmapType::Overflow2, prev_page.get().id as u32) + } else { + (PtrmapType::Overflow1, page.get().id as u32) + }; + return_if_io!(pager.ptrmap_put( + new_overflow_page_id, + entry_type, + parent_page_no + )); + } if let Some(prev_page) = current_overflow_page { // Update the previous overflow page's "next overflow page" pointer to point to the new overflow page. diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 0f8f3d7833..8d5752358d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -29,6 +29,8 @@ use tracing::{instrument, trace, Level}; use turso_macros::AtomicEnum; use super::btree::btree_init_page; +#[cfg(not(feature = "omit_autovacuum"))] +use super::btree::{integrity_check, IntegrityCheckState, PageCategory}; use super::page_cache::{CacheError, CacheResizeResult, PageCache, PageCacheKey}; use super::sqlite3_ondisk::begin_write_btree_page; use super::wal::CheckpointMode; @@ -436,6 +438,7 @@ impl DbState { enum PtrMapGetState { Start, Deserialize { + target_page_num: u32, ptrmap_page: PageRef, offset_in_ptrmap_page: usize, }, @@ -446,6 +449,7 @@ enum PtrMapGetState { enum PtrMapPutState { Start, Deserialize { + target_page_num: u32, ptrmap_page: PageRef, offset_in_ptrmap_page: usize, }, @@ -998,6 +1002,7 @@ impl Pager { let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as i64)?; self.vacuum_state.write().ptrmap_get_state = PtrMapGetState::Deserialize { + target_page_num, ptrmap_page, offset_in_ptrmap_page, }; @@ -1006,9 +1011,14 @@ impl Pager { } } PtrMapGetState::Deserialize { + target_page_num: state_target_page, ptrmap_page, offset_in_ptrmap_page, } => { + if state_target_page != target_page_num { + self.vacuum_state.write().ptrmap_get_state = PtrMapGetState::Start; + continue; + } turso_assert!(ptrmap_page.is_loaded(), "ptrmap_page should be loaded"); let ptrmap_page_inner = ptrmap_page.get(); let ptrmap_pg_no = ptrmap_page_inner.id; @@ -1105,6 +1115,7 @@ impl Pager { let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as i64)?; self.vacuum_state.write().ptrmap_put_state = PtrMapPutState::Deserialize { + target_page_num: db_page_no_to_update, ptrmap_page, offset_in_ptrmap_page, }; @@ -1113,9 +1124,14 @@ impl Pager { } } PtrMapPutState::Deserialize { + target_page_num: state_target_page, ptrmap_page, offset_in_ptrmap_page, } => { + if state_target_page != db_page_no_to_update { + self.vacuum_state.write().ptrmap_put_state = PtrMapPutState::Start; + continue; + } turso_assert!(ptrmap_page.is_loaded(), "page should be loaded"); let ptrmap_page_inner = ptrmap_page.get(); let ptrmap_pg_no = ptrmap_page_inner.id; @@ -1457,6 +1473,87 @@ impl Pager { Ok(IOResult::Done(wal.borrow_mut().begin_write_tx()?)) } + #[cfg(not(feature = "omit_autovacuum"))] + fn reclaim_orphan_pages(&self, connection: &Connection) -> Result> { + if matches!(self.get_auto_vacuum_mode(), AutoVacuumMode::None) { + return Ok(IOResult::Done(())); + } + + let pager_arc = connection.pager.load(); + + let db_size = return_if_io!(self.with_header(|header| header.database_size)).get(); + if db_size <= 1 { + return Ok(IOResult::Done(())); + } + + let roots = { + let schema = connection.schema.read(); + let mut root_pages = Vec::new(); + for table in schema.tables.values() { + if let crate::schema::Table::BTree(table) = table.as_ref() { + root_pages.push(table.root_page as i64); + if let Some(indexes) = schema.indexes.get(table.name.as_str()) { + for index in indexes.iter() { + if index.root_page > 0 { + root_pages.push(index.root_page as i64); + } + } + } + } + } + root_pages + }; + + let mut errors = Vec::new(); + let mut state = IntegrityCheckState::new(db_size as usize); + let freelist_trunk = + return_if_io!(self.with_header(|header| header.freelist_trunk_page)).get(); + if freelist_trunk > 0 { + let expected_count = + return_if_io!(self.with_header(|header| header.freelist_pages)).get(); + state.set_expected_freelist_count(expected_count as usize); + state.start( + freelist_trunk as i64, + PageCategory::FreeListTrunk, + &mut errors, + ); + } else if let Some(first_root) = roots.first() { + state.start(*first_root, PageCategory::Normal, &mut errors); + } else { + return Ok(IOResult::Done(())); + } + + let mut current_root_idx = if freelist_trunk > 0 { 0 } else { 1 }; + loop { + match integrity_check(&mut state, &mut errors, &pager_arc) { + Ok(IOResult::Done(())) => { + if current_root_idx < roots.len() { + state.start(roots[current_root_idx], PageCategory::Normal, &mut errors); + current_root_idx += 1; + continue; + } + break; + } + Ok(IOResult::IO(c)) => return Ok(IOResult::IO(c)), + Err(e) => return Err(e), + } + } + + let page_size = self.get_page_size_unchecked().get() as usize; + for page_no in 2..=db_size { + let page_no_u32 = page_no as u32; + if ptrmap::is_ptrmap_page(page_no_u32, page_size) { + continue; + } + if state.page_reference.contains_key(&(page_no as i64)) { + continue; + } + return_if_io!(self.free_page(None, page_no as usize)); + } + + Ok(IOResult::Done(())) + } + #[instrument(skip_all, level = Level::DEBUG)] pub fn commit_tx(&self, connection: &Connection) -> Result> { if connection.is_nested_stmt() { @@ -1472,6 +1569,13 @@ impl Pager { _ => (false, false), }; tracing::trace!("commit_tx(schema_did_change={})", schema_did_change); + #[cfg(not(feature = "omit_autovacuum"))] + if matches!( + self.get_auto_vacuum_mode(), + AutoVacuumMode::Full | AutoVacuumMode::Incremental + ) { + return_if_io!(self.reclaim_orphan_pages(connection)); + } let commit_status = return_if_io!(self.commit_dirty_pages( connection.is_wal_auto_checkpoint_disabled(), connection.get_sync_mode(), @@ -2279,6 +2383,16 @@ impl Pager { ))); } + #[cfg(not(feature = "omit_autovacuum"))] + if !matches!(self.get_auto_vacuum_mode(), AutoVacuumMode::None) { + let page_size = self.get_page_size_unchecked().get() as usize; + if page_id as u32 >= ptrmap::FIRST_PTRMAP_PAGE_NO + && !ptrmap::is_ptrmap_page(page_id as u32, page_size) + { + return_if_io!(self.ptrmap_put(page_id as u32, PtrmapType::FreePage, 0)); + } + } + let (page, _c) = match page.take() { Some(page) => { assert_eq!(