2424import java .util .Map ;
2525import java .util .Properties ;
2626
27+ import org .apache .hadoop .fs .FileSystem ;
28+ import org .apache .hadoop .fs .Path ;
29+
2730import com .google .common .base .Optional ;
2831import org .apache .commons .lang .StringUtils ;
2932import org .apache .hadoop .conf .Configuration ;
5659import org .apache .iceberg .expressions .Expression ;
5760import org .apache .iceberg .expressions .Expressions ;
5861
62+ import static org .apache .gobblin .configuration .ConfigurationKeys .DATA_PUBLISHER_FINAL_DIR ;
63+ import static org .apache .gobblin .data .management .copy .CopySource .SERIALIZED_COPYABLE_DATASET ;
64+
65+ import org .apache .gobblin .data .management .copy .entities .PrePublishStep ;
66+ import org .apache .gobblin .util .commit .DeleteFileCommitStep ;
67+ import org .apache .gobblin .commit .CommitStep ;
68+
5969/**
6070 * Unified Iceberg source that supports partition-based data copying from Iceberg tables.
6171 *
@@ -107,7 +117,6 @@ public class IcebergSource extends FileBasedSource<String, FileAwareInputStream>
107117 public static final String ICEBERG_RECORD_PROCESSING_ENABLED = "iceberg.record.processing.enabled" ;
108118 public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false ;
109119 public static final String ICEBERG_FILES_PER_WORKUNIT = "iceberg.files.per.workunit" ;
110- public static final int DEFAULT_FILES_PER_WORKUNIT = 10 ;
111120 public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled" ;
112121 public static final String ICEBERG_FILTER_DATE = "iceberg.filter.date" ; // Date value (e.g., 2025-04-01 or CURRENT_DATE)
113122 public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days" ;
@@ -123,6 +132,10 @@ public class IcebergSource extends FileBasedSource<String, FileAwareInputStream>
123132 private static final String HOURLY_PARTITION_SUFFIX = "-00" ;
124133 private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight" ;
125134
135+ // Delete configuration - similar to RecursiveCopyableDataset
136+ public static final String DELETE_FILES_NOT_IN_SOURCE = "iceberg.copy.delete" ;
137+ public static final boolean DEFAULT_DELETE_FILES_NOT_IN_SOURCE = true ;
138+
126139 private Optional <LineageInfo > lineageInfo ;
127140 private final WorkUnitWeighter weighter = new FieldWeighter (WORK_UNIT_WEIGHT );
128141
@@ -262,7 +275,7 @@ private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(Sour
262275 }
263276
264277 String datePartitionColumn = state .getProp (ICEBERG_PARTITION_COLUMN , DEFAULT_DATE_PARTITION_COLUMN );
265-
278+
266279 String dateValue = state .getProp (ICEBERG_FILTER_DATE );
267280 Preconditions .checkArgument (!StringUtils .isBlank (dateValue ),
268281 "iceberg.filter.date is required when iceberg.filter.enabled=true" );
@@ -281,10 +294,10 @@ private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(Sour
281294
282295 if (lookbackDays >= 1 ) {
283296 log .info ("Applying lookback period of {} days for date partition column '{}': {}" , lookbackDays , datePartitionColumn , dateValue );
284-
297+
285298 // Check if hourly partitioning is enabled
286299 boolean isHourlyPartition = state .getPropAsBoolean (ICEBERG_HOURLY_PARTITION_ENABLED , DEFAULT_HOURLY_PARTITION_ENABLED );
287-
300+
288301 // Parse the date in yyyy-MM-dd format
289302 LocalDate start ;
290303 try {
@@ -296,7 +309,7 @@ private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(Sour
296309 log .error (errorMsg );
297310 throw new IllegalArgumentException (errorMsg , e );
298311 }
299-
312+
300313 for (int i = 0 ; i < lookbackDays ; i ++) {
301314 String dateOnly = start .minusDays (i ).toString ();
302315 // Append hour suffix if hourly partitioning is enabled
@@ -340,7 +353,8 @@ private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(Sour
340353 * @param table the Iceberg table being copied
341354 * @return list of work units ready for parallel execution
342355 */
343- private List <WorkUnit > createWorkUnitsFromFiles (List <IcebergTable .FilePathWithPartition > filesWithPartitions , SourceState state , IcebergTable table ) {
356+ private List <WorkUnit > createWorkUnitsFromFiles (
357+ List <IcebergTable .FilePathWithPartition > filesWithPartitions , SourceState state , IcebergTable table ) throws IOException {
344358 List <WorkUnit > workUnits = Lists .newArrayList ();
345359
346360 if (filesWithPartitions .isEmpty ()) {
@@ -353,40 +367,32 @@ private List<WorkUnit> createWorkUnitsFromFiles(List<IcebergTable.FilePathWithPa
353367 String tableName = table .getTableId ().name ();
354368 Extract extract = new Extract (Extract .TableType .SNAPSHOT_ONLY , nameSpace , tableName );
355369
356- int filesPerWorkUnit = state .getPropAsInt (ICEBERG_FILES_PER_WORKUNIT , DEFAULT_FILES_PER_WORKUNIT );
357- List <List <IcebergTable .FilePathWithPartition >> groups = Lists .partition (filesWithPartitions , Math .max (1 , filesPerWorkUnit ));
358- log .info ("Grouping {} files into {} work units ({} files per work unit)" ,
359- filesWithPartitions .size (), groups .size (), filesPerWorkUnit );
370+ String datasetUrn = table .getTableId ().toString ();
371+ long totalSize = 0L ;
360372
361- for (int i = 0 ; i < groups .size (); i ++) {
362- List <IcebergTable .FilePathWithPartition > group = groups .get (i );
373+ for (IcebergTable .FilePathWithPartition fileWithPartition : filesWithPartitions ) {
363374 WorkUnit workUnit = new WorkUnit (extract );
375+ String filePath = fileWithPartition .getFilePath ();
376+ totalSize += fileWithPartition .getFileSize ();
364377
365- // Store data file paths and their partition metadata separately
366- // Note: Only data files (parquet/orc/avro) are included, no Iceberg metadata files
367- List <String > filePaths = Lists .newArrayList ();
378+ // Store partition path for each file
368379 Map <String , String > fileToPartitionPath = Maps .newHashMap ();
369- long totalSize = 0L ;
370-
371- for (IcebergTable .FilePathWithPartition fileWithPartition : group ) {
372- String filePath = fileWithPartition .getFilePath ();
373- filePaths .add (filePath );
374- // Store partition path for each file
375- fileToPartitionPath .put (filePath , fileWithPartition .getPartitionPath ());
376- // Accumulate file sizes for work unit weight
377- totalSize += fileWithPartition .getFileSize ();
378- }
380+ fileToPartitionPath .put (filePath , fileWithPartition .getPartitionPath ());
381+ workUnit .setProp (ConfigurationKeys .DATASET_URN_KEY , datasetUrn );
379382
380- workUnit .setProp (ConfigurationKeys .SOURCE_FILEBASED_FILES_TO_PULL , String .join ("," , filePaths ));
383+ workUnit .setProp (ConfigurationKeys .SOURCE_FILEBASED_FILES_TO_PULL , filePath );
384+ // Serialized copyable dataset / file is not required during work unit generation step.
385+ // Copyable file is created during process work unit step (IcebergFileStreamExtractor)
386+ workUnit .setProp (SERIALIZED_COPYABLE_DATASET , "{}" );
381387
382388 // Store partition path mapping as JSON for extractor to use
383389 workUnit .setProp (ICEBERG_FILE_PARTITION_PATH , new com .google .gson .Gson ().toJson (fileToPartitionPath ));
384390
385391 // Set work unit size for dynamic scaling (instead of just file count)
386- workUnit .setProp (ServiceConfigKeys .WORK_UNIT_SIZE , totalSize );
392+ workUnit .setProp (ServiceConfigKeys .WORK_UNIT_SIZE , fileWithPartition . getFileSize () );
387393
388394 // Set work unit weight for bin packing
389- setWorkUnitWeight (workUnit , totalSize );
395+ setWorkUnitWeight (workUnit , fileWithPartition . getFileSize () );
390396
391397 // Carry partition info to extractor for destination path mapping
392398 if (state .contains (ICEBERG_PARTITION_KEY )) {
@@ -399,13 +405,140 @@ private List<WorkUnit> createWorkUnitsFromFiles(List<IcebergTable.FilePathWithPa
399405 // Add lineage information for data governance and tracking
400406 addLineageSourceInfo (state , workUnit , table );
401407 workUnits .add (workUnit );
402-
403- log .info ("Created work unit {} with {} files, total size: {} bytes" , i , group .size (), totalSize );
404408 }
409+ log .info ("Created {} work unit(s), total size: {} bytes" , workUnits .size (), totalSize );
410+
411+ // Add delete step to overwrite partitions
412+ addDeleteStepIfNeeded (state , workUnits , extract , datasetUrn );
405413
406414 return workUnits ;
407415 }
408416
417+ /**
418+ * Creates a PrePublishStep with DeleteFileCommitStep to delete ALL files in impacted directories.
419+ * This enables complete partition rewrites - all existing files in target partitions are deleted
420+ * before new files are copied from source.
421+ *
422+ * Execution Order:
423+ * 1. Source Phase (this method): Identifies directories to delete and creates PrePublishStep
424+ * 2. Task Execution: Files are copied from source to staging directory
425+ * 3. Publisher Phase - PrePublishStep: Deletes ALL files from target directories (BEFORE rename)
426+ * 4. Publisher Phase - Rename: Moves files from staging to final target location
427+ *
428+ * Behavior:
429+ * 1. If filter is enabled (iceberg.filter.enabled=true): Deletes ALL files in specific partition
430+ * directories based on source partition values. For example, if source has partitions 2025-10-11,
431+ * 2025-10-10, 2025-10-09, it will delete ALL files in those partition directories in target.
432+ * 2. If filter is disabled (iceberg.filter.enabled=false): Deletes ALL files in the entire root directory.
433+ * 3. No file comparison - this is a complete rewrite of the impacted directories.
434+ */
435+ private void addDeleteStepIfNeeded (SourceState state , List <WorkUnit > workUnits , Extract extract , String datasetUrn ) throws IOException {
436+ boolean deleteEnabled = state .getPropAsBoolean (DELETE_FILES_NOT_IN_SOURCE , DEFAULT_DELETE_FILES_NOT_IN_SOURCE );
437+ if (!deleteEnabled || workUnits .isEmpty ()) {
438+ log .info ("Delete not enabled or no work units created, skipping delete step" );
439+ return ;
440+ }
441+
442+ // Get target filesystem and directory
443+ String targetRootDir = state .getProp (DATA_PUBLISHER_FINAL_DIR );
444+ if (targetRootDir == null ) {
445+ log .warn ("DATA_PUBLISHER_FINAL_DIR not configured, cannot determine directories to delete" );
446+ return ;
447+ }
448+
449+ try {
450+ FileSystem targetFs = HadoopUtils .getWriterFileSystem (state , 1 , 0 );
451+ Path targetRootPath = new Path (targetRootDir );
452+
453+ if (!targetFs .exists (targetRootPath )) {
454+ log .info ("Target directory {} does not exist, no directories to delete" , targetRootPath );
455+ return ;
456+ }
457+
458+ // Determine which directories to delete based on filter configuration
459+ List <Path > directoriesToDelete = Lists .newArrayList ();
460+ boolean filterEnabled = state .getPropAsBoolean (ICEBERG_FILTER_ENABLED , true );
461+
462+ if (!filterEnabled ) {
463+ // No filter: Delete entire root directory to rewrite all data
464+ log .info ("Filter disabled - will delete entire root directory: {}" , targetRootPath );
465+ directoriesToDelete .add (targetRootPath );
466+ } else {
467+ // Filter enabled: Delete only specific partition directories
468+ String partitionColumn = state .getProp (ICEBERG_PARTITION_KEY );
469+ String partitionValuesStr = state .getProp (ICEBERG_PARTITION_VALUES );
470+
471+ if (partitionColumn == null || partitionValuesStr == null ) {
472+ log .warn ("Partition key or values not found in state, cannot determine partition directories to delete" );
473+ return ;
474+ }
475+
476+ // Parse partition values (comma-separated list from lookback calculation)
477+ // These values already include hourly suffix if applicable (e.g., "2025-10-11-00,2025-10-10-00,2025-10-09-00")
478+ String [] values = partitionValuesStr .split ("," );
479+ log .info ("Filter enabled - will delete {} partition directories for {}={}" ,
480+ values .length , partitionColumn , partitionValuesStr );
481+
482+ // Collect partition directories to delete
483+ for (String value : values ) {
484+ String trimmedValue = value .trim ();
485+ // Construct partition directory path: targetRoot/partitionColumn=value/
486+ // Example: /root/datepartition=2025-10-11-00/
487+ Path partitionDir = new Path (targetRootPath , partitionColumn + "=" + trimmedValue );
488+
489+ if (targetFs .exists (partitionDir )) {
490+ log .info ("Found partition directory to delete: {}" , partitionDir );
491+ directoriesToDelete .add (partitionDir );
492+ } else {
493+ log .info ("Partition directory does not exist in target: {}" , partitionDir );
494+ }
495+ }
496+ }
497+
498+ if (directoriesToDelete .isEmpty ()) {
499+ log .info ("No directories to delete in target directory {}" , targetRootPath );
500+ return ;
501+ }
502+
503+ // Delete directories (and all their contents) for complete overwrite
504+ // DeleteFileCommitStep will recursively delete all files within these directories
505+ log .info ("Will delete {} for complete overwrite" , directoriesToDelete .size ());
506+
507+ // Log directories to be deleted
508+ for (Path dir : directoriesToDelete ) {
509+ log .info ("Will delete directory: {}" , dir );
510+ }
511+
512+ // Create DeleteFileCommitStep to delete directories recursively
513+ // Note: deleteEmptyDirs is not needed since we're deleting entire directories
514+ CommitStep deleteStep = DeleteFileCommitStep .fromPaths (targetFs , directoriesToDelete , state .getProperties ());
515+
516+ // Create a dedicated work unit for the delete step
517+ WorkUnit deleteWorkUnit = new WorkUnit (extract );
518+ deleteWorkUnit .addAll (state );
519+
520+ // Set properties so extractor knows this is a delete-only work unit (no files to copy)
521+ deleteWorkUnit .setProp (ConfigurationKeys .SOURCE_FILEBASED_FILES_TO_PULL , "" );
522+ deleteWorkUnit .setProp (SERIALIZED_COPYABLE_DATASET , "{}" );
523+ deleteWorkUnit .setProp (ConfigurationKeys .DATASET_URN_KEY , datasetUrn );
524+ deleteWorkUnit .setProp (ICEBERG_FILE_PARTITION_PATH , "{}" );
525+ setWorkUnitWeight (deleteWorkUnit , 0 );
526+
527+ // Use PrePublishStep to delete BEFORE copying new files
528+ PrePublishStep prePublishStep = new PrePublishStep (datasetUrn , Maps .newHashMap (), deleteStep , 0 );
529+
530+ // Serialize the PrePublishStep as a CopyEntity
531+ CopySource .serializeCopyEntity (deleteWorkUnit , prePublishStep );
532+ workUnits .add (deleteWorkUnit );
533+
534+ log .info ("Added PrePublishStep with DeleteFileCommitStep to work units" );
535+
536+ } catch (Exception e ) {
537+ log .error ("Failed to create delete step" , e );
538+ throw new IOException ("Failed to create delete step" , e );
539+ }
540+ }
541+
409542 /**
410543 * Create catalog using existing IcebergDatasetFinder logic
411544 */
0 commit comments