Skip to content

Commit 45fcd56

Browse files
authored
[FLINK-37367][state/forst] Make ForSt inherit uploaded SST files after restorations (#26202)
1 parent ff70dd6 commit 45fcd56

File tree

3 files changed

+11
-6
lines changed

3 files changed

+11
-6
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,16 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
263263
defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();
264264
nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
265265

266-
// TODO: init materializedSstFiles and lastCompletedCheckpointId when implement restore
267266
SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>
268267
materializedSstFiles = new TreeMap<>();
269268
long lastCompletedCheckpointId = -1L;
269+
if (restoreOperation instanceof ForStIncrementalRestoreOperation) {
270+
backendUID = restoreResult.getBackendUID();
271+
lastCompletedCheckpointId = restoreResult.getLastCompletedCheckpointId();
272+
if (recoveryClaimMode != RecoveryClaimMode.NO_CLAIM) {
273+
materializedSstFiles = restoreResult.getRestoredSstFiles();
274+
}
275+
}
270276

271277
snapshotStrategy =
272278
initializeSnapshotStrategy(

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java

-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.flink.runtime.state.StreamStateHandle;
3232
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
3333
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
34-
import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
3534
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
3635
import org.apache.flink.util.IOUtils;
3736
import org.apache.flink.util.Preconditions;
@@ -115,10 +114,6 @@ private HandleAndLocalPath copyFileToCheckpoint(
115114
((ForStFlinkFileSystem) dbFileSystem).getMappingEntry(dbFilePath);
116115
Preconditions.checkNotNull(mappingEntry, "dbFile not found: " + dbFilePath);
117116
sourceStateHandle = mappingEntry.getSource().toStateHandle();
118-
if (mappingEntry.getFileOwnership() == FileOwnership.NOT_OWNED) {
119-
// The file is already owned by JM, simply return the state handle
120-
return HandleAndLocalPath.of(sourceStateHandle, dbFilePath.getName());
121-
}
122117
} else {
123118
// Construct a FileStateHandle base on the DB file
124119
FileSystem sourceFileSystem = dbFilePath.getFileSystem();

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java

+4
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ public List<String> listByPrefix(String path) {
165165
* @return always return true except for IOException
166166
*/
167167
public boolean renameFile(String src, String dst) throws IOException {
168+
if (src.equals(dst)) {
169+
return true;
170+
}
171+
168172
MappingEntry srcEntry = mappingTable.get(src);
169173
if (srcEntry != null) { // rename file
170174
if (mappingTable.containsKey(dst)) {

0 commit comments

Comments
 (0)