Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 149 additions & 57 deletions core/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use either::Either;
use parking_lot::Mutex;
use rustc_hash::{FxBuildHasher, FxHashMap};
#[cfg(feature = "serde")]
use serde::Deserialize;
use turso_ext::{AggCtx, FinalizeFunction, StepFunction};
Expand All @@ -19,6 +21,7 @@ use crate::vdbe::Register;
use crate::vtab::VirtualTableCursor;
use crate::{Completion, CompletionError, Result, IO};
use std::borrow::{Borrow, Cow};
use std::collections::hash_map::Entry;
use std::fmt::{Debug, Display};
use std::iter::Peekable;
use std::ops::Deref;
Expand Down Expand Up @@ -53,7 +56,7 @@ impl Display for ValueType {
}
}

#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum TextSubtype {
Text,
Expand Down Expand Up @@ -94,7 +97,7 @@ impl Text {
}
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Hash)]
pub struct TextRef<'a> {
pub value: &'a str,
pub subtype: TextSubtype,
Expand Down Expand Up @@ -258,6 +261,19 @@ pub enum ValueRef<'a> {
Blob(&'a [u8]),
}

impl std::hash::Hash for ValueRef<'_> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
core::mem::discriminant(self).hash(state);
match self {
ValueRef::Null => {}
ValueRef::Integer(i) => i.hash(state),
ValueRef::Float(f) => (f.to_bits()).hash(state),
ValueRef::Text(text_ref) => text_ref.hash(state),
ValueRef::Blob(items) => items.hash(state),
}
}
}

impl Debug for ValueRef<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -873,18 +889,111 @@ impl<'a> TryFrom<ValueRef<'a>> for &'a str {
}
}

#[derive(Debug)]
struct RecordParserInner<'a> {
values: FxHashMap<usize, ValueRef<'a>>,
cursor: RecordCursor,
}

impl<'a> RecordParserInner<'a> {
fn new() -> Self {
Self {
values: FxHashMap::with_capacity_and_hasher(0, FxBuildHasher),
cursor: RecordCursor::new(),
}
}

fn with_capacity(len: usize) -> Self {
Self {
values: FxHashMap::with_capacity_and_hasher(len, FxBuildHasher),
cursor: RecordCursor::new(),
}
}

fn clear(&mut self) {
self.values.clear();
self.cursor.invalidate();
}

fn get_value(&mut self, record: &'a ImmutableRecord, idx: usize) -> Result<ValueRef<'a>> {
let val = match self.values.entry(idx) {
Entry::Occupied(entry) => *entry.get(),
Entry::Vacant(entry) => {
let val = self.cursor.get_value(record, idx)?;
entry.insert(val);
val
}
};
Ok(val)
}

fn get_values(
&mut self,
record: &'a ImmutableRecord,
) -> Result<Peekable<impl ExactSizeIterator<Item = Result<ValueRef<'a>>> + use<'a, '_>>> {
self.cursor.parse_full_header(record)?;
// Assumes the entire header is parsed
Ok((0..(self.cursor.serial_types.len()))
.map(|idx| self.get_value(record, idx))
.peekable())
}
}

#[derive(Debug)]
struct RecordParser<'a> {
inner: Mutex<RecordParserInner<'a>>,
}

impl<'a> Clone for RecordParser<'a> {
fn clone(&self) -> Self {
Self::new()
}
}

