Skip to content

Commit b6f3d8a

Browse files
committed
NdJsonReadOptions add array json support
1 parent 8d108d1 commit b6f3d8a

File tree

2 files changed

+99
-1
lines changed

2 files changed

+99
-1
lines changed

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ mod tests {
4747
use datafusion_common::stats::Precision;
4848

4949
use datafusion_common::Result;
50+
use datafusion_datasource::file_compression_type::FileCompressionType;
5051
use futures::StreamExt;
5152
use insta::assert_snapshot;
5253
use object_store::local::LocalFileSystem;
@@ -578,4 +579,89 @@ mod tests {
578579

579580
Ok(())
580581
}
582+
583+
#[tokio::test]
584+
async fn test_ndjson_read_options_format_array() -> Result<()> {
585+
let ctx = SessionContext::new();
586+
587+
// Create a temporary file with JSON array format
588+
let tmp_dir = tempfile::TempDir::new()?;
589+
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
590+
std::fs::write(
591+
&path,
592+
r#"[
593+
{"a": 1, "b": "hello"},
594+
{"a": 2, "b": "world"},
595+
{"a": 3, "b": "test"}
596+
]"#,
597+
)?;
598+
599+
// Use NdJsonReadOptions with format_array = true
600+
let options = NdJsonReadOptions::default().format_array(true);
601+
602+
ctx.register_json("json_array_table", &path, options)
603+
.await?;
604+
605+
let result = ctx
606+
.sql("SELECT a, b FROM json_array_table ORDER BY a")
607+
.await?
608+
.collect()
609+
.await?;
610+
611+
assert_snapshot!(batches_to_string(&result), @r"
612+
+---+-------+
613+
| a | b |
614+
+---+-------+
615+
| 1 | hello |
616+
| 2 | world |
617+
| 3 | test |
618+
+---+-------+
619+
");
620+
621+
Ok(())
622+
}
623+
624+
#[tokio::test]
625+
async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> {
626+
use flate2::Compression;
627+
use flate2::write::GzEncoder;
628+
use std::io::Write;
629+
630+
let ctx = SessionContext::new();
631+
632+
// Create a temporary gzip compressed JSON array file
633+
let tmp_dir = tempfile::TempDir::new()?;
634+
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());
635+
636+
let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#;
637+
let file = std::fs::File::create(&path)?;
638+
let mut encoder = GzEncoder::new(file, Compression::default());
639+
encoder.write_all(json_content.as_bytes())?;
640+
encoder.finish()?;
641+
642+
// Use NdJsonReadOptions with format_array and GZIP compression
643+
let options = NdJsonReadOptions::default()
644+
.format_array(true)
645+
.file_compression_type(FileCompressionType::GZIP)
646+
.file_extension(".json.gz");
647+
648+
ctx.register_json("json_array_gzip", &path, options).await?;
649+
650+
let result = ctx
651+
.sql("SELECT a, b FROM json_array_gzip ORDER BY a")
652+
.await?
653+
.collect()
654+
.await?;
655+
656+
assert_snapshot!(batches_to_string(&result), @r"
657+
+---+-------+
658+
| a | b |
659+
+---+-------+
660+
| 1 | hello |
661+
| 2 | world |
662+
+---+-------+
663+
");
664+
665+
Ok(())
666+
}
581667
}

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ pub struct NdJsonReadOptions<'a> {
465465
pub infinite: bool,
466466
/// Indicates how the file is sorted
467467
pub file_sort_order: Vec<Vec<SortExpr>>,
468+
/// Whether the JSON file is in array format `[{...}, {...}]` instead of
469+
/// line-delimited format. Defaults to `false`.
470+
pub format_array: bool,
468471
}
469472

470473
impl Default for NdJsonReadOptions<'_> {
@@ -477,6 +480,7 @@ impl Default for NdJsonReadOptions<'_> {
477480
file_compression_type: FileCompressionType::UNCOMPRESSED,
478481
infinite: false,
479482
file_sort_order: vec![],
483+
format_array: false,
480484
}
481485
}
482486
}
@@ -529,6 +533,13 @@ impl<'a> NdJsonReadOptions<'a> {
529533
self.schema_infer_max_records = schema_infer_max_records;
530534
self
531535
}
536+
537+
/// Specify whether the JSON file is in array format `[{...}, {...}]`
538+
/// instead of line-delimited format.
539+
pub fn format_array(mut self, format_array: bool) -> Self {
540+
self.format_array = format_array;
541+
self
542+
}
532543
}
533544

534545
#[async_trait]
@@ -663,7 +674,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
663674
let file_format = JsonFormat::default()
664675
.with_options(table_options.json)
665676
.with_schema_infer_max_rec(self.schema_infer_max_records)
666-
.with_file_compression_type(self.file_compression_type.to_owned());
677+
.with_file_compression_type(self.file_compression_type.to_owned())
678+
.with_format_array(self.format_array);
667679

668680
ListingOptions::new(Arc::new(file_format))
669681
.with_file_extension(self.file_extension)

0 commit comments

Comments
 (0)