Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37460][state] Make state processor API checkpoint ID configurable #26299

Merged
merged 1 commit into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class KeyedOperatorTransformation<K, T> {
/** The data set containing the data to bootstrap the operator state with. */
private final DataSet<T> dataSet;

/** Checkpoint ID. */
private final long checkpointId;

/** Local max parallelism for the bootstrapped operator. */
private final OptionalInt operatorMaxParallelism;

Expand All @@ -60,11 +63,13 @@ public class KeyedOperatorTransformation<K, T> {

KeyedOperatorTransformation(
DataSet<T> dataSet,
long checkpointId,
OptionalInt operatorMaxParallelism,
@Nullable Timestamper<T> timestamper,
KeySelector<T, K> keySelector,
TypeInformation<K> keyType) {
this.dataSet = dataSet;
this.checkpointId = checkpointId;
this.operatorMaxParallelism = operatorMaxParallelism;
this.timestamper = timestamper;
this.keySelector = keySelector;
Expand All @@ -84,7 +89,8 @@ public class KeyedOperatorTransformation<K, T> {
public BootstrapTransformation<T> transform(KeyedStateBootstrapFunction<K, T> processFunction) {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
new KeyedStateBootstrapOperator<>(timestamp, path, processFunction);
new KeyedStateBootstrapOperator<>(
checkpointId, timestamp, path, processFunction);
return transform(factory);
}

Expand Down Expand Up @@ -116,6 +122,12 @@ public BootstrapTransformation<T> transform(SavepointWriterOperatorFactory facto
public <W extends Window> WindowedOperatorTransformation<T, K, W> window(
WindowAssigner<? super T, W> assigner) {
return new WindowedOperatorTransformation<>(
dataSet, operatorMaxParallelism, timestamper, keySelector, keyType, assigner);
dataSet,
checkpointId,
operatorMaxParallelism,
timestamper,
keySelector,
keyType,
assigner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class KeyedStateTransformation<K, T> {
/** The data set containing the data to bootstrap the operator state with. */
private final DataStream<T> stream;

/** Checkpoint ID. */
private final long checkpointId;

/** Local max parallelism for the bootstrapped operator. */
private final OptionalInt operatorMaxParallelism;

Expand All @@ -54,10 +57,12 @@ public class KeyedStateTransformation<K, T> {

KeyedStateTransformation(
DataStream<T> stream,
long checkpointId,
OptionalInt operatorMaxParallelism,
KeySelector<T, K> keySelector,
TypeInformation<K> keyType) {
this.stream = stream;
this.checkpointId = checkpointId;
this.operatorMaxParallelism = operatorMaxParallelism;
this.keySelector = keySelector;
this.keyType = keyType;
Expand All @@ -77,7 +82,8 @@ public StateBootstrapTransformation<T> transform(
KeyedStateBootstrapFunction<K, T> processFunction) {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
new KeyedStateBootstrapOperator<>(timestamp, path, processFunction);
new KeyedStateBootstrapOperator<>(
checkpointId, timestamp, path, processFunction);
return transform(factory);
}

Expand Down Expand Up @@ -109,6 +115,6 @@ public StateBootstrapTransformation<T> transform(SavepointWriterOperatorFactory
public <W extends Window> WindowedStateTransformation<T, K, W> window(
WindowAssigner<? super T, W> assigner) {
return new WindowedStateTransformation<>(
stream, operatorMaxParallelism, keySelector, keyType, assigner);
stream, checkpointId, operatorMaxParallelism, keySelector, keyType, assigner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,21 @@ public class OneInputOperatorTransformation<T> {
/** The data set containing the data to bootstrap the operator state with. */
private final DataSet<T> dataSet;

/** Checkpoint ID. */
private final long checkpointId;

/** Local max parallelism for the bootstrapped operator. */
private OptionalInt operatorMaxParallelism = OptionalInt.empty();

@Nullable private Timestamper<T> timestamper;

OneInputOperatorTransformation(DataSet<T> dataSet) {
this(dataSet, 0L);
}

OneInputOperatorTransformation(DataSet<T> dataSet, long checkpointId) {
this.dataSet = dataSet;
this.checkpointId = checkpointId;
}

/**
Expand Down Expand Up @@ -108,7 +116,9 @@ public OneInputOperatorTransformation<T> assignTimestamps(TimestampAssigner<T> a
*/
public BootstrapTransformation<T> transform(StateBootstrapFunction<T> processFunction) {
SavepointWriterOperatorFactory factory =
(timestamp, path) -> new StateBootstrapOperator<>(timestamp, path, processFunction);
(timestamp, path) ->
new StateBootstrapOperator<>(
checkpointId, timestamp, path, processFunction);

return transform(factory);
}
Expand All @@ -127,7 +137,8 @@ public BootstrapTransformation<T> transform(
BroadcastStateBootstrapFunction<T> processFunction) {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
new BroadcastStateBootstrapOperator<>(timestamp, path, processFunction);
new BroadcastStateBootstrapOperator<>(
checkpointId, timestamp, path, processFunction);

return transform(factory);
}
Expand Down Expand Up @@ -156,7 +167,7 @@ public <K> KeyedOperatorTransformation<K, T> keyBy(KeySelector<T, K> keySelector
TypeInformation<K> keyType =
TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType());
return new KeyedOperatorTransformation<>(
dataSet, operatorMaxParallelism, timestamper, keySelector, keyType);
dataSet, checkpointId, operatorMaxParallelism, timestamper, keySelector, keyType);
}

/**
Expand All @@ -170,7 +181,7 @@ public <K> KeyedOperatorTransformation<K, T> keyBy(KeySelector<T, K> keySelector
public <K> KeyedOperatorTransformation<K, T> keyBy(
KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
return new KeyedOperatorTransformation<>(
dataSet, operatorMaxParallelism, timestamper, keySelector, keyType);
dataSet, checkpointId, operatorMaxParallelism, timestamper, keySelector, keyType);
}

/**
Expand Down Expand Up @@ -211,6 +222,6 @@ private KeyedOperatorTransformation<Tuple, T> keyBy(Keys<T> keys) {
TypeInformation<Tuple> keyType =
TypeExtractor.getKeySelectorTypes(keySelector, dataSet.getType());
return new KeyedOperatorTransformation<>(
dataSet, operatorMaxParallelism, timestamper, keySelector, keyType);
dataSet, checkpointId, operatorMaxParallelism, timestamper, keySelector, keyType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,19 @@ public class OneInputStateTransformation<T> {
/** The data stream containing the data to bootstrap the operator state with. */
private final DataStream<T> stream;

/** Checkpoint ID. */
private final long checkpointId;

/** Local max parallelism for the bootstrapped operator. */
private OptionalInt operatorMaxParallelism = OptionalInt.empty();

OneInputStateTransformation(DataStream<T> stream) {
this(stream, 0L);
}

OneInputStateTransformation(DataStream<T> stream, long checkpointId) {
this.stream = stream;
this.checkpointId = checkpointId;
}

/**
Expand Down Expand Up @@ -81,7 +89,9 @@ public OneInputStateTransformation<T> setMaxParallelism(int maxParallelism) {
*/
public StateBootstrapTransformation<T> transform(StateBootstrapFunction<T> processFunction) {
SavepointWriterOperatorFactory factory =
(timestamp, path) -> new StateBootstrapOperator<>(timestamp, path, processFunction);
(timestamp, path) ->
new StateBootstrapOperator<>(
checkpointId, timestamp, path, processFunction);

return transform(factory);
}
Expand All @@ -100,7 +110,8 @@ public StateBootstrapTransformation<T> transform(
BroadcastStateBootstrapFunction<T> processFunction) {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
new BroadcastStateBootstrapOperator<>(timestamp, path, processFunction);
new BroadcastStateBootstrapOperator<>(
checkpointId, timestamp, path, processFunction);

return transform(factory);
}
Expand Down Expand Up @@ -128,7 +139,8 @@ public StateBootstrapTransformation<T> transform(SavepointWriterOperatorFactory
public <K> KeyedStateTransformation<K, T> keyBy(KeySelector<T, K> keySelector) {
TypeInformation<K> keyType =
TypeExtractor.getKeySelectorTypes(keySelector, stream.getType());
return new KeyedStateTransformation<>(stream, operatorMaxParallelism, keySelector, keyType);
return new KeyedStateTransformation<>(
stream, checkpointId, operatorMaxParallelism, keySelector, keyType);
}

/**
Expand All @@ -141,7 +153,8 @@ public <K> KeyedStateTransformation<K, T> keyBy(KeySelector<T, K> keySelector) {
*/
public <K> KeyedStateTransformation<K, T> keyBy(
KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
return new KeyedStateTransformation<>(stream, operatorMaxParallelism, keySelector, keyType);
return new KeyedStateTransformation<>(
stream, checkpointId, operatorMaxParallelism, keySelector, keyType);
}

/**
Expand Down Expand Up @@ -181,6 +194,7 @@ private KeyedStateTransformation<Tuple, T> keyBy(Keys<T> keys) {

TypeInformation<Tuple> keyType =
TypeExtractor.getKeySelectorTypes(keySelector, stream.getType());
return new KeyedStateTransformation<>(stream, operatorMaxParallelism, keySelector, keyType);
return new KeyedStateTransformation<>(
stream, checkpointId, operatorMaxParallelism, keySelector, keyType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ public static <T> OneInputOperatorTransformation<T> bootstrapWith(DataSet<T> dat
return new OneInputOperatorTransformation<>(dataSet);
}

/**
* Create a new {@link OperatorTransformation} from a {@link DataSet}.
*
* @param dataSet A dataset of elements.
* @param checkpointId checkpoint ID.
* @param <T> The type of the input.
* @return A {@link OneInputOperatorTransformation}.
* @deprecated use {@link #bootstrapWith(DataStream)} to bootstrap a savepoint using the data
* stream api under batch execution.
*/
@Deprecated
public static <T> OneInputOperatorTransformation<T> bootstrapWith(
DataSet<T> dataSet, long checkpointId) {
return new OneInputOperatorTransformation<>(dataSet, checkpointId);
}

/**
* Create a new {@link OneInputStateTransformation} from a {@link DataStream}.
*
Expand All @@ -81,4 +97,17 @@ public static <T> OneInputOperatorTransformation<T> bootstrapWith(DataSet<T> dat
public static <T> OneInputStateTransformation<T> bootstrapWith(DataStream<T> stream) {
return new OneInputStateTransformation<>(stream);
}

/**
* Create a new {@link OneInputStateTransformation} from a {@link DataStream}.
*
* @param stream A data stream of elements.
* @param checkpointId checkpoint ID.
* @param <T> The type of the input.
* @return A {@link OneInputStateTransformation}.
*/
public static <T> OneInputStateTransformation<T> bootstrapWith(
DataStream<T> stream, long checkpointId) {
return new OneInputStateTransformation<>(stream, checkpointId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ public static ExistingSavepoint load(ExecutionEnvironment env, String path) thro

SavepointMetadata savepointMetadata =
new SavepointMetadata(
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
metadata.getCheckpointId(),
maxParallelism,
metadata.getMasterStates(),
metadata.getOperatorStates());
return new ExistingSavepoint(env, savepointMetadata, null);
}

Expand Down Expand Up @@ -102,7 +105,10 @@ public static ExistingSavepoint load(

SavepointMetadata savepointMetadata =
new SavepointMetadata(
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
metadata.getCheckpointId(),
maxParallelism,
metadata.getMasterStates(),
metadata.getOperatorStates());
return new ExistingSavepoint(env, savepointMetadata, stateBackend);
}

Expand All @@ -124,7 +130,7 @@ public static NewSavepoint create(int maxParallelism) {

SavepointMetadata metadata =
new SavepointMetadata(
maxParallelism, Collections.emptyList(), Collections.emptyList());
0L, maxParallelism, Collections.emptyList(), Collections.emptyList());
return new NewSavepoint(metadata, null);
}

Expand All @@ -147,7 +153,7 @@ public static NewSavepoint create(StateBackend stateBackend, int maxParallelism)

SavepointMetadata metadata =
new SavepointMetadata(
maxParallelism, Collections.emptyList(), Collections.emptyList());
0L, maxParallelism, Collections.emptyList(), Collections.emptyList());
return new NewSavepoint(metadata, stateBackend);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ public static SavepointReader read(StreamExecutionEnvironment env, String path)

SavepointMetadataV2 savepointMetadata =
new SavepointMetadataV2(
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
metadata.getCheckpointId(),
maxParallelism,
metadata.getMasterStates(),
metadata.getOperatorStates());
return new SavepointReader(env, savepointMetadata, null);
}

Expand Down Expand Up @@ -111,7 +114,10 @@ public static SavepointReader read(

SavepointMetadataV2 savepointMetadata =
new SavepointMetadataV2(
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
metadata.getCheckpointId(),
maxParallelism,
metadata.getMasterStates(),
metadata.getOperatorStates());
return new SavepointReader(env, savepointMetadata, stateBackend);
}

Expand Down
Loading