Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are
| `immediateTableUpdate(boolean enabled)` | Controls whether table metadata (schema/partition spec) updates immediately (default: false) |
| `set(String property, String value)` | Set any Iceberg write property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the options here: [write-options](flink-configuration.md#write-options) |
| `setAll(Map<String, String> properties)` | Set multiple properties at once |
| `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. |

### Notes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public static class Builder<T> {
private final Map<String, String> writeOptions = Maps.newHashMap();
private final Map<String, String> snapshotSummary = Maps.newHashMap();
private ReadableConfig readableConfig = new Configuration();
private TableCreator tableCreator = TableCreator.DEFAULT;
private boolean immediateUpdate = false;
private int cacheMaximumSize = 100;
private long cacheRefreshMs = 1_000;
Expand Down Expand Up @@ -243,6 +244,15 @@ public Builder<T> flinkConf(ReadableConfig config) {
return this;
}

/**
* Logic to create a table. Allows setting custom table properties/location on a per-table
* basis.
*/
public Builder<T> tableCreator(TableCreator tableCreationFunction) {
this.tableCreator = tableCreationFunction;
return this;
}

/**
* Configuring the write parallel number for iceberg stream writer.
*
Expand Down Expand Up @@ -374,7 +384,8 @@ public DataStreamSink<DynamicRecordInternal> append() {
immediateUpdate,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize))
inputSchemasPerTableCacheMaximumSize,
tableCreator))
.uid(prefixIfNotNull(uidPrefix, "-generator"))
.name(operatorName("generator"))
.returns(type);
Expand All @@ -391,7 +402,8 @@ public DataStreamSink<DynamicRecordInternal> append() {
catalogLoader,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize))
inputSchemasPerTableCacheMaximumSize,
tableCreator))
.uid(prefixIfNotNull(uidPrefix, "-updater"))
.name(operatorName("Updater"))
.returns(type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
private final int cacheMaximumSize;
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
private final TableCreator tableCreator;

private transient TableMetadataCache tableCache;
private transient HashKeyGenerator hashKeyGenerator;
Expand All @@ -57,13 +58,15 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
boolean immediateUpdate,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize) {
int inputSchemasPerTableCacheMaximumSize,
TableCreator tableCreator) {
this.generator = generator;
this.catalogLoader = catalogLoader;
this.immediateUpdate = immediateUpdate;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
this.tableCreator = tableCreator;
}

@Override
Expand Down Expand Up @@ -114,7 +117,8 @@ public void collect(DynamicRecord data) {
|| foundSchema.compareResult() == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
if (immediateUpdate) {
Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> newData =
updater.update(data.tableIdentifier(), data.branch(), data.schema(), data.spec());
updater.update(
data.tableIdentifier(), data.branch(), data.schema(), data.spec(), tableCreator);
emit(
collector,
data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,21 @@ class DynamicTableUpdateOperator
private final int cacheMaximumSize;
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
private final TableCreator tableCreator;

private transient TableUpdater updater;

DynamicTableUpdateOperator(
CatalogLoader catalogLoader,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize) {
int inputSchemasPerTableCacheMaximumSize,
TableCreator tableCreator) {
this.catalogLoader = catalogLoader;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
this.tableCreator = tableCreator;
}

@Override
Expand All @@ -70,7 +73,11 @@ public void open(OpenContext openContext) throws Exception {
public DynamicRecordInternal map(DynamicRecordInternal data) throws Exception {
Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> newData =
updater.update(
TableIdentifier.parse(data.tableName()), data.branch(), data.schema(), data.spec());
TableIdentifier.parse(data.tableName()),
data.branch(),
data.schema(),
data.spec(),
tableCreator);
TableMetadataCache.ResolvedSchemaInfo compareInfo = newData.f0;

data.setSchema(compareInfo.resolvedTableSchema());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.io.Serializable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;

@FunctionalInterface
public interface TableCreator extends Serializable {

TableCreator DEFAULT = Catalog::createTable;

Table createTable(Catalog catalog, TableIdentifier identifier, Schema schema, PartitionSpec spec);
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,21 @@ class TableUpdater {
* requested one, and the new {@link PartitionSpec#specId()}.
*/
Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> update(
TableIdentifier tableIdentifier, String branch, Schema schema, PartitionSpec spec) {
findOrCreateTable(tableIdentifier, schema, spec);
TableIdentifier tableIdentifier,
String branch,
Schema schema,
PartitionSpec spec,
TableCreator tableCreator) {
findOrCreateTable(tableIdentifier, schema, spec, tableCreator);
findOrCreateBranch(tableIdentifier, branch);
TableMetadataCache.ResolvedSchemaInfo newSchemaInfo =
findOrCreateSchema(tableIdentifier, schema);
PartitionSpec newSpec = findOrCreateSpec(tableIdentifier, spec);
return Tuple2.of(newSchemaInfo, newSpec);
}

private void findOrCreateTable(TableIdentifier identifier, Schema schema, PartitionSpec spec) {
private void findOrCreateTable(
TableIdentifier identifier, Schema schema, PartitionSpec spec, TableCreator tableCreator) {
Tuple2<Boolean, Exception> exists = cache.exists(identifier);
if (Boolean.FALSE.equals(exists.f0)) {
if (exists.f1 instanceof NoSuchNamespaceException) {
Expand All @@ -80,12 +85,12 @@ private void findOrCreateTable(TableIdentifier identifier, Schema schema, Partit

LOG.info("Table {} not found during table search. Creating table.", identifier);
try {
Table table = catalog.createTable(identifier, schema, spec);
Table table = tableCreator.createTable(catalog, identifier, schema, spec);
cache.update(identifier, table);
} catch (AlreadyExistsException e) {
LOG.debug("Table {} created concurrently. Skipping creation.", identifier, e);
cache.invalidate(identifier);
findOrCreateTable(identifier, schema, spec);
findOrCreateTable(identifier, schema, spec, tableCreator);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ void testDynamicTableUpdateOperatorNewTable() throws Exception {
CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize,
cacheRefreshMs,
inputSchemaCacheMaximumSize);
inputSchemaCacheMaximumSize,
TableCreator.DEFAULT);
operator.open(null);

DynamicRecordInternal input =
Expand Down Expand Up @@ -93,7 +94,8 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception {
CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize,
cacheRefreshMs,
inputSchemaCacheMaximumSize);
inputSchemaCacheMaximumSize,
TableCreator.DEFAULT);
operator.open(null);

catalog.createTable(table, SCHEMA1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ void testCacheInvalidationAfterSchemaChange() {

catalog.dropTable(tableIdentifier);
catalog.createTable(tableIdentifier, SCHEMA2);
tableUpdater.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned());
tableUpdater.update(
tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned(), TableCreator.DEFAULT);

Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema();
assertThat(schema2.sameSchema(SCHEMA2)).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.file.Path;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestTableUpdater extends TestFlinkIcebergSinkBase {

Expand All @@ -45,15 +49,26 @@ public class TestTableUpdater extends TestFlinkIcebergSinkBase {
Types.NestedField.optional(3, "extra", Types.StringType.get()));

@Test
void testTableCreation() {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
void testTableCreation(@TempDir Path tempDir) {
// Location for tables is not configurable for hadoop catalogs
InMemoryCatalog catalog = new InMemoryCatalog();
catalog.initialize("catalog", Map.of());
catalog.createNamespace(Namespace.of("myNamespace"));
TableIdentifier tableIdentifier = TableIdentifier.parse("myNamespace.myTable");
TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10);
TableUpdater tableUpdater = new TableUpdater(cache, catalog);

tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned());
assertThat(catalog.tableExists(tableIdentifier)).isTrue();
String locationOverride = tempDir.toString() + "/custom-path";
Map<String, String> tableProperties = Map.of("key", "value");
TableCreator tableCreator =
(catalog1, identifier, schema, spec) ->
catalog1.createTable(identifier, schema, spec, locationOverride, tableProperties);

tableUpdater.update(
tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), tableCreator);
assertThat(catalog.tableExists(tableIdentifier)).isTrue();
assertThat(catalog.loadTable(tableIdentifier).properties().get("key")).isEqualTo("value");
assertThat(catalog.loadTable(tableIdentifier).location()).isEqualTo(locationOverride);
TableMetadataCache.ResolvedSchemaInfo cachedSchema = cache.schema(tableIdentifier, SCHEMA);
assertThat(cachedSchema.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
}
Expand All @@ -71,7 +86,8 @@ void testTableAlreadyExists() {
catalog.createTable(tableIdentifier, SCHEMA);
// Make sure that the cache is invalidated and the table refreshed without an error
Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> result =
tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned());
tableUpdater.update(
tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT);
assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME);
assertThat(result.f1).isEqualTo(PartitionSpec.unpartitioned());
Expand All @@ -85,11 +101,13 @@ void testBranchCreationAndCaching() {
TableUpdater tableUpdater = new TableUpdater(cache, catalog);

catalog.createTable(tableIdentifier, SCHEMA);
tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned());
tableUpdater.update(
tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT);
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier);
assertThat(cacheItem).isNotNull();

tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned());
tableUpdater.update(
tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT);
assertThat(cache.getInternalCache()).contains(Map.entry(tableIdentifier, cacheItem));
}

Expand All @@ -101,7 +119,7 @@ void testSpecCreation() {
TableUpdater tableUpdater = new TableUpdater(cache, catalog);

PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 10).build();
tableUpdater.update(tableIdentifier, "main", SCHEMA, spec);
tableUpdater.update(tableIdentifier, "main", SCHEMA, spec, TableCreator.DEFAULT);

Table table = catalog.loadTable(tableIdentifier);
assertThat(table).isNotNull();
Expand All @@ -119,7 +137,12 @@ void testInvalidateOldCacheEntryOnUpdate() {

Schema updated =
tableUpdater
.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned())
.update(
tableIdentifier,
"main",
SCHEMA2,
PartitionSpec.unpartitioned(),
TableCreator.DEFAULT)
.f0
.resolvedTableSchema();
assertThat(updated.sameSchema(SCHEMA2)).isTrue();
Expand All @@ -136,7 +159,8 @@ void testLastResultInvalidation() {
TableUpdater tableUpdater = new TableUpdater(cache, catalog);

// Initialize cache
tableUpdater.update(tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned());
tableUpdater.update(
tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT);

// Update table behind the scenes
catalog.dropTable(tableIdentifier);
Expand All @@ -148,7 +172,12 @@ void testLastResultInvalidation() {

assertThat(
tableUpdater
.update(tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned())
.update(
tableIdentifier,
"main",
SCHEMA2,
PartitionSpec.unpartitioned(),
TableCreator.DEFAULT)
.f0
.compareResult())
.isEqualTo(CompareSchemasVisitor.Result.SAME);
Expand Down