Skip to content

Commit

Permalink
Merge branch 'main' into json_handler_docs
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db authored Jan 14, 2025
2 parents 8b11c35 + 12020d8 commit 34f7a90
Show file tree
Hide file tree
Showing 29 changed files with 51 additions and 51 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
.idea/
.vscode/
.vim
.zed

# Rust
.cargo/
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Delta-kernel-rs is split into a few different crates:
- kernel: The actual core kernel crate
- acceptance: Acceptance tests that validate correctness via the [Delta Acceptance Tests][dat]
- derive-macros: A crate for our [derive-macros] to live in
- ffi: Functionallity that enables delta-kernel-rs to be used from `C` or `C++` See the [ffi](ffi)
- ffi: Functionality that enables delta-kernel-rs to be used from `C` or `C++` See the [ffi](ffi)
directory for more information.

## Building
Expand Down Expand Up @@ -66,12 +66,12 @@ are still unstable. We therefore may break APIs within minor releases (that is,
we will not break APIs in patch releases (`0.1.0` -> `0.1.1`).

## Arrow versioning
If you enable the `default-engine` or `sync-engine` features, you get an implemenation of the
If you enable the `default-engine` or `sync-engine` features, you get an implementation of the
`Engine` trait that uses [Arrow] as its data format.

The [`arrow crate`](https://docs.rs/arrow/latest/arrow/) tends to release new major versions rather
quickly. To enable engines that already integrate arrow to also integrate kernel and not force them
to track a specific version of arrow that kernel depends on, we take as broad dependecy on arrow
to track a specific version of arrow that kernel depends on, we take as broad dependency on arrow
versions as we can.

This means you can force kernel to rely on the specific arrow version that your engine already uses,
Expand All @@ -96,7 +96,7 @@ arrow-schema = "53.0"
parquet = "53.0"
```

Note that unfortunatly patching in `cargo` requires that _exactly one_ version matches your
Note that unfortunately patching in `cargo` requires that _exactly one_ version matches your
specification. If only arrow "53.0.0" had been released the above will work, but if "53.0.1" where
to be released, the specification will break and you will need to provide a more restrictive
specification like `"=53.0.0"`.
Expand All @@ -111,7 +111,7 @@ and then checking what version of `object_store` it depends on.
## Documentation

- [API Docs](https://docs.rs/delta_kernel/latest/delta_kernel/)
- [arcitecture.md](doc/architecture.md) document describing the kernel architecture (currently wip)
- [architecture.md](doc/architecture.md) document describing the kernel architecture (currently wip)

## Examples

Expand Down
4 changes: 2 additions & 2 deletions ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ static GArrowRecordBatch* add_partition_columns(
}

GArrowArray* partition_col = garrow_array_builder_finish((GArrowArrayBuilder*)builder, &error);
if (report_g_error("Can't build string array for parition column", error)) {
if (report_g_error("Can't build string array for partition column", error)) {
printf("Giving up on column %s\n", col);
g_error_free(error);
g_object_unref(builder);
Expand Down Expand Up @@ -144,7 +144,7 @@ static void add_batch_to_context(
}
record_batch = add_partition_columns(record_batch, partition_cols, partition_values);
if (record_batch == NULL) {
printf("Failed to add parition columns, not adding batch\n");
printf("Failed to add partition columns, not adding batch\n");
return;
}
context->batches = g_list_append(context->batches, record_batch);
Expand Down
2 changes: 1 addition & 1 deletion ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void print_partition_info(struct EngineContext* context, const CStringMap* parti
}

// Kernel will call this function for each file that should be scanned. The arguments include enough
// context to constuct the correct logical data from the physically read parquet
// context to construct the correct logical data from the physically read parquet
void scan_row_callback(
void* engine_context,
KernelStringSlice path,
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Drop for FileReadResultIterator {
}
}

/// Call the engine back with the next `EngingeData` batch read by Parquet/Json handler. The
/// Call the engine back with the next `EngineData` batch read by Parquet/Json handler. The
/// _engine_ "owns" the data that is passed into the `engine_visitor`, since it is allocated by the
/// `Engine` being used for log-replay. If the engine wants the kernel to free this data, it _must_
/// call [`free_engine_data`] on it.
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/expressions/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct EngineExpressionVisitor {
/// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`.
/// The timestamp is microsecond precision with no timezone.
pub visit_literal_timestamp_ntz: VisitLiteralFn<i64>,
/// Visit a 32bit intger `date` representing days since UNIX epoch 1970-01-01. The `date` belongs
/// Visit a 32bit integer `date` representing days since UNIX epoch 1970-01-01. The `date` belongs
/// to the list identified by `sibling_list_id`.
pub visit_literal_date: VisitLiteralFn<i32>,
/// Visit binary data at the `buffer` with length `len` belonging to the list identified by
Expand Down
4 changes: 2 additions & 2 deletions ffi/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//! boundary.
//!
//! Creating a [`Handle<T>`] always implies some kind of ownership transfer. A mutable handle takes
//! ownership of the object itself (analagous to [`Box<T>`]), while a non-mutable (shared) handle
//! takes ownership of a shared reference to the object (analagous to [`std::sync::Arc<T>`]). Thus, a created
//! ownership of the object itself (analogous to [`Box<T>`]), while a non-mutable (shared) handle
//! takes ownership of a shared reference to the object (analogous to [`std::sync::Arc<T>`]). Thus, a created
//! handle remains [valid][Handle#Validity], and its underlying object remains accessible, until the
//! handle is explicitly dropped or consumed. Dropping a mutable handle always drops the underlying
//! object as well; dropping a shared handle only drops the underlying object if the handle was the
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ struct ContextWrapper {
/// data which provides the data handle and selection vector as each element in the iterator.
///
/// # Safety
/// engine is responsbile for passing a valid [`ExclusiveEngineData`] and selection vector.
/// engine is responsible for passing a valid [`ExclusiveEngineData`] and selection vector.
#[no_mangle]
pub unsafe extern "C" fn visit_scan_data(
data: Handle<ExclusiveEngineData>,
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/test_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::{
/// output expression can be found in `ffi/tests/test_expression_visitor/expected.txt`.
///
/// # Safety
/// The caller is responsible for freeing the retured memory, either by calling
/// The caller is responsible for freeing the returned memory, either by calling
/// [`free_kernel_predicate`], or [`Handle::drop_handle`]
#[no_mangle]
pub unsafe extern "C" fn get_testing_kernel_expression() -> Handle<SharedExpression> {
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ fn create_kernel_schema() -> delta_kernel::schema::Schema {
fn main() {
let arrow_schema = create_arrow_schema();
let kernel_schema = create_kernel_schema();
let convereted: delta_kernel::schema::Schema =
let converted: delta_kernel::schema::Schema =
delta_kernel::schema::Schema::try_from(&arrow_schema).expect("couldn't convert");
assert!(kernel_schema == convereted);
assert!(kernel_schema == converted);
println!("Okay, made it");
}
2 changes: 1 addition & 1 deletion kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ fn print_scan_file(
fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();

// build a table and get the lastest snapshot from it
// build a table and get the latest snapshot from it
let table = Table::try_from_uri(&cli.path)?;

let engine = DefaultEngine::try_new(
Expand Down
6 changes: 3 additions & 3 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn truncate_batch(batch: RecordBatch, rows: usize) -> RecordBatch {
RecordBatch::try_new(batch.schema(), cols).unwrap()
}

// This is the callback that will be called fo each valid scan row
// This is the callback that will be called for each valid scan row
fn send_scan_file(
scan_tx: &mut spmc::Sender<ScanFile>,
path: &str,
Expand All @@ -125,7 +125,7 @@ fn send_scan_file(
fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();

// build a table and get the lastest snapshot from it
// build a table and get the latest snapshot from it
let table = Table::try_from_uri(&cli.path)?;
println!("Reading {}", table.location());

Expand Down Expand Up @@ -279,7 +279,7 @@ fn do_work(

// this example uses the parquet_handler from the engine, but an engine could
// choose to use whatever method it might want to read a parquet file. The reader
// could, for example, fill in the parition columns, or apply deletion vectors. Here
// could, for example, fill in the partition columns, or apply deletion vectors. Here
// we assume a more naive parquet reader and fix the data up after the fact.
// further parallelism would also be possible here as we could read the parquet file
// in chunks where each thread reads one chunk. The engine would need to ensure
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn main() -> ExitCode {
fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();

// build a table and get the lastest snapshot from it
// build a table and get the latest snapshot from it
let table = Table::try_from_uri(&cli.path)?;
println!("Reading {}", table.location());

Expand Down
2 changes: 1 addition & 1 deletion kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ impl RowVisitor for CdcVisitor {

pub type SetTransactionMap = HashMap<String, SetTransaction>;

/// Extact application transaction actions from the log into a map
/// Extract application transaction actions from the log into a map
///
/// This visitor maintains the first entry for each application id it
/// encounters. When a specific application id is required then
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ macro_rules! prim_array_cmp {

pub(crate) use prim_array_cmp;

/// Get the indicies in `parquet_schema` of the specified columns in `requested_schema`. This
/// returns a tuples of (mask_indicies: Vec<parquet_schema_index>, reorder_indicies:
/// Vec<requested_index>). `mask_indicies` is used for generating the mask for reading from the
/// Get the indices in `parquet_schema` of the specified columns in `requested_schema`. This
/// returns a tuples of (mask_indices: Vec<parquet_schema_index>, reorder_indices:
/// Vec<requested_index>). `mask_indices` is used for generating the mask for reading from the
pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error {
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s.into())).with_backtrace()
}
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct DefaultJsonHandler<E: TaskExecutor> {
store: Arc<DynObjectStore>,
/// The executor to run async tasks on
task_executor: Arc<E>,
/// The maximun number of batches to read ahead
/// The maximum number of batches to read ahead
readahead: usize,
/// The number of rows to read per batch
batch_size: usize,
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl FileOpener for ParquetOpener {
let mut reader = ParquetObjectReader::new(store, meta);
let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?;
let parquet_schema = metadata.schema();
let (indicies, requested_ordering) =
let (indices, requested_ordering) =
get_requested_indices(&table_schema, parquet_schema)?;
let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index);
let mut builder =
Expand All @@ -267,7 +267,7 @@ impl FileOpener for ParquetOpener {
&table_schema,
parquet_schema,
builder.parquet_schema(),
&indicies,
&indices,
) {
builder = builder.with_projection(mask)
}
Expand Down Expand Up @@ -330,7 +330,7 @@ impl FileOpener for PresignedUrlOpener {
let reader = client.get(file_meta.location).send().await?.bytes().await?;
let metadata = ArrowReaderMetadata::load(&reader, Default::default())?;
let parquet_schema = metadata.schema();
let (indicies, requested_ordering) =
let (indices, requested_ordering) =
get_requested_indices(&table_schema, parquet_schema)?;

let options = ArrowReaderOptions::new();
Expand All @@ -340,7 +340,7 @@ impl FileOpener for PresignedUrlOpener {
&table_schema,
parquet_schema,
builder.parquet_schema(),
&indicies,
&indices,
) {
builder = builder.with_projection(mask)
}
Expand Down
5 changes: 2 additions & 3 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ fn try_create_from_parquet(
let metadata = ArrowReaderMetadata::load(&file, Default::default())?;
let parquet_schema = metadata.schema();
let mut builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let (indicies, requested_ordering) = get_requested_indices(&schema, parquet_schema)?;
if let Some(mask) = generate_mask(&schema, parquet_schema, builder.parquet_schema(), &indicies)
{
let (indices, requested_ordering) = get_requested_indices(&schema, parquet_schema)?;
if let Some(mask) = generate_mask(&schema, parquet_schema, builder.parquet_schema(), &indices) {
builder = builder.with_projection(mask);
}
if let Some(predicate) = predicate {
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub trait RowVisitor {
/// "getter" of type [`GetData`] will be present. This can be used to actually get at the data
/// for each row. You can `use` the `TypedGetData` trait if you want to have a way to extract
/// typed data that will fail if the "getter" is for an unexpected type. The data in `getters`
/// does not outlive the call to this funtion (i.e. it should be copied if needed).
/// does not outlive the call to this function (i.e. it should be copied if needed).
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()>;

/// Visit the rows of an [`EngineData`], selecting the leaf column names given by
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Defintions of errors that the delta kernel can encounter
//! Definitions of errors that the delta kernel can encounter
use std::{
backtrace::{Backtrace, BacktraceStatus},
Expand Down Expand Up @@ -58,7 +58,7 @@ pub enum Error {
#[error("Internal error {0}. This is a kernel bug, please report.")]
InternalError(String),

/// An error enountered while working with parquet data
/// An error encountered while working with parquet data
#[cfg(feature = "parquet")]
#[error("Arrow error: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
Expand Down Expand Up @@ -99,7 +99,7 @@ pub enum Error {
#[error("No table version found.")]
MissingVersion,

/// An error occured while working with deletion vectors
/// An error occurred while working with deletion vectors
#[error("Deletion Vector error: {0}")]
DeletionVector(String),

Expand Down
2 changes: 1 addition & 1 deletion kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ mod tests {
),
]);

// Similer to ExpressionDepthChecker::check, but also returns call count
// Similar to ExpressionDepthChecker::check, but also returns call count
let check_with_call_count =
|depth_limit| ExpressionDepthChecker::check_with_call_count(&expr, depth_limit);

Expand Down
2 changes: 1 addition & 1 deletion kernel/src/expressions/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl PrimitiveType {
// Timestamps may additionally be encoded as a ISO 8601 formatted string such as
// `1970-01-01T00:00:00.123456Z`.
//
// The difference arrises mostly in how they are to be handled on the engine side - i.e. timestampNTZ
// The difference arises mostly in how they are to be handled on the engine side - i.e. timestampNTZ
// is not adjusted to UTC, this is just so we can (de-)serialize it as a date sting.
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
TimestampNtz | Timestamp => {
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl FileMeta {
/// let b: Arc<Bar> = a.downcast().unwrap();
/// ```
///
/// In contrast, very similer code that relies only on `Any` would fail to compile:
/// In contrast, very similar code that relies only on `Any` would fail to compile:
///
/// ```fail_compile
/// # use std::any::Any;
Expand Down Expand Up @@ -416,7 +416,7 @@ pub trait JsonHandler: AsAny {
///
/// - `path` - URL specifying the location to write the JSON file
/// - `data` - Iterator of EngineData to write to the JSON file. Each row should be written as
/// a new JSON object appended to the file. (that is, the file is newline-delimeted JSON, and
/// a new JSON object appended to the file. (that is, the file is newline-delimited JSON, and
/// each row is a JSON object on a single line)
/// - `overwrite` - If true, overwrite the file if it exists. If false, the call must fail if
/// the file exists.
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/predicates/parquet_stats_skipping/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ fn test_sql_where() {
"WHERE {TRUE} < {FALSE}"
);

// Constrast normal vs SQL WHERE semantics - comparison
// Contrast normal vs SQL WHERE semantics - comparison
expect_eq!(
AllNullTestFilter.eval_expr(&Expr::lt(col.clone(), VAL), false),
None,
Expand All @@ -321,7 +321,7 @@ fn test_sql_where() {
"WHERE {VAL} < {col}"
);

// Constrast normal vs SQL WHERE semantics - comparison inside AND
// Contrast normal vs SQL WHERE semantics - comparison inside AND
expect_eq!(
AllNullTestFilter.eval_expr(&Expr::and(NULL, Expr::lt(col.clone(), VAL)), false),
None,
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/predicates/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn test_default_eval_scalar() {
}
}

// verifies that partial orderings behave as excpected for all Scalar types
// verifies that partial orderings behave as expected for all Scalar types
#[test]
fn test_default_partial_cmp_scalars() {
use Ordering::*;
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod tests;
/// Returns `None` if the predicate is not eligible for data skipping.
///
/// We normalize each binary operation to a comparison between a column and a literal value and
/// rewite that in terms of the min/max values of the column.
/// rewrite that in terms of the min/max values of the column.
/// For example, `1 < a` is rewritten as `minValues.a > 1`.
///
/// For Unary `Not`, we push the Not down using De Morgan's Laws to invert everything below the Not.
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ mod tests {
),
]);

// Similer to SchemaDepthChecker::check, but also returns call count
// Similar to SchemaDepthChecker::check, but also returns call count
let check_with_call_count =
|depth_limit| SchemaDepthChecker::check_with_call_count(&schema, depth_limit);

Expand Down
2 changes: 1 addition & 1 deletion kernel/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl WriteContext {
/// Result after committing a transaction. If 'committed', the version is the new version written
/// to the log. If 'conflict', the transaction is returned so the caller can resolve the conflict
/// (along with the version which conflicted).
// TODO(zach): in order to make the returning of a transcation useful, we need to add APIs to
// TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to
// update the transaction to a new version etc.
#[derive(Debug)]
pub enum CommitResult {
Expand Down
Loading

0 comments on commit 34f7a90

Please sign in to comment.