Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> {
projected,
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store),
false,
);

let scan_config = FileScanConfigBuilder::new(
Expand Down
16 changes: 16 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3065,6 +3065,22 @@ config_namespace! {
/// If not specified, the default level for the compression algorithm is used.
pub compression_level: Option<u32>, default = None
pub schema_infer_max_rec: Option<usize>, default = None
/// The format of JSON input files.
///
/// When `false` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `true`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
pub format_array: bool, default = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think format_array will be hard to discover / find and we should call this parameter something more standard.

I looked at what other systems did and there is no consistency.

I reviewed Spark's doc and they seem to use 'multiLine =truefor what you have labelledformat_array`
https://spark.apache.org/docs/latest/sql-data-sources-json.html

Duckdb seems to call it format=newline_delimited: https://duckdb.org/docs/stable/data/json/loading_json#parameters

postgres seems to have two separate functions row_to_json and array_to_json
https://www.postgresql.org/docs/9.5/functions-json.html

I think I prefer the duckdb style newline_delimited of the options, though maybe the spark multiline would be more widely understood

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it would be better to use an enum here, e.g. JSON_FORMAT {NDJSON, ARRAY}.
It will be more clear than true/false and also easier to extend with a third, fourth, ... formats later

}
}

Expand Down
273 changes: 273 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod tests {
use datafusion_common::stats::Precision;

use datafusion_common::Result;
use datafusion_datasource::file_compression_type::FileCompressionType;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
Expand Down Expand Up @@ -391,4 +392,276 @@ mod tests {
assert_eq!(metadata.len(), 0);
Ok(())
}

#[tokio::test]
async fn test_json_array_format() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;

// Create a temporary file with JSON array format
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
std::fs::write(
&path,
r#"[
{"a": 1, "b": 2.0, "c": true},
{"a": 2, "b": 3.5, "c": false},
{"a": 3, "b": 4.0, "c": true}
]"#,
)?;

Comment on lines +398 to +413
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this standard preamble is could be reduced so there were fewer test lines (and thus it was easier to veriy what was being tested)

For example, it looks like you maybe could make a function like

let file_schema = create_json_with_format({..}", format);

I bet the tests would be less than half the size

// Test with format_array = true
let format = JsonFormat::default().with_format_array(true);
let file_schema = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await
.expect("Schema inference");

let fields = file_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);

Ok(())
}

#[tokio::test]
async fn test_json_array_format_empty() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy());
std::fs::write(&path, "[]")?;

let format = JsonFormat::default().with_format_array(true);
let result = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await;

assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("JSON array is empty")
);

Ok(())
}

#[tokio::test]
async fn test_json_array_format_with_limit() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array_limit.json", tmp_dir.path().to_string_lossy());
std::fs::write(
&path,
r#"[
{"a": 1},
{"a": 2, "b": "extra"}
]"#,
)?;

// Only infer from first record
let format = JsonFormat::default()
.with_format_array(true)
.with_schema_infer_max_rec(1);

let file_schema = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await
.expect("Schema inference");

// Should only have field "a" since we limited to 1 record
let fields = file_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(vec!["a: Int64"], fields);

Ok(())
}

#[tokio::test]
async fn test_json_array_format_read_data() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let task_ctx = ctx.task_ctx();
let store = Arc::new(LocalFileSystem::new()) as _;

// Create a temporary file with JSON array format
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
std::fs::write(
&path,
r#"[
{"a": 1, "b": 2.0, "c": true},
{"a": 2, "b": 3.5, "c": false},
{"a": 3, "b": 4.0, "c": true}
]"#,
)?;

let format = JsonFormat::default().with_format_array(true);

// Infer schema
let file_schema = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await?;

// Scan and read data
let exec = scan_format(
&ctx,
&format,
Some(file_schema),
tmp_dir.path().to_str().unwrap(),
"array.json",
None,
None,
)
.await?;
let batches = collect(exec, task_ctx).await?;

assert_eq!(1, batches.len());
assert_eq!(3, batches[0].num_columns());
assert_eq!(3, batches[0].num_rows());

// Verify data
let array_a = as_int64_array(batches[0].column(0))?;
assert_eq!(
vec![1, 2, 3],
(0..3).map(|i| array_a.value(i)).collect::<Vec<_>>()
);

Ok(())
}

#[tokio::test]
async fn test_json_array_format_with_projection() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let task_ctx = ctx.task_ctx();
let store = Arc::new(LocalFileSystem::new()) as _;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?;

let format = JsonFormat::default().with_format_array(true);
let file_schema = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await?;

// Project only column "a"
let exec = scan_format(
&ctx,
&format,
Some(file_schema),
tmp_dir.path().to_str().unwrap(),
"array.json",
Some(vec![0]),
None,
)
.await?;
let batches = collect(exec, task_ctx).await?;

assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns()); // Only 1 column projected
assert_eq!(2, batches[0].num_rows());

Ok(())
}

#[tokio::test]
async fn test_ndjson_read_options_format_array() -> Result<()> {
let ctx = SessionContext::new();

// Create a temporary file with JSON array format
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
std::fs::write(
&path,
r#"[
{"a": 1, "b": "hello"},
{"a": 2, "b": "world"},
{"a": 3, "b": "test"}
]"#,
)?;

// Use NdJsonReadOptions with format_array = true
let options = NdJsonReadOptions::default().format_array(true);

ctx.register_json("json_array_table", &path, options)
.await?;

let result = ctx
.sql("SELECT a, b FROM json_array_table ORDER BY a")
.await?
.collect()
.await?;

assert_snapshot!(batches_to_string(&result), @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
| 3 | test |
+---+-------+
");

Ok(())
}

#[tokio::test]
async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;

let ctx = SessionContext::new();

// Create a temporary gzip compressed JSON array file
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());

let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#;
let file = std::fs::File::create(&path)?;
let mut encoder = GzEncoder::new(file, Compression::default());
encoder.write_all(json_content.as_bytes())?;
encoder.finish()?;

// Use NdJsonReadOptions with format_array and GZIP compression
let options = NdJsonReadOptions::default()
.format_array(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension(".json.gz");

ctx.register_json("json_array_gzip", &path, options).await?;

let result = ctx
.sql("SELECT a, b FROM json_array_gzip ORDER BY a")
.await?
.collect()
.await?;

assert_snapshot!(batches_to_string(&result), @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
+---+-------+
");

Ok(())
}
}
14 changes: 13 additions & 1 deletion datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,9 @@ pub struct NdJsonReadOptions<'a> {
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Whether the JSON file is in array format `[{...}, {...}]` instead of
/// line-delimited format. Defaults to `false`.
pub format_array: bool,
}

impl Default for NdJsonReadOptions<'_> {
Expand All @@ -477,6 +480,7 @@ impl Default for NdJsonReadOptions<'_> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
format_array: false,
}
}
}
Expand Down Expand Up @@ -529,6 +533,13 @@ impl<'a> NdJsonReadOptions<'a> {
self.schema_infer_max_records = schema_infer_max_records;
self
}

/// Specify whether the JSON file is in array format `[{...}, {...}]`
/// instead of line-delimited format.
pub fn format_array(mut self, format_array: bool) -> Self {
self.format_array = format_array;
self
}
}

#[async_trait]
Expand Down Expand Up @@ -663,7 +674,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
let file_format = JsonFormat::default()
.with_options(table_options.json)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());
.with_file_compression_type(self.file_compression_type.to_owned())
.with_format_array(self.format_array);

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/tests/data/json_array.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[
{"a": 1, "b": "hello"},
{"a": 2, "b": "world"},
{"a": 3, "b": "test"}
]
1 change: 1 addition & 0 deletions datafusion/core/tests/data/json_empty_array.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
1 change: 1 addition & 0 deletions datafusion/datasource-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ datafusion-physical-plan = { workspace = true }
datafusion-session = { workspace = true }
futures = { workspace = true }
object_store = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }

# Note: add additional linter rules in lib.rs.
Expand Down
Loading