From b3b13c5e2d3c3b012ea9908f4c719cbcec16c572 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 14 Mar 2025 10:31:52 +0100 Subject: [PATCH] [FLINK-37460][state] Make state processor API checkpoint ID configurable --- .../api/KeyedOperatorTransformation.java | 16 ++++- .../state/api/KeyedStateTransformation.java | 10 +++- .../api/OneInputOperatorTransformation.java | 21 +++++-- .../api/OneInputStateTransformation.java | 24 ++++++-- .../state/api/OperatorTransformation.java | 29 +++++++++ .../org/apache/flink/state/api/Savepoint.java | 14 +++-- .../flink/state/api/SavepointReader.java | 10 +++- .../flink/state/api/SavepointWriter.java | 59 ++++++++++++++++--- .../api/WindowedOperatorTransformation.java | 32 +++++++--- .../api/WindowedStateTransformation.java | 32 +++++++--- .../flink/state/api/WritableSavepoint.java | 4 +- .../state/api/output/MergeOperatorStates.java | 7 ++- .../flink/state/api/output/SnapshotUtils.java | 11 ++-- .../BroadcastStateBootstrapOperator.java | 10 +++- .../KeyedStateBootstrapOperator.java | 10 +++- .../operators/StateBootstrapOperator.java | 10 +++- .../StateBootstrapWrapperOperator.java | 8 ++- .../runtime/metadata/SavepointMetadata.java | 8 +++ .../runtime/metadata/SavepointMetadataV2.java | 8 +++ .../apache/flink/state/api/SavepointTest.java | 6 +- .../state/api/SavepointWriterITCase.java | 18 ++++-- .../KeyedStateBootstrapOperatorTest.java | 4 +- .../state/api/output/SnapshotUtilsTest.java | 2 +- 23 files changed, 285 insertions(+), 68 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java index 5af77d6711745..777b03259b246 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java @@ -47,6 +47,9 @@ public class KeyedOperatorTransformation { /** The data set containing the data to bootstrap the operator state with. */ private final DataSet dataSet; + /** Checkpoint ID. */ + private final long checkpointId; + /** Local max parallelism for the bootstrapped operator. */ private final OptionalInt operatorMaxParallelism; @@ -60,11 +63,13 @@ public class KeyedOperatorTransformation { KeyedOperatorTransformation( DataSet dataSet, + long checkpointId, OptionalInt operatorMaxParallelism, @Nullable Timestamper timestamper, KeySelector keySelector, TypeInformation keyType) { this.dataSet = dataSet; + this.checkpointId = checkpointId; this.operatorMaxParallelism = operatorMaxParallelism; this.timestamper = timestamper; this.keySelector = keySelector; @@ -84,7 +89,8 @@ public class KeyedOperatorTransformation { public BootstrapTransformation transform(KeyedStateBootstrapFunction processFunction) { SavepointWriterOperatorFactory factory = (timestamp, path) -> - new KeyedStateBootstrapOperator<>(timestamp, path, processFunction); + new KeyedStateBootstrapOperator<>( + checkpointId, timestamp, path, processFunction); return transform(factory); } @@ -116,6 +122,12 @@ public BootstrapTransformation transform(SavepointWriterOperatorFactory facto public WindowedOperatorTransformation window( WindowAssigner assigner) { return new WindowedOperatorTransformation<>( - dataSet, operatorMaxParallelism, timestamper, keySelector, keyType, assigner); + dataSet, + checkpointId, + operatorMaxParallelism, + timestamper, + keySelector, + keyType, + assigner); } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java index 8bcef815b0c4a..c8ece7bf59307 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java @@ -43,6 +43,9 @@ public class KeyedStateTransformation { /** The data set containing the data to bootstrap the operator state with. */ private final DataStream stream; + /** Checkpoint ID. */ + private final long checkpointId; + /** Local max parallelism for the bootstrapped operator. */ private final OptionalInt operatorMaxParallelism; @@ -54,10 +57,12 @@ public class KeyedStateTransformation { KeyedStateTransformation( DataStream stream, + long checkpointId, OptionalInt operatorMaxParallelism, KeySelector keySelector, TypeInformation keyType) { this.stream = stream; + this.checkpointId = checkpointId; this.operatorMaxParallelism = operatorMaxParallelism; this.keySelector = keySelector; this.keyType = keyType; @@ -77,7 +82,8 @@ public StateBootstrapTransformation transform( KeyedStateBootstrapFunction processFunction) { SavepointWriterOperatorFactory factory = (timestamp, path) -> - new KeyedStateBootstrapOperator<>(timestamp, path, processFunction); + new KeyedStateBootstrapOperator<>( + checkpointId, timestamp, path, processFunction); return transform(factory); } @@ -109,6 +115,6 @@ public StateBootstrapTransformation transform(SavepointWriterOperatorFactory public WindowedStateTransformation window( WindowAssigner assigner) { return new WindowedStateTransformation<>( - stream, operatorMaxParallelism, keySelector, keyType, assigner); + stream, checkpointId, operatorMaxParallelism, keySelector, keyType, assigner); } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java index 81d99c7a7234c..26b72cf05ea82 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java @@ -55,13 +55,21 @@ public class OneInputOperatorTransformation { /** The data set containing the data to bootstrap the operator state with. */ private final DataSet dataSet; + /** Checkpoint ID. */ + private final long checkpointId; + /** Local max parallelism for the bootstrapped operator. */ private OptionalInt operatorMaxParallelism = OptionalInt.empty(); @Nullable private Timestamper timestamper; OneInputOperatorTransformation(DataSet dataSet) { + this(dataSet, 0L); + } + + OneInputOperatorTransformation(DataSet dataSet, long checkpointId) { this.dataSet = dataSet; + this.checkpointId = checkpointId; } /** @@ -108,7 +116,9 @@ public OneInputOperatorTransformation assignTimestamps(TimestampAssigner a */ public BootstrapTransformation transform(StateBootstrapFunction processFunction) { SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapOperator<>(timestamp, path, processFunction); + (timestamp, path) -> + new StateBootstrapOperator<>( + checkpointId, timestamp, path, processFunction); return transform(factory); } @@ -127,7 +137,8 @@ public BootstrapTransformation transform( BroadcastStateBootstrapFunction processFunction) { SavepointWriterOperatorFactory factory = (timestamp, path) -> - new BroadcastStateBootstrapOperator<>(timestamp, path, processFunction); + new BroadcastStateBootstrapOperator<>( + checkpointId, timestamp, path, processFunction); return transform(factory); } @@ -156,7 +167,7 @@ public KeyedOperatorTransformation keyBy(KeySelector keySelector TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType()); return new KeyedOperatorTransformation<>( - dataSet, operatorMaxParallelism, timestamper, keySelector, keyType); + dataSet, checkpointId, operatorMaxParallelism, timestamper, keySelector, keyType); } /** @@ -170,7 +181,7 @@ public KeyedOperatorTransformation keyBy(KeySelector keySelector public KeyedOperatorTransformation keyBy( KeySelector keySelector, TypeInformation keyType) { return new KeyedOperatorTransformation<>( - dataSet, operatorMaxParallelism, timestamper, keySelector, keyType); + dataSet, checkpointId, operatorMaxParallelism, timestamper, keySelector, keyType); } /** @@ -211,6 +222,6 @@ private KeyedOperatorTransformation keyBy(Keys keys) { TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType()); return new KeyedOperatorTransformation<>( - dataSet, operatorMaxParallelism, timestamper, keySelector, keyType); + dataSet, checkpointId, operatorMaxParallelism, timestamper, keySelector, keyType); } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java index ce9d3977005a9..ad5dc4c89da4f 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java @@ -48,11 +48,19 @@ public class OneInputStateTransformation { /** The data stream containing the data to bootstrap the operator state with. */ private final DataStream stream; + /** Checkpoint ID. */ + private final long checkpointId; + /** Local max parallelism for the bootstrapped operator. */ private OptionalInt operatorMaxParallelism = OptionalInt.empty(); OneInputStateTransformation(DataStream stream) { + this(stream, 0L); + } + + OneInputStateTransformation(DataStream stream, long checkpointId) { this.stream = stream; + this.checkpointId = checkpointId; } /** @@ -81,7 +89,9 @@ public OneInputStateTransformation setMaxParallelism(int maxParallelism) { */ public StateBootstrapTransformation transform(StateBootstrapFunction processFunction) { SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapOperator<>(timestamp, path, processFunction); + (timestamp, path) -> + new StateBootstrapOperator<>( + checkpointId, timestamp, path, processFunction); return transform(factory); } @@ -100,7 +110,8 @@ public StateBootstrapTransformation transform( BroadcastStateBootstrapFunction processFunction) { SavepointWriterOperatorFactory factory = (timestamp, path) -> - new BroadcastStateBootstrapOperator<>(timestamp, path, processFunction); + new BroadcastStateBootstrapOperator<>( + checkpointId, timestamp, path, processFunction); return transform(factory); } @@ -128,7 +139,8 @@ public StateBootstrapTransformation transform(SavepointWriterOperatorFactory public KeyedStateTransformation keyBy(KeySelector keySelector) { TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, stream.getType()); - return new KeyedStateTransformation<>(stream, operatorMaxParallelism, keySelector, keyType); + return new KeyedStateTransformation<>( + stream, checkpointId, operatorMaxParallelism, keySelector, keyType); } /** @@ -141,7 +153,8 @@ public KeyedStateTransformation keyBy(KeySelector keySelector) { */ public KeyedStateTransformation keyBy( KeySelector keySelector, TypeInformation keyType) { - return new KeyedStateTransformation<>(stream, operatorMaxParallelism, keySelector, keyType); + return new KeyedStateTransformation<>( + stream, checkpointId, operatorMaxParallelism, keySelector, keyType); } /** @@ -181,6 +194,7 @@ private KeyedStateTransformation keyBy(Keys keys) { TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, stream.getType()); - return new KeyedStateTransformation<>(stream, operatorMaxParallelism, keySelector, keyType); + return new KeyedStateTransformation<>( + stream, checkpointId, operatorMaxParallelism, keySelector, keyType); } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java index af32098148393..fc52f3965a4be 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java @@ -71,6 +71,22 @@ public static OneInputOperatorTransformation bootstrapWith(DataSet dat return new OneInputOperatorTransformation<>(dataSet); } + /** + * Create a new {@link OperatorTransformation} from a {@link DataSet}. + * + * @param dataSet A dataset of elements. + * @param checkpointId checkpoint ID. + * @param The type of the input. + * @return A {@link OneInputOperatorTransformation}. + * @deprecated use {@link #bootstrapWith(DataStream)} to bootstrap a savepoint using the data + * stream api under batch execution. + */ + @Deprecated + public static OneInputOperatorTransformation bootstrapWith( + DataSet dataSet, long checkpointId) { + return new OneInputOperatorTransformation<>(dataSet, checkpointId); + } + /** * Create a new {@link OneInputStateTransformation} from a {@link DataStream}. * @@ -81,4 +97,17 @@ public static OneInputOperatorTransformation bootstrapWith(DataSet dat public static OneInputStateTransformation bootstrapWith(DataStream stream) { return new OneInputStateTransformation<>(stream); } + + /** + * Create a new {@link OneInputStateTransformation} from a {@link DataStream}. + * + * @param stream A data stream of elements. + * @param checkpointId checkpoint ID. + * @param The type of the input. + * @return A {@link OneInputStateTransformation}. + */ + public static OneInputStateTransformation bootstrapWith( + DataStream stream, long checkpointId) { + return new OneInputStateTransformation<>(stream, checkpointId); + } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java index 033e4859dcc11..1934f98a45e7d 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java @@ -73,7 +73,10 @@ public static ExistingSavepoint load(ExecutionEnvironment env, String path) thro SavepointMetadata savepointMetadata = new SavepointMetadata( - maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates()); + metadata.getCheckpointId(), + maxParallelism, + metadata.getMasterStates(), + metadata.getOperatorStates()); return new ExistingSavepoint(env, savepointMetadata, null); } @@ -102,7 +105,10 @@ public static ExistingSavepoint load( SavepointMetadata savepointMetadata = new SavepointMetadata( - maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates()); + metadata.getCheckpointId(), + maxParallelism, + metadata.getMasterStates(), + metadata.getOperatorStates()); return new ExistingSavepoint(env, savepointMetadata, stateBackend); } @@ -124,7 +130,7 @@ public static NewSavepoint create(int maxParallelism) { SavepointMetadata metadata = new SavepointMetadata( - maxParallelism, Collections.emptyList(), Collections.emptyList()); + 0L, maxParallelism, Collections.emptyList(), Collections.emptyList()); return new NewSavepoint(metadata, null); } @@ -147,7 +153,7 @@ public static NewSavepoint create(StateBackend stateBackend, int maxParallelism) SavepointMetadata metadata = new SavepointMetadata( - maxParallelism, Collections.emptyList(), Collections.emptyList()); + 0L, maxParallelism, Collections.emptyList(), Collections.emptyList()); return new NewSavepoint(metadata, stateBackend); } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java index f7ba27816547c..ae1b00474ebd1 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java @@ -82,7 +82,10 @@ public static SavepointReader read(StreamExecutionEnvironment env, String path) SavepointMetadataV2 savepointMetadata = new SavepointMetadataV2( - maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates()); + metadata.getCheckpointId(), + maxParallelism, + metadata.getMasterStates(), + metadata.getOperatorStates()); return new SavepointReader(env, savepointMetadata, null); } @@ -111,7 +114,10 @@ public static SavepointReader read( SavepointMetadataV2 savepointMetadata = new SavepointMetadataV2( - maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates()); + metadata.getCheckpointId(), + maxParallelism, + metadata.getMasterStates(), + metadata.getOperatorStates()); return new SavepointReader(env, savepointMetadata, stateBackend); } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java index 01f51ca997a09..c272abe0edb80 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java @@ -125,13 +125,16 @@ private static SavepointMetadataV2 readSavepointMetadata(String path) throws IOE "Savepoint must contain at least one operator state.")); return new SavepointMetadataV2( - maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates()); + metadata.getCheckpointId(), + maxParallelism, + metadata.getMasterStates(), + metadata.getOperatorStates()); } /** @deprecated use {@link #newSavepoint(StreamExecutionEnvironment, int)} */ @Deprecated public static SavepointWriter newSavepoint(int maxParallelism) { - return new SavepointWriter(createSavepointMetadata(maxParallelism), null, null); + return new SavepointWriter(createSavepointMetadata(0L, maxParallelism), null, null); } /** @@ -146,19 +149,54 @@ public static SavepointWriter newSavepoint(int maxParallelism) { public static SavepointWriter newSavepoint( StreamExecutionEnvironment executionEnvironment, int maxParallelism) { return new SavepointWriter( - createSavepointMetadata(maxParallelism), null, executionEnvironment); + createSavepointMetadata(0L, maxParallelism), null, executionEnvironment); + } + + /** + * Creates a new savepoint. The savepoint will be written using the state backend defined via + * the clusters configuration. + * + * @param maxParallelism The max parallelism of the savepoint. + * @param checkpointId checkpoint ID. + * @return A {@link SavepointWriter}. + * @see #newSavepoint(StreamExecutionEnvironment, StateBackend, int) + * @see #withConfiguration(ConfigOption, Object) + */ + public static SavepointWriter newSavepoint( + StreamExecutionEnvironment executionEnvironment, + long checkpointId, + int maxParallelism) { + return new SavepointWriter( + createSavepointMetadata(checkpointId, maxParallelism), null, executionEnvironment); } /** @deprecated use {@link #newSavepoint(StreamExecutionEnvironment, StateBackend, int)} */ @Deprecated public static SavepointWriter newSavepoint(StateBackend stateBackend, int maxParallelism) { - return new SavepointWriter(createSavepointMetadata(maxParallelism), stateBackend, null); + return new SavepointWriter(createSavepointMetadata(0L, maxParallelism), stateBackend, null); + } + + /** + * Creates a new savepoint. + * + * @param stateBackend The state backend of the savepoint used for keyed state. + * @param maxParallelism The max parallelism of the savepoint. + * @return A {@link SavepointWriter}. + * @see #newSavepoint(StreamExecutionEnvironment, int) + */ + public static SavepointWriter newSavepoint( + StreamExecutionEnvironment executionEnvironment, + StateBackend stateBackend, + int maxParallelism) { + return new SavepointWriter( + createSavepointMetadata(0L, maxParallelism), stateBackend, executionEnvironment); } /** * Creates a new savepoint. * * @param stateBackend The state backend of the savepoint used for keyed state. + * @param checkpointId checkpoint ID. * @param maxParallelism The max parallelism of the savepoint. * @return A {@link SavepointWriter}. * @see #newSavepoint(StreamExecutionEnvironment, int) @@ -166,12 +204,16 @@ public static SavepointWriter newSavepoint(StateBackend stateBackend, int maxPar public static SavepointWriter newSavepoint( StreamExecutionEnvironment executionEnvironment, StateBackend stateBackend, + long checkpointId, int maxParallelism) { return new SavepointWriter( - createSavepointMetadata(maxParallelism), stateBackend, executionEnvironment); + createSavepointMetadata(checkpointId, maxParallelism), + stateBackend, + executionEnvironment); } - private static SavepointMetadataV2 createSavepointMetadata(int maxParallelism) { + private static SavepointMetadataV2 createSavepointMetadata( + long checkpointId, int maxParallelism) { Preconditions.checkArgument( maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM, "Maximum parallelism must be between 1 and " @@ -180,7 +222,7 @@ private static SavepointMetadataV2 createSavepointMetadata(int maxParallelism) { + maxParallelism); return new SavepointMetadataV2( - maxParallelism, Collections.emptyList(), Collections.emptyList()); + checkpointId, maxParallelism, Collections.emptyList(), Collections.emptyList()); } /** @@ -336,7 +378,8 @@ public final void write(String path) { "reduce(OperatorState)", TypeInformation.of(CheckpointMetadata.class), new GroupReduceOperator<>( - new MergeOperatorStates(metadata.getMasterStates()))) + new MergeOperatorStates( + metadata.getCheckpointId(), metadata.getMasterStates()))) .forceNonParallel() .map(new CheckpointMetadataCheckpointMetadataMapFunction(this.uidTransformationMap)) .setParallelism(1) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java index ed72d7eaf19de..76770c5387cb0 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java @@ -60,6 +60,8 @@ public class WindowedOperatorTransformation { private final WindowOperatorBuilder builder; + private final long checkpointId; + private final OptionalInt operatorMaxParallelism; @Nullable private final Timestamper timestamper; @@ -70,12 +72,14 @@ public class WindowedOperatorTransformation { WindowedOperatorTransformation( DataSet input, + long checkpointId, OptionalInt operatorMaxParallelism, @Nullable Timestamper timestamper, KeySelector keySelector, TypeInformation keyType, WindowAssigner windowAssigner) { this.input = input; + this.checkpointId = checkpointId; this.operatorMaxParallelism = operatorMaxParallelism; this.timestamper = timestamper; this.keySelector = keySelector; @@ -163,7 +167,9 @@ public BootstrapTransformation reduce( WindowOperator operator = builder.reduce(reduceFunction, function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new BootstrapTransformation<>( input, operatorMaxParallelism, timestamper, factory, keySelector, keyType); } @@ -189,7 +195,9 @@ public BootstrapTransformation reduce( WindowOperator operator = builder.reduce(reduceFunction, function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new BootstrapTransformation<>( input, operatorMaxParallelism, timestamper, factory, keySelector, keyType); } @@ -323,7 +331,9 @@ public BootstrapTransformation aggregate( builder.aggregate(aggregateFunction, windowFunction, accumulatorType); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new BootstrapTransformation<>( input, operatorMaxParallelism, timestamper, factory, keySelector, keyType); } @@ -400,7 +410,9 @@ public BootstrapTransformation aggregate( builder.aggregate(aggregateFunction, windowFunction, accumulatorType); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new BootstrapTransformation<>( input, operatorMaxParallelism, timestamper, factory, keySelector, keyType); } @@ -424,7 +436,9 @@ public BootstrapTransformation apply(WindowFunction function) WindowOperator operator = builder.apply(function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new BootstrapTransformation<>( input, operatorMaxParallelism, timestamper, factory, keySelector, keyType); } @@ -448,7 +462,9 @@ public BootstrapTransformation apply( WindowOperator operator = builder.apply(function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new BootstrapTransformation<>( input, operatorMaxParallelism, timestamper, factory, keySelector, keyType); } @@ -469,7 +485,9 @@ public BootstrapTransformation process(ProcessWindowFunction WindowOperator operator = builder.process(function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new BootstrapTransformation<>( input, operatorMaxParallelism, timestamper, factory, keySelector, keyType); } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java index 5d42ba7811c0c..49c433fe96821 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java @@ -57,6 +57,8 @@ public class WindowedStateTransformation { private final WindowOperatorBuilder builder; + private final long checkpointId; + private final OptionalInt operatorMaxParallelism; private final KeySelector keySelector; @@ -65,11 +67,13 @@ public class WindowedStateTransformation { WindowedStateTransformation( DataStream input, + long checkpointId, OptionalInt operatorMaxParallelism, KeySelector keySelector, TypeInformation keyType, WindowAssigner windowAssigner) { this.input = input; + this.checkpointId = checkpointId; this.operatorMaxParallelism = operatorMaxParallelism; this.keySelector = keySelector; this.keyType = keyType; @@ -156,7 +160,9 @@ public StateBootstrapTransformation reduce( WindowOperator operator = builder.reduce(reduceFunction, function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new StateBootstrapTransformation<>( input, operatorMaxParallelism, factory, keySelector, keyType); } @@ -182,7 +188,9 @@ public StateBootstrapTransformation reduce( WindowOperator operator = builder.reduce(reduceFunction, function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new StateBootstrapTransformation<>( input, operatorMaxParallelism, factory, keySelector, keyType); } @@ -317,7 +325,9 @@ public StateBootstrapTransformation aggregate( builder.aggregate(aggregateFunction, windowFunction, accumulatorType); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new StateBootstrapTransformation<>( input, operatorMaxParallelism, factory, keySelector, keyType); } @@ -394,7 +404,9 @@ public StateBootstrapTransformation aggregate( builder.aggregate(aggregateFunction, windowFunction, accumulatorType); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new StateBootstrapTransformation<>( input, operatorMaxParallelism, factory, keySelector, keyType); } @@ -418,7 +430,9 @@ public StateBootstrapTransformation apply(WindowFunction func WindowOperator operator = builder.apply(function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new StateBootstrapTransformation<>( input, operatorMaxParallelism, factory, keySelector, keyType); } @@ -442,7 +456,9 @@ public StateBootstrapTransformation apply( WindowOperator operator = builder.apply(function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new StateBootstrapTransformation<>( input, operatorMaxParallelism, factory, keySelector, keyType); } @@ -463,7 +479,9 @@ public StateBootstrapTransformation process(ProcessWindowFunction operator = builder.process(function); SavepointWriterOperatorFactory factory = - (timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator); + (timestamp, path) -> + new StateBootstrapWrapperOperator<>( + checkpointId, timestamp, path, operator); return new StateBootstrapTransformation<>( input, operatorMaxParallelism, factory, keySelector, keyType); } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java index e137ccd1ae659..8bf3c84dbe6b4 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java @@ -140,7 +140,9 @@ public final void write(String path) { finalOperatorStates = newOperatorStates.union(existingOperatorStates); } finalOperatorStates - .reduceGroup(new MergeOperatorStates(metadata.getMasterStates())) + .reduceGroup( + new MergeOperatorStates( + metadata.getCheckpointId(), metadata.getMasterStates())) .name("reduce(OperatorState)") .output(new SavepointOutputFormat(savepointPath)) .name(path); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java index f00bfa34943bf..584ae131fb7d3 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java @@ -39,11 +39,14 @@ public class MergeOperatorStates implements GroupReduceFunction masterStates; - public MergeOperatorStates(Collection masterStates) { + public MergeOperatorStates(long checkpointId, Collection masterStates) { Preconditions.checkNotNull(masterStates, "Master state metadata must not be null"); + this.checkpointId = checkpointId; this.masterStates = masterStates; } @@ -51,7 +54,7 @@ public MergeOperatorStates(Collection masterStates) { public void reduce(Iterable values, Collector out) { CheckpointMetadata metadata = new CheckpointMetadata( - SnapshotUtils.CHECKPOINT_ID, + checkpointId, StreamSupport.stream(values.spliterator(), false) .collect(Collectors.toList()), masterStates); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java index b4b6e5dbd6ac6..2c7664234ab7b 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java @@ -41,11 +41,10 @@ /** Takes a final snapshot of the state of an operator subtask. */ @Internal public final class SnapshotUtils { - static final long CHECKPOINT_ID = 0L; - private SnapshotUtils() {} public static > TaggedOperatorSubtaskState snapshot( + long checkpointId, OP operator, int index, long timestamp, @@ -64,21 +63,22 @@ public static > TaggedOperatorSubtaskState s isUnalignedCheckpoint, CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT); - operator.prepareSnapshotPreBarrier(CHECKPOINT_ID); + operator.prepareSnapshotPreBarrier(checkpointId); CheckpointStreamFactory storage = createStreamFactory(configuration, options); OperatorSnapshotFutures snapshotInProgress = - operator.snapshotState(CHECKPOINT_ID, timestamp, options, storage); + operator.snapshotState(checkpointId, timestamp, options, storage); OperatorSubtaskState state = new OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState(); - operator.notifyCheckpointComplete(CHECKPOINT_ID); + operator.notifyCheckpointComplete(checkpointId); return new TaggedOperatorSubtaskState(index, state); } public static > TaggedOperatorSubtaskState snapshot( + long checkpointId, OP operator, int index, long timestamp, @@ -89,6 +89,7 @@ public static > TaggedOperatorSubtaskState s throws Exception { return snapshot( + checkpointId, operator, index, timestamp, diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java index 04aee7899c651..786bd61d39d15 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java @@ -44,6 +44,8 @@ public class BroadcastStateBootstrapOperator private static final long serialVersionUID = 1L; + private final long checkpointId; + private final long timestamp; private final Path savepointPath; @@ -51,10 +53,13 @@ public class BroadcastStateBootstrapOperator private transient ContextImpl context; public BroadcastStateBootstrapOperator( - long timestamp, Path savepointPath, BroadcastStateBootstrapFunction function) { + long checkpointId, + long timestamp, + Path savepointPath, + BroadcastStateBootstrapFunction function) { super(function); + this.checkpointId = checkpointId; this.timestamp = timestamp; - this.savepointPath = savepointPath; } @@ -73,6 +78,7 @@ public void processElement(StreamRecord element) throws Exception { public void endInput() throws Exception { TaggedOperatorSubtaskState state = SnapshotUtils.snapshot( + checkpointId, this, getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), timestamp, diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java index b3be5b988d620..0648342d6d014 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java @@ -48,6 +48,8 @@ public class KeyedStateBootstrapOperator private static final long serialVersionUID = 1L; + private final long checkpointId; + private final long timestamp; private final Path savepointPath; @@ -55,9 +57,12 @@ public class KeyedStateBootstrapOperator private transient KeyedStateBootstrapOperator.ContextImpl context; public KeyedStateBootstrapOperator( - long timestamp, Path savepointPath, KeyedStateBootstrapFunction function) { + long checkpointId, + long timestamp, + Path savepointPath, + KeyedStateBootstrapFunction function) { super(function); - + this.checkpointId = checkpointId; this.timestamp = timestamp; this.savepointPath = savepointPath; } @@ -88,6 +93,7 @@ public void processElement(StreamRecord element) throws Exception { public void endInput() throws Exception { TaggedOperatorSubtaskState state = SnapshotUtils.snapshot( + checkpointId, this, getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), timestamp, diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java index 4ae00d8e2d4e8..3c6ee3209aefa 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java @@ -40,6 +40,8 @@ public class StateBootstrapOperator private static final long serialVersionUID = 1L; + private final long checkpointId; + private final long timestamp; private final Path savepointPath; @@ -47,9 +49,12 @@ public class StateBootstrapOperator private transient ContextImpl context; public StateBootstrapOperator( - long timestamp, Path savepointPath, StateBootstrapFunction function) { + long checkpointId, + long timestamp, + Path savepointPath, + StateBootstrapFunction function) { super(function); - + this.checkpointId = checkpointId; this.timestamp = timestamp; this.savepointPath = savepointPath; } @@ -69,6 +74,7 @@ public void processElement(StreamRecord element) throws Exception { public void endInput() throws Exception { TaggedOperatorSubtaskState state = SnapshotUtils.snapshot( + checkpointId, this, getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), timestamp, diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java index ccbf588b81c08..669628d6ef1d8 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java @@ -55,6 +55,8 @@ public final class StateBootstrapWrapperOperator< private static final long serialVersionUID = 1L; + private final long checkpointId; + private final long timestamp; private final Path savepointPath; @@ -63,8 +65,9 @@ public final class StateBootstrapWrapperOperator< private final OP operator; - public StateBootstrapWrapperOperator(long timestamp, Path savepointPath, OP operator) { - + public StateBootstrapWrapperOperator( + long checkpointId, long timestamp, Path savepointPath, OP operator) { + this.checkpointId = checkpointId; this.timestamp = timestamp; this.savepointPath = savepointPath; this.operator = operator; @@ -185,6 +188,7 @@ public Object getCurrentKey() { public void endInput() throws Exception { TaggedOperatorSubtaskState state = SnapshotUtils.snapshot( + checkpointId, this, operator.getContainingTask() .getEnvironment() diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java index 3f801a34b01f6..9e12b394a2565 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java @@ -41,6 +41,8 @@ @Deprecated public class SavepointMetadata { + private final long checkpointId; + private final int maxParallelism; private final Collection masterStates; @@ -48,6 +50,7 @@ public class SavepointMetadata { private final Map operatorStateIndex; public SavepointMetadata( + long checkpointId, int maxParallelism, Collection masterStates, Collection initialStates) { @@ -57,6 +60,7 @@ public SavepointMetadata( + UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); + this.checkpointId = checkpointId; this.maxParallelism = maxParallelism; this.masterStates = Preconditions.checkNotNull(masterStates); @@ -69,6 +73,10 @@ public SavepointMetadata( OperatorStateSpec.existing(existingState))); } + public long getCheckpointId() { + return checkpointId; + } + public int getMaxParallelism() { return maxParallelism; } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java index 72cfa27781f8d..dc1b260d3c795 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java @@ -41,6 +41,8 @@ @Internal public class SavepointMetadataV2 { + private final long checkpointId; + private final int maxParallelism; private final Collection masterStates; @@ -48,6 +50,7 @@ public class SavepointMetadataV2 { private final Map operatorStateIndex; public SavepointMetadataV2( + long checkpointId, int maxParallelism, Collection masterStates, Collection initialStates) { @@ -59,6 +62,7 @@ public SavepointMetadataV2( + maxParallelism); Preconditions.checkNotNull(masterStates); + this.checkpointId = checkpointId; this.maxParallelism = maxParallelism; this.masterStates = new ArrayList<>(masterStates); this.operatorStateIndex = CollectionUtil.newHashMapWithExpectedSize(initialStates.size()); @@ -70,6 +74,10 @@ public SavepointMetadataV2( OperatorStateSpecV2.existing(existingState))); } + public long getCheckpointId() { + return checkpointId; + } + public int getMaxParallelism() { return maxParallelism; } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java index ef63bd4829916..ff28ec1fa32e5 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java @@ -51,7 +51,7 @@ public void testNewSavepointEnforceUniqueUIDs() { .transform(new ExampleStateBootstrapFunction()); SavepointMetadata metadata = - new SavepointMetadata(1, Collections.emptyList(), Collections.emptyList()); + new SavepointMetadata(0L, 1, Collections.emptyList(), Collections.emptyList()); new NewSavepoint(metadata, new MemoryStateBackend()) .withOperator(UID, transformation) @@ -74,7 +74,7 @@ public void testExistingSavepointEnforceUniqueUIDs() throws IOException { new OperatorState(OperatorIDGenerator.fromUid(UID), 1, 4)); SavepointMetadata metadata = - new SavepointMetadata(4, Collections.emptyList(), operatorStates); + new SavepointMetadata(0L, 4, Collections.emptyList(), operatorStates); new ExistingSavepoint(env, metadata, new MemoryStateBackend()) .withOperator(UID, transformation) @@ -97,7 +97,7 @@ public void testExistingSavepointEnforceUniqueUIDsWithOldSavepoint() throws IOEx new OperatorState(OperatorIDGenerator.fromUid(UID), 1, 4)); SavepointMetadata metadata = - new SavepointMetadata(4, Collections.emptyList(), operatorStates); + new SavepointMetadata(0L, 4, Collections.emptyList(), operatorStates); new ExistingSavepoint(env, metadata, new MemoryStateBackend()) .withOperator(UID, transformation) diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java index f10c99faabba5..a2380caa1da54 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -37,6 +38,7 @@ import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction; import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction; import org.apache.flink.state.api.functions.StateBootstrapFunction; +import org.apache.flink.state.api.runtime.SavepointLoader; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -64,6 +66,8 @@ /** IT test for writing savepoints. */ public class SavepointWriterITCase extends AbstractTestBaseJUnit4 { + private static final long CHECKPOINT_ID = 42; + private static final String ACCOUNT_UID = "accounts"; private static final String CURRENCY_UID = "currency"; @@ -113,18 +117,18 @@ private void bootstrapState(StateBackend backend, String savepointPath) throws E env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); StateBootstrapTransformation transformation = - OperatorTransformation.bootstrapWith(env.fromData(accounts)) + OperatorTransformation.bootstrapWith(env.fromData(accounts), CHECKPOINT_ID) .keyBy(acc -> acc.id) .transform(new AccountBootstrapper()); StateBootstrapTransformation broadcastTransformation = - OperatorTransformation.bootstrapWith(env.fromData(currencyRates)) + OperatorTransformation.bootstrapWith(env.fromData(currencyRates), CHECKPOINT_ID) .transform(new CurrencyBootstrapFunction()); SavepointWriter writer = backend == null - ? SavepointWriter.newSavepoint(env, 128) - : SavepointWriter.newSavepoint(env, backend, 128); + ? SavepointWriter.newSavepoint(env, CHECKPOINT_ID, 128) + : SavepointWriter.newSavepoint(env, backend, CHECKPOINT_ID, 128); writer.withOperator(OperatorIdentifier.forUid(ACCOUNT_UID), transformation) .withOperator(getUidHashFromUid(CURRENCY_UID), broadcastTransformation) @@ -134,6 +138,9 @@ private void bootstrapState(StateBackend backend, String savepointPath) throws E } private void validateBootstrap(StateBackend backend, String savepointPath) throws Exception { + CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata(savepointPath); + assertThat(metadata.getCheckpointId()).isEqualTo(CHECKPOINT_ID); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); if (backend != null) { @@ -186,6 +193,9 @@ private void modifySavepoint(StateBackend backend, String savepointPath, String } private void validateModification(StateBackend backend, String savepointPath) throws Exception { + CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata(savepointPath); + assertThat(metadata.getCheckpointId()).isEqualTo(CHECKPOINT_ID); + StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); if (backend != null) { sEnv.setStateBackend(backend); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java index aeab132053b4d..b680718ea1fcf 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java @@ -61,7 +61,7 @@ public void testTimerStateRestorable() throws Exception { OperatorSubtaskState state; KeyedStateBootstrapOperator bootstrapOperator = - new KeyedStateBootstrapOperator<>(0L, path, new TimerBootstrapFunction()); + new KeyedStateBootstrapOperator<>(0L, 0L, path, new TimerBootstrapFunction()); try (KeyedOneInputStreamOperatorTestHarness harness = getHarness(bootstrapOperator)) { processElements(harness, 1L, 2L, 3L); @@ -92,7 +92,7 @@ public void testNonTimerStatesRestorableByNonProcessesOperator() throws Exceptio OperatorSubtaskState state; KeyedStateBootstrapOperator bootstrapOperator = - new KeyedStateBootstrapOperator<>(0L, path, new SimpleBootstrapFunction()); + new KeyedStateBootstrapOperator<>(0L, 0L, path, new SimpleBootstrapFunction()); try (KeyedOneInputStreamOperatorTestHarness harness = getHarness(bootstrapOperator)) { processElements(harness, 1L, 2L, 3L); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java index 0582fcc9d9acd..3cedf154c0d6d 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java @@ -77,7 +77,7 @@ private void testSnapshotUtilsLifecycleWithSavepointFormatType( Path path = new Path(folder.newFolder().getAbsolutePath()); SnapshotUtils.snapshot( - operator, 0, 0L, true, false, new Configuration(), path, savepointFormatType); + 0L, operator, 0, 0L, true, false, new Configuration(), path, savepointFormatType); Assert.assertEquals(SavepointType.savepoint(savepointFormatType), actualSnapshotType); Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING);