1515// specific language governing permissions and limitations
1616// under the License.
1717
18- use std:: collections:: HashSet ;
1918use std:: num:: NonZeroUsize ;
2019use std:: sync:: Arc ;
2120
@@ -25,10 +24,8 @@ use datafusion::physical_plan::expressions::Column;
2524use datafusion:: physical_plan:: repartition:: RepartitionExec ;
2625use datafusion:: physical_plan:: { ExecutionPlan , Partitioning } ;
2726use iceberg:: spec:: { TableMetadata , TableMetadataRef , Transform } ;
28- use tracing;
2927
3028use crate :: physical_plan:: project:: PARTITION_VALUES_COLUMN ;
31-
3229/// Creates an Iceberg-aware repartition execution plan that optimizes data distribution
3330/// for parallel processing while respecting Iceberg table partitioning semantics.
3431///
@@ -37,40 +34,57 @@ use crate::physical_plan::project::PARTITION_VALUES_COLUMN;
3734///
3835/// ## Partitioning Strategies
3936///
40- /// - **Unpartitioned tables** – Uses round-robin distribution to balance load across workers.
41- /// - **Hash partitioning** – Applied for tables with identity or bucket transforms, ensuring
42- /// co-location of related data for efficient file clustering. This is used when the `_partition`
43- /// column is present AND the partition spec has hash-friendly transforms (Identity/Bucket),
44- /// or when source columns with these transforms are available.
45- /// - **Round-robin partitioning** – Used for temporal transforms (Year, Month, Day, Hour),
46- /// Truncate, or other transforms that don't provide good hash distribution.
47- /// - **Mixed transforms** – Combines multiple transform types, using hash partitioning only
48- /// when Identity or Bucket transforms are present, otherwise falling back to round-robin.
37+ /// - **Partitioned tables with Identity/Bucket transforms** – Uses hash partitioning on the
38+ /// `_partition` column for optimal data distribution and file clustering. Ensures that rows
39+ /// with the same partition values are co-located in the same task.
40+ ///
41+ /// - **Partitioned tables with temporal transforms** – Uses round-robin partitioning for
42+ /// temporal transforms (Year, Month, Day, Hour) that don't provide uniform hash distribution.
43+ ///
44+ /// - **Unpartitioned tables** – Uses round-robin distribution to balance load evenly across workers.
45+ ///
46+ /// ## Requirements
47+ ///
48+ /// - **For partitioned tables**: The input MUST include the `_partition` column.
49+ /// Add it by calling [`project_with_partition`](crate::physical_plan::project_with_partition) before [`repartition`].
50+ /// - **For unpartitioned tables**: No special preparation needed.
51+ /// - Returns an error if a partitioned table is missing the `_partition` column.
4952///
5053/// ## Performance Notes
5154///
52- /// - Only repartitions when the input scheme or partition count differs from the target.
55+ /// - Only adds repartitioning when the input partitioning differs from the target.
5356/// - Requires an explicit target partition count for deterministic behavior.
54- /// - Preserves column order (partitions first, then buckets) for consistent layout.
5557///
5658/// # Arguments
5759///
58- /// * `input` – The input execution plan providing data to be repartitioned . For partitioned tables,
59- /// the input should include the `_partition` column (added via `project_with_partition`).
60- /// * `table_metadata` – Iceberg table metadata containing partition spec.
61- /// * `target_partitions` – Target number of partitions for parallel processing (must be > 0).
60+ /// * `input` - The input [`ExecutionPlan`] . For partitioned tables, must include the `_partition`
61+ /// column (added via [ `project_with_partition`](crate::physical_plan::project_with_partition) ).
62+ /// * `table_metadata` - Iceberg table metadata containing partition spec.
63+ /// * `target_partitions` - Target number of partitions for parallel processing (must be > 0).
6264///
6365/// # Returns
6466///
65- /// An execution plan that applies the optimal partitioning strategy, or the original input plan if no repartitioning is needed.
67+ /// An [`ExecutionPlan`] that applies the optimal partitioning strategy, or the original input plan
68+ /// if repartitioning is not needed.
69+ ///
70+ /// # Errors
71+ ///
72+ /// Returns [`DataFusionError::Plan`] if a partitioned table input is missing the `_partition` column.
6673///
67- /// # Example
74+ /// # Examples
75+ ///
76+ /// For partitioned tables, first add the `_partition` column:
6877///
6978/// ```ignore
79+ /// use std::num::NonZeroUsize;
80+ /// use iceberg_datafusion::physical_plan::project_with_partition;
81+ ///
82+ /// let plan_with_partition = project_with_partition(input_plan, &table)?;
83+ ///
7084/// let repartitioned_plan = repartition(
71- /// input_plan ,
85+ /// plan_with_partition ,
7286/// table.metadata_ref(),
73- /// 4, // Explicit partition count
87+ /// NonZeroUsize::new(4).unwrap(),
7488/// )?;
7589/// ```
7690#[ allow( dead_code) ]
@@ -128,39 +142,26 @@ fn same_columns(a_exprs: &[Arc<dyn PhysicalExpr>], b_exprs: &[Arc<dyn PhysicalEx
128142///
129143/// ## Partitioning Strategy
130144///
131- /// - **Hash partitioning using `_partition` column**: Used when the input includes a
132- /// projected `_partition` column AND the partition spec contains Identity or Bucket transforms.
133- /// Ensures data is distributed based on actual partition values with good distribution.
134- ///
135- /// - **Hash partitioning using source columns**: Applied when identity or bucket transforms
136- /// provide good distribution:
137- /// 1. Identity partition columns (e.g., `PARTITIONED BY (user_id, category)`)
138- /// 2. Bucket columns from partition spec (e.g., `bucket(16, user_id)`)
139- ///
140- /// Ensures co-location within partitions and buckets for optimal file clustering.
141- ///
142- /// - **Round-robin partitioning**: Used for unpartitioned tables, or when partition transforms
143- /// don't provide good hash distribution (e.g., Year, Month, Day, Hour, Truncate transforms).
144- /// Ensures even load distribution across partitions.
145+ /// - **Partitioned tables**: Must have the `_partition` column in the input schema (added via
146+ /// `project_with_partition`). Uses hash partitioning if the partition spec contains Identity
147+ /// or Bucket transforms for good data distribution. Falls back to round-robin for temporal
148+ /// transforms (Year, Month, Day, Hour) that don't provide uniform hash distribution.
145149///
146- /// ## Column Priority
150+ /// - **Unpartitioned tables**: Always uses round-robin batch partitioning to ensure even load
151+ /// distribution across workers.
147152///
148- /// Columns are combined in the following order, with duplicates removed:
149- /// 1. `_partition` column (highest priority, if present)
150- /// 2. Identity partition columns from partition spec
151- /// 3. Bucket columns from partition spec
153+ /// ## Requirements
152154///
153- /// ## Fallback
154- ///
155- /// If no suitable hash columns are found, falls back to round-robin batch partitioning
156- /// to ensure even load distribution across partitions .
155+ /// - **For partitioned tables**: The input MUST include the `_partition` column
156+ /// (added via `project_with_partition()`).
157+ /// - **For unpartitioned tables**: No special preparation needed.
158+ /// - Returns an error if a partitioned table is missing the `_partition` column .
157159fn determine_partitioning_strategy (
158160 input : & Arc < dyn ExecutionPlan > ,
159161 table_metadata : & TableMetadata ,
160162 target_partitions : NonZeroUsize ,
161163) -> DFResult < Partitioning > {
162164 let partition_spec = table_metadata. default_partition_spec ( ) ;
163- let table_schema = table_metadata. current_schema ( ) ;
164165 let input_schema = input. schema ( ) ;
165166 let target_partition_count = target_partitions. get ( ) ;
166167
@@ -176,84 +177,28 @@ fn determine_partitioning_strategy(
176177 match ( is_partitioned_table, partition_col_result) {
177178 // Case 1: Partitioned table with _partition column present
178179 ( true , Ok ( partition_col_idx) ) => {
179- let partition_field = input_schema. field ( partition_col_idx) ;
180- if partition_field. name ( ) != PARTITION_VALUES_COLUMN {
181- return Err ( DataFusionError :: Plan ( format ! (
182- "Expected {} column at index {}, but found '{}'" ,
183- PARTITION_VALUES_COLUMN ,
184- partition_col_idx,
185- partition_field. name( )
186- ) ) ) ;
187- }
188-
189180 let partition_expr = Arc :: new ( Column :: new ( PARTITION_VALUES_COLUMN , partition_col_idx) )
190181 as Arc < dyn PhysicalExpr > ;
191182
192- return if has_hash_friendly_transforms {
183+ if has_hash_friendly_transforms {
193184 Ok ( Partitioning :: Hash (
194185 vec ! [ partition_expr] ,
195186 target_partition_count,
196187 ) )
197188 } else {
198189 Ok ( Partitioning :: RoundRobinBatch ( target_partition_count) )
199- } ;
200- }
201-
202- // Case 2: Partitioned table missing _partition column (warning)
203- ( true , Err ( _) ) => {
204- tracing:: warn!(
205- "Partitioned table input missing {} column. \
206- Consider adding partition projection before repartitioning.",
207- PARTITION_VALUES_COLUMN
208- ) ;
209- }
210-
211- // Case 3: Unpartitioned table with _partition column
212- ( false , Ok ( _) ) => {
213- tracing:: warn!(
214- "Input contains {} column but table is unpartitioned. \
215- This may indicate unnecessary projection.",
216- PARTITION_VALUES_COLUMN
217- ) ;
190+ }
218191 }
219192
220- // Case 4: Unpartitioned table without _partition column
221- ( false , Err ( _) ) => {
222- // Nothing to do - fall through to source column analysis
223- }
224- }
193+ // Case 2: Partitioned table missing _partition column (normally this should not happen)
194+ ( true , Err ( _) ) => Err ( DataFusionError :: Plan ( format ! (
195+ "Partitioned table input missing {} column. \
196+ Ensure projection happens before repartitioning.",
197+ PARTITION_VALUES_COLUMN
198+ ) ) ) ,
225199
226- let hash_column_names: Vec < & str > = partition_spec
227- . fields ( )
228- . iter ( )
229- . filter ( |pf| matches ! ( pf. transform, Transform :: Identity | Transform :: Bucket ( _) ) )
230- . filter_map ( |pf| {
231- table_schema
232- . field_by_id ( pf. source_id )
233- . map ( |sf| sf. name . as_str ( ) )
234- } )
235- . collect ( ) ;
236-
237- let mut seen_columns = HashSet :: with_capacity ( hash_column_names. len ( ) ) ;
238- let hash_exprs: Vec < Arc < dyn PhysicalExpr > > = hash_column_names
239- . into_iter ( )
240- . filter ( |name| seen_columns. insert ( * name) )
241- . map ( |column_name| {
242- let column_idx = input_schema. index_of ( column_name) . map_err ( |e| {
243- DataFusionError :: Plan ( format ! (
244- "Column '{}' not found in input schema. \
245- Ensure projection happens before repartitioning. Error: {}",
246- column_name, e
247- ) )
248- } ) ?;
249- Ok ( Arc :: new ( Column :: new ( column_name, column_idx) ) as Arc < dyn PhysicalExpr > )
250- } )
251- . collect :: < DFResult < _ > > ( ) ?;
252-
253- if !hash_exprs. is_empty ( ) {
254- Ok ( Partitioning :: Hash ( hash_exprs, target_partition_count) )
255- } else {
256- Ok ( Partitioning :: RoundRobinBatch ( target_partition_count) )
200+ // Case 3: Unpartitioned table, always use RoundRobinBatch
201+ ( false , _) => Ok ( Partitioning :: RoundRobinBatch ( target_partition_count) ) ,
257202 }
258203}
259204
@@ -561,6 +506,11 @@ mod tests {
561506 ArrowField :: new( "date" , ArrowDataType :: Date32 , false ) ,
562507 ArrowField :: new( "user_id" , ArrowDataType :: Int64 , false ) ,
563508 ArrowField :: new( "amount" , ArrowDataType :: Int64 , false ) ,
509+ ArrowField :: new(
510+ PARTITION_VALUES_COLUMN ,
511+ ArrowDataType :: Struct ( Fields :: empty( ) ) ,
512+ false ,
513+ ) ,
564514 ] ) ) ;
565515 let input = Arc :: new ( EmptyExec :: new ( arrow_schema) ) ;
566516 let repartitioned_plan = repartition (
@@ -573,10 +523,11 @@ mod tests {
573523 let partitioning = repartitioned_plan. properties ( ) . output_partitioning ( ) ;
574524 match partitioning {
575525 Partitioning :: Hash ( exprs, _) => {
576- // With the new logic, we expect at least 1 column
577- assert ! (
578- !exprs. is_empty( ) ,
579- "Should have at least one column for hash partitioning"
526+ // Should use _partition column for hash partitioning
527+ assert_eq ! (
528+ exprs. len( ) ,
529+ 1 ,
530+ "Should have exactly one hash column (_partition)"
580531 ) ;
581532
582533 let column_names: Vec < String > = exprs
@@ -588,19 +539,13 @@ mod tests {
588539 } )
589540 . collect ( ) ;
590541
591- // Should include either user_id (identity transform) or date (partition field)
592- let has_user_id = column_names. contains ( & "user_id" . to_string ( ) ) ;
593- let has_date = column_names. contains ( & "date" . to_string ( ) ) ;
594542 assert ! (
595- has_user_id || has_date ,
596- "Should include either 'user_id' or 'date' column, got: {:?}" ,
543+ column_names . contains ( & PARTITION_VALUES_COLUMN . to_string ( ) ) ,
544+ "Should use _partition column, got: {:?}" ,
597545 column_names
598546 ) ;
599547 }
600- Partitioning :: RoundRobinBatch ( _) => {
601- // This could happen if no suitable hash columns are found
602- }
603- _ => panic ! ( "Unexpected partitioning strategy: {:?}" , partitioning) ,
548+ _ => panic ! ( "Expected Hash partitioning with Identity transform" ) ,
604549 }
605550 }
606551
@@ -717,6 +662,11 @@ mod tests {
717662 let arrow_schema = Arc :: new ( ArrowSchema :: new ( vec ! [
718663 ArrowField :: new( "date" , ArrowDataType :: Date32 , false ) ,
719664 ArrowField :: new( "amount" , ArrowDataType :: Int64 , false ) ,
665+ ArrowField :: new(
666+ PARTITION_VALUES_COLUMN ,
667+ ArrowDataType :: Struct ( Fields :: empty( ) ) ,
668+ false ,
669+ ) ,
720670 ] ) ) ;
721671 let input = Arc :: new ( EmptyExec :: new ( arrow_schema) ) ;
722672 let repartitioned_plan = repartition (
@@ -729,7 +679,7 @@ mod tests {
729679 let partitioning = repartitioned_plan. properties ( ) . output_partitioning ( ) ;
730680 assert ! (
731681 matches!( partitioning, Partitioning :: RoundRobinBatch ( _) ) ,
732- "Should use round-robin for range-only partitions "
682+ "Should use round-robin for temporal transforms (Day) that don't provide good hash distribution "
733683 ) ;
734684 }
735685
@@ -788,6 +738,11 @@ mod tests {
788738 ArrowField :: new( "date" , ArrowDataType :: Date32 , false ) ,
789739 ArrowField :: new( "user_id" , ArrowDataType :: Int64 , false ) ,
790740 ArrowField :: new( "amount" , ArrowDataType :: Int64 , false ) ,
741+ ArrowField :: new(
742+ PARTITION_VALUES_COLUMN ,
743+ ArrowDataType :: Struct ( Fields :: empty( ) ) ,
744+ false ,
745+ ) ,
791746 ] ) ) ;
792747 let input = Arc :: new ( EmptyExec :: new ( arrow_schema) ) ;
793748 let repartitioned_plan = repartition (
@@ -800,11 +755,7 @@ mod tests {
800755 let partitioning = repartitioned_plan. properties ( ) . output_partitioning ( ) ;
801756 match partitioning {
802757 Partitioning :: Hash ( exprs, _) => {
803- assert_eq ! (
804- exprs. len( ) ,
805- 1 ,
806- "Should have one hash column (user_id identity transform)"
807- ) ;
758+ assert_eq ! ( exprs. len( ) , 1 , "Should have one hash column (_partition)" ) ;
808759 let column_names: Vec < String > = exprs
809760 . iter ( )
810761 . filter_map ( |expr| {
@@ -814,8 +765,9 @@ mod tests {
814765 } )
815766 . collect ( ) ;
816767 assert ! (
817- column_names. contains( & "user_id" . to_string( ) ) ,
818- "Should include identity transform column 'user_id'"
768+ column_names. contains( & PARTITION_VALUES_COLUMN . to_string( ) ) ,
769+ "Should use _partition column for mixed transforms with Identity, got: {:?}" ,
770+ column_names
819771 ) ;
820772 }
821773 _ => panic ! ( "Expected Hash partitioning for table with identity transforms" ) ,
0 commit comments