Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,16 +590,14 @@ impl Database {
AutoVacuumMode::None
};

// Force autovacuum to None if the experimental flag is not enabled
let final_mode = if !self.opts.enable_autovacuum {
if mode != AutoVacuumMode::None {
tracing::warn!(
"Database has autovacuum enabled but --experimental-autovacuum flag is not set. Forcing autovacuum to None."
);
}
AutoVacuumMode::None
} else {
// TODO: remove behind experimental toggle once feature is fully supported.
// Do not disable autovacuum if the database header already has it enabled; otherwise we would
// corrupt pointer-map expectations for existing databases. The flag only controls enabling it
// when creating/rewriting databases, not honoring an already-enabled file.
let final_mode = if mode != AutoVacuumMode::None || self.opts.enable_autovacuum {
mode
} else {
AutoVacuumMode::None
};

pager.set_auto_vacuum_mode(final_mode);
Expand Down
336 changes: 301 additions & 35 deletions core/storage/btree.rs

Large diffs are not rendered by default.

114 changes: 114 additions & 0 deletions core/storage/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use tracing::{instrument, trace, Level};
use turso_macros::AtomicEnum;

use super::btree::btree_init_page;
#[cfg(not(feature = "omit_autovacuum"))]
use super::btree::{integrity_check, IntegrityCheckState, PageCategory};
use super::page_cache::{CacheError, CacheResizeResult, PageCache, PageCacheKey};
use super::sqlite3_ondisk::begin_write_btree_page;
use super::wal::CheckpointMode;
Expand Down Expand Up @@ -436,6 +438,7 @@ impl DbState {
enum PtrMapGetState {
Start,
Deserialize {
target_page_num: u32,
ptrmap_page: PageRef,
offset_in_ptrmap_page: usize,
},
Expand All @@ -446,6 +449,7 @@ enum PtrMapGetState {
enum PtrMapPutState {
Start,
Deserialize {
target_page_num: u32,
ptrmap_page: PageRef,
offset_in_ptrmap_page: usize,
},
Expand Down Expand Up @@ -998,6 +1002,7 @@ impl Pager {

let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as i64)?;
self.vacuum_state.write().ptrmap_get_state = PtrMapGetState::Deserialize {
target_page_num,
ptrmap_page,
offset_in_ptrmap_page,
};
Expand All @@ -1006,9 +1011,14 @@ impl Pager {
}
}
PtrMapGetState::Deserialize {
target_page_num: state_target_page,
ptrmap_page,
offset_in_ptrmap_page,
} => {
if state_target_page != target_page_num {
self.vacuum_state.write().ptrmap_get_state = PtrMapGetState::Start;
continue;
}
turso_assert!(ptrmap_page.is_loaded(), "ptrmap_page should be loaded");
let ptrmap_page_inner = ptrmap_page.get();
let ptrmap_pg_no = ptrmap_page_inner.id;
Expand Down Expand Up @@ -1105,6 +1115,7 @@ impl Pager {

let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as i64)?;
self.vacuum_state.write().ptrmap_put_state = PtrMapPutState::Deserialize {
target_page_num: db_page_no_to_update,
ptrmap_page,
offset_in_ptrmap_page,
};
Expand All @@ -1113,9 +1124,14 @@ impl Pager {
}
}
PtrMapPutState::Deserialize {
target_page_num: state_target_page,
ptrmap_page,
offset_in_ptrmap_page,
} => {
if state_target_page != db_page_no_to_update {
self.vacuum_state.write().ptrmap_put_state = PtrMapPutState::Start;
continue;
}
turso_assert!(ptrmap_page.is_loaded(), "page should be loaded");
let ptrmap_page_inner = ptrmap_page.get();
let ptrmap_pg_no = ptrmap_page_inner.id;
Expand Down Expand Up @@ -1457,6 +1473,87 @@ impl Pager {
Ok(IOResult::Done(wal.borrow_mut().begin_write_tx()?))
}

#[cfg(not(feature = "omit_autovacuum"))]
fn reclaim_orphan_pages(&self, connection: &Connection) -> Result<IOResult<()>> {
if matches!(self.get_auto_vacuum_mode(), AutoVacuumMode::None) {
return Ok(IOResult::Done(()));
}

let pager_arc = connection.pager.load();

let db_size = return_if_io!(self.with_header(|header| header.database_size)).get();
if db_size <= 1 {
return Ok(IOResult::Done(()));
}

let roots = {
let schema = connection.schema.read();
let mut root_pages = Vec::new();
for table in schema.tables.values() {
if let crate::schema::Table::BTree(table) = table.as_ref() {
root_pages.push(table.root_page as i64);
if let Some(indexes) = schema.indexes.get(table.name.as_str()) {
for index in indexes.iter() {
if index.root_page > 0 {
root_pages.push(index.root_page as i64);
}
}
}
}
}
root_pages
};

let mut errors = Vec::new();
let mut state = IntegrityCheckState::new(db_size as usize);
let freelist_trunk =
return_if_io!(self.with_header(|header| header.freelist_trunk_page)).get();
if freelist_trunk > 0 {
let expected_count =
return_if_io!(self.with_header(|header| header.freelist_pages)).get();
state.set_expected_freelist_count(expected_count as usize);
state.start(
freelist_trunk as i64,
PageCategory::FreeListTrunk,
&mut errors,
);
} else if let Some(first_root) = roots.first() {
state.start(*first_root, PageCategory::Normal, &mut errors);
} else {
return Ok(IOResult::Done(()));
}

let mut current_root_idx = if freelist_trunk > 0 { 0 } else { 1 };
loop {
match integrity_check(&mut state, &mut errors, &pager_arc) {
Ok(IOResult::Done(())) => {
if current_root_idx < roots.len() {
state.start(roots[current_root_idx], PageCategory::Normal, &mut errors);
current_root_idx += 1;
continue;
}
break;
}
Ok(IOResult::IO(c)) => return Ok(IOResult::IO(c)),
Err(e) => return Err(e),
}
}

let page_size = self.get_page_size_unchecked().get() as usize;
for page_no in 2..=db_size {
let page_no_u32 = page_no as u32;
if ptrmap::is_ptrmap_page(page_no_u32, page_size) {
continue;
}
if state.page_reference.contains_key(&(page_no as i64)) {
continue;
}
return_if_io!(self.free_page(None, page_no as usize));
}

Ok(IOResult::Done(()))
}

#[instrument(skip_all, level = Level::DEBUG)]
pub fn commit_tx(&self, connection: &Connection) -> Result<IOResult<PagerCommitResult>> {
if connection.is_nested_stmt() {
Expand All @@ -1472,6 +1569,13 @@ impl Pager {
_ => (false, false),
};
tracing::trace!("commit_tx(schema_did_change={})", schema_did_change);
#[cfg(not(feature = "omit_autovacuum"))]
if matches!(
self.get_auto_vacuum_mode(),
AutoVacuumMode::Full | AutoVacuumMode::Incremental
) {
return_if_io!(self.reclaim_orphan_pages(connection));
}
let commit_status = return_if_io!(self.commit_dirty_pages(
connection.is_wal_auto_checkpoint_disabled(),
connection.get_sync_mode(),
Expand Down Expand Up @@ -2279,6 +2383,16 @@ impl Pager {
)));
}

#[cfg(not(feature = "omit_autovacuum"))]
if !matches!(self.get_auto_vacuum_mode(), AutoVacuumMode::None) {
let page_size = self.get_page_size_unchecked().get() as usize;
if page_id as u32 >= ptrmap::FIRST_PTRMAP_PAGE_NO
&& !ptrmap::is_ptrmap_page(page_id as u32, page_size)
{
return_if_io!(self.ptrmap_put(page_id as u32, PtrmapType::FreePage, 0));
}
}

let (page, _c) = match page.take() {
Some(page) => {
assert_eq!(
Expand Down
5 changes: 5 additions & 0 deletions simulator/profiles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub struct Profile {
#[garde(skip)]
/// Experimental MVCC feature
pub experimental_mvcc: bool,
#[garde(skip)]
/// Whether to seed simulator databases with a SQLite-generated auto-vacuum file.
pub seed_sqlite_autovacuum: bool,
#[garde(range(min = 1, max = 64))]
pub max_connections: usize,
#[garde(dive)]
Expand All @@ -40,6 +43,7 @@ impl Default for Profile {
fn default() -> Self {
Self {
experimental_mvcc: false,
seed_sqlite_autovacuum: true,
max_connections: 10,
io: Default::default(),
query: Default::default(),
Expand Down Expand Up @@ -118,6 +122,7 @@ impl Profile {
},
experimental_mvcc: true,
max_connections: 2,
seed_sqlite_autovacuum: true,
};
profile.validate().unwrap();
profile
Expand Down
26 changes: 26 additions & 0 deletions simulator/runner/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ impl SimulatorEnv {
}
self.db = None;

if self.profile.seed_sqlite_autovacuum {
self.seed_sqlite_autovacuum(&db_path);
}

let db = match Database::open_file_with_flags(
io.clone(),
db_path.to_str().unwrap(),
Expand All @@ -255,6 +259,28 @@ impl SimulatorEnv {
self.db = Some(db);
}

fn seed_sqlite_autovacuum(&self, db_path: &Path) {
let conn = rusqlite::Connection::open(db_path).unwrap_or_else(|e| {
panic!(
"failed to open sqlite connection while seeding auto-vacuum database at {}: {e}",
db_path.display()
)
});
conn.execute_batch(
"PRAGMA auto_vacuum=FULL;\
VACUUM;\
CREATE TABLE IF NOT EXISTS __sim_autovacuum_seed__(id INTEGER PRIMARY KEY, value TEXT);\
INSERT OR IGNORE INTO __sim_autovacuum_seed__ VALUES (1,'seed');",
)
.unwrap_or_else(|e| {
panic!(
"failed to seed sqlite auto-vacuum database at {}: {e}",
db_path.display()
)
});
drop(conn);
}

pub(crate) fn get_db_path(&self) -> PathBuf {
self.paths.db(&self.type_, &self.phase)
}
Expand Down
25 changes: 25 additions & 0 deletions tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,31 @@ impl TempDatabase {
}
}

pub fn new_with_autovacuum_full(sql: &str) -> Self {
let mut path = TempDir::new().unwrap().keep();
path.push("test.db");
{
let connection = rusqlite::Connection::open(&path).unwrap();
connection.pragma_update(None, "page_size", 4096).unwrap();
connection.pragma_update(None, "auto_vacuum", 1).unwrap();
connection.execute_batch("VACUUM;").unwrap();
connection.execute_batch(sql).unwrap();
}
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let db = Database::open_file_with_flags(
io.clone(),
path.to_str().unwrap(),
turso_core::OpenFlags::default(),
turso_core::DatabaseOpts::new()
.with_indexes(true)
.with_index_method(true),
None,
)
.unwrap();

Self { path, io, db }
}

pub fn new_with_rusqlite(table_sql: &str) -> Self {
let mut path = TempDir::new().unwrap().keep();
path.push("test.db");
Expand Down
29 changes: 29 additions & 0 deletions tests/integration/storage/auto_vacuum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use core_tester::common::{self, maybe_setup_tracing, rusqlite_integrity_check, TempDatabase};
use turso_core::Row;

#[test]

fn test_autovacuum_pointer_map_integrity() -> anyhow::Result<()> {
let _ = env_logger::try_init();
maybe_setup_tracing();

let tmp_db = TempDatabase::new_with_autovacuum_full(
"CREATE TABLE seed_table(id INTEGER PRIMARY KEY, value TEXT);\
INSERT INTO seed_table VALUES (1, 'seed');",
);
let conn = tmp_db.connect_limbo();

common::run_query(&tmp_db, &conn, "CREATE TABLE third(a);")?;
for i in 1..=2000 {
common::run_query(&tmp_db, &conn, &format!("INSERT INTO third VALUES ({i});"))?;
}

common::run_query_on_row(&tmp_db, &conn, "PRAGMA integrity_check;", |row: &Row| {
let res = row.get::<String>(0).unwrap();
assert_eq!(res, "ok");
})?;

rusqlite_integrity_check(tmp_db.path.as_path())?;

Ok(())
}
1 change: 1 addition & 0 deletions tests/integration/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
mod auto_vacuum;
#[cfg(feature = "checksum")]
mod checksum;
Loading