diff --git a/core/types.rs b/core/types.rs index f4c3434085..544d615643 100644 --- a/core/types.rs +++ b/core/types.rs @@ -1,4 +1,6 @@ use either::Either; +use parking_lot::Mutex; +use rustc_hash::{FxBuildHasher, FxHashMap}; #[cfg(feature = "serde")] use serde::Deserialize; use turso_ext::{AggCtx, FinalizeFunction, StepFunction}; @@ -19,6 +21,7 @@ use crate::vdbe::Register; use crate::vtab::VirtualTableCursor; use crate::{Completion, CompletionError, Result, IO}; use std::borrow::{Borrow, Cow}; +use std::collections::hash_map::Entry; use std::fmt::{Debug, Display}; use std::iter::Peekable; use std::ops::Deref; @@ -53,7 +56,7 @@ impl Display for ValueType { } } -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum TextSubtype { Text, @@ -94,7 +97,7 @@ impl Text { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Hash)] pub struct TextRef<'a> { pub value: &'a str, pub subtype: TextSubtype, @@ -258,6 +261,19 @@ pub enum ValueRef<'a> { Blob(&'a [u8]), } +impl std::hash::Hash for ValueRef<'_> { + fn hash(&self, state: &mut H) { + core::mem::discriminant(self).hash(state); + match self { + ValueRef::Null => {} + ValueRef::Integer(i) => i.hash(state), + ValueRef::Float(f) => (f.to_bits()).hash(state), + ValueRef::Text(text_ref) => text_ref.hash(state), + ValueRef::Blob(items) => items.hash(state), + } + } +} + impl Debug for ValueRef<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -873,11 +889,101 @@ impl<'a> TryFrom> for &'a str { } } +#[derive(Debug)] +struct RecordParserInner<'a> { + values: FxHashMap>, + cursor: RecordCursor, +} + +impl<'a> RecordParserInner<'a> { + fn new() -> Self { + Self { + values: FxHashMap::with_capacity_and_hasher(0, FxBuildHasher), + cursor: RecordCursor::new(), + } + } + + fn with_capacity(len: usize) -> Self { + Self { + values: FxHashMap::with_capacity_and_hasher(len, FxBuildHasher), + cursor: RecordCursor::new(), + } + } + + fn clear(&mut self) { + self.values.clear(); + self.cursor.invalidate(); + } + + fn get_value(&mut self, record: &'a ImmutableRecord, idx: usize) -> Result> { + let val = match self.values.entry(idx) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + let val = self.cursor.get_value(record, idx)?; + entry.insert(val); + val + } + }; + Ok(val) + } + + fn get_values( + &mut self, + record: &'a ImmutableRecord, + ) -> Result>> + use<'a, '_>>> { + self.cursor.parse_full_header(record)?; + // Assumes the entire header is parsed + Ok((0..(self.cursor.serial_types.len())) + .map(|idx| self.get_value(record, idx)) + .peekable()) + } +} + +#[derive(Debug)] +struct RecordParser<'a> { + inner: Mutex>, +} + +impl<'a> Clone for RecordParser<'a> { + fn clone(&self) -> Self { + Self::new() + } +} + +impl<'a> RecordParser<'a> { + fn new() -> Self { + Self { + inner: Mutex::new(RecordParserInner::new()), + } + } + + fn with_capacity(len: usize) -> Self { + Self { + inner: Mutex::new(RecordParserInner::with_capacity(len)), + } + } + + fn clear(&mut self) { + self.inner.get_mut().clear(); + } + + fn get_value(&self, record: &'a ImmutableRecord, idx: usize) -> Result> { + self.inner.lock().get_value(record, idx) + } + + fn get_values(&self, record: &'a ImmutableRecord) -> Vec> { + let mut cursor = self.inner.lock(); + cursor.get_values(record).map_or(Vec::new(), |val| { + val.collect::>>().unwrap_or_default() + }) + } +} + /// This struct serves the purpose of not allocating multiple vectors of bytes if not needed. /// A value in a record that has already been serialized can stay serialized and what this struct offsers /// is easy acces to each value which point to the payload. /// The name might be contradictory as it is immutable in the sense that you cannot modify the values without modifying the payload. -#[derive(Clone, Eq, Ord, PartialEq, PartialOrd)] +#[derive(Clone)] pub struct ImmutableRecord { // We have to be super careful with this buffer since we make values point to the payload we need to take care reallocations // happen in a controlled manner. If we realocate with values that should be correct, they will now point to undefined data. @@ -885,6 +991,9 @@ pub struct ImmutableRecord { // // payload is the Vec but in order to use Register which holds ImmutableRecord as a Value - we store Vec as Value::Blob payload: Value, + /// This is a self referential borrow from the values inside the Payload. + /// Always be sure to invalidate this when starting a new serialization or invalidating + record_parser: RecordParser<'static>, } impl std::fmt::Debug for ImmutableRecord { @@ -912,41 +1021,31 @@ impl std::fmt::Debug for ImmutableRecord { } } -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub struct Record { - values: Vec, -} - -impl Record { - // pub fn get<'a, T: FromValue<'a> + 'a>(&'a self, idx: usize) -> Result { - // let value = &self.values[idx]; - // T::from_value(value) - // } - - pub fn count(&self) -> usize { - self.values.len() - } - - pub fn last_value(&self) -> Option<&Value> { - self.values.last() +impl PartialEq for ImmutableRecord { + fn eq(&self, other: &Self) -> bool { + self.payload == other.payload } +} - pub fn get_values(&self) -> &Vec { - &self.values - } +impl Eq for ImmutableRecord {} - pub fn get_value(&self, idx: usize) -> &Value { - &self.values[idx] +impl Ord for ImmutableRecord { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.payload.cmp(&other.payload) } +} - pub fn len(&self) -> usize { - self.values.len() +impl PartialOrd for ImmutableRecord { + fn partial_cmp(&self, other: &Self) -> Option { + self.payload.partial_cmp(&other.payload) } +} - pub fn is_empty(&self) -> bool { - self.values.is_empty() - } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct Record { + values: Vec, } + struct AppendWriter<'a> { buf: &'a mut Vec, pos: usize, @@ -983,27 +1082,17 @@ impl ImmutableRecord { pub fn new(payload_capacity: usize) -> Self { Self { payload: Value::Blob(Vec::with_capacity(payload_capacity)), + record_parser: RecordParser::new(), } } pub fn from_bin_record(payload: Vec) -> Self { Self { payload: Value::Blob(payload), + record_parser: RecordParser::new(), } } - // TODO: inline the complete record parsing code here. - // Its probably more efficient. - // fixme(pedrocarlo): this function is very inneficient and kind of misleading because - // it always deserializes the columns - pub fn get_values<'a>(&'a self) -> Vec> { - let mut cursor = RecordCursor::new(); - cursor - .get_values(self) - .collect::>>() - .unwrap_or_default() - } - pub fn from_registers<'a, I: Iterator + Clone>( // we need to accept both &[Register] and &[&Register] values - that's why non-trivial signature // @@ -1086,6 +1175,7 @@ impl ImmutableRecord { writer.assert_finish_capacity(); Self { payload: Value::Blob(buf), + record_parser: RecordParser::with_capacity(len), } } @@ -1096,7 +1186,7 @@ impl ImmutableRecord { } } - pub fn as_blob_mut(&mut self) -> &mut Vec { + fn as_blob_mut(&mut self) -> &mut Vec { match &mut self.payload { Value::Blob(b) => b, _ => panic!("payload must be a blob"), @@ -1108,10 +1198,12 @@ impl ImmutableRecord { } pub fn start_serialization(&mut self, payload: &[u8]) { + self.record_parser.clear(); self.as_blob_mut().extend_from_slice(payload); } pub fn invalidate(&mut self) { + self.record_parser.clear(); self.as_blob_mut().clear(); } @@ -1139,9 +1231,19 @@ impl ImmutableRecord { Some(record_cursor.get_value(self, last_idx)) } + // TODO: inline the complete record parsing code here. + // Its probably more efficient. + // fixme(pedrocarlo): this function is very inneficient and kind of misleading because + // it always deserializes the columns + pub fn get_values<'a>(&'a self) -> Vec> { + let record: &'static Self = unsafe { std::mem::transmute(self) }; + let values: Vec> = self.record_parser.get_values(record); + values + } + pub fn get_value<'a>(&'a self, idx: usize) -> Result> { - let mut cursor = RecordCursor::new(); - cursor.get_value(self, idx) + let record: &'static Self = unsafe { std::mem::transmute(self) }; + self.record_parser.get_value(record, idx) } pub fn get_value_opt<'a>(&'a self, idx: usize) -> Option> { @@ -1149,18 +1251,8 @@ impl ImmutableRecord { return None; } - let mut cursor = RecordCursor::new(); - - match cursor.ensure_parsed_upto(self, idx) { - Ok(()) => { - if idx >= cursor.serial_types.len() { - return None; - } - - cursor.deserialize_column(self, idx).ok() - } - Err(_) => None, - } + let record: &'static Self = unsafe { std::mem::transmute(self) }; + self.record_parser.get_value(record, idx).ok() } pub fn column_count(&self) -> usize {