Skip to content

Commit 3dff527

Browse files
committed
Ignore partition fields that are dropped from the current-schema
1 parent 8ce1b32 commit 3dff527

File tree

5 files changed

+61
-5
lines changed

5 files changed

+61
-5
lines changed

api/src/main/java/org/apache/iceberg/PartitionSpec.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ public StructType partitionType() {
131131
for (PartitionField field : fields) {
132132
Type sourceType = schema.findType(field.sourceId());
133133
Type resultType = field.transform().getResultType(sourceType);
134+
135+
// When the source field has been dropped we cannot determine the type
136+
if (resultType == null) {
137+
resultType = Types.UnknownType.get();
138+
}
139+
134140
structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType));
135141
}
136142

@@ -633,8 +639,6 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) {
633639
// We don't care about the source type since a VoidTransform is always compatible and skip the
634640
// checks
635641
if (!transform.equals(Transforms.alwaysNull())) {
636-
ValidationException.check(
637-
sourceType != null, "Cannot find source column for partition field: %s", field);
638642
ValidationException.check(
639643
sourceType.isPrimitiveType(),
640644
"Cannot partition by non-primitive source field: %s",

core/src/main/java/org/apache/iceberg/Partitioning.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec
239239
*/
240240
public static StructType partitionType(Table table) {
241241
Collection<PartitionSpec> specs = table.specs().values();
242-
return buildPartitionProjectionType("table partition", specs, allFieldIds(specs));
242+
return buildPartitionProjectionType(
243+
"table partition", specs, allActiveFieldIds(table.schema(), specs));
243244
}
244245

245246
/**
@@ -346,10 +347,11 @@ private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, ?>
346347
|| t2.equals(Transforms.alwaysNull());
347348
}
348349

349-
// collects IDs of all partition field used across specs
350-
private static Set<Integer> allFieldIds(Collection<PartitionSpec> specs) {
350+
// collects IDs of all partition field used across specs that are in the current schema
351+
private static Set<Integer> allActiveFieldIds(Schema schema, Collection<PartitionSpec> specs) {
351352
return FluentIterable.from(specs)
352353
.transformAndConcat(PartitionSpec::fields)
354+
.filter(field -> schema.findField(field.sourceId()) != null)
353355
.transform(PartitionField::fieldId)
354356
.toSet();
355357
}

core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java

+2
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ static Schema toOption(Schema schema) {
188188
Preconditions.checkArgument(
189189
isOptionSchema(schema), "Union schemas are not supported: %s", schema);
190190
return schema;
191+
} else if (schema.getType() == Schema.Type.NULL) {
192+
return schema;
191193
} else {
192194
return Schema.createUnion(NULL, schema);
193195
}

core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java

+4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
4949
private static final Schema UUID_SCHEMA =
5050
LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16));
5151
private static final Schema BINARY_SCHEMA = Schema.create(Schema.Type.BYTES);
52+
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
5253

5354
static {
5455
TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false);
@@ -243,6 +244,9 @@ public Schema primitive(Type.PrimitiveType primitive) {
243244
null,
244245
TypeUtil.decimalRequiredBytes(decimal.precision())));
245246
break;
247+
case UNKNOWN:
248+
primitiveSchema = NULL_SCHEMA;
249+
break;
246250
default:
247251
throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId());
248252
}

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java

+44
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
2222

23+
import java.time.LocalDateTime;
24+
import java.time.ZoneOffset;
25+
import java.util.List;
2326
import org.apache.iceberg.Parameter;
2427
import org.apache.iceberg.ParameterizedTestExtension;
2528
import org.apache.iceberg.Parameters;
2629
import org.apache.iceberg.PartitionSpec;
2730
import org.apache.iceberg.Table;
2831
import org.apache.iceberg.TableProperties;
2932
import org.apache.iceberg.TestHelpers;
33+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3034
import org.apache.iceberg.spark.SparkCatalogConfig;
3135
import org.apache.iceberg.spark.source.SparkTable;
3236
import org.apache.spark.sql.connector.catalog.CatalogManager;
@@ -582,4 +586,44 @@ private void createTable(String schema, String spec) {
582586
tableName, schema, spec, TableProperties.FORMAT_VERSION, formatVersion);
583587
}
584588
}
589+
590+
private void runCreateAndDropPartitionField(
591+
String column, String partitionType, List<Object[]> expected, String predicate) {
592+
sql("DROP TABLE IF EXISTS %s", tableName);
593+
sql(
594+
"CREATE TABLE %s (col_int INTEGER, col_ts TIMESTAMP_NTZ, col_long BIGINT) USING ICEBERG TBLPROPERTIES ('format-version' = %d)",
595+
tableName, formatVersion);
596+
sql("INSERT INTO %s VALUES (1000, CAST('2024-03-01 19:25:00' as TIMESTAMP), 2100)", tableName);
597+
sql("ALTER TABLE %s ADD PARTITION FIELD %s AS col2_partition", tableName, partitionType);
598+
sql("INSERT INTO %s VALUES (2000, CAST('2024-04-01 19:25:00' as TIMESTAMP), 2200)", tableName);
599+
sql("ALTER TABLE %s DROP PARTITION FIELD col2_partition", tableName);
600+
sql("INSERT INTO %s VALUES (3000, CAST('2024-05-01 19:25:00' as TIMESTAMP), 2300)", tableName);
601+
sql("ALTER TABLE %s DROP COLUMN %s", tableName, column);
602+
603+
assertEquals(
604+
"Should return correct data",
605+
expected,
606+
sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName, predicate));
607+
}
608+
609+
@TestTemplate
610+
public void testDropPartitionAndUnderlyingField() {
611+
String predicateLong = "col_ts >= '2024-04-01 19:25:00'";
612+
List<Object[]> expectedLong =
613+
Lists.newArrayList(
614+
new Object[] {2000, LocalDateTime.ofEpochSecond(1711999500, 0, ZoneOffset.UTC)},
615+
new Object[] {3000, LocalDateTime.ofEpochSecond(1714591500, 0, ZoneOffset.UTC)});
616+
runCreateAndDropPartitionField("col_long", "col_long", expectedLong, predicateLong);
617+
runCreateAndDropPartitionField(
618+
"col_long", "truncate(2, col_long)", expectedLong, predicateLong);
619+
runCreateAndDropPartitionField("col_long", "bucket(16, col_long)", expectedLong, predicateLong);
620+
621+
String predicateTs = "col_long >= 2200";
622+
List<Object[]> expectedTs =
623+
Lists.newArrayList(new Object[] {2000, 2200L}, new Object[] {3000, 2300L});
624+
runCreateAndDropPartitionField("col_ts", "col_ts", expectedTs, predicateTs);
625+
runCreateAndDropPartitionField("col_ts", "year(col_ts)", expectedTs, predicateTs);
626+
runCreateAndDropPartitionField("col_ts", "month(col_ts)", expectedTs, predicateTs);
627+
runCreateAndDropPartitionField("col_ts", "day(col_ts)", expectedTs, predicateTs);
628+
}
585629
}

0 commit comments

Comments
 (0)