Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ rustc-hash = "2.0"
either = { workspace = true }
tracing-subscriber.workspace = true
rapidhash = "4.1.1"
branches = { version = "0.4.3", default-features = false }

# Use pure-rust for Android to avoid C cross-compilation issues
[target.'cfg(target_os = "android")'.dependencies]
Expand Down
14 changes: 10 additions & 4 deletions core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,30 +140,36 @@ pub enum CompletionError {
ChecksumNotEnabled,
}

#[cold]
// makes all branches that return errors marked as unlikely
pub(crate) const fn cold_return<T>(v: T) -> T {
v
}

#[macro_export]
macro_rules! bail_parse_error {
($($arg:tt)*) => {
return Err($crate::error::LimboError::ParseError(format!($($arg)*)))
return $crate::error::cold_return(Err($crate::error::LimboError::ParseError(format!($($arg)*))))
};
}

#[macro_export]
macro_rules! bail_corrupt_error {
($($arg:tt)*) => {
return Err($crate::error::LimboError::Corrupt(format!($($arg)*)))
return $crate::error::cold_return(Err($crate::error::LimboError::Corrupt(format!($($arg)*))))
};
}

#[macro_export]
macro_rules! bail_constraint_error {
($($arg:tt)*) => {
return Err($crate::error::LimboError::Constraint(format!($($arg)*)))
return $crate::error::cold_return(Err($crate::error::LimboError::Constraint(format!($($arg)*))))
};
}

impl From<turso_ext::ResultCode> for LimboError {
fn from(err: turso_ext::ResultCode) -> Self {
LimboError::ExtensionError(err.to_string())
cold_return(LimboError::ExtensionError(err.to_string()))
}
}

Expand Down
38 changes: 1 addition & 37 deletions core/storage/sqlite3_ondisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1580,44 +1580,8 @@ pub fn read_integer(buf: &[u8], serial_type: u8) -> Result<i64> {
}
}

/// Fast varint reader optimized for the common cases of 1-byte and 2-byte varints.
///
/// This function is a performance-optimized version of `read_varint()` that handles
/// the most common varint cases inline before falling back to the full implementation.
/// It follows the same varint encoding as SQLite.
///
/// # Optimized Cases
///
/// - **Single-byte case**: Values 0-127 (0x00-0x7F) are returned immediately
/// - **Two-byte case**: Values 128-16383 (0x80-0x3FFF) are handled inline
/// - **Multi-byte case**: Larger values fall back to the full `read_varint()` implementation
///
/// Reads varint integer from the buffer.
/// This function is similar to `sqlite3GetVarint32`
#[inline(always)]
pub fn read_varint_fast(buf: &[u8]) -> Result<(u64, usize)> {
// Fast path: Single-byte varint
if let Some(&first_byte) = buf.first() {
if first_byte & 0x80 == 0 {
return Ok((first_byte as u64, 1));
}
} else {
crate::bail_corrupt_error!("Invalid varint");
}

// Fast path: Two-byte varint
if let Some(&second_byte) = buf.get(1) {
if second_byte & 0x80 == 0 {
let v = (((buf[0] & 0x7f) as u64) << 7) + (second_byte as u64);
return Ok((v, 2));
}
} else {
crate::bail_corrupt_error!("Invalid varint");
}

//Fallback: Multi-byte varint
read_varint(buf)
}

#[inline(always)]
pub fn read_varint(buf: &[u8]) -> Result<(u64, usize)> {
let mut v: u64 = 0;
Expand Down
115 changes: 52 additions & 63 deletions core/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use branches::unlikely;
use either::Either;
#[cfg(feature = "serde")]
use serde::Deserialize;
Expand Down Expand Up @@ -1181,7 +1182,7 @@ impl ImmutableRecord {
}
let mut cursor = self.cursor.lock();
cursor.parse_full_header(self).unwrap();
let last_idx = cursor.serial_types.len().checked_sub(1)?;
let last_idx = cursor.serials_offsets.len().checked_sub(1)?;
Some(cursor.deserialize_column(self, last_idx))
}

