From 274d1182bc0e7777dc602d04188cc3f8b014b34d Mon Sep 17 00:00:00 2001 From: Jordan Epstein Date: Wed, 12 Nov 2025 16:23:22 -0600 Subject: [PATCH] Set table properties/location on DynamicIcebergSink table creation The DynamicIcebergSink does not currently allow providing a set of reasonable default table properties at creation time. This change will apply these properties when a table is created (not updated). This helps for specifying things like iceberg version, merge-on-read vs. copy-on-write, table location, and others. --- docs/docs/flink-writes.md | 1 + .../sink/dynamic/DynamicIcebergSink.java | 16 +++++- .../sink/dynamic/DynamicRecordProcessor.java | 8 ++- .../dynamic/DynamicTableUpdateOperator.java | 11 +++- .../flink/sink/dynamic/TableCreator.java | 34 ++++++++++++ .../flink/sink/dynamic/TableUpdater.java | 15 ++++-- .../TestDynamicTableUpdateOperator.java | 6 ++- .../sink/dynamic/TestTableMetadataCache.java | 3 +- .../flink/sink/dynamic/TestTableUpdater.java | 53 ++++++++++++++----- 9 files changed, 121 insertions(+), 26 deletions(-) create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableCreator.java diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 34988a0ae7a2..723d25fe6a69 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -536,6 +536,7 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are | `immediateTableUpdate(boolean enabled)` | Controls whether table metadata (schema/partition spec) updates immediately (default: false) | | `set(String property, String value)` | Set any Iceberg write property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the options here: [write-options](flink-configuration.md#write-options) | | `setAll(Map properties)` | Set multiple properties at once | +| `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. | ### Notes diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index db48be7977f0..cbc50a5839e8 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -185,6 +185,7 @@ public static class Builder { private final Map writeOptions = Maps.newHashMap(); private final Map snapshotSummary = Maps.newHashMap(); private ReadableConfig readableConfig = new Configuration(); + private TableCreator tableCreator = TableCreator.DEFAULT; private boolean immediateUpdate = false; private int cacheMaximumSize = 100; private long cacheRefreshMs = 1_000; @@ -243,6 +244,15 @@ public Builder flinkConf(ReadableConfig config) { return this; } + /** + * Logic to create a table. Allows setting custom table properties/location on a per-table + * basis. + */ + public Builder tableCreator(TableCreator tableCreationFunction) { + this.tableCreator = tableCreationFunction; + return this; + } + /** * Configuring the write parallel number for iceberg stream writer. * @@ -374,7 +384,8 @@ public DataStreamSink append() { immediateUpdate, cacheMaximumSize, cacheRefreshMs, - inputSchemasPerTableCacheMaximumSize)) + inputSchemasPerTableCacheMaximumSize, + tableCreator)) .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); @@ -391,7 +402,8 @@ public DataStreamSink append() { catalogLoader, cacheMaximumSize, cacheRefreshMs, - inputSchemasPerTableCacheMaximumSize)) + inputSchemasPerTableCacheMaximumSize, + tableCreator)) .uid(prefixIfNotNull(uidPrefix, "-updater")) .name(operatorName("Updater")) .returns(type) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index 166217a0140e..bc3a25468d84 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -43,6 +43,7 @@ class DynamicRecordProcessor extends ProcessFunction extends ProcessFunction newData = - updater.update(data.tableIdentifier(), data.branch(), data.schema(), data.spec()); + updater.update( + data.tableIdentifier(), data.branch(), data.schema(), data.spec(), tableCreator); emit( collector, data, diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java index 6057d773c3f0..586239b54bca 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -41,6 +41,7 @@ class DynamicTableUpdateOperator private final int cacheMaximumSize; private final long cacheRefreshMs; private final int inputSchemasPerTableCacheMaximumSize; + private final TableCreator tableCreator; private transient TableUpdater updater; @@ -48,11 +49,13 @@ class DynamicTableUpdateOperator CatalogLoader catalogLoader, int cacheMaximumSize, long cacheRefreshMs, - int inputSchemasPerTableCacheMaximumSize) { + int inputSchemasPerTableCacheMaximumSize, + TableCreator tableCreator) { this.catalogLoader = catalogLoader; this.cacheMaximumSize = cacheMaximumSize; this.cacheRefreshMs = cacheRefreshMs; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; + this.tableCreator = tableCreator; } @Override @@ -70,7 +73,11 @@ public void open(OpenContext openContext) throws Exception { public DynamicRecordInternal map(DynamicRecordInternal data) throws Exception { Tuple2 newData = updater.update( - TableIdentifier.parse(data.tableName()), data.branch(), data.schema(), data.spec()); + TableIdentifier.parse(data.tableName()), + data.branch(), + data.schema(), + data.spec(), + tableCreator); TableMetadataCache.ResolvedSchemaInfo compareInfo = newData.f0; data.setSchema(compareInfo.resolvedTableSchema()); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableCreator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableCreator.java new file mode 100644 index 000000000000..d1df2c0038a3 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableCreator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.Serializable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; + +@FunctionalInterface +public interface TableCreator extends Serializable { + + TableCreator DEFAULT = Catalog::createTable; + + Table createTable(Catalog catalog, TableIdentifier identifier, Schema schema, PartitionSpec spec); +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index fdd182830b2c..cadfe345980c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -56,8 +56,12 @@ class TableUpdater { * requested one, and the new {@link PartitionSpec#specId()}. */ Tuple2 update( - TableIdentifier tableIdentifier, String branch, Schema schema, PartitionSpec spec) { - findOrCreateTable(tableIdentifier, schema, spec); + TableIdentifier tableIdentifier, + String branch, + Schema schema, + PartitionSpec spec, + TableCreator tableCreator) { + findOrCreateTable(tableIdentifier, schema, spec, tableCreator); findOrCreateBranch(tableIdentifier, branch); TableMetadataCache.ResolvedSchemaInfo newSchemaInfo = findOrCreateSchema(tableIdentifier, schema); @@ -65,7 +69,8 @@ Tuple2 update( return Tuple2.of(newSchemaInfo, newSpec); } - private void findOrCreateTable(TableIdentifier identifier, Schema schema, PartitionSpec spec) { + private void findOrCreateTable( + TableIdentifier identifier, Schema schema, PartitionSpec spec, TableCreator tableCreator) { Tuple2 exists = cache.exists(identifier); if (Boolean.FALSE.equals(exists.f0)) { if (exists.f1 instanceof NoSuchNamespaceException) { @@ -80,12 +85,12 @@ private void findOrCreateTable(TableIdentifier identifier, Schema schema, Partit LOG.info("Table {} not found during table search. Creating table.", identifier); try { - Table table = catalog.createTable(identifier, schema, spec); + Table table = tableCreator.createTable(catalog, identifier, schema, spec); cache.update(identifier, table); } catch (AlreadyExistsException e) { LOG.debug("Table {} created concurrently. Skipping creation.", identifier, e); cache.invalidate(identifier); - findOrCreateTable(identifier, schema, spec); + findOrCreateTable(identifier, schema, spec, tableCreator); } } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java index 618074f412f9..22655ff99f86 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -61,7 +61,8 @@ void testDynamicTableUpdateOperatorNewTable() throws Exception { CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, cacheRefreshMs, - inputSchemaCacheMaximumSize); + inputSchemaCacheMaximumSize, + TableCreator.DEFAULT); operator.open(null); DynamicRecordInternal input = @@ -93,7 +94,8 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize, cacheRefreshMs, - inputSchemaCacheMaximumSize); + inputSchemaCacheMaximumSize, + TableCreator.DEFAULT); operator.open(null); catalog.createTable(table, SCHEMA1); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index 2264cc3a8db0..42c93c13c264 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -76,7 +76,8 @@ void testCacheInvalidationAfterSchemaChange() { catalog.dropTable(tableIdentifier); catalog.createTable(tableIdentifier, SCHEMA2); - tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()); + tableUpdater.update( + tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned(), TableCreator.DEFAULT); Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); assertThat(schema2.sameSchema(SCHEMA2)).isTrue(); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index ad35d929728d..1d4461698746 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -20,16 +20,20 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; import java.util.Map; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; public class TestTableUpdater extends TestFlinkIcebergSinkBase { @@ -45,15 +49,26 @@ public class TestTableUpdater extends TestFlinkIcebergSinkBase { Types.NestedField.optional(3, "extra", Types.StringType.get())); @Test - void testTableCreation() { - Catalog catalog = CATALOG_EXTENSION.catalog(); - TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + void testTableCreation(@TempDir Path tempDir) { + // Location for tables is not configurable for hadoop catalogs + InMemoryCatalog catalog = new InMemoryCatalog(); + catalog.initialize("catalog", Map.of()); + catalog.createNamespace(Namespace.of("myNamespace")); + TableIdentifier tableIdentifier = TableIdentifier.parse("myNamespace.myTable"); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); TableUpdater tableUpdater = new TableUpdater(cache, catalog); - tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); - assertThat(catalog.tableExists(tableIdentifier)).isTrue(); + String locationOverride = tempDir.toString() + "/custom-path"; + Map tableProperties = Map.of("key", "value"); + TableCreator tableCreator = + (catalog1, identifier, schema, spec) -> + catalog1.createTable(identifier, schema, spec, locationOverride, tableProperties); + tableUpdater.update( + tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), tableCreator); + assertThat(catalog.tableExists(tableIdentifier)).isTrue(); + assertThat(catalog.loadTable(tableIdentifier).properties().get("key")).isEqualTo("value"); + assertThat(catalog.loadTable(tableIdentifier).location()).isEqualTo(locationOverride); TableMetadataCache.ResolvedSchemaInfo cachedSchema = cache.schema(tableIdentifier, SCHEMA); assertThat(cachedSchema.resolvedTableSchema().sameSchema(SCHEMA)).isTrue(); } @@ -71,7 +86,8 @@ void testTableAlreadyExists() { catalog.createTable(tableIdentifier, SCHEMA); // Make sure that the cache is invalidated and the table refreshed without an error Tuple2 result = - tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); + tableUpdater.update( + tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT); assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue(); assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME); assertThat(result.f1).isEqualTo(PartitionSpec.unpartitioned()); @@ -85,11 +101,13 @@ void testBranchCreationAndCaching() { TableUpdater tableUpdater = new TableUpdater(cache, catalog); catalog.createTable(tableIdentifier, SCHEMA); - tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); + tableUpdater.update( + tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT); TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier); assertThat(cacheItem).isNotNull(); - tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); + tableUpdater.update( + tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT); assertThat(cache.getInternalCache()).contains(Map.entry(tableIdentifier, cacheItem)); } @@ -101,7 +119,7 @@ void testSpecCreation() { TableUpdater tableUpdater = new TableUpdater(cache, catalog); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 10).build(); - tableUpdater.update(tableIdentifier, "main", SCHEMA, spec); + tableUpdater.update(tableIdentifier, "main", SCHEMA, spec, TableCreator.DEFAULT); Table table = catalog.loadTable(tableIdentifier); assertThat(table).isNotNull(); @@ -119,7 +137,12 @@ void testInvalidateOldCacheEntryOnUpdate() { Schema updated = tableUpdater - .update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()) + .update( + tableIdentifier, + "main", + SCHEMA2, + PartitionSpec.unpartitioned(), + TableCreator.DEFAULT) .f0 .resolvedTableSchema(); assertThat(updated.sameSchema(SCHEMA2)).isTrue(); @@ -136,7 +159,8 @@ void testLastResultInvalidation() { TableUpdater tableUpdater = new TableUpdater(cache, catalog); // Initialize cache - tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned()); + tableUpdater.update( + tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT); // Update table behind the scenes catalog.dropTable(tableIdentifier); @@ -148,7 +172,12 @@ void testLastResultInvalidation() { assertThat( tableUpdater - .update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned()) + .update( + tableIdentifier, + "main", + SCHEMA2, + PartitionSpec.unpartitioned(), + TableCreator.DEFAULT) .f0 .compareResult()) .isEqualTo(CompareSchemasVisitor.Result.SAME);