-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: Add TableCreator interface to set table properties/location on DynamicIcebergSink table creation #14578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
06425db to
1ddd45d
Compare
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
Outdated
Show resolved
Hide resolved
|
I’ve added a few comments on the PR, but the bigger question is deciding what functionality we actually want.
We need to draw a clear line: what should the Dynamic Sink support, and what should be handled by external operators outside the Dynamic Iceberg Sink? To make this decision, I’d like input from actual users: @jordepic, @mxm, @Guosmilesmile, or anyone else interested. Maybe someone could even start a discussion on the dev list to reach a wider audience. If some changes are handled by external operators, we may need a mechanism for those operators to notify the Dynamic Iceberg Sink to refresh its caches. One option could be using something like the Flink Orchestrator restart nonce (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#application-restarts-without-spec-change). If the cache detects a new nonce (greater than the current one), it could invalidate the current table cache and reload the data. |
|
I think our requirements, for now, are twofold:
This change is just addressing number 1! I'm happy to expand it to do number 2 as well, though I'm less sure of the business logic that everybody needs there, hence why I excluded it for now. |
1ddd45d to
3805827
Compare
|
Let's start with a narrow scope for this feature. We could always extend it to DynamicRecord, but I'm not convinced the feature should live in DynamicRecord because table properties aren't connected directly to the table data (that's why I kept it outside in #13883). The majority of the use cases will only require setting table properties on table creation. Concerning (2), we've had users request setting the table location as well, so I would suggest to add this feature alongside the table properties on table creation. |
|
Do we need different properties for different tables, or we can use/set the same properties for every table? |
|
I think users will need the table identifier. |
3805827 to
a1aca93
Compare
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableCreator.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableCreator.java
Show resolved
Hide resolved
|
Please rename the PR to match the new feature added |
a1aca93 to
59e291a
Compare
Done. |
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableCreator.java
Show resolved
Hide resolved
The DynamicIcebergSink does not currently allow providing a set of reasonable default table properties at creation time. This change will apply these properties when a table is created (not updated). This helps for specifying things like iceberg version, merge-on-read vs. copy-on-write, table location, and others.
59e291a to
274d118
Compare
The DynamicIcebergSink does not currently allow providing a set of reasonable default table properties at table creation. This change will apply these properties when a table is created (not updated). This helps for specifying things like iceberg version, merge-on-read vs. copy-on-write, table location, and others.