Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b3b13c5

Browse files
committedMar 17, 2025
[FLINK-37460][state] Make state processor API checkpoint ID configurable
1 parent 69d998f commit b3b13c5

23 files changed

+285
-68
lines changed
 

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedOperatorTransformation.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public class KeyedOperatorTransformation<K, T> {
4747
/** The data set containing the data to bootstrap the operator state with. */
4848
private final DataSet<T> dataSet;
4949

50+
/** Checkpoint ID. */
51+
private final long checkpointId;
52+
5053
/** Local max parallelism for the bootstrapped operator. */
5154
private final OptionalInt operatorMaxParallelism;
5255

@@ -60,11 +63,13 @@ public class KeyedOperatorTransformation<K, T> {
6063

6164
KeyedOperatorTransformation(
6265
DataSet<T> dataSet,
66+
long checkpointId,
6367
OptionalInt operatorMaxParallelism,
6468
@Nullable Timestamper<T> timestamper,
6569
KeySelector<T, K> keySelector,
6670
TypeInformation<K> keyType) {
6771
this.dataSet = dataSet;
72+
this.checkpointId = checkpointId;
6873
this.operatorMaxParallelism = operatorMaxParallelism;
6974
this.timestamper = timestamper;
7075
this.keySelector = keySelector;
@@ -84,7 +89,8 @@ public class KeyedOperatorTransformation<K, T> {
8489
public BootstrapTransformation<T> transform(KeyedStateBootstrapFunction<K, T> processFunction) {
8590
SavepointWriterOperatorFactory factory =
8691
(timestamp, path) ->
87-
new KeyedStateBootstrapOperator<>(timestamp, path, processFunction);
92+
new KeyedStateBootstrapOperator<>(
93+
checkpointId, timestamp, path, processFunction);
8894
return transform(factory);
8995
}
9096

@@ -116,6 +122,12 @@ public BootstrapTransformation<T> transform(SavepointWriterOperatorFactory facto
116122
public <W extends Window> WindowedOperatorTransformation<T, K, W> window(
117123
WindowAssigner<? super T, W> assigner) {
118124
return new WindowedOperatorTransformation<>(
119-
dataSet, operatorMaxParallelism, timestamper, keySelector, keyType, assigner);
125+
dataSet,
126+
checkpointId,
127+
operatorMaxParallelism,
128+
timestamper,
129+
keySelector,
130+
keyType,
131+
assigner);
120132
}
121133
}

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public class KeyedStateTransformation<K, T> {
4343
/** The data set containing the data to bootstrap the operator state with. */
4444
private final DataStream<T> stream;
4545

46+
/** Checkpoint ID. */
47+
private final long checkpointId;
48+
4649
/** Local max parallelism for the bootstrapped operator. */
4750
private final OptionalInt operatorMaxParallelism;
4851

@@ -54,10 +57,12 @@ public class KeyedStateTransformation<K, T> {
5457

5558
KeyedStateTransformation(
5659
DataStream<T> stream,
60+
long checkpointId,
5761
OptionalInt operatorMaxParallelism,
5862
KeySelector<T, K> keySelector,
5963
TypeInformation<K> keyType) {
6064
this.stream = stream;
65+
this.checkpointId = checkpointId;
6166
this.operatorMaxParallelism = operatorMaxParallelism;
6267
this.keySelector = keySelector;
6368
this.keyType = keyType;
@@ -77,7 +82,8 @@ public StateBootstrapTransformation<T> transform(
7782
KeyedStateBootstrapFunction<K, T> processFunction) {
7883
SavepointWriterOperatorFactory factory =
7984
(timestamp, path) ->
80-
new KeyedStateBootstrapOperator<>(timestamp, path, processFunction);
85+
new KeyedStateBootstrapOperator<>(
86+
checkpointId, timestamp, path, processFunction);
8187
return transform(factory);
8288
}
8389

@@ -109,6 +115,6 @@ public StateBootstrapTransformation<T> transform(SavepointWriterOperatorFactory
109115
public <W extends Window> WindowedStateTransformation<T, K, W> window(
110116
WindowAssigner<? super T, W> assigner) {
111117
return new WindowedStateTransformation<>(
112-
stream, operatorMaxParallelism, keySelector, keyType, assigner);
118+
stream, checkpointId, operatorMaxParallelism, keySelector, keyType, assigner);
113119
}
114120
}

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputOperatorTransformation.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,21 @@ public class OneInputOperatorTransformation<T> {
5555
/** The data set containing the data to bootstrap the operator state with. */
5656
private final DataSet<T> dataSet;
5757

58+
/** Checkpoint ID. */
59+
private final long checkpointId;
60+
5861
/** Local max parallelism for the bootstrapped operator. */
5962
private OptionalInt operatorMaxParallelism = OptionalInt.empty();
6063

6164
@Nullable private Timestamper<T> timestamper;
6265

6366
OneInputOperatorTransformation(DataSet<T> dataSet) {
67+
this(dataSet, 0L);
68+
}
69+
70+
OneInputOperatorTransformation(DataSet<T> dataSet, long checkpointId) {
6471
this.dataSet = dataSet;
72+
this.checkpointId = checkpointId;
6573
}
6674

6775
/**
@@ -108,7 +116,9 @@ public OneInputOperatorTransformation<T> assignTimestamps(TimestampAssigner<T> a
108116
*/
109117
public BootstrapTransformation<T> transform(StateBootstrapFunction<T> processFunction) {
110118
SavepointWriterOperatorFactory factory =
111-
(timestamp, path) -> new StateBootstrapOperator<>(timestamp, path, processFunction);
119+
(timestamp, path) ->
120+
new StateBootstrapOperator<>(
121+
checkpointId, timestamp, path, processFunction);
112122

113123
return transform(factory);
114124
}
@@ -127,7 +137,8 @@ public BootstrapTransformation<T> transform(
127137
BroadcastStateBootstrapFunction<T> processFunction) {
128138
SavepointWriterOperatorFactory factory =
129139
(timestamp, path) ->
130-
new BroadcastStateBootstrapOperator<>(timestamp, path, processFunction);
140+
new BroadcastStateBootstrapOperator<>(
141+
checkpointId, timestamp, path, processFunction);
131142

132143
return transform(factory);
133144
}
@@ -156,7 +167,7 @@ public <K> KeyedOperatorTransformation<K, T> keyBy(KeySelector<T, K> keySelector
156167
TypeInformation<K> keyType =
157168
TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType());
158169
return new KeyedOperatorTransformation<>(
159-
dataSet, operatorMaxParallelism, timestamper, keySelector, keyType);
170+
dataSet, checkpointId, operatorMaxParallelism, timestamper, keySelector, keyType);
160171
}
161172

162173
/**
@@ -170,7 +181,7 @@ public <K> KeyedOperatorTransformation<K, T> keyBy(KeySelector<T, K> keySelector
170181
public <K> KeyedOperatorTransformation<K, T> keyBy(
171182
KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
172183
return new KeyedOperatorTransformation<>(
173-
dataSet, operatorMaxParallelism, timestamper, keySelector, keyType);
184+
dataSet, checkpointId, operatorMaxParallelism, timestamper, keySelector, keyType);
174185
}
175186

176187
/**
@@ -211,6 +222,6 @@ private KeyedOperatorTransformation<Tuple, T> keyBy(Keys<T> keys) {
211222
TypeInformation<Tuple> keyType =
212223
TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType());
213224
return new KeyedOperatorTransformation<>(
214-
dataSet, operatorMaxParallelism, timestamper, keySelector, keyType);
225+
dataSet, checkpointId, operatorMaxParallelism, timestamper, keySelector, keyType);
215226
}
216227
}

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,19 @@ public class OneInputStateTransformation<T> {
4848
/** The data stream containing the data to bootstrap the operator state with. */
4949
private final DataStream<T> stream;
5050

51+
/** Checkpoint ID. */
52+
private final long checkpointId;
53+
5154
/** Local max parallelism for the bootstrapped operator. */
5255
private OptionalInt operatorMaxParallelism = OptionalInt.empty();
5356

5457
OneInputStateTransformation(DataStream<T> stream) {
58+
this(stream, 0L);
59+
}
60+
61+
OneInputStateTransformation(DataStream<T> stream, long checkpointId) {
5562
this.stream = stream;
63+
this.checkpointId = checkpointId;
5664
}
5765

5866
/**
@@ -81,7 +89,9 @@ public OneInputStateTransformation<T> setMaxParallelism(int maxParallelism) {
8189
*/
8290
public StateBootstrapTransformation<T> transform(StateBootstrapFunction<T> processFunction) {
8391
SavepointWriterOperatorFactory factory =
84-
(timestamp, path) -> new StateBootstrapOperator<>(timestamp, path, processFunction);
92+
(timestamp, path) ->
93+
new StateBootstrapOperator<>(
94+
checkpointId, timestamp, path, processFunction);
8595

8696
return transform(factory);
8797
}
@@ -100,7 +110,8 @@ public StateBootstrapTransformation<T> transform(
100110
BroadcastStateBootstrapFunction<T> processFunction) {
101111
SavepointWriterOperatorFactory factory =
102112
(timestamp, path) ->
103-
new BroadcastStateBootstrapOperator<>(timestamp, path, processFunction);
113+
new BroadcastStateBootstrapOperator<>(
114+
checkpointId, timestamp, path, processFunction);
104115

105116
return transform(factory);
106117
}
@@ -128,7 +139,8 @@ public StateBootstrapTransformation<T> transform(SavepointWriterOperatorFactory
128139
public <K> KeyedStateTransformation<K, T> keyBy(KeySelector<T, K> keySelector) {
129140
TypeInformation<K> keyType =
130141
TypeExtractor.getKeySelectorTypes(keySelector, stream.getType());
131-
return new KeyedStateTransformation<>(stream, operatorMaxParallelism, keySelector, keyType);
142+
return new KeyedStateTransformation<>(
143+
stream, checkpointId, operatorMaxParallelism, keySelector, keyType);
132144
}
133145

134146
/**
@@ -141,7 +153,8 @@ public <K> KeyedStateTransformation<K, T> keyBy(KeySelector<T, K> keySelector) {
141153
*/
142154
public <K> KeyedStateTransformation<K, T> keyBy(
143155
KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
144-
return new KeyedStateTransformation<>(stream, operatorMaxParallelism, keySelector, keyType);
156+
return new KeyedStateTransformation<>(
157+
stream, checkpointId, operatorMaxParallelism, keySelector, keyType);
145158
}
146159

147160
/**
@@ -181,6 +194,7 @@ private KeyedStateTransformation<Tuple, T> keyBy(Keys<T> keys) {
181194

182195
TypeInformation<Tuple> keyType =
183196
TypeExtractor.getKeySelectorTypes(keySelector, stream.getType());
184-
return new KeyedStateTransformation<>(stream, operatorMaxParallelism, keySelector, keyType);
197+
return new KeyedStateTransformation<>(
198+
stream, checkpointId, operatorMaxParallelism, keySelector, keyType);
185199
}
186200
}

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java

+29
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,22 @@ public static <T> OneInputOperatorTransformation<T> bootstrapWith(DataSet<T> dat
7171
return new OneInputOperatorTransformation<>(dataSet);
7272
}
7373

74+
/**
75+
* Create a new {@link OperatorTransformation} from a {@link DataSet}.
76+
*
77+
* @param dataSet A dataset of elements.
78+
* @param checkpointId checkpoint ID.
79+
* @param <T> The type of the input.
80+
* @return A {@link OneInputOperatorTransformation}.
81+
* @deprecated use {@link #bootstrapWith(DataStream)} to bootstrap a savepoint using the data
82+
* stream api under batch execution.
83+
*/
84+
@Deprecated
85+
public static <T> OneInputOperatorTransformation<T> bootstrapWith(
86+
DataSet<T> dataSet, long checkpointId) {
87+
return new OneInputOperatorTransformation<>(dataSet, checkpointId);
88+
}
89+
7490
/**
7591
* Create a new {@link OneInputStateTransformation} from a {@link DataStream}.
7692
*
@@ -81,4 +97,17 @@ public static <T> OneInputOperatorTransformation<T> bootstrapWith(DataSet<T> dat
8197
public static <T> OneInputStateTransformation<T> bootstrapWith(DataStream<T> stream) {
8298
return new OneInputStateTransformation<>(stream);
8399
}
100+
101+
/**
102+
* Create a new {@link OneInputStateTransformation} from a {@link DataStream}.
103+
*
104+
* @param stream A data stream of elements.
105+
* @param checkpointId checkpoint ID.
106+
* @param <T> The type of the input.
107+
* @return A {@link OneInputStateTransformation}.
108+
*/
109+
public static <T> OneInputStateTransformation<T> bootstrapWith(
110+
DataStream<T> stream, long checkpointId) {
111+
return new OneInputStateTransformation<>(stream, checkpointId);
112+
}
84113
}

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ public static ExistingSavepoint load(ExecutionEnvironment env, String path) thro
7373

7474
SavepointMetadata savepointMetadata =
7575
new SavepointMetadata(
76-
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
76+
metadata.getCheckpointId(),
77+
maxParallelism,
78+
metadata.getMasterStates(),
79+
metadata.getOperatorStates());
7780
return new ExistingSavepoint(env, savepointMetadata, null);
7881
}
7982

@@ -102,7 +105,10 @@ public static ExistingSavepoint load(
102105

103106
SavepointMetadata savepointMetadata =
104107
new SavepointMetadata(
105-
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
108+
metadata.getCheckpointId(),
109+
maxParallelism,
110+
metadata.getMasterStates(),
111+
metadata.getOperatorStates());
106112
return new ExistingSavepoint(env, savepointMetadata, stateBackend);
107113
}
108114

@@ -124,7 +130,7 @@ public static NewSavepoint create(int maxParallelism) {
124130

125131
SavepointMetadata metadata =
126132
new SavepointMetadata(
127-
maxParallelism, Collections.emptyList(), Collections.emptyList());
133+
0L, maxParallelism, Collections.emptyList(), Collections.emptyList());
128134
return new NewSavepoint(metadata, null);
129135
}
130136

@@ -147,7 +153,7 @@ public static NewSavepoint create(StateBackend stateBackend, int maxParallelism)
147153

148154
SavepointMetadata metadata =
149155
new SavepointMetadata(
150-
maxParallelism, Collections.emptyList(), Collections.emptyList());
156+
0L, maxParallelism, Collections.emptyList(), Collections.emptyList());
151157
return new NewSavepoint(metadata, stateBackend);
152158
}
153159
}

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ public static SavepointReader read(StreamExecutionEnvironment env, String path)
8282