impl<'a> RecordParser<'a> {
fn new() -> Self {
Self {
inner: Mutex::new(RecordParserInner::new()),
}
}

fn with_capacity(len: usize) -> Self {
Self {
inner: Mutex::new(RecordParserInner::with_capacity(len)),
}
}

fn clear(&mut self) {
self.inner.get_mut().clear();
}

fn get_value(&self, record: &'a ImmutableRecord, idx: usize) -> Result<ValueRef<'a>> {
self.inner.lock().get_value(record, idx)
}

fn get_values(&self, record: &'a ImmutableRecord) -> Vec<ValueRef<'a>> {
let mut cursor = self.inner.lock();
cursor.get_values(record).map_or(Vec::new(), |val| {
val.collect::<Result<Vec<_>>>().unwrap_or_default()
})
}
}

/// This struct serves the purpose of not allocating multiple vectors of bytes if not needed.
/// A value in a record that has already been serialized can stay serialized and what this struct offsers
/// is easy acces to each value which point to the payload.
/// The name might be contradictory as it is immutable in the sense that you cannot modify the values without modifying the payload.
#[derive(Clone, Eq, Ord, PartialEq, PartialOrd)]
#[derive(Clone)]
pub struct ImmutableRecord {
// We have to be super careful with this buffer since we make values point to the payload we need to take care reallocations
// happen in a controlled manner. If we realocate with values that should be correct, they will now point to undefined data.
// We don't use pin here because it would make it imposible to reuse the buffer if we need to push a new record in the same struct.
//
// payload is the Vec<u8> but in order to use Register which holds ImmutableRecord as a Value - we store Vec<u8> as Value::Blob
payload: Value,
/// This is a self referential borrow from the values inside the Payload.
/// Always be sure to invalidate this when starting a new serialization or invalidating
record_parser: RecordParser<'static>,
}

impl std::fmt::Debug for ImmutableRecord {
Expand Down Expand Up @@ -912,41 +1021,31 @@ impl std::fmt::Debug for ImmutableRecord {
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Record {
values: Vec<Value>,
}

impl Record {
// pub fn get<'a, T: FromValue<'a> + 'a>(&'a self, idx: usize) -> Result<T> {
// let value = &self.values[idx];
// T::from_value(value)
// }

pub fn count(&self) -> usize {
self.values.len()
}

pub fn last_value(&self) -> Option<&Value> {
self.values.last()
impl PartialEq for ImmutableRecord {
fn eq(&self, other: &Self) -> bool {
self.payload == other.payload
}
}

pub fn get_values(&self) -> &Vec<Value> {
&self.values
}
impl Eq for ImmutableRecord {}

pub fn get_value(&self, idx: usize) -> &Value {
&self.values[idx]
impl Ord for ImmutableRecord {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.payload.cmp(&other.payload)
}
}

pub fn len(&self) -> usize {
self.values.len()
impl PartialOrd for ImmutableRecord {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.payload.partial_cmp(&other.payload)
}
}

pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Record {
values: Vec<Value>,
}

struct AppendWriter<'a> {
buf: &'a mut Vec<u8>,
pos: usize,
Expand Down Expand Up @@ -983,27 +1082,17 @@ impl ImmutableRecord {
pub fn new(payload_capacity: usize) -> Self {
Self {
payload: Value::Blob(Vec::with_capacity(payload_capacity)),
record_parser: RecordParser::new(),
}
}

pub fn from_bin_record(payload: Vec<u8>) -> Self {
Self {
payload: Value::Blob(payload),
record_parser: RecordParser::new(),
}
}

// TODO: inline the complete record parsing code here.
// Its probably more efficient.
// fixme(pedrocarlo): this function is very inneficient and kind of misleading because
// it always deserializes the columns
pub fn get_values<'a>(&'a self) -> Vec<ValueRef<'a>> {
let mut cursor = RecordCursor::new();
cursor
.get_values(self)
.collect::<Result<Vec<_>>>()
.unwrap_or_default()
}

pub fn from_registers<'a, I: Iterator<Item = &'a Register> + Clone>(
// we need to accept both &[Register] and &[&Register] values - that's why non-trivial signature
//
Expand Down Expand Up @@ -1086,6 +1175,7 @@ impl ImmutableRecord {
writer.assert_finish_capacity();
Self {
payload: Value::Blob(buf),
record_parser: RecordParser::with_capacity(len),
}
}

Expand All @@ -1096,7 +1186,7 @@ impl ImmutableRecord {
}
}

pub fn as_blob_mut(&mut self) -> &mut Vec<u8> {
fn as_blob_mut(&mut self) -> &mut Vec<u8> {
match &mut self.payload {
Value::Blob(b) => b,
_ => panic!("payload must be a blob"),
Expand All @@ -1108,10 +1198,12 @@ impl ImmutableRecord {
}

pub fn start_serialization(&mut self, payload: &[u8]) {
self.record_parser.clear();
self.as_blob_mut().extend_from_slice(payload);
}

pub fn invalidate(&mut self) {
self.record_parser.clear();
self.as_blob_mut().clear();
}

Expand Down Expand Up @@ -1139,28 +1231,28 @@ impl ImmutableRecord {
Some(record_cursor.get_value(self, last_idx))
}

// TODO: inline the complete record parsing code here.
// Its probably more efficient.
// fixme(pedrocarlo): this function is very inneficient and kind of misleading because
// it always deserializes the columns
pub fn get_values<'a>(&'a self) -> Vec<ValueRef<'a>> {
let record: &'static Self = unsafe { std::mem::transmute(self) };
let values: Vec<ValueRef<'a>> = self.record_parser.get_values(record);
values
}

pub fn get_value<'a>(&'a self, idx: usize) -> Result<ValueRef<'a>> {
let mut cursor = RecordCursor::new();
cursor.get_value(self, idx)
let record: &'static Self = unsafe { std::mem::transmute(self) };
self.record_parser.get_value(record, idx)
}

pub fn get_value_opt<'a>(&'a self, idx: usize) -> Option<ValueRef<'a>> {
if self.is_invalidated() {
return None;
}

let mut cursor = RecordCursor::new();

match cursor.ensure_parsed_upto(self, idx) {
Ok(()) => {
if idx >= cursor.serial_types.len() {
return None;
}

cursor.deserialize_column(self, idx).ok()
}
Err(_) => None,
}
let record: &'static Self = unsafe { std::mem::transmute(self) };
self.record_parser.get_value(record, idx).ok()
}

pub fn column_count(&self) -> usize {
Expand Down
Loading