Skip to content

Commit 65a4d56

Browse files
committed
Feature: Add Connection::snapshot() for point-in-time database copy
This addresses feedback from tursodatabase/agentfs#119 suggesting that snapshot functionality should be implemented in turso.git using direct fily copying rather than SQL-based row by row copying. Comment: https:://github.com/tursodatabase/agentfs/pull/119#issuecomment-3681336678 - extract core logic from Pager::checkpoint function to a new Pager::checkpoing_internal function and add a flag to keep_lock during the Finalize phase - create wrapper functions Pager::checkpoint, Pager::checkpoint_with_lock and Pager::block_checkpoint_keep_lock - add Connection::snapshot API that checkpoints while keeping the lock and copies the database file The lock is held to avoid a race condition after finishing checkpointing and before copying the file when concurrent writers can write to db. Signed-off-by: Prateek Singh Rathore <[email protected]>
1 parent 9582043 commit 65a4d56

File tree

2 files changed

+98
-3
lines changed

2 files changed

+98
-3
lines changed

core/lib.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2017,6 +2017,67 @@ impl Connection {
20172017
}
20182018
}
20192019

2020+
/// Create a point-in-time snapshot of the database by copying the database file.
2021+
#[cfg(feature = "fs")]
2022+
pub fn snapshot(self: &Arc<Self>, output_path: &str) -> Result<()> {
2023+
self._snapshot_copy_file(output_path)
2024+
}
2025+
2026+
/// Create a point-in-time snapshot of the database by copying the database file.
2027+
///
2028+
/// It checkpoints and keeps the lock after checkpointing to avoid race window between
2029+
/// checkpoint completion and file copy operation where concurrent writers could modify
2030+
/// the database.
2031+
#[cfg(feature = "fs")]
2032+
pub fn _snapshot_copy_file(self: &Arc<Self>, output_path: &str) -> Result<()> {
2033+
use crate::util::MEMORY_PATH;
2034+
use std::path::Path;
2035+
2036+
if self.is_closed() {
2037+
return Err(LimboError::InternalError("Connection closed".to_string()));
2038+
}
2039+
2040+
// FIXME: enable mvcc
2041+
if self.mvcc_enabled() {
2042+
return Err(LimboError::InternalError("Snapshot not yet supported with MVCC mode".to_string()))
2043+
}
2044+
2045+
// Cannot snapshot in-memory databases
2046+
if self.db.path == MEMORY_PATH {
2047+
return Err(LimboError::InvalidArgument(
2048+
"Cannot snapshot in-memory database".to_string(),
2049+
));
2050+
}
2051+
2052+
let path = Path::new(output_path);
2053+
if path.exists() {
2054+
return Err(LimboError::InvalidArgument(format!(
2055+
"Output file already exists: {}",
2056+
output_path
2057+
)));
2058+
}
2059+
if let Some(parent) = path.parent() {
2060+
std::fs::create_dir_all(parent).map_err(|e| {
2061+
LimboError::InternalError(format!("Failed to create parent directory: {}", e))
2062+
})?;
2063+
}
2064+
2065+
let pager = self.pager.load();
2066+
let result = (|| -> Result<()> {
2067+
// Checkpoint and keep the lock
2068+
let _res = pager.blocking_checkpoint_keep_lock(CheckpointMode::Truncate { upper_bound_inclusive: None }, self.get_sync_mode())?;
2069+
2070+
let source_path = Path::new(&self.db.path);
2071+
std::fs::copy(source_path, path).map_err(|e| {
2072+
LimboError::InternalError(format!("Failed to copy database file: {}", e))
2073+
})?;
2074+
2075+
Ok(()) // lock is dropped here when _res is dropped here
2076+
})();
2077+
2078+
result
2079+
}
2080+
20202081
/// Close a connection and checkpoint.
20212082
pub fn close(&self) -> Result<()> {
20222083
if self.is_closed() {

core/storage/pager.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2856,17 +2856,40 @@ impl Pager {
28562856
}
28572857
}
28582858

2859+
#[instrument(skip_all, level = Level::DEBUG)]
2860+
/// checkpoint_keep_lock checkpoints the WAL to the database file (if needed) and keeps the lock
2861+
pub fn checkpoint_keep_lock(
2862+
&self,
2863+
mode: CheckpointMode,
2864+
sync_mode: crate::SyncMode,
2865+
clear_page_cache: bool,
2866+
) -> Result<IOResult<CheckpointResult>> {
2867+
self.checkpoint_inner(mode, sync_mode, clear_page_cache, true)
2868+
}
2869+
2870+
#[instrument(skip_all, level = Level::DEBUG)]
2871+
/// checkpoint checkpoints the WAL to the database file (if needed) and drops the lock
2872+
pub fn checkpoint(
2873+
&self,
2874+
mode: CheckpointMode,
2875+
sync_mode: crate::SyncMode,
2876+
clear_page_cache: bool,
2877+
) -> Result<IOResult<CheckpointResult>> {
2878+
self.checkpoint_inner(mode, sync_mode, clear_page_cache, false)
2879+
}
2880+
28592881
#[instrument(skip_all, level = Level::DEBUG, name = "pager_checkpoint",)]
28602882
/// Checkpoint the WAL to the database file (if needed).
28612883
/// Args:
28622884
/// - mode: The checkpoint mode to use (PASSIVE, FULL, RESTART, TRUNCATE)
28632885
/// - sync_mode: The fsync mode to use (OFF, NORMAL, FULL)
28642886
/// - clear_page_cache: Whether to clear the page cache after checkpointing
2865-
pub fn checkpoint(
2887+
fn checkpoint_inner(
28662888
&self,
28672889
mode: CheckpointMode,
28682890
sync_mode: crate::SyncMode,
28692891
clear_page_cache: bool,
2892+
keep_lock: bool,
28702893
) -> Result<IOResult<CheckpointResult>> {
28712894
let Some(wal) = self.wal.as_ref() else {
28722895
return Err(LimboError::InternalError(
@@ -2994,8 +3017,10 @@ impl Pager {
29943017
let mut state = self.checkpoint_state.write();
29953018
let mut res = state.result.take().expect("result should be set");
29963019

2997-
// Release checkpoint guard
2998-
res.release_guard();
3020+
// Release checkpoint guard if lock is not to be kept
3021+
if !keep_lock {
3022+
res.release_guard();
3023+
}
29993024

30003025
// Clear page cache only if requested (explicit checkpoints do this, auto-checkpoint does not)
30013026
if clear_page_cache {
@@ -3092,6 +3117,15 @@ impl Pager {
30923117
self.io.block(|| self.checkpoint(mode, sync_mode, true))
30933118
}
30943119

3120+
#[instrument(skip_all, level = Level::DEBUG)]
3121+
pub fn blocking_checkpoint_keep_lock(
3122+
&self,
3123+
mode: CheckpointMode,
3124+
sync_mode: crate::SyncMode,
3125+
) -> Result<CheckpointResult> {
3126+
self.io.block(|| self.checkpoint_keep_lock(mode, sync_mode, true))
3127+
}
3128+
30953129
pub fn freepage_list(&self) -> u32 {
30963130
self.io
30973131
.block(|| HeaderRefMut::from_pager(self))

0 commit comments

Comments
 (0)