Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
Expand Down Expand Up @@ -664,9 +665,7 @@ public DataStreamSink<RowData> append() {
// Note that IcebergSink internally consists o multiple operators (like writer, committer,
// aggregator).
// The following parallelism will be propagated to all of the above operators.
if (sink.flinkWriteConf.writeParallelism() != null) {
rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism());
}
rowDataDataStreamSink.setParallelism(sink.resolveWriterParallelism(rowDataInput));
return rowDataDataStreamSink;
}
}
Expand Down Expand Up @@ -818,16 +817,19 @@ private DataStream<RowData> distributeDataStreamByHashDistributionMode(
}
}

private int resolveWriterParallelism(DataStream<RowData> input) {
// if the writeParallelism is not specified, we set the default to the input parallelism to
// encourage chaining.
return Optional.ofNullable(flinkWriteConf.writeParallelism()).orElseGet(input::getParallelism);
}

private DataStream<RowData> distributeDataStreamByRangeDistributionMode(
DataStream<RowData> input,
Schema iSchema,
PartitionSpec partitionSpec,
SortOrder sortOrderParam) {

int writerParallelism =
flinkWriteConf.writeParallelism() == null
? input.getParallelism()
: flinkWriteConf.writeParallelism();
int writerParallelism = resolveWriterParallelism(input);

// needed because of checkStyle not allowing us to change the value of an argument
SortOrder sortOrder = sortOrderParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,51 @@ void testErrorOnNullForRequiredField() throws Exception {
assertThatThrownBy(() -> env.execute()).hasRootCauseInstanceOf(NullPointerException.class);
}

@TestTemplate
void testDefaultWriteParallelism() {
List<Row> rows = createRows("");
DataStream<Row> dataStream =
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId");

var sink =
IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.tableSchema(SimpleDataUtil.FLINK_SCHEMA)
.distributionMode(DistributionMode.NONE)
.append();

// since the sink write parallelism was null, it asserts that the default parallelism used was
// the input source parallelism.
// sink.getTransformation is referring to the SinkV2 Writer Operator associated to the
// IcebergSink
assertThat(sink.getTransformation().getParallelism()).isEqualTo(dataStream.getParallelism());
}

@TestTemplate
void testWriteParallelism() {
List<Row> rows = createRows("");

// the parallelism of this input source is always 1, as this is a non-parallel source.
DataStream<Row> dataStream =
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId");

var sink =
IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.tableSchema(SimpleDataUtil.FLINK_SCHEMA)
.distributionMode(DistributionMode.NONE)
.writeParallelism(parallelism)
.append();

// The parallelism has been properly specified when creating the IcebergSink, so this asserts
// that its value is the same as the parallelism TestTemplate parameter
// sink.getTransformation is referring to the SinkV2 Writer Operator associated to the
// IcebergSink
assertThat(sink.getTransformation().getParallelism()).isEqualTo(parallelism);
}

private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
throws Exception {
List<Row> rows = createRows("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
Expand Down Expand Up @@ -676,9 +677,7 @@ public DataStreamSink<RowData> append() {
// Note that IcebergSink internally consists o multiple operators (like writer, committer,
// aggregator).
// The following parallelism will be propagated to all of the above operators.
if (sink.flinkWriteConf.writeParallelism() != null) {
rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism());
}
rowDataDataStreamSink.setParallelism(sink.resolveWriterParallelism(rowDataInput));
return rowDataDataStreamSink;
}
}
Expand Down Expand Up @@ -830,16 +829,19 @@ private DataStream<RowData> distributeDataStreamByHashDistributionMode(
}
}

private int resolveWriterParallelism(DataStream<RowData> input) {
// if the writeParallelism is not specified, we set the default to the input parallelism to
// encourage chaining.
return Optional.ofNullable(flinkWriteConf.writeParallelism()).orElseGet(input::getParallelism);
}

private DataStream<RowData> distributeDataStreamByRangeDistributionMode(
DataStream<RowData> input,
Schema iSchema,
PartitionSpec partitionSpec,
SortOrder sortOrderParam) {

int writerParallelism =
flinkWriteConf.writeParallelism() == null
? input.getParallelism()
: flinkWriteConf.writeParallelism();
int writerParallelism = resolveWriterParallelism(input);

// needed because of checkStyle not allowing us to change the value of an argument
SortOrder sortOrder = sortOrderParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,51 @@ void testErrorOnNullForRequiredField() throws Exception {
assertThatThrownBy(() -> env.execute()).hasRootCauseInstanceOf(NullPointerException.class);
}

@TestTemplate
void testDefaultWriteParallelism() {
List<Row> rows = createRows("");
DataStream<Row> dataStream =
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId");

var sink =
IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.tableSchema(SimpleDataUtil.FLINK_SCHEMA)
.distributionMode(DistributionMode.NONE)
.append();

// since the sink write parallelism was null, it asserts that the default parallelism used was
// the input source parallelism.
// sink.getTransformation is referring to the SinkV2 Writer Operator associated to the
// IcebergSink
assertThat(sink.getTransformation().getParallelism()).isEqualTo(dataStream.getParallelism());
}

@TestTemplate
void testWriteParallelism() {
List<Row> rows = createRows("");

// the parallelism of this input source is always 1, as this is a non-parallel source.
DataStream<Row> dataStream =
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId");

var sink =
IcebergSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.tableSchema(SimpleDataUtil.FLINK_SCHEMA)
.distributionMode(DistributionMode.NONE)
.writeParallelism(parallelism)
.append();

// The parallelism has been properly specified when creating the IcebergSink, so this asserts
// that its value is the same as the parallelism TestTemplate parameter
// sink.getTransformation is referring to the SinkV2 Writer Operator associated to the
// IcebergSink
assertThat(sink.getTransformation().getParallelism()).isEqualTo(parallelism);
}

private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
throws Exception {
List<Row> rows = createRows("");
Expand Down