diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index bb9711234bf3..b267e0c086cb 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -31,6 +31,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Function; @@ -664,9 +665,7 @@ public DataStreamSink append() { // Note that IcebergSink internally consists o multiple operators (like writer, committer, // aggregator). // The following parallelism will be propagated to all of the above operators. - if (sink.flinkWriteConf.writeParallelism() != null) { - rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism()); - } + rowDataDataStreamSink.setParallelism(sink.resolveWriterParallelism(rowDataInput)); return rowDataDataStreamSink; } } @@ -818,16 +817,19 @@ private DataStream distributeDataStreamByHashDistributionMode( } } + private int resolveWriterParallelism(DataStream input) { + // if the writeParallelism is not specified, we set the default to the input parallelism to + // encourage chaining. + return Optional.ofNullable(flinkWriteConf.writeParallelism()).orElseGet(input::getParallelism); + } + private DataStream distributeDataStreamByRangeDistributionMode( DataStream input, Schema iSchema, PartitionSpec partitionSpec, SortOrder sortOrderParam) { - int writerParallelism = - flinkWriteConf.writeParallelism() == null - ? input.getParallelism() - : flinkWriteConf.writeParallelism(); + int writerParallelism = resolveWriterParallelism(input); // needed because of checkStyle not allowing us to change the value of an argument SortOrder sortOrder = sortOrderParam; diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java index c7ad594412ad..138831b69eca 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java @@ -388,6 +388,51 @@ void testErrorOnNullForRequiredField() throws Exception { assertThatThrownBy(() -> env.execute()).hasRootCauseInstanceOf(NullPointerException.class); } + @TestTemplate + void testDefaultWriteParallelism() { + List rows = createRows(""); + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + var sink = + IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .tableSchema(SimpleDataUtil.FLINK_SCHEMA) + .distributionMode(DistributionMode.NONE) + .append(); + + // since the sink write parallelism was null, it asserts that the default parallelism used was + // the input source parallelism. + // sink.getTransformation is referring to the SinkV2 Writer Operator associated to the + // IcebergSink + assertThat(sink.getTransformation().getParallelism()).isEqualTo(dataStream.getParallelism()); + } + + @TestTemplate + void testWriteParallelism() { + List rows = createRows(""); + + // the parallelism of this input source is always 1, as this is a non-parallel source. + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + var sink = + IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .tableSchema(SimpleDataUtil.FLINK_SCHEMA) + .distributionMode(DistributionMode.NONE) + .writeParallelism(parallelism) + .append(); + + // The parallelism has been properly specified when creating the IcebergSink, so this asserts + // that its value is the same as the parallelism TestTemplate parameter + // sink.getTransformation is referring to the SinkV2 Writer Operator associated to the + // IcebergSink + assertThat(sink.getTransformation().getParallelism()).isEqualTo(parallelism); + } + private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception { List rows = createRows(""); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 586f96468d59..7546518cdef1 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -31,6 +31,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Function; @@ -676,9 +677,7 @@ public DataStreamSink append() { // Note that IcebergSink internally consists o multiple operators (like writer, committer, // aggregator). // The following parallelism will be propagated to all of the above operators. - if (sink.flinkWriteConf.writeParallelism() != null) { - rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism()); - } + rowDataDataStreamSink.setParallelism(sink.resolveWriterParallelism(rowDataInput)); return rowDataDataStreamSink; } } @@ -830,16 +829,19 @@ private DataStream distributeDataStreamByHashDistributionMode( } } + private int resolveWriterParallelism(DataStream input) { + // if the writeParallelism is not specified, we set the default to the input parallelism to + // encourage chaining. + return Optional.ofNullable(flinkWriteConf.writeParallelism()).orElseGet(input::getParallelism); + } + private DataStream distributeDataStreamByRangeDistributionMode( DataStream input, Schema iSchema, PartitionSpec partitionSpec, SortOrder sortOrderParam) { - int writerParallelism = - flinkWriteConf.writeParallelism() == null - ? input.getParallelism() - : flinkWriteConf.writeParallelism(); + int writerParallelism = resolveWriterParallelism(input); // needed because of checkStyle not allowing us to change the value of an argument SortOrder sortOrder = sortOrderParam; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java index c7ad594412ad..138831b69eca 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java @@ -388,6 +388,51 @@ void testErrorOnNullForRequiredField() throws Exception { assertThatThrownBy(() -> env.execute()).hasRootCauseInstanceOf(NullPointerException.class); } + @TestTemplate + void testDefaultWriteParallelism() { + List rows = createRows(""); + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + var sink = + IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .tableSchema(SimpleDataUtil.FLINK_SCHEMA) + .distributionMode(DistributionMode.NONE) + .append(); + + // since the sink write parallelism was null, it asserts that the default parallelism used was + // the input source parallelism. + // sink.getTransformation is referring to the SinkV2 Writer Operator associated to the + // IcebergSink + assertThat(sink.getTransformation().getParallelism()).isEqualTo(dataStream.getParallelism()); + } + + @TestTemplate + void testWriteParallelism() { + List rows = createRows(""); + + // the parallelism of this input source is always 1, as this is a non-parallel source. + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + var sink = + IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .tableSchema(SimpleDataUtil.FLINK_SCHEMA) + .distributionMode(DistributionMode.NONE) + .writeParallelism(parallelism) + .append(); + + // The parallelism has been properly specified when creating the IcebergSink, so this asserts + // that its value is the same as the parallelism TestTemplate parameter + // sink.getTransformation is referring to the SinkV2 Writer Operator associated to the + // IcebergSink + assertThat(sink.getTransformation().getParallelism()).isEqualTo(parallelism); + } + private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception { List rows = createRows("");