Expand All @@ -1199,7 +1200,7 @@ impl ImmutableRecord {

match cursor.ensure_parsed_upto(self, idx) {
Ok(()) => {
if idx >= cursor.serial_types.len() {
if idx >= cursor.serials_offsets.len() {
return None;
}

Expand All @@ -1212,7 +1213,7 @@ impl ImmutableRecord {
pub fn column_count(&self) -> usize {
let mut cursor = self.cursor.lock();
cursor.parse_full_header(self).unwrap();
cursor.serial_types.len()
cursor.serials_offsets.len()
}

/// Get direct access to the embedded cursor. Use with caution.
Expand All @@ -1236,52 +1237,64 @@ impl ImmutableRecord {
/// - **Data section**: The actual field data in the same order as serial types
#[derive(Debug, Default)]
pub struct RecordCursor {
/// Parsed serial type values for each column.
/// Serial types encode both the data type and size information.
pub serial_types: Vec<u64>,
/// Byte offsets where each column's data begins in the record payload.
/// Always has one more entry than `serial_types` (the final offset marks the end).
pub offsets: Vec<usize>,
/// Total size of the record header in bytes.
pub header_size: usize,
/// Tuple of parsed serial type values for each column and byte offsets where each
/// column's data begins in the record payload.
/// Serial types encode both the data type and size information.
pub serials_offsets: Vec<(u64, usize)>,
/// Current parsing position within the header section.
pub header_offset: usize,
}

impl RecordCursor {
pub fn new() -> Self {
Self {
serial_types: Vec::new(),
offsets: Vec::new(),
serials_offsets: Vec::new(),
header_size: 0,
header_offset: 0,
}
}

pub fn with_capacity(num_columns: usize) -> Self {
Self {
serial_types: Vec::with_capacity(num_columns),
offsets: Vec::with_capacity(num_columns + 1),
serials_offsets: Vec::with_capacity(num_columns),
header_size: 0,
header_offset: 0,
}
}

pub fn invalidate(&mut self) {
self.serial_types.clear();
self.offsets.clear();
self.serials_offsets.clear();
self.header_size = 0;
self.header_offset = 0;
}

pub fn is_invalidated(&self) -> bool {
self.serial_types.is_empty() && self.offsets.is_empty()
pub fn is_uninitialized(&self) -> bool {
self.header_size == 0
}

pub fn parse_full_header(&mut self, record: &ImmutableRecord) -> Result<()> {
self.ensure_parsed_upto(record, MAX_COLUMN)
}

#[inline(always)]
pub fn last_offset(&self) -> usize {
if let Some((_, offset)) = self.serials_offsets.last() {
*offset
} else {
self.header_size
}
}

#[inline]
pub fn init_header(&mut self, payload: &[u8]) -> Result<usize> {
let (header_size, bytes_read) = read_varint(payload)?;
self.header_size = header_size as usize;
self.header_offset = bytes_read;
Ok(header_size as usize)
}

/// Ensures the header is parsed up to (and including) the target column index.
///
/// This is the core lazy parsing method. It only parses as much of the header
Expand Down Expand Up @@ -1316,27 +1329,23 @@ impl RecordCursor {
return Ok(());
}

// Parse header size and initialize parsing
if self.serial_types.is_empty() && self.offsets.is_empty() {
let (header_size, bytes_read) = read_varint(payload)?;
self.header_size = header_size as usize;
self.header_offset = bytes_read;
self.offsets.push(self.header_size); // First column starts after header
}

let mut prev_offset = if unlikely(self.is_uninitialized()) {
self.init_header(payload)?
} else {
self.last_offset()
};
// Parse serial types incrementally
while self.serial_types.len() <= target_idx
while self.serials_offsets.len() <= target_idx
&& self.header_offset < self.header_size
&& self.header_offset < payload.len()
{
let (serial_type, read_bytes) = read_varint(&payload[self.header_offset..])?;
self.serial_types.push(serial_type);
self.header_offset += read_bytes;

let serial_type_obj = SerialType::try_from(serial_type)?;
let data_size = serial_type_obj.size();
let prev_offset = *self.offsets.last().unwrap();
self.offsets.push(prev_offset + data_size);
prev_offset += data_size;
self.serials_offsets.push((serial_type, prev_offset));
}

Ok(())
Expand Down Expand Up @@ -1366,11 +1375,11 @@ impl RecordCursor {
record: &'a ImmutableRecord,
idx: usize,
) -> Result<ValueRef<'a>> {
if idx >= self.serial_types.len() {
if idx >= self.serials_offsets.len() {
return Ok(ValueRef::Null);
}

let serial_type = self.serial_types[idx];
let (serial_type, end) = self.serials_offsets[idx];
let serial_type_obj = SerialType::try_from(serial_type)?;

match serial_type_obj.kind() {
Expand All @@ -1380,13 +1389,12 @@ impl RecordCursor {
_ => {} // continue
}

if idx + 1 >= self.offsets.len() {
return Ok(ValueRef::Null);
}

let start = self.offsets[idx];
let end = self.offsets[idx + 1];
let payload = record.get_payload();
let start = if unlikely(idx == 0) {
self.header_size
} else {
self.serials_offsets[idx - 1].1
};

let slice = &payload[start..end];
let (value, _) = crate::storage::sqlite3_ondisk::read_value(slice, serial_type_obj)?;
Expand Down Expand Up @@ -1468,7 +1476,7 @@ impl RecordCursor {
}

let _ = self.parse_full_header(record);
self.serial_types.len()
self.serials_offsets.len()
}

/// Alias for `count()`. Returns the number of columns in the record.
Expand Down Expand Up @@ -1518,7 +1526,7 @@ impl RecordCursor {
return Some(Err(err));
}
}
if !self.record.is_invalidated() && self.idx < self.cursor.serial_types.len() {
if !self.record.is_invalidated() && self.idx < self.cursor.serials_offsets.len() {
let res = self.cursor.deserialize_column(self.record, self.idx);
self.idx += 1;
Some(res)
Expand All @@ -1530,7 +1538,7 @@ impl RecordCursor {

impl<'a, 'b> ExactSizeIterator for GetValues<'a, 'b> {
fn len(&self) -> usize {
self.cursor.serial_types.len() - self.idx
self.cursor.serials_offsets.len() - self.idx
}
}

Expand Down Expand Up @@ -3382,36 +3390,18 @@ mod tests {
.parse_full_header(&record)
.expect("Failed to parse full header");

assert_eq!(
cursor1.offsets.len(),
cursor1.serial_types.len() + 1,
"offsets should be one longer than serial_types"
);

for i in 0..values.len() {
cursor1
.deserialize_column(&record, i)
.expect("Failed to deserialize column");
}

// Incremental Parsing
let mut cursor2 = RecordCursor::new();
cursor2
.ensure_parsed_upto(&record, 2)
.expect("Failed to parse up to column 2");

assert_eq!(
cursor2.offsets.len(),
cursor2.serial_types.len() + 1,
"offsets should be one longer than serial_types"
);

cursor2.get_value(&record, 2).expect("Column 2 failed");

// Access column 0 (already parsed)
let before = cursor2.serial_types.len();
let before = cursor2.serials_offsets.len();
cursor2.get_value(&record, 0).expect("Column 0 failed");
let after = cursor2.serial_types.len();
let after = cursor2.serials_offsets.len();
assert_eq!(before, after, "Should not parse more");

// Access column 5 (forces full parse)
Expand All @@ -3428,10 +3418,9 @@ mod tests {
}

assert_eq!(
cursor1.serial_types, cursor2.serial_types,
"serial_types must match"
cursor1.serials_offsets, cursor2.serials_offsets,
"entries must match"
);
assert_eq!(cursor1.offsets, cursor2.offsets, "offsets must match");
}

#[test]
Expand Down
Loading
Loading