From 9878e561af30a90dd5491e9df9c99c8c17734a82 Mon Sep 17 00:00:00 2001 From: Yanfei Lei Date: Mon, 6 Jan 2025 21:00:25 +0800 Subject: [PATCH] [FLINK-36935][state/forst] Implement fast link for ForStFileSystem (#25818) --- .../state/forst/ForStResourceContainer.java | 5 +- .../state/forst/fs/ForStFlinkFileSystem.java | 216 +++++---------- .../fs/filemapping/FileMappingManager.java | 251 ++++++++++++++++++ .../forst/fs/filemapping/MappingEntry.java | 87 ++++++ .../ForStIncrementalRestoreOperation.java | 22 +- .../forst/fs/FileMappingManagerTest.java | 238 +++++++++++++++++ .../forst/fs/ForStFlinkFileSystemTest.java | 15 +- 7 files changed, 674 insertions(+), 160 deletions(-) create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java create mode 100644 flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index 53139ca07faee..08ee1a7f7faba 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -396,8 +396,9 @@ public void clearDirectories() throws Exception { } } - private static void clearDirectories(Path basePath) throws IOException { - FileSystem fileSystem = basePath.getFileSystem(); + private void clearDirectories(Path basePath) throws IOException { + FileSystem fileSystem = + forStFileSystem != null ? forStFileSystem : basePath.getFileSystem(); if (fileSystem.exists(basePath)) { fileSystem.delete(basePath, true); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index e760d07515eee..433f5f15c27c7 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -19,7 +19,6 @@ package org.apache.flink.state.forst.fs; import org.apache.flink.annotation.Experimental; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.BlockLocation; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; @@ -33,6 +32,7 @@ import org.apache.flink.state.forst.fs.cache.FileBasedCache; import org.apache.flink.state.forst.fs.cache.SizeBasedCacheLimitPolicy; import org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy; +import org.apache.flink.state.forst.fs.filemapping.FileMappingManager; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -40,7 +40,8 @@ import java.io.File; import java.io.IOException; import java.net.URI; -import java.util.function.Function; +import java.util.ArrayList; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -58,14 +59,10 @@ public class ForStFlinkFileSystem extends FileSystem { private static final long SST_FILE_SIZE = 1024 * 1024 * 64; - private static final Function miscFileFilter = s -> !s.endsWith(".sst"); - private final FileSystem localFS; private final FileSystem delegateFS; - private final String remoteBase; - private final Function localFileFilter; - private final String localBase; @Nullable private final FileBasedCache fileBasedCache; + private final FileMappingManager fileMappingManager; public ForStFlinkFileSystem( FileSystem delegateFS, @@ -74,10 +71,9 @@ public ForStFlinkFileSystem( @Nullable FileBasedCache fileBasedCache) { this.localFS = FileSystem.getLocalFileSystem(); this.delegateFS = delegateFS; - this.localFileFilter = miscFileFilter; - this.remoteBase = remoteBase; - this.localBase = localBase; this.fileBasedCache = fileBasedCache; + this.fileMappingManager = + new FileMappingManager(delegateFS, localFS, remoteBase, localBase); } /** @@ -142,10 +138,10 @@ public ByteBufferWritableFSDataOutputStream create(Path path) throws IOException @Override public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOException { - Tuple2 localPathTuple = tryBuildLocalPath(path); - if (localPathTuple.f0) { + FileMappingManager.RealPath realPath = fileMappingManager.createFile(path); + if (realPath.isLocal) { return new ByteBufferWritableFSDataOutputStream( - localFS.create(localPathTuple.f1, overwriteMode)); + localFS.create(realPath.path, overwriteMode)); } FSDataOutputStream originalOutputStream = delegateFS.create(path, overwriteMode); @@ -157,19 +153,22 @@ public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwrit @Override public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throws IOException { - Tuple2 localPathTuple = tryBuildLocalPath(path); - if (localPathTuple.f0) { + FileMappingManager.RealPath realPath = fileMappingManager.realPath(path); + Preconditions.checkNotNull(realPath); + if (realPath.isLocal) { return new ByteBufferReadableFSDataInputStream( - () -> localFS.open(localPathTuple.f1, bufferSize), + () -> localFS.open(realPath.path, bufferSize), DEFAULT_INPUT_STREAM_CAPACITY, - localFS.getFileStatus(localPathTuple.f1).getLen()); + localFS.getFileStatus(realPath.path).getLen()); } - FileStatus fileStatus = checkNotNull(getFileStatus(path)); + FileStatus fileStatus = checkNotNull(getFileStatus(realPath.path)); return new ByteBufferReadableFSDataInputStream( () -> { - FSDataInputStream inputStream = delegateFS.open(path, bufferSize); + FSDataInputStream inputStream = delegateFS.open(realPath.path, bufferSize); CachedDataInputStream cachedDataInputStream = - fileBasedCache == null ? null : fileBasedCache.open(path, inputStream); + fileBasedCache == null + ? null + : fileBasedCache.open(realPath.path, inputStream); return cachedDataInputStream == null ? inputStream : cachedDataInputStream; }, DEFAULT_INPUT_STREAM_CAPACITY, @@ -178,19 +177,22 @@ public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throw @Override public ByteBufferReadableFSDataInputStream open(Path path) throws IOException { - Tuple2 localPathTuple = tryBuildLocalPath(path); - if (localPathTuple.f0) { + FileMappingManager.RealPath realPath = fileMappingManager.realPath(path); + Preconditions.checkNotNull(realPath); + if (realPath.isLocal) { return new ByteBufferReadableFSDataInputStream( - () -> localFS.open(localPathTuple.f1), + () -> localFS.open(realPath.path), DEFAULT_INPUT_STREAM_CAPACITY, - localFS.getFileStatus(localPathTuple.f1).getLen()); + localFS.getFileStatus(realPath.path).getLen()); } - FileStatus fileStatus = checkNotNull(getFileStatus(path)); + FileStatus fileStatus = checkNotNull(getFileStatus(realPath.path)); return new ByteBufferReadableFSDataInputStream( () -> { - FSDataInputStream inputStream = delegateFS.open(path); + FSDataInputStream inputStream = delegateFS.open(realPath.path); CachedDataInputStream cachedDataInputStream = - fileBasedCache == null ? null : fileBasedCache.open(path, inputStream); + fileBasedCache == null + ? null + : fileBasedCache.open(realPath.path, inputStream); return cachedDataInputStream == null ? inputStream : cachedDataInputStream; }, DEFAULT_INPUT_STREAM_CAPACITY, @@ -199,28 +201,7 @@ public ByteBufferReadableFSDataInputStream open(Path path) throws IOException { @Override public boolean rename(Path src, Path dst) throws IOException { - // The rename is not atomic for ForSt. Some FileSystems e.g. HDFS, OSS does not allow a - // renaming if the target already exists. So, we delete the target before attempting the - // rename. - - Tuple2 localPathTuple = tryBuildLocalPath(src); - if (localPathTuple.f0) { - Path localSrc = localPathTuple.f1; - Path localDst = tryBuildLocalPath(dst).f1; - FileStatus fileStatus = localFS.getFileStatus(localSrc); - boolean success = localFS.rename(localSrc, localDst); - if (!fileStatus.isDir()) { - return success; - } - } - - if (delegateFS.exists(dst)) { - boolean deleted = delegateFS.delete(dst, false); - if (!deleted) { - throw new IOException("Fail to delete dst path: " + dst); - } - } - return delegateFS.rename(src, dst); + return fileMappingManager.renameFile(src.toString(), dst.toString()); } @Override @@ -240,29 +221,41 @@ public URI getUri() { @Override public boolean exists(final Path f) throws IOException { - Tuple2 localPathTuple = tryBuildLocalPath(f); - if (localPathTuple.f0) { - return localFS.exists(localPathTuple.f1); + FileMappingManager.RealPath realPath = fileMappingManager.realPath(f); + if (realPath == null) { + return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir(); } - return delegateFS.exists(f); + + boolean status = false; + if (realPath.isLocal) { + status |= localFS.exists(realPath.path); + if (!status) { + status = delegateFS.exists(f); + } + } else { + status = delegateFS.exists(realPath.path); + } + return status; } @Override public FileStatus getFileStatus(Path path) throws IOException { - Tuple2 localPathTuple = tryBuildLocalPath(path); - if (localPathTuple.f0) { - return localFS.getFileStatus(localPathTuple.f1); + FileMappingManager.RealPath realPath = fileMappingManager.realPath(path); + Preconditions.checkNotNull(realPath); + if (realPath.isLocal) { + return localFS.getFileStatus(realPath.path); } - return delegateFS.getFileStatus(path); + return delegateFS.getFileStatus(realPath.path); } @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { Path path = file.getPath(); - Tuple2 localPathTuple = tryBuildLocalPath(path); - if (localPathTuple.f0) { - FileStatus localFile = localFS.getFileStatus(localPathTuple.f1); + FileMappingManager.RealPath realPath = fileMappingManager.realPath(path); + Preconditions.checkNotNull(realPath); + if (realPath.isLocal) { + FileStatus localFile = localFS.getFileStatus(realPath.path); return localFS.getFileBlockLocations(localFile, start, len); } return delegateFS.getFileBlockLocations(file, start, len); @@ -270,78 +263,28 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l @Override public FileStatus[] listStatus(Path path) throws IOException { - FileStatus[] localFiles = new FileStatus[0]; - Tuple2 localPathTuple = tryBuildLocalPath(path); - if (localPathTuple.f0) { - localFiles = localFS.listStatus(localPathTuple.f1); + // mapping files + List fileStatuses = new ArrayList<>(); + String pathStr = path.toString(); + if (!pathStr.endsWith("/")) { + pathStr += "/"; } - int localFileNum = localFiles == null ? 0 : localFiles.length; - FileStatus[] remoteFiles = delegateFS.listStatus(path); - if (localFileNum == 0) { - return remoteFiles; - } - int remoteFileNum = remoteFiles == null ? 0 : remoteFiles.length; - FileStatus[] fileStatuses = new FileStatus[localFileNum + remoteFileNum]; - for (int index = 0; index < localFileNum; index++) { - final FileStatus localFile = localFiles[index]; - fileStatuses[index] = - new FileStatus() { - @Override - public long getLen() { - return localFile.getLen(); - } - - @Override - public long getBlockSize() { - return localFile.getBlockSize(); - } - - @Override - public short getReplication() { - return localFile.getReplication(); - } - - @Override - public long getModificationTime() { - return localFile.getModificationTime(); - } - - @Override - public long getAccessTime() { - return localFile.getAccessTime(); - } - - @Override - public boolean isDir() { - return localFile.isDir(); - } - - @Override - public Path getPath() { - if (localFile.getPath().toString().length() == localBase.length()) { - return new Path(remoteBase); - } - return new Path( - remoteBase, - localFile.getPath().toString().substring(localBase.length())); - } - }; - } - if (remoteFileNum != 0) { - System.arraycopy(remoteFiles, 0, fileStatuses, localFileNum, remoteFileNum); + List mappingFiles = fileMappingManager.listByPrefix(pathStr); + for (String mappingFile : mappingFiles) { + String relativePath = mappingFile.substring(pathStr.length()); + int slashIndex = relativePath.indexOf('/'); + if (slashIndex == -1) { // direct child + fileStatuses.add(getFileStatus(new Path(mappingFile))); + } } - return fileStatuses; + return fileStatuses.toArray(new FileStatus[0]); } @Override public boolean delete(Path path, boolean recursive) throws IOException { - boolean success = false; - Tuple2 localPathTuple = tryBuildLocalPath(path); - if (localPathTuple.f0) { - success = localFS.delete(localPathTuple.f1, recursive); // delete from local - } - success |= delegateFS.delete(path, recursive); // and delete from remote + boolean success = fileMappingManager.deleteFile(path, recursive); if (fileBasedCache != null) { + // only new generated file will put into cache, no need to consider file mapping fileBasedCache.delete(path); } return success; @@ -349,13 +292,7 @@ public boolean delete(Path path, boolean recursive) throws IOException { @Override public boolean mkdirs(Path path) throws IOException { - boolean success = true; - Tuple2 localPathTuple = tryBuildLocalPath(path); - if (localPathTuple.f0) { - success &= localFS.mkdirs(localPathTuple.f1); - } - success &= delegateFS.mkdirs(path); - return success; + return delegateFS.mkdirs(path); } @Override @@ -363,20 +300,7 @@ public boolean isDistributedFS() { return delegateFS.isDistributedFS(); } - private Tuple2 tryBuildLocalPath(Path path) { - String remotePathStr = path.toString(); - if (localFileFilter.apply(path.getName()) && remotePathStr.startsWith(remoteBase)) { - return Tuple2.of( - true, - remotePathStr.length() == remoteBase.length() - ? new Path(localBase) - : new Path(localBase, remotePathStr.substring(remoteBase.length()))); - } - return Tuple2.of(false, null); - } - public int link(Path src, Path dst) throws IOException { - // let forstdb copy the file - return -1; + return fileMappingManager.link(src.toString(), dst.toString()); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java new file mode 100644 index 0000000000000..d935a5e4d08bc --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs.filemapping; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A manager to manage file mapping of forst file system, including misc file mapping (remote file + * -> local file) and linked mapping (remote file -> remote file). Note, the key/value of mapping + * table must be a file path, directories are maintained by file system itself, directories wouldn't + * be the key/value of mapping table. + */ +public class FileMappingManager { + + private static final Logger LOG = LoggerFactory.getLogger(FileMappingManager.class); + + public static final String SST_SUFFIX = ".sst"; + + private final FileSystem fileSystem; + + private final FileSystem localFileSystem; + + private final HashMap mappingTable; + + private final String remoteBase; + + private final String localBase; + + public FileMappingManager( + FileSystem fileSystem, + FileSystem localFileSystem, + String remoteBase, + String localBase) { + this.fileSystem = fileSystem; + this.localFileSystem = localFileSystem; + this.mappingTable = new HashMap<>(); + this.remoteBase = remoteBase; + this.localBase = localBase; + } + + /** Create a mapping entry for a file. */ + public RealPath createFile(Path file) { + String fileName = file.toString(); + Preconditions.checkState(!mappingTable.containsKey(fileName)); + if (!fileName.endsWith(SST_SUFFIX) && fileName.startsWith(remoteBase)) { + Path localFile = new Path(localBase, file.getName()); + mappingTable.put( + fileName, + new MappingEntry(1, localFileSystem, localFile.toString(), true, false)); + return new RealPath(localFile, true); + } else { + mappingTable.put(fileName, new MappingEntry(1, fileSystem, fileName, false, false)); + return new RealPath(file, false); + } + } + + /** Called by link/copy. Directory link is not supported now. */ + public int link(String src, String dst) { + if (src.equals(dst)) { + return -1; + } + // if dst already exist, not allow + if (mappingTable.containsKey(dst)) { + return -1; + } + MappingEntry sourceEntry = mappingTable.get(src); + if (sourceEntry != null) { + sourceEntry.retain(); + mappingTable.putIfAbsent(dst, sourceEntry); + } else { + sourceEntry = new MappingEntry(0, fileSystem, src, false, false); + sourceEntry.retain(); + mappingTable.put(src, sourceEntry); + sourceEntry.retain(); + mappingTable.put(dst, sourceEntry); + } + LOG.trace("link: {} -> {}", dst, src); + return 0; + } + + /** + * Get the real path of a file, the real path maybe a local file or a remote file/dir. Due to + * the lazy deletion, if the path is a directory, the exists check may have false positives. + */ + public RealPath realPath(Path path) { + String fileName = path.toString(); + MappingEntry entry = mappingTable.getOrDefault(fileName, null); + if (entry != null) { + return new RealPath(new Path(entry.sourcePath), entry.isLocal); + } + return null; + } + + public List listByPrefix(String path) { + List linkedPaths = new ArrayList<>(); + for (Map.Entry entry : mappingTable.entrySet()) { + if (isParentDir(entry.getKey(), path)) { + linkedPaths.add(entry.getKey()); + } + } + return linkedPaths; + } + + /** + * 1. If src can match any key, we only `mark rename`, no physical file would be renamed. 2. If + * src is a directory, all files under src will be renamed, including linked files and local + * files, the directory also would be renamed in file system physically. + * + * @param src the source path + * @param dst the destination path + * @return always return true except for IOException + */ + public boolean renameFile(String src, String dst) throws IOException { + MappingEntry srcEntry = mappingTable.get(src); + if (srcEntry != null) { // rename file + if (mappingTable.containsKey(dst)) { + MappingEntry dstEntry = mappingTable.remove(dst); + dstEntry.release(); + } + mappingTable.remove(src); + mappingTable.put(dst, srcEntry); + } else { // rename directory = link to dst dir + delete src dir + + // step 1: link all files under src to dst + List toRename = listByPrefix(src); + for (String key : toRename) { + MappingEntry sourceEntry = mappingTable.get(key); + sourceEntry.retain(); + String renamedDst = key.replace(src, dst); + LOG.trace("rename: {} -> {}", key, renamedDst); + mappingTable.put(renamedDst, sourceEntry); + } + + Path dstPath = new Path(dst); + if (!fileSystem.exists(dstPath)) { + fileSystem.mkdirs(dstPath); + } + // step 2: delete src dir + deleteFile(new Path(src), true); + } + return true; + } + + /** + * Delete a file or directory from mapping table and file system, the directory deletion may be + * deferred. + * + * @param file to be deleted + * @param recursive whether to delete recursively + * @return true if the file or directory is deleted successfully, false otherwise. + * @throws IOException if an error occurs during deletion + */ + public boolean deleteFile(Path file, boolean recursive) throws IOException { + String fileStr = file.toString(); + MappingEntry entry = mappingTable.getOrDefault(fileStr, null); + LOG.trace("delete: {}, source:{}", file, entry == null ? "null" : entry.sourcePath); + // case 1: delete file + if (entry != null) { + mappingTable.remove(fileStr); + entry.release(); + return true; + } + + // case 2: delete directory + if (!recursive) { + throw new IOException(fileStr + "is a directory, delete failed."); + } + MappingEntry parentEntry = new MappingEntry(0, fileSystem, fileStr, false, recursive); + + // step 2.1: find all matched entries, mark delete dir as parent dir + for (Map.Entry currentEntry : mappingTable.entrySet()) { + if (!isParentDir(currentEntry.getValue().sourcePath, fileStr)) { + continue; + } + MappingEntry oldParentDir = currentEntry.getValue().parentDir; + if (oldParentDir == null + || isParentDir(oldParentDir.sourcePath, fileStr) + && !oldParentDir.equals(parentEntry)) { + parentEntry.retain(); + currentEntry.getValue().parentDir = parentEntry; + } + } + + boolean status = true; + // step 2.2: release file under directory + if (parentEntry.getReferenceCount() == 0) { + // an empty directory + status = fileSystem.delete(file, recursive); + } + List toRelease = listByPrefix(fileStr); + for (String key : toRelease) { + mappingTable.remove(key).release(); + } + return status; + } + + @VisibleForTesting + public MappingEntry mappingEntry(String path) { + return mappingTable.getOrDefault(path, null); + } + + private boolean isParentDir(String path, String dir) { + if (dir.isEmpty()) { + return false; + } + if (dir.charAt(dir.length() - 1) == '/') { + return path.startsWith(dir); + } else { + return (path.startsWith(dir + "/")); + } + } + + /** A wrapper of real path. */ + public static class RealPath { + public final Path path; + public final boolean isLocal; + + public RealPath(Path path, boolean isLocal) { + this.path = path; + this.isLocal = isLocal; + } + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java new file mode 100644 index 0000000000000..73498e06c51fd --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs.filemapping; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.asyncprocessing.ReferenceCounted; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** + * A file mapping entry that encapsulates source and destination path. Source Path : dest Path = 1 : + * N. + */ +public class MappingEntry extends ReferenceCounted { + private static final Logger LOG = LoggerFactory.getLogger(MappingEntry.class); + + /** The reference of file mapping manager. */ + private final FileSystem fileSystem; + + /** The original path of file. */ + String sourcePath; + + /** Whether the file is local. */ + boolean isLocal; + + boolean recursive; + + /** When delete a directory, if the directory is the parent of this source file, track it. */ + @Nullable MappingEntry parentDir; + + public MappingEntry( + int initReference, + FileSystem fileSystem, + String sourcePath, + boolean isLocal, + boolean recursive) { + super(initReference); + this.fileSystem = fileSystem; + this.sourcePath = sourcePath; + this.parentDir = null; + this.isLocal = isLocal; + this.recursive = recursive; + } + + @Override + protected void referenceCountReachedZero(@Nullable Object o) { + try { + if (parentDir != null) { + parentDir.release(); + } + fileSystem.delete(new Path(sourcePath), recursive); + } catch (Exception e) { + LOG.warn("Failed to delete file {}.", sourcePath, e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return sourcePath.equals(((MappingEntry) o).sourcePath); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java index 4bfee85de65cb..fc9a1051df1a6 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java @@ -243,7 +243,7 @@ public ForStRestoreResult restore() throws Exception { .forEach( dir -> { try { - FileSystem fs = dir.getFileSystem(); + FileSystem fs = getFileSystem(dir); fs.delete(dir, true); } catch (IOException ignored) { logger.warn("Failed to delete transfer destination {}", dir); @@ -253,11 +253,7 @@ public ForStRestoreResult restore() throws Exception { } private void transferAllStateHandles(List specs) throws Exception { - FileSystem forStFs = optionsContainer.getFileSystem(); - if (forStFs == null) { - forStFs = FileSystem.getLocalFileSystem(); - } - + FileSystem forStFs = getFileSystem(optionsContainer.getBasePath()); try (ForStStateDataTransfer transfer = new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs)) { transfer.transferAllStateDataToDirectory(specs, cancelStreamRegistry); @@ -558,7 +554,7 @@ private void copyTempDbIntoBaseDb( private void cleanUpPathQuietly(@Nonnull Path path) { try { - path.getFileSystem().delete(path, true); + getFileSystem(forstBasePath).delete(path, true); } catch (IOException ex) { logger.warn("Failed to clean up path " + path, ex); } @@ -664,7 +660,7 @@ public void mergeStateHandlesWithClipAndIngest( throws Exception { Path exportCfBasePath = new Path(forstBasePath, "export-cfs"); - forstBasePath.getFileSystem().mkdirs(exportCfBasePath); + getFileSystem(forstBasePath).mkdirs(exportCfBasePath); final Map> exportedColumnFamilyMetaData = new HashMap<>(keyedStateHandles.size()); @@ -750,7 +746,7 @@ public void exportColumnFamilies( checkpoint.exportColumnFamily(columnFamilyHandles.get(i), subPathStr); FileStatus[] exportedSstFiles = - exportBasePath.getFileSystem().listStatus(new Path(exportBasePath, uuid)); + getFileSystem(exportBasePath).listStatus(new Path(exportBasePath, uuid)); if (exportedSstFiles != null) { int sstFileCount = 0; @@ -967,6 +963,14 @@ private RestoredDBInstance restoreTempDBInstance(StateHandleTransferSpec stateHa stateHandleSpec.getTransferDestination().toString()); } + private FileSystem getFileSystem(Path path) throws IOException { + if (optionsContainer.getFileSystem() != null) { + return optionsContainer.getFileSystem(); + } else { + return path.getFileSystem(); + } + } + /** Entity to hold the temporary RocksDB instance created for restore. */ public static class RestoredDBInstance implements AutoCloseable { diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java new file mode 100644 index 0000000000000..081e12d690c1d --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.state.forst.fs.filemapping.FileMappingManager; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link FileMappingManager}. */ +public class FileMappingManagerTest { + @TempDir static java.nio.file.Path tempDir; + + @Test + void testFileLink() throws IOException { + FileSystem localFS = FileSystem.getLocalFileSystem(); + FileMappingManager fileMappingManager = + new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString()); + String src = tempDir + "/source"; + FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); + os.write(233); + os.close(); + String dst = tempDir.toString() + "/dst"; + fileMappingManager.link(src, dst); + assertThat(fileMappingManager.realPath(new Path(dst)).path.toString()).isEqualTo(src); + } + + @Test + void testNestLink() throws IOException { + // link b->a + // link c->b + // link d->c + FileSystem localFS = FileSystem.getLocalFileSystem(); + FileMappingManager fileMappingManager = + new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString()); + String src = tempDir + "/a"; + FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); + os.write(233); + os.close(); + String dstB = tempDir.toString() + "/b"; + fileMappingManager.link(src, dstB); + assertThat(fileMappingManager.realPath(new Path(dstB)).path.toString()).isEqualTo(src); + assertThat(fileMappingManager.mappingEntry(dstB).getReferenceCount()).isEqualTo(2); + + String dstC = tempDir.toString() + "/c"; + fileMappingManager.link(dstB, dstC); + assertThat(fileMappingManager.realPath(new Path(dstC)).path.toString()).isEqualTo(src); + assertThat(fileMappingManager.mappingEntry(dstC).getReferenceCount()).isEqualTo(3); + + String dstD = tempDir.toString() + "/d"; + fileMappingManager.link(dstC, dstD); + assertThat(fileMappingManager.realPath(new Path(dstD)).path.toString()).isEqualTo(src); + assertThat(fileMappingManager.mappingEntry(dstC).getReferenceCount()).isEqualTo(4); + + assertThat(fileMappingManager.link(dstD, dstC)).isEqualTo(-1); + } + + @Test + void testFileDelete() throws IOException { + FileSystem localFS = FileSystem.getLocalFileSystem(); + FileMappingManager fileMappingManager = + new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString()); + String src = tempDir + "/source"; + FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); + os.write(233); + os.close(); + String dst = tempDir.toString() + "/dst"; + fileMappingManager.link(src, dst); + // delete src + fileMappingManager.deleteFile(new Path(src), false); + assertThat(localFS.exists(new Path(src))).isTrue(); + + // delete dst + fileMappingManager.deleteFile(new Path(dst), false); + assertThat(localFS.exists(new Path(src))).isFalse(); + } + + @Test + void testDirectoryDelete() throws IOException { + FileSystem localFS = FileSystem.getLocalFileSystem(); + FileMappingManager fileMappingManager = + new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString()); + String testDir = tempDir + "/testDir"; + localFS.mkdirs(new Path(testDir)); + String src = testDir + "/source"; + FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); + os.write(233); + os.close(); + String dst = tempDir.toString() + "/dst"; + fileMappingManager.link(src, dst); + + // delete testDir + fileMappingManager.deleteFile(new Path(testDir), true); + assertThat(localFS.exists(new Path(src))).isTrue(); + assertThat(localFS.exists(new Path(testDir))).isTrue(); + + // delete dst + fileMappingManager.deleteFile(new Path(dst), false); + assertThat(localFS.exists(new Path(src))).isFalse(); + assertThat(localFS.exists(new Path(testDir))).isFalse(); + } + + @Test + void testDirectoryRename() throws IOException { + FileSystem localFS = FileSystem.getLocalFileSystem(); + FileMappingManager fileMappingManager = + new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString()); + String testDir = tempDir + "/testDir"; + localFS.mkdirs(new Path(testDir)); + String src = testDir + "/source"; + FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); + os.write(233); + os.close(); + + String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp"; + localFS.mkdirs(new Path(linkedDirTmp)); + String linkedSrc = linkedDirTmp + "/source"; + fileMappingManager.link(src, linkedSrc); + + String linkedDir = tempDir.toString() + "/linkedDir"; + // rename linkDir.tmp to linkedDir + assertThat(fileMappingManager.renameFile(linkedDirTmp, linkedDir)).isEqualTo(true); + linkedSrc = linkedDir + "/source"; + + // delete src + assertThat(fileMappingManager.deleteFile(new Path(src), false)).isEqualTo(true); + assertThat(localFS.exists(new Path(testDir))).isTrue(); + assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse(); + assertThat(localFS.exists(new Path(linkedDir))).isTrue(); + assertThat(localFS.exists(new Path(src))).isTrue(); + + // delete testDir + fileMappingManager.deleteFile(new Path(testDir), true); + assertThat(localFS.exists(new Path(testDir))).isTrue(); + assertThat(localFS.exists(new Path(linkedDir))).isTrue(); + assertThat(localFS.exists(new Path(src))).isTrue(); + + // delete linkedSrc + assertThat(fileMappingManager.deleteFile(new Path(linkedSrc), false)).isEqualTo(true); + assertThat(localFS.exists(new Path(src))).isFalse(); + assertThat(localFS.exists(new Path(testDir))).isFalse(); + + // delete linkedDir + assertThat(fileMappingManager.deleteFile(new Path(linkedDir), true)).isEqualTo(true); + assertThat(localFS.exists(new Path(testDir))).isFalse(); + assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse(); + assertThat(localFS.exists(new Path(linkedDir))).isFalse(); + assertThat(localFS.exists(new Path(src))).isFalse(); + } + + @Test + void testCreateFileBeforeRename() throws IOException { + FileSystem localFS = FileSystem.getLocalFileSystem(); + FileMappingManager fileMappingManager = + new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString()); + String testDir = tempDir + "/testDir"; + localFS.mkdirs(new Path(testDir)); + String src = testDir + "/source"; + FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); + os.write(233); + os.close(); + + String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp"; + localFS.mkdirs(new Path(linkedDirTmp)); + String linkedSrc = linkedDirTmp + "/source"; + + // link src to linkedDirTmp + fileMappingManager.link(src, linkedSrc); + + // create file in linkedDirTmp + String create = linkedDirTmp + "/create.sst"; + FileMappingManager.RealPath realPath = fileMappingManager.createFile(new Path(create)); + FSDataOutputStream os1 = localFS.create(realPath.path, FileSystem.WriteMode.OVERWRITE); + os1.write(233); + os1.close(); + + String linkedDir = tempDir.toString() + "/linkedDir"; + // rename linkDir.tmp to linkedDir + assertThat(fileMappingManager.renameFile(linkedDirTmp, linkedDir)).isEqualTo(true); + linkedSrc = linkedDir + "/source"; + + // delete src + assertThat(fileMappingManager.deleteFile(new Path(src), false)).isEqualTo(true); + assertThat(localFS.exists(new Path(testDir))).isTrue(); + assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue(); + assertThat(localFS.exists(new Path(linkedDir))).isTrue(); + assertThat(localFS.exists(new Path(src))).isTrue(); + + // delete testDir + fileMappingManager.deleteFile(new Path(testDir), true); + assertThat(localFS.exists(new Path(testDir))).isTrue(); + assertThat(localFS.exists(new Path(linkedDir))).isTrue(); + assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue(); + assertThat(localFS.exists(new Path(src))).isTrue(); + + // delete linkedSrc + assertThat(fileMappingManager.deleteFile(new Path(linkedSrc), false)).isEqualTo(true); + assertThat(localFS.exists(new Path(src))).isFalse(); + assertThat(localFS.exists(new Path(testDir))).isFalse(); + assertThat(localFS.exists(new Path(linkedDir))).isTrue(); + assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue(); + + // delete create file + String renamedCreated = linkedDir + "/create.sst"; + assertThat(fileMappingManager.deleteFile(new Path(renamedCreated), false)).isEqualTo(true); + assertThat(localFS.exists(new Path(renamedCreated))).isFalse(); + assertThat(localFS.exists(new Path(linkedDir))).isTrue(); + assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse(); + assertThat(localFS.exists(new Path(testDir))).isFalse(); + + // delete linkedDir + assertThat(fileMappingManager.deleteFile(new Path(linkedDir), true)).isEqualTo(true); + assertThat(localFS.exists(new Path(testDir))).isFalse(); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java index 678fd4db491a2..c6a937a512f2f 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java @@ -205,7 +205,10 @@ void testMiscFileInLocal() throws IOException { os.write(233); os.sync(); os.close(); - assertThat(fileSystem.exists(new org.apache.flink.core.fs.Path(localPath, "CURRENT"))) + assertThat( + localPath + .getFileSystem() + .exists(new org.apache.flink.core.fs.Path(localPath, "CURRENT"))) .isTrue(); ByteBufferReadableFSDataInputStream is = fileSystem.open(new org.apache.flink.core.fs.Path(remotePath, "CURRENT")); @@ -245,7 +248,10 @@ void testSstFileInCache() throws IOException { os.close(); assertThat(fileSystem.exists(new org.apache.flink.core.fs.Path(remotePath, "1.sst"))) .isTrue(); - assertThat(fileSystem.exists(new org.apache.flink.core.fs.Path(cachePath, "1.sst"))) + assertThat( + cachePath + .getFileSystem() + .exists(new org.apache.flink.core.fs.Path(cachePath, "1.sst"))) .isTrue(); FileCacheEntry cacheEntry = cache.get(cachePath.getPath() + "/1.sst"); assertThat(cacheEntry).isNotNull(); @@ -268,7 +274,10 @@ void testSstFileInCache() throws IOException { .isTrue(); assertThat(fileSystem.exists(new org.apache.flink.core.fs.Path(cachePath, "1.sst"))) .isFalse(); - assertThat(fileSystem.exists(new org.apache.flink.core.fs.Path(cachePath, "2.sst"))) + assertThat( + cachePath + .getFileSystem() + .exists(new org.apache.flink.core.fs.Path(cachePath, "2.sst"))) .isTrue(); assertThat(cacheEntry.getReferenceCount()).isEqualTo(0); // read after evict