Skip to content

Commit

Permalink
Avoid creating a new page on every insert
Browse files Browse the repository at this point in the history
Currently, `insert_storage` in the access method creates a new Tape
to add the vector to, on every insert. This can cause a lot of empty
space in pages. This commit allows tapes to be restarted at the most
recent block (of the right tape), instead of always starting at a new
block. This allows `insert_storage` to continue writing into an existing
page instead of creating a new one.
  • Loading branch information
syvb committed Nov 5, 2024
1 parent 8596d56 commit 0574e5e
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pgvectorscale/src/access_method/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ unsafe fn insert_storage<S: Storage>(
meta_page: &mut MetaPage,
stats: &mut InsertStats,
) {
let mut tape = Tape::new(&index_relation, S::page_type());
let mut tape = Tape::restart(&index_relation, S::page_type());
let index_pointer = storage.create_node(
vector.to_index_slice(),
heap_pointer,
Expand Down
113 changes: 112 additions & 1 deletion pgvectorscale/src/util/tape.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Tape provides a simple infinite-tape-writing abstraction over postgres pages.

use super::page::{PageType, WritablePage};
use super::page::{PageType, ReadablePage, WritablePage};
use pgrx::{
pg_sys::{BlockNumber, BLCKSZ},
*,
Expand All @@ -13,6 +13,7 @@ pub struct Tape<'a> {
}

impl<'a> Tape<'a> {
/// Create a Tape that starts writing on a new page.
pub unsafe fn new(index: &'a PgRelation, page_type: PageType) -> Self {
let page = WritablePage::new(index, page_type);
let block_number = page.get_block_number();
Expand All @@ -24,6 +25,35 @@ impl<'a> Tape<'a> {
}
}

/// Create a Tape that resumes writing on the newest page of the given type, if possible.
pub unsafe fn restart(index: &'a PgRelation, page_type: PageType) -> Self {
let nblocks = pg_sys::RelationGetNumberOfBlocksInFork(
index.as_ptr(),
pg_sys::ForkNumber::MAIN_FORKNUM,
);
println!("nblocks={:?}", nblocks);
let mut current_block = None;
for block in (0..nblocks).rev() {
println!("checking block {}", block);
// println!("checking block {}/{:?}", block, ReadablePage::read(index, block).get_type());
if ReadablePage::read(index, block).get_type() == page_type {
println!("b");
current_block = Some(block);
break;
}
println!("c");
}
println!("a");
match current_block {
Some(current) => Tape {
index,
page_type,
current,
},
None => Tape::new(index, page_type),
}
}

pub unsafe fn write(&mut self, data: &[u8]) -> super::ItemPointer {
let size = data.len();
assert!(size < BLCKSZ as usize);
Expand Down Expand Up @@ -52,3 +82,84 @@ impl<'a> Tape<'a> {
std::mem::drop(self)
}
}

#[cfg(any(test, feature = "pg_test"))]
#[pgrx::pg_schema]
mod tests {
use super::*;

fn make_test_relation() -> PgRelation {
Spi::run(&format!(
"CREATE TABLE test(encoding vector(3));
CREATE INDEX idxtest
ON test
USING diskann(encoding)
WITH (num_neighbors=30);",
))
.unwrap();

let index_oid = Spi::get_one::<pg_sys::Oid>("SELECT 'idxtest'::regclass::oid")
.unwrap()
.expect("oid was null");
unsafe { PgRelation::from_pg(pg_sys::RelationIdGetRelation(index_oid)) }
}

#[pg_test]
fn tape_restart() {
let indexrel = make_test_relation();
unsafe {
let node_page = {
let mut tape = Tape::new(&indexrel, PageType::Node);
let node_page = tape.current;
tape.write(&[1, 2, 3]);
tape.write(&[4, 5, 6]);
assert_eq!(
tape.current, node_page,
"Data should be written to page with enough room"
);
node_page
};

{
let mut tape = Tape::restart(&indexrel, PageType::SbqMeans);
tape.write(&[99]);
assert_ne!(
tape.current, node_page,
"Data can only be written to page of its type"
);
}

{
let mut tape = Tape::restart(&indexrel, PageType::PqQuantizerVector);
tape.write(&[99]);
assert_eq!(
tape.current,
node_page + 1,
"An unseen page type must create a new page"
);
}

{
let mut tape = Tape::restart(&indexrel, PageType::Node);
tape.write(&[7, 8, 9]);
tape.write(&[10, 11, 12]);
assert_eq!(
tape.current, node_page,
"Data should be written to existing page when there is room"
);

let page = WritablePage::modify(tape.index, tape.current);
assert_eq!(page.get_free_space(), 8108);
}

{
let mut tape = Tape::restart(&indexrel, PageType::Node);
tape.write(&[42; 8109]);
assert_ne!(
tape.current, node_page,
"Writing more than available forces a new page"
);
}
}
}
}

0 comments on commit 0574e5e

Please sign in to comment.