Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ impl TableProvider for ListingTable {
insert_op,
keep_partition_by_columns,
file_extension: self.options().format.get_ext(),
single_file_output: None, // Use extension heuristic for table inserts
};

// For writes, we only use user-specified ordering (no file groups to derive from)
Expand Down
16 changes: 14 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2048,11 +2048,17 @@ impl DataFrame {
.build()?
};

// Build copy options, including single_file_output if explicitly set
let mut copy_options: HashMap<String, String> = HashMap::new();
if options.single_file_output {
copy_options.insert("single_file_output".to_string(), "true".to_string());
}

let plan = LogicalPlanBuilder::copy_to(
plan,
path.into(),
file_type,
HashMap::new(),
copy_options,
options.partition_by,
)?
.build()?;
Expand Down Expand Up @@ -2116,11 +2122,17 @@ impl DataFrame {
.build()?
};

// Build copy options, including single_file_output if explicitly set
let mut copy_options: HashMap<String, String> = HashMap::new();
if options.single_file_output {
copy_options.insert("single_file_output".to_string(), "true".to_string());
}

let plan = LogicalPlanBuilder::copy_to(
plan,
path.into(),
file_type,
Default::default(),
copy_options,
options.partition_by,
)?
.build()?;
Expand Down
57 changes: 56 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use crate::datasource::file_format::{
Expand Down Expand Up @@ -84,11 +85,17 @@ impl DataFrame {
.build()?
};

// Build copy options, including single_file_output if explicitly set
let mut copy_options = HashMap::<String, String>::new();
if options.single_file_output {
copy_options.insert("single_file_output".to_string(), "true".to_string());
}

let plan = LogicalPlanBuilder::copy_to(
plan,
path.into(),
file_type,
Default::default(),
copy_options,
options.partition_by,
)?
.build()?;
Expand Down Expand Up @@ -324,4 +331,52 @@ mod tests {

Ok(())
}

/// Test that single_file_output works for paths WITHOUT file extensions.
/// This verifies the fix for the regression where extension heuristics
/// ignored the explicit with_single_file_output(true) setting.
#[tokio::test]
async fn test_single_file_output_without_extension() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;

// Path WITHOUT .parquet extension - this is the key scenario
let output_path = tmp_dir.path().join("data_no_ext");
let output_path_str = output_path.to_str().unwrap();

let df = ctx.read_batch(RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?)?;

// Explicitly request single file output
df.write_parquet(
output_path_str,
DataFrameWriteOptions::new().with_single_file_output(true),
None,
)
.await?;

// Verify: output should be a FILE, not a directory
assert!(
output_path.is_file(),
"Expected single file at {:?}, but got is_file={}, is_dir={}",
output_path,
output_path.is_file(),
output_path.is_dir()
);

// Verify the file is readable as parquet
let file = std::fs::File::open(&output_path)?;
let reader = parquet::file::reader::SerializedFileReader::new(file)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
assert_eq!(metadata.file_metadata().num_rows(), 3);

Ok(())
}
}
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,7 @@ mod tests {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
single_file_output: None,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down Expand Up @@ -1638,6 +1639,7 @@ mod tests {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
single_file_output: None,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down Expand Up @@ -1728,6 +1730,7 @@ mod tests {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
single_file_output: None,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down
25 changes: 24 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,30 @@ impl DefaultPhysicalPlanner {
}
};

// Parse single_file_output option if explicitly set
let single_file_output = match source_option_tuples
.get("single_file_output")
.map(|v| v.trim())
{
None => None,
Some("true") => Some(true),
Some("false") => Some(false),
Some(value) => {
return Err(DataFusionError::Configuration(format!(
"provided value for 'single_file_output' was not recognized: \"{value}\""
)));
}
};

// Filter out sink-related options that are not format options
let format_options: HashMap<String, String> = source_option_tuples
.iter()
.filter(|(k, _)| k.as_str() != "single_file_output")
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

let sink_format = file_type_to_format(file_type)?
.create(session_state, source_option_tuples)?;
.create(session_state, &format_options)?;

// Determine extension based on format extension and compression
let file_extension = match sink_format.compression_type() {
Expand All @@ -571,6 +593,7 @@ impl DefaultPhysicalPlanner {
insert_op: InsertOp::Append,
keep_partition_by_columns,
file_extension,
single_file_output,
};

let ordering = input_exec.properties().output_ordering().cloned();
Expand Down
5 changes: 5 additions & 0 deletions datafusion/datasource/src/file_sink_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ pub struct FileSinkConfig {
pub keep_partition_by_columns: bool,
/// File extension without a dot(.)
pub file_extension: String,
/// Override for single file output behavior.
/// - `None`: use extension heuristic (path with extension = single file)
/// - `Some(true)`: force single file output at exact path
/// - `Some(false)`: force directory output with generated filenames
pub single_file_output: Option<bool>,
}

impl FileSinkConfig {
Expand Down
7 changes: 5 additions & 2 deletions datafusion/datasource/src/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ pub(crate) fn start_demuxer_task(
let file_extension = config.file_extension.clone();
let base_output_path = config.table_paths[0].clone();
let task = if config.table_partition_cols.is_empty() {
let single_file_output = !base_output_path.is_collection()
&& base_output_path.file_extension().is_some();
// Use explicit single_file_output if set, otherwise fall back to extension heuristic
let single_file_output = config.single_file_output.unwrap_or_else(|| {
!base_output_path.is_collection()
&& base_output_path.file_extension().is_some()
});
SpawnedTask::spawn(async move {
row_count_demuxer(
tx,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
insert_op,
keep_partition_by_columns: conf.keep_partition_by_columns,
file_extension: conf.file_extension.clone(),
// For deserialized plans, use extension heuristic (backward compatible)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For how long this backward compatible approach will be used ? At some point it should start using the new config property.

IMO the proper way would be to add a new optional field to the Protobuf format, then try to read it and use None only if it is missing, but if it is not missing then use its value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. You were right, I made some changes based on the input. Let me know if this looks better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lovely!

single_file_output: None,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ fn roundtrip_json_sink() -> Result<()> {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: true,
file_extension: "json".into(),
single_file_output: None,
};
let data_sink = Arc::new(JsonSink::new(
file_sink_config,
Expand Down Expand Up @@ -1513,6 +1514,7 @@ fn roundtrip_csv_sink() -> Result<()> {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: true,
file_extension: "csv".into(),
single_file_output: None,
};
let data_sink = Arc::new(CsvSink::new(
file_sink_config,
Expand Down Expand Up @@ -1570,6 +1572,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: true,
file_extension: "parquet".into(),
single_file_output: None,
};
let data_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down
Loading