8383
SavepointMetadataV2 savepointMetadata =
8484
new SavepointMetadataV2(
85-
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
85+
metadata.getCheckpointId(),
86+
maxParallelism,
87+
metadata.getMasterStates(),
88+
metadata.getOperatorStates());
8689
return new SavepointReader(env, savepointMetadata, null);
8790
}
8891

@@ -111,7 +114,10 @@ public static SavepointReader read(
111114

112115
SavepointMetadataV2 savepointMetadata =
113116
new SavepointMetadataV2(
114-
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
117+
metadata.getCheckpointId(),
118+
maxParallelism,
119+
metadata.getMasterStates(),
120+
metadata.getOperatorStates());
115121
return new SavepointReader(env, savepointMetadata, stateBackend);
116122
}
117123

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java

+51-8
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,16 @@ private static SavepointMetadataV2 readSavepointMetadata(String path) throws IOE
125125
"Savepoint must contain at least one operator state."));
126126

127127
return new SavepointMetadataV2(
128-
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
128+
metadata.getCheckpointId(),
129+
maxParallelism,
130+
metadata.getMasterStates(),
131+
metadata.getOperatorStates());
129132
}
130133

131134
/** @deprecated use {@link #newSavepoint(StreamExecutionEnvironment, int)} */
132135
@Deprecated
133136
public static SavepointWriter newSavepoint(int maxParallelism) {
134-
return new SavepointWriter(createSavepointMetadata(maxParallelism), null, null);
137+
return new SavepointWriter(createSavepointMetadata(0L, maxParallelism), null, null);
135138
}
136139

137140
/**
@@ -146,32 +149,71 @@ public static SavepointWriter newSavepoint(int maxParallelism) {
146149
public static SavepointWriter newSavepoint(
147150
StreamExecutionEnvironment executionEnvironment, int maxParallelism) {
148151
return new SavepointWriter(
149-
createSavepointMetadata(maxParallelism), null, executionEnvironment);
152+
createSavepointMetadata(0L, maxParallelism), null, executionEnvironment);
153+
}
154+
155+
/**
156+
* Creates a new savepoint. The savepoint will be written using the state backend defined via
157+
* the clusters configuration.
158+
*
159+
* @param maxParallelism The max parallelism of the savepoint.
160+
* @param checkpointId checkpoint ID.
161+
* @return A {@link SavepointWriter}.
162+
* @see #newSavepoint(StreamExecutionEnvironment, StateBackend, int)
163+
* @see #withConfiguration(ConfigOption, Object)
164+
*/
165+
public static SavepointWriter newSavepoint(
166+
StreamExecutionEnvironment executionEnvironment,
167+
long checkpointId,
168+
int maxParallelism) {
169+
return new SavepointWriter(
170+
createSavepointMetadata(checkpointId, maxParallelism), null, executionEnvironment);
150171
}
151172

152173
/** @deprecated use {@link #newSavepoint(StreamExecutionEnvironment, StateBackend, int)} */
153174
@Deprecated
154175
public static SavepointWriter newSavepoint(StateBackend stateBackend, int maxParallelism) {
155-
return new SavepointWriter(createSavepointMetadata(maxParallelism), stateBackend, null);
176+
return new SavepointWriter(createSavepointMetadata(0L, maxParallelism), stateBackend, null);
177+
}
178+
179+
/**
180+
* Creates a new savepoint.
181+
*
182+
* @param stateBackend The state backend of the savepoint used for keyed state.
183+
* @param maxParallelism The max parallelism of the savepoint.
184+
* @return A {@link SavepointWriter}.
185+
* @see #newSavepoint(StreamExecutionEnvironment, int)
186+
*/
187+
public static SavepointWriter newSavepoint(
188+
StreamExecutionEnvironment executionEnvironment,
189+
StateBackend stateBackend,
190+
int maxParallelism) {
191+
return new SavepointWriter(
192+
createSavepointMetadata(0L, maxParallelism), stateBackend, executionEnvironment);
156193
}
157194

158195
/**
159196
* Creates a new savepoint.
160197
*
161198
* @param stateBackend The state backend of the savepoint used for keyed state.
199+
* @param checkpointId checkpoint ID.
162200
* @param maxParallelism The max parallelism of the savepoint.
163201
* @return A {@link SavepointWriter}.
164202
* @see #newSavepoint(StreamExecutionEnvironment, int)
165203
*/
166204
public static SavepointWriter newSavepoint(
167205
StreamExecutionEnvironment executionEnvironment,
168206
StateBackend stateBackend,
207+
long checkpointId,
169208
int maxParallelism) {
170209
return new SavepointWriter(
171-
createSavepointMetadata(maxParallelism), stateBackend, executionEnvironment);
210+
createSavepointMetadata(checkpointId, maxParallelism),
211+
stateBackend,
212+
executionEnvironment);
172213
}
173214

174-
private static SavepointMetadataV2 createSavepointMetadata(int maxParallelism) {
215+
private static SavepointMetadataV2 createSavepointMetadata(
216+
long checkpointId, int maxParallelism) {
175217
Preconditions.checkArgument(
176218
maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
177219
"Maximum parallelism must be between 1 and "
@@ -180,7 +222,7 @@ private static SavepointMetadataV2 createSavepointMetadata(int maxParallelism) {
180222
+ maxParallelism);
181223

182224
return new SavepointMetadataV2(
183-
maxParallelism, Collections.emptyList(), Collections.emptyList());
225+
checkpointId, maxParallelism, Collections.emptyList(), Collections.emptyList());
184226
}
185227

186228
/**
@@ -336,7 +378,8 @@ public final void write(String path) {
336378
"reduce(OperatorState)",
337379
TypeInformation.of(CheckpointMetadata.class),
338380
new GroupReduceOperator<>(
339-
new MergeOperatorStates(metadata.getMasterStates())))
381+
new MergeOperatorStates(
382+
metadata.getCheckpointId(), metadata.getMasterStates())))
340383
.forceNonParallel()
341384
.map(new CheckpointMetadataCheckpointMetadataMapFunction(this.uidTransformationMap))
342385
.setParallelism(1)

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedOperatorTransformation.java

+25-7
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public class WindowedOperatorTransformation<T, K, W extends Window> {
6060

6161
private final WindowOperatorBuilder<T, K, W> builder;
6262

63+
private final long checkpointId;
64+
6365
private final OptionalInt operatorMaxParallelism;
6466

6567
@Nullable private final Timestamper<T> timestamper;
@@ -70,12 +72,14 @@ public class WindowedOperatorTransformation<T, K, W extends Window> {
7072

7173
WindowedOperatorTransformation(
7274
DataSet<T> input,
75+
long checkpointId,
7376
OptionalInt operatorMaxParallelism,
7477
@Nullable Timestamper<T> timestamper,
7578
KeySelector<T, K> keySelector,
7679
TypeInformation<K> keyType,
7780
WindowAssigner<? super T, W> windowAssigner) {
7881
this.input = input;
82+
this.checkpointId = checkpointId;
7983
this.operatorMaxParallelism = operatorMaxParallelism;
8084
this.timestamper = timestamper;
8185
this.keySelector = keySelector;
@@ -163,7 +167,9 @@ public <R> BootstrapTransformation<T> reduce(
163167
WindowOperator<K, T, ?, R, W> operator = builder.reduce(reduceFunction, function);
164168

165169
SavepointWriterOperatorFactory factory =
166-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
170+
(timestamp, path) ->
171+
new StateBootstrapWrapperOperator<>(
172+
checkpointId, timestamp, path, operator);
167173
return new BootstrapTransformation<>(
168174
input, operatorMaxParallelism, timestamper, factory, keySelector, keyType);
169175
}
@@ -189,7 +195,9 @@ public <R> BootstrapTransformation<T> reduce(
189195
WindowOperator<K, T, ?, R, W> operator = builder.reduce(reduceFunction, function);
190196

191197
SavepointWriterOperatorFactory factory =
192-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
198+
(timestamp, path) ->
199+
new StateBootstrapWrapperOperator<>(
200+
checkpointId, timestamp, path, operator);
193201
return new BootstrapTransformation<>(
194202
input, operatorMaxParallelism, timestamper, factory, keySelector, keyType);
195203
}
@@ -323,7 +331,9 @@ public <ACC, V, R> BootstrapTransformation<T> aggregate(
323331
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
324332

325333
SavepointWriterOperatorFactory factory =
326-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
334+
(timestamp, path) ->
335+
new StateBootstrapWrapperOperator<>(
336+
checkpointId, timestamp, path, operator);
327337
return new BootstrapTransformation<>(
328338
input, operatorMaxParallelism, timestamper, factory, keySelector, keyType);
329339
}
@@ -400,7 +410,9 @@ public <ACC, V, R> BootstrapTransformation<T> aggregate(
400410
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
401411

402412
SavepointWriterOperatorFactory factory =
403-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
413+
(timestamp, path) ->
414+
new StateBootstrapWrapperOperator<>(
415+
checkpointId, timestamp, path, operator);
404416
return new BootstrapTransformation<>(
405417
input, operatorMaxParallelism, timestamper, factory, keySelector, keyType);
406418
}
@@ -424,7 +436,9 @@ public <R> BootstrapTransformation<T> apply(WindowFunction<T, R, K, W> function)
424436
WindowOperator<K, T, ?, R, W> operator = builder.apply(function);
425437

426438
SavepointWriterOperatorFactory factory =
427-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
439+
(timestamp, path) ->
440+
new StateBootstrapWrapperOperator<>(
441+
checkpointId, timestamp, path, operator);
428442
return new BootstrapTransformation<>(
429443
input, operatorMaxParallelism, timestamper, factory, keySelector, keyType);
430444
}
@@ -448,7 +462,9 @@ public <R> BootstrapTransformation<T> apply(
448462
WindowOperator<K, T, ?, R, W> operator = builder.apply(function);
449463

450464
SavepointWriterOperatorFactory factory =
451-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
465+
(timestamp, path) ->
466+
new StateBootstrapWrapperOperator<>(
467+
checkpointId, timestamp, path, operator);
452468
return new BootstrapTransformation<>(
453469
input, operatorMaxParallelism, timestamper, factory, keySelector, keyType);
454470
}
@@ -469,7 +485,9 @@ public <R> BootstrapTransformation<T> process(ProcessWindowFunction<T, R, K, W>
469485
WindowOperator<K, T, ?, R, W> operator = builder.process(function);
470486

471487
SavepointWriterOperatorFactory factory =
472-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
488+
(timestamp, path) ->
489+
new StateBootstrapWrapperOperator<>(
490+
checkpointId, timestamp, path, operator);
473491
return new BootstrapTransformation<>(
474492
input, operatorMaxParallelism, timestamper, factory, keySelector, keyType);
475493
}

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java

+25-7
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public class WindowedStateTransformation<T, K, W extends Window> {
5757

5858
private final WindowOperatorBuilder<T, K, W> builder;
5959

60+
private final long checkpointId;
61+
6062
private final OptionalInt operatorMaxParallelism;
6163

6264
private final KeySelector<T, K> keySelector;
@@ -65,11 +67,13 @@ public class WindowedStateTransformation<T, K, W extends Window> {
6567

6668
WindowedStateTransformation(
6769
DataStream<T> input,
70+
long checkpointId,
6871
OptionalInt operatorMaxParallelism,
6972
KeySelector<T, K> keySelector,
7073
TypeInformation<K> keyType,
7174
WindowAssigner<? super T, W> windowAssigner) {
7275
this.input = input;
76+
this.checkpointId = checkpointId;
7377
this.operatorMaxParallelism = operatorMaxParallelism;
7478
this.keySelector = keySelector;
7579
this.keyType = keyType;
@@ -156,7 +160,9 @@ public <R> StateBootstrapTransformation<T> reduce(
156160
WindowOperator<K, T, ?, R, W> operator = builder.reduce(reduceFunction, function);
157161

158162
SavepointWriterOperatorFactory factory =
159-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
163+
(timestamp, path) ->
164+
new StateBootstrapWrapperOperator<>(
165+
checkpointId, timestamp, path, operator);
160166
return new StateBootstrapTransformation<>(
161167
input, operatorMaxParallelism, factory, keySelector, keyType);
162168
}
@@ -182,7 +188,9 @@ public <R> StateBootstrapTransformation<T> reduce(
182188
WindowOperator<K, T, ?, R, W> operator = builder.reduce(reduceFunction, function);
183189

184190
SavepointWriterOperatorFactory factory =
185-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
191+
(timestamp, path) ->
192+
new StateBootstrapWrapperOperator<>(
193+
checkpointId, timestamp, path, operator);
186194
return new StateBootstrapTransformation<>(
187195
input, operatorMaxParallelism, factory, keySelector, keyType);
188196
}
@@ -317,7 +325,9 @@ public <ACC, V, R> StateBootstrapTransformation<T> aggregate(
317325
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
318326

319327
SavepointWriterOperatorFactory factory =
320-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
328+
(timestamp, path) ->
329+
new StateBootstrapWrapperOperator<>(
330+
checkpointId, timestamp, path, operator);
321331
return new StateBootstrapTransformation<>(
322332
input, operatorMaxParallelism, factory, keySelector, keyType);
323333
}
@@ -394,7 +404,9 @@ public <ACC, V, R> StateBootstrapTransformation<T> aggregate(
394404
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
395405

396406
SavepointWriterOperatorFactory factory =
397-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
407+
(timestamp, path) ->
408+
new StateBootstrapWrapperOperator<>(
409+
checkpointId, timestamp, path, operator);
398410
return new StateBootstrapTransformation<>(
399411
input, operatorMaxParallelism, factory, keySelector, keyType);
400412
}
@@ -418,7 +430,9 @@ public <R> StateBootstrapTransformation<T> apply(WindowFunction<T, R, K, W> func
418430
WindowOperator<K, T, ?, R, W> operator = builder.apply(function);
419431

420432
SavepointWriterOperatorFactory factory =
421-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
433+
(timestamp, path) ->
434+
new StateBootstrapWrapperOperator<>(
435+
checkpointId, timestamp, path, operator);
422436
return new StateBootstrapTransformation<>(
423437
input, operatorMaxParallelism, factory, keySelector, keyType);
424438
}
@@ -442,7 +456,9 @@ public <R> StateBootstrapTransformation<T> apply(
442456
WindowOperator<K, T, ?, R, W> operator = builder.apply(function);
443457

444458
SavepointWriterOperatorFactory factory =
445-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
459+
(timestamp, path) ->
460+
new StateBootstrapWrapperOperator<>(
461+
checkpointId, timestamp, path, operator);
446462
return new StateBootstrapTransformation<>(
447463
input, operatorMaxParallelism, factory, keySelector, keyType);
448464
}
@@ -463,7 +479,9 @@ public <R> StateBootstrapTransformation<T> process(ProcessWindowFunction<T, R, K
463479
WindowOperator<K, T, ?, R, W> operator = builder.process(function);
464480

465481
SavepointWriterOperatorFactory factory =
466-
(timestamp, path) -> new StateBootstrapWrapperOperator<>(timestamp, path, operator);
482+
(timestamp, path) ->
483+
new StateBootstrapWrapperOperator<>(
484+
checkpointId, timestamp, path, operator);
467485
return new StateBootstrapTransformation<>(
468486
input, operatorMaxParallelism, factory, keySelector, keyType);
469487
}

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ public final void write(String path) {
140140
finalOperatorStates = newOperatorStates.union(existingOperatorStates);
141141
}
142142
finalOperatorStates
143-
.reduceGroup(new MergeOperatorStates(metadata.getMasterStates()))
143+
.reduceGroup(
144+
new MergeOperatorStates(
145+
metadata.getCheckpointId(), metadata.getMasterStates()))
144146
.name("reduce(OperatorState)")
145147
.output(new SavepointOutputFormat(savepointPath))
146148
.name(path);

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,22 @@ public class MergeOperatorStates implements GroupReduceFunction<OperatorState, C
3939

4040
private static final long serialVersionUID = 1L;
4141

42+
private final long checkpointId;
43+
4244
private final Collection<MasterState> masterStates;
4345

44-
public MergeOperatorStates(Collection<MasterState> masterStates) {
46+
public MergeOperatorStates(long checkpointId, Collection<MasterState> masterStates) {
4547
Preconditions.checkNotNull(masterStates, "Master state metadata must not be null");
4648

49+
this.checkpointId = checkpointId;
4750
this.masterStates = masterStates;
4851
}
4952

5053
@Override
5154
public void reduce(Iterable<OperatorState> values, Collector<CheckpointMetadata> out) {
5255
CheckpointMetadata metadata =
5356
new CheckpointMetadata(
54-
SnapshotUtils.CHECKPOINT_ID,
57+
checkpointId,
5558
StreamSupport.stream(values.spliterator(), false)
5659
.collect(Collectors.toList()),
5760
masterStates);

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,10 @@
4141
/** Takes a final snapshot of the state of an operator subtask. */
4242
@Internal
4343
public final class SnapshotUtils {
44-
static final long CHECKPOINT_ID = 0L;
45-
4644
private SnapshotUtils() {}
4745

4846
public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState snapshot(
47+
long checkpointId,
4948
OP operator,
5049
int index,
5150
long timestamp,
@@ -64,21 +63,22 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s
6463
isUnalignedCheckpoint,
6564
CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT);
6665

67-
operator.prepareSnapshotPreBarrier(CHECKPOINT_ID);
66+
operator.prepareSnapshotPreBarrier(checkpointId);
6867

6968
CheckpointStreamFactory storage = createStreamFactory(configuration, options);
7069

7170
OperatorSnapshotFutures snapshotInProgress =
72-
operator.snapshotState(CHECKPOINT_ID, timestamp, options, storage);
71+
operator.snapshotState(checkpointId, timestamp, options, storage);
7372

7473
OperatorSubtaskState state =
7574
new OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState();
7675

77-
operator.notifyCheckpointComplete(CHECKPOINT_ID);
76+
operator.notifyCheckpointComplete(checkpointId);
7877
return new TaggedOperatorSubtaskState(index, state);
7978
}
8079

8180
public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState snapshot(
81+
long checkpointId,
8282
OP operator,
8383
int index,
8484
long timestamp,
@@ -89,6 +89,7 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s
8989
throws Exception {
9090

9191
return snapshot(
92+
checkpointId,
9293
operator,
9394
index,
9495
timestamp,

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,22 @@ public class BroadcastStateBootstrapOperator<IN>
4444

4545
private static final long serialVersionUID = 1L;
4646

47+
private final long checkpointId;
48+
4749
private final long timestamp;
4850

4951
private final Path savepointPath;
5052

5153
private transient ContextImpl context;
5254

5355
public BroadcastStateBootstrapOperator(
54-
long timestamp, Path savepointPath, BroadcastStateBootstrapFunction<IN> function) {
56+
long checkpointId,
57+
long timestamp,
58+
Path savepointPath,
59+
BroadcastStateBootstrapFunction<IN> function) {
5560
super(function);
61+
this.checkpointId = checkpointId;
5662
this.timestamp = timestamp;
57-
5863
this.savepointPath = savepointPath;
5964
}
6065

@@ -73,6 +78,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
7378
public void endInput() throws Exception {
7479
TaggedOperatorSubtaskState state =
7580
SnapshotUtils.snapshot(
81+
checkpointId,
7682
this,
7783
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
7884
timestamp,

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,21 @@ public class KeyedStateBootstrapOperator<K, IN>
4848

4949
private static final long serialVersionUID = 1L;
5050

51+
private final long checkpointId;
52+
5153
private final long timestamp;
5254

5355
private final Path savepointPath;
5456

5557
private transient KeyedStateBootstrapOperator<K, IN>.ContextImpl context;
5658

5759
public KeyedStateBootstrapOperator(
58-
long timestamp, Path savepointPath, KeyedStateBootstrapFunction<K, IN> function) {
60+
long checkpointId,
61+
long timestamp,
62+
Path savepointPath,
63+
KeyedStateBootstrapFunction<K, IN> function) {
5964
super(function);
60-
65+
this.checkpointId = checkpointId;
6166
this.timestamp = timestamp;
6267
this.savepointPath = savepointPath;
6368
}
@@ -88,6 +93,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
8893
public void endInput() throws Exception {
8994
TaggedOperatorSubtaskState state =
9095
SnapshotUtils.snapshot(
96+
checkpointId,
9197
this,
9298
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
9399
timestamp,

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,21 @@ public class StateBootstrapOperator<IN>
4040

4141
private static final long serialVersionUID = 1L;
4242

43+
private final long checkpointId;
44+
4345
private final long timestamp;
4446

4547
private final Path savepointPath;
4648

4749
private transient ContextImpl context;
4850

4951
public StateBootstrapOperator(
50-
long timestamp, Path savepointPath, StateBootstrapFunction<IN> function) {
52+
long checkpointId,
53+
long timestamp,
54+
Path savepointPath,
55+
StateBootstrapFunction<IN> function) {
5156
super(function);
52-
57+
this.checkpointId = checkpointId;
5358
this.timestamp = timestamp;
5459
this.savepointPath = savepointPath;
5560
}
@@ -69,6 +74,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
6974
public void endInput() throws Exception {
7075
TaggedOperatorSubtaskState state =
7176
SnapshotUtils.snapshot(
77+
checkpointId,
7278
this,
7379
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
7480
timestamp,

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public final class StateBootstrapWrapperOperator<
5555

5656
private static final long serialVersionUID = 1L;
5757

58+
private final long checkpointId;
59+
5860
private final long timestamp;
5961

6062
private final Path savepointPath;
@@ -63,8 +65,9 @@ public final class StateBootstrapWrapperOperator<
6365

6466
private final OP operator;
6567

66-
public StateBootstrapWrapperOperator(long timestamp, Path savepointPath, OP operator) {
67-
68+
public StateBootstrapWrapperOperator(
69+
long checkpointId, long timestamp, Path savepointPath, OP operator) {
70+
this.checkpointId = checkpointId;
6871
this.timestamp = timestamp;
6972
this.savepointPath = savepointPath;
7073
this.operator = operator;
@@ -185,6 +188,7 @@ public Object getCurrentKey() {
185188
public void endInput() throws Exception {
186189
TaggedOperatorSubtaskState state =
187190
SnapshotUtils.snapshot(
191+
checkpointId,
188192
this,
189193
operator.getContainingTask()
190194
.getEnvironment()

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java

+8
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,16 @@
4141
@Deprecated
4242
public class SavepointMetadata {
4343

44+
private final long checkpointId;
45+
4446
private final int maxParallelism;
4547

4648
private final Collection<MasterState> masterStates;
4749

4850
private final Map<OperatorID, OperatorStateSpec> operatorStateIndex;
4951

5052
public SavepointMetadata(
53+
long checkpointId,
5154
int maxParallelism,
5255
Collection<MasterState> masterStates,
5356
Collection<OperatorState> initialStates) {
@@ -57,6 +60,7 @@ public SavepointMetadata(
5760
+ UPPER_BOUND_MAX_PARALLELISM
5861
+ ". Found: "
5962
+ maxParallelism);
63+
this.checkpointId = checkpointId;
6064
this.maxParallelism = maxParallelism;
6165

6266
this.masterStates = Preconditions.checkNotNull(masterStates);
@@ -69,6 +73,10 @@ public SavepointMetadata(
6973
OperatorStateSpec.existing(existingState)));
7074
}
7175

76+
public long getCheckpointId() {
77+
return checkpointId;
78+
}
79+
7280
public int getMaxParallelism() {
7381
return maxParallelism;
7482
}

‎flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java

+8
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,16 @@
4141
@Internal
4242
public class SavepointMetadataV2 {
4343

44+
private final long checkpointId;
45+
4446
private final int maxParallelism;
4547

4648
private final Collection<MasterState> masterStates;
4749

4850
private final Map<OperatorID, OperatorStateSpecV2> operatorStateIndex;
4951

5052
public SavepointMetadataV2(
53+
long checkpointId,
5154
int maxParallelism,
5255
Collection<MasterState> masterStates,
5356
Collection<OperatorState> initialStates) {
@@ -59,6 +62,7 @@ public SavepointMetadataV2(
5962
+ maxParallelism);
6063
Preconditions.checkNotNull(masterStates);
6164

65+
this.checkpointId = checkpointId;
6266
this.maxParallelism = maxParallelism;
6367
this.masterStates = new ArrayList<>(masterStates);
6468
this.operatorStateIndex = CollectionUtil.newHashMapWithExpectedSize(initialStates.size());
@@ -70,6 +74,10 @@ public SavepointMetadataV2(
7074
OperatorStateSpecV2.existing(existingState)));
7175
}
7276

77+
public long getCheckpointId() {
78+
return checkpointId;
79+
}
80+
7381
public int getMaxParallelism() {
7482
return maxParallelism;
7583
}

‎flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void testNewSavepointEnforceUniqueUIDs() {
5151
.transform(new ExampleStateBootstrapFunction());
5252

5353
SavepointMetadata metadata =
54-
new SavepointMetadata(1, Collections.emptyList(), Collections.emptyList());
54+
new SavepointMetadata(0L, 1, Collections.emptyList(), Collections.emptyList());
5555

5656
new NewSavepoint(metadata, new MemoryStateBackend())
5757
.withOperator(UID, transformation)
@@ -74,7 +74,7 @@ public void testExistingSavepointEnforceUniqueUIDs() throws IOException {
7474
new OperatorState(OperatorIDGenerator.fromUid(UID), 1, 4));
7575

7676
SavepointMetadata metadata =
77-
new SavepointMetadata(4, Collections.emptyList(), operatorStates);
77+
new SavepointMetadata(0L, 4, Collections.emptyList(), operatorStates);
7878

7979
new ExistingSavepoint(env, metadata, new MemoryStateBackend())
8080
.withOperator(UID, transformation)
@@ -97,7 +97,7 @@ public void testExistingSavepointEnforceUniqueUIDsWithOldSavepoint() throws IOEx
9797
new OperatorState(OperatorIDGenerator.fromUid(UID), 1, 4));
9898

9999
SavepointMetadata metadata =
100-
new SavepointMetadata(4, Collections.emptyList(), operatorStates);
100+
new SavepointMetadata(0L, 4, Collections.emptyList(), operatorStates);
101101

102102
new ExistingSavepoint(env, metadata, new MemoryStateBackend())
103103
.withOperator(UID, transformation)

‎flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.api.common.state.ValueStateDescriptor;
3030
import org.apache.flink.api.common.typeinfo.Types;
3131
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
32+
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
3233
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
3334
import org.apache.flink.runtime.state.FunctionInitializationContext;
3435
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -37,6 +38,7 @@
3738
import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
3839
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
3940
import org.apache.flink.state.api.functions.StateBootstrapFunction;
41+
import org.apache.flink.state.api.runtime.SavepointLoader;
4042
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
4143
import org.apache.flink.streaming.api.datastream.DataStream;
4244
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -64,6 +66,8 @@
6466
/** IT test for writing savepoints. */
6567
public class SavepointWriterITCase extends AbstractTestBaseJUnit4 {
6668

69+
private static final long CHECKPOINT_ID = 42;
70+
6771
private static final String ACCOUNT_UID = "accounts";
6872

6973
private static final String CURRENCY_UID = "currency";
@@ -113,18 +117,18 @@ private void bootstrapState(StateBackend backend, String savepointPath) throws E
113117
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
114118

115119
StateBootstrapTransformation<Account> transformation =
116-
OperatorTransformation.bootstrapWith(env.fromData(accounts))
120+
OperatorTransformation.bootstrapWith(env.fromData(accounts), CHECKPOINT_ID)
117121
.keyBy(acc -> acc.id)
118122
.transform(new AccountBootstrapper());
119123

120124
StateBootstrapTransformation<CurrencyRate> broadcastTransformation =
121-
OperatorTransformation.bootstrapWith(env.fromData(currencyRates))
125+
OperatorTransformation.bootstrapWith(env.fromData(currencyRates), CHECKPOINT_ID)
122126
.transform(new CurrencyBootstrapFunction());
123127

124128
SavepointWriter writer =
125129
backend == null
126-
? SavepointWriter.newSavepoint(env, 128)
127-
: SavepointWriter.newSavepoint(env, backend, 128);
130+
? SavepointWriter.newSavepoint(env, CHECKPOINT_ID, 128)
131+
: SavepointWriter.newSavepoint(env, backend, CHECKPOINT_ID, 128);
128132

129133
writer.withOperator(OperatorIdentifier.forUid(ACCOUNT_UID), transformation)
130134
.withOperator(getUidHashFromUid(CURRENCY_UID), broadcastTransformation)
@@ -134,6 +138,9 @@ private void bootstrapState(StateBackend backend, String savepointPath) throws E
134138
}
135139

136140
private void validateBootstrap(StateBackend backend, String savepointPath) throws Exception {
141+
CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata(savepointPath);
142+
assertThat(metadata.getCheckpointId()).isEqualTo(CHECKPOINT_ID);
143+
137144
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
138145

139146
if (backend != null) {
@@ -186,6 +193,9 @@ private void modifySavepoint(StateBackend backend, String savepointPath, String
186193
}
187194

188195
private void validateModification(StateBackend backend, String savepointPath) throws Exception {
196+
CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata(savepointPath);
197+
assertThat(metadata.getCheckpointId()).isEqualTo(CHECKPOINT_ID);
198+
189199
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
190200
if (backend != null) {
191201
sEnv.setStateBackend(backend);

‎flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testTimerStateRestorable() throws Exception {
6161

6262
OperatorSubtaskState state;
6363
KeyedStateBootstrapOperator<Long, Long> bootstrapOperator =
64-
new KeyedStateBootstrapOperator<>(0L, path, new TimerBootstrapFunction());
64+
new KeyedStateBootstrapOperator<>(0L, 0L, path, new TimerBootstrapFunction());
6565
try (KeyedOneInputStreamOperatorTestHarness<Long, Long, TaggedOperatorSubtaskState>
6666
harness = getHarness(bootstrapOperator)) {
6767
processElements(harness, 1L, 2L, 3L);
@@ -92,7 +92,7 @@ public void testNonTimerStatesRestorableByNonProcessesOperator() throws Exceptio
9292

9393
OperatorSubtaskState state;
9494
KeyedStateBootstrapOperator<Long, Long> bootstrapOperator =
95-
new KeyedStateBootstrapOperator<>(0L, path, new SimpleBootstrapFunction());
95+
new KeyedStateBootstrapOperator<>(0L, 0L, path, new SimpleBootstrapFunction());
9696
try (KeyedOneInputStreamOperatorTestHarness<Long, Long, TaggedOperatorSubtaskState>
9797
harness = getHarness(bootstrapOperator)) {
9898
processElements(harness, 1L, 2L, 3L);

‎flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private void testSnapshotUtilsLifecycleWithSavepointFormatType(
7777
Path path = new Path(folder.newFolder().getAbsolutePath());
7878

7979
SnapshotUtils.snapshot(
80-
operator, 0, 0L, true, false, new Configuration(), path, savepointFormatType);
80+
0L, operator, 0, 0L, true, false, new Configuration(), path, savepointFormatType);
8181

8282
Assert.assertEquals(SavepointType.savepoint(savepointFormatType), actualSnapshotType);
8383
Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING);

0 commit comments

Comments
 (0)
Please sign in to comment.