Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimized json parsing #118

Closed
wants to merge 1 commit into from
Closed

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Feb 15, 2024

Picking up the discussion where we left off around json parsing.

This PR tries to leverage the optimization in Arrow json parsing at the cost of having more complex code that is a bit harder to reason about.

@roeap roeap force-pushed the engine-json branch 2 times, most recently from 4dfc123 to ea165ed Compare February 15, 2024 19:35
}
decoder.flush()
};
std::iter::from_fn(move || next().transpose()).collect::<Result<Vec<_>, _>>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is directly the return value, is it possible that collect could Just Work?

Suggested change
std::iter::from_fn(move || next().transpose()).collect::<Result<Vec<_>, _>>()
std::iter::from_fn(move || next().transpose()).collect()

(it seems to work in the playground, at least)

mark = it;
}
}
batches.extend(emit_run(json_strings.len() - mark)?);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth a comment to explain: There is always a run in progress, because the loop only emits the previous run after it detects that a new run has started. Corollary: If the batch only contains one run, this will be the only emit_run call.

}
};

for it in 1..json_strings.len() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's not an iterator...

Suggested change
for it in 1..json_strings.len() {
for index in 1..json_strings.len() {

let result_schema = output_schema.clone();
let mut emit_run = move |run_len: usize| {
if run_is_null {
get_nulls(run_len, output_schema.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only call site for get_nulls. Should we delete the function and inline the logic here? Especially because new_null_array calls ArrayData::new_null, which is nearly identical to this code here. So we can probably simplify it down to just:

Suggested change
get_nulls(run_len, output_schema.clone())
let nulls = ArrayData::new_null(schema.into(), run_len);
Ok(vec![StructArray::from(nulls).into()])

(that's a combination of SchemaRef into DataType and StructArray from ArrayData into RecordBatch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Erm, I totally misread the From trait chains involving Schema and DataType... let me try that again:

Suggested change
get_nulls(run_len, output_schema.clone())
let nulls = ArrayData::new_null(DataType::Struct(schema.fields), run_len)
Ok(vec![StructArray::from(nulls).into()])

https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Struct

Comment on lines +134 to +135
let batch = read_from_json(get_reader(slice.value_data()), &mut decoder).unwrap();
Ok(batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace that unwrap call with a map_err to produce a compatible error type?

read_from_json(get_reader(slice.value_data()), &mut decoder).map_err(|e| {
  ...
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things we need to figure out before we can trust json parsing in runs like this (as opposed to parsing each row's json record individually):

  1. If the JSON is truly invalid (malformed, type conflict, etc), is that a hard error that fails the query? Or would it be preferable to treat it as NULL (e.g. disabling data skipping for a file after failing to parse its invalid add.stats)?
    • We don't know how often invalid stats occur in the wild, because spark JSON parsing is "permissive" (= errors become nulls) unless specifically configured with mode=FAILFAST option (see Databricks docs because Spark docs are woefully incomplete here). Delta spark currently relies on the defaults when parsing stats.
    • Note: The Delta spec doesn't say anything about how clients should handle invalid stats.
  2. Because stats are a JSON object literal embedded in add.stats string field, there are multiple ways to encode missing stats:
    • Column just plain missing: { "add": { } }
    • JSON null: { "add": { "stats": null } }
    • JSON object literal for an empty object: { "add": { "stats": "\{\}" } }
    • JSON literal for a JSON null (scary case): { "add": { "stats": "null" } }
      • A run of these would cause parsing failure due to invalid an JSON such as nullnullnull
    • Empty string (scary and arguably illegal case): { "add": { "stats": "" } }
      • The parser would find no object at all, and would consume the next object in the buffer instead, producing both wrong offsets and wrong number of stats.
      • I don't know if this "should" count as NULL, or error, but either way it would mess up the run-based JSON parsing proposed here.

(adding as a file comment because github has really poor handling of top-level PR comments)

@nicklan nicklan closed this Mar 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants