Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
#include <sys/time.h>

#include "arrow.h"
#include "kernel_utils.h"
#include "read_table.h"
#include "schema.h"
#include "kernel_utils.h"

// Print the content of a selection vector if `VERBOSE` is defined in read_table.h
void print_selection_vector(const char* indent, const KernelBoolSlice* selection_vec)
Expand Down Expand Up @@ -55,7 +55,11 @@ void scan_row_callback(
{
(void)size; // not using this at the moment
struct EngineContext* context = engine_context;
print_diag("Called back to read file: %.*s. (size: %" PRIu64 ", num records: ", (int)path.len, path.ptr, size);
print_diag(
"Called back to read file: %.*s. (size: %" PRIu64 ", num records: ",
(int)path.len,
path.ptr,
size);
if (stats) {
print_diag("%" PRId64 ")\n", stats->num_records);
} else {
Expand Down Expand Up @@ -85,20 +89,27 @@ void scan_row_callback(

// For each chunk of scan data (which may contain multiple files to scan), kernel will call this
// function (named do_visit_scan_data to avoid conflict with visit_scan_data exported by kernel)
void do_visit_scan_data(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec,
const CTransforms* transforms)
void do_visit_scan_data(void* engine_context, HandleCScanData scan_data)
{
print_diag("\nScan iterator found some data to read\n Of this data, here is "
"a selection vector\n");
print_selection_vector(" ", &selection_vec);
struct EngineContext* context = engine_context;

ExternResultKernelBoolSlice selection_vector_res =
selection_vector_from_scan_data(scan_data, context->engine);
if (selection_vector_res.tag != OkKernelBoolSlice) {
printf("Could not get selection vector from kernel\n");
exit(-1);
}
KernelBoolSlice selection_vector = selection_vector_res.ok;
print_selection_vector(" ", &selection_vector);

// TODO! print selection vector
// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
visit_scan_data(scan_data, engine_context, scan_row_callback);
free_bool_slice(selection_vector);
free_scan_data(scan_data);
}

// Called for each element of the partition StringSliceIterator. We just turn the slice into a
Expand Down Expand Up @@ -145,29 +156,29 @@ PartitionList* get_partition_list(SharedSnapshot* snapshot)
return list;
}

void free_partition_list(PartitionList* list) {
void free_partition_list(PartitionList* list)
{
for (uintptr_t i = 0; i < list->len; i++) {
free(list->cols[i]);
}
free(list->cols);
free(list);
}

static const char *LEVEL_STRING[] = {
"ERROR", "WARN", "INFO", "DEBUG", "TRACE"
};
static const char* LEVEL_STRING[] = { "ERROR", "WARN", "INFO", "DEBUG", "TRACE" };

// define some ansi color escapes so we can have nice colored output in our logs
#define RED "\x1b[31m"
#define BLUE "\x1b[34m"
#define DIM "\x1b[2m"
#define RED "\x1b[31m"
#define BLUE "\x1b[34m"
#define DIM "\x1b[2m"
#define RESET "\x1b[0m"

void tracing_callback(struct Event event) {
void tracing_callback(struct Event event)
{
struct timeval tv;
char buffer[32];
gettimeofday(&tv, NULL);
struct tm *tm_info = gmtime(&tv.tv_sec);
struct tm* tm_info = gmtime(&tv.tv_sec);
strftime(buffer, 26, "%Y-%m-%dT%H:%M:%S", tm_info);
char* level_color = event.level < 3 ? RED : BLUE;
printf(
Expand All @@ -186,17 +197,12 @@ void tracing_callback(struct Event event) {
(int)event.message.len,
event.message.ptr);
if (event.file.ptr) {
printf(
" %sat%s %.*s:%i\n",
DIM,
RESET,
(int)event.file.len,
event.file.ptr,
event.line);
printf(" %sat%s %.*s:%i\n", DIM, RESET, (int)event.file.len, event.file.ptr, event.line);
}
}

void log_line_callback(KernelStringSlice line) {
void log_line_callback(KernelStringSlice line)
{
printf("%.*s", (int)line.len, line.ptr);
}

Expand Down
85 changes: 48 additions & 37 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::state::{DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, Error, Expression, ExpressionRef};
Expand All @@ -16,9 +16,9 @@ use crate::expressions::engine::{
};
use crate::expressions::SharedExpression;
use crate::{
kernel_string_slice, AllocateStringFn, ExclusiveEngineData, ExternEngine, ExternResult,
IntoExternResult, KernelBoolSlice, KernelRowIndexArray, KernelStringSlice, NullableCvoid,
SharedExternEngine, SharedSchema, SharedSnapshot, TryFromStringSlice,
kernel_string_slice, AllocateStringFn, ExternEngine, ExternResult, IntoExternResult,
KernelBoolSlice, KernelRowIndexArray, KernelStringSlice, NullableCvoid, SharedExternEngine,
SharedSchema, SharedSnapshot, TryFromStringSlice,
};

use super::handle::Handle;
Expand All @@ -29,6 +29,36 @@ use super::handle::Handle;
#[handle_descriptor(target=Scan, mutable=false, sized=true)]
pub struct SharedScan;

#[handle_descriptor(target=ScanData, mutable=false, sized=true)]
pub struct CScanData;

/// Drop an `CScanData`.
///
/// # Safety
///
/// Caller is responsible for passing a valid scan data handle.
#[no_mangle]
pub unsafe extern "C" fn free_scan_data(scan_data: Handle<CScanData>) {
scan_data.drop_handle();
}

/// Get a selection vector out of a [`CScanData`] struct
///
/// # Safety
/// Engine is responsible for providing valid pointers for each argument
#[no_mangle]
pub unsafe extern "C" fn selection_vector_from_scan_data(
scan_data: Handle<CScanData>,
engine: Handle<SharedExternEngine>,
) -> ExternResult<KernelBoolSlice> {
let scan_data = unsafe { scan_data.as_ref() };
selection_vector_from_scan_data_impl(scan_data).into_extern_result(&engine.as_ref())
}

fn selection_vector_from_scan_data_impl(scan_data: &ScanData) -> DeltaResult<KernelBoolSlice> {
Ok(scan_data.selection_vector().clone().into())
}

/// Drops a scan.
///
/// # Safety
Expand Down Expand Up @@ -176,8 +206,8 @@ fn kernel_scan_data_init_impl(
}

/// Call the provided `engine_visitor` on the next scan data item. The visitor will be provided with
/// a selection vector and engine data. It is the responsibility of the _engine_ to free these when
/// it is finished by calling [`free_bool_slice`] and [`free_engine_data`] respectively.
/// a [`CScanData`]. It is the responsibility of the _engine_ to free these when it is finished
/// by calling [`free_bool_slice`] and [`free_engine_data`] respectively.
///
/// # Safety
///
Expand All @@ -190,12 +220,7 @@ fn kernel_scan_data_init_impl(
pub unsafe extern "C" fn kernel_scan_data_next(
data: Handle<SharedScanDataIterator>,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
engine_visitor: extern "C" fn(engine_context: NullableCvoid, scan_data: Handle<CScanData>),
) -> ExternResult<bool> {
let data = unsafe { data.as_ref() };
kernel_scan_data_next_impl(data, engine_context, engine_visitor)
Expand All @@ -204,21 +229,14 @@ pub unsafe extern "C" fn kernel_scan_data_next(
fn kernel_scan_data_next_impl(
data: &KernelScanDataIterator,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
engine_visitor: extern "C" fn(engine_context: NullableCvoid, scan_data: Handle<CScanData>),
) -> DeltaResult<bool> {
let mut data = data
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
if let Some((data, sel_vec, transforms)) = data.next().transpose()? {
let bool_slice = KernelBoolSlice::from(sel_vec);
let transform_map = CTransforms { transforms };
(engine_visitor)(engine_context, data.into(), bool_slice, &transform_map);
if let Some(scan_data) = data.next().transpose()? {
(engine_visitor)(engine_context, Arc::new(scan_data).into());
Ok(true)
} else {
Ok(false)
Expand Down Expand Up @@ -421,31 +439,24 @@ struct ContextWrapper {
}

/// Shim for ffi to call visit_scan_data. This will generally be called when iterating through scan
/// data which provides the data handle and selection vector as each element in the iterator.
/// data which provides the [`CScanData`] as each element in the iterator.
///
/// # Safety
/// engine is responsible for passing a valid [`ExclusiveEngineData`] and selection vector.
/// engine is responsible for passing a valid [`CScanData`].
#[no_mangle]
pub unsafe extern "C" fn visit_scan_data(
data: Handle<ExclusiveEngineData>,
selection_vec: KernelBoolSlice,
transforms: &CTransforms,
scan_data: Handle<CScanData>,
engine_context: NullableCvoid,
callback: CScanCallback,
) {
let selection_vec = unsafe { selection_vec.as_ref() };
let data = unsafe { data.as_ref() };
let scan_data = unsafe { scan_data.as_ref() };
let context_wrapper = ContextWrapper {
engine_context,
callback,
};

// TODO: return ExternResult to caller instead of panicking?
visit_scan_files(
data,
selection_vec,
&transforms.transforms,
context_wrapper,
rust_callback,
)
.unwrap();
scan_data
.visit_scan_files(context_wrapper, rust_callback)
.unwrap();
}
10 changes: 2 additions & 8 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,8 @@ fn try_main() -> DeltaResult<()> {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let (data, vector, transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
(),
print_scan_file,
)?;
let scan_data = res?;
scan_data.visit_scan_files((), print_scan_file)?;
}
}
Commands::Actions { oldest_first } => {
Expand Down
10 changes: 2 additions & 8 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,8 @@ fn try_main() -> DeltaResult<()> {
drop(record_batch_tx);

for res in scan_data {
let (data, vector, transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
scan_file_tx,
send_scan_file,
)?;
let scan_data = res?;
scan_file_tx = scan_data.visit_scan_files(scan_file_tx, send_scan_file)?;
}

// have sent all scan files, drop this so threads will exit when there's no more work
Expand Down
8 changes: 8 additions & 0 deletions kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ use tracing::debug;

use std::collections::HashMap;

/// Engine data paired with a selection vector that indicates which rows to process.
pub struct FilteredEngineData {
// The underlying engine data
pub data: Box<dyn EngineData>,
// The selection vector where `true` marks rows to include in results
pub selection_vector: Vec<bool>,
}

/// a trait that an engine exposes to give access to a list
pub trait EngineList {
/// Return the length of the list at the specified row_index in the raw data
Expand Down
19 changes: 14 additions & 5 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,11 @@ impl LogReplayScanner {
// TODO: Teach expression eval to respect the selection vector we just computed so carefully!
let selection_vector = visitor.selection_vector;
let result = add_transform.evaluate(actions)?;
Ok((result, selection_vector, visitor.row_transform_exprs))
Ok(ScanData::new(
result,
selection_vector,
visitor.row_transform_exprs,
))
}
}

Expand Down Expand Up @@ -382,7 +386,11 @@ pub(crate) fn scan_action_iter(
is_log_batch,
)
})
.filter(|res| res.as_ref().map_or(true, |(_, sv, _)| sv.contains(&true)))
.filter(|res| {
res.as_ref().map_or(true, |scan_data| {
scan_data.selection_vector().contains(&true)
})
})
}

#[cfg(test)]
Expand Down Expand Up @@ -464,8 +472,8 @@ mod tests {
None,
);
for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
assert!(transforms.is_empty(), "Should have no transforms");
let scan_data = res.unwrap();
assert!(scan_data.transforms.is_empty(), "Should have no transforms");
}
}

Expand Down Expand Up @@ -510,7 +518,8 @@ mod tests {
}

for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
let scan_data = res.unwrap();
let transforms = scan_data.transforms;
// in this case we have a metadata action first and protocol 3rd, so we expect 4 items,
// the first and 3rd being a `None`
assert_eq!(transforms.len(), 4, "Should have 4 transforms");
Expand Down
Loading
Loading