Skip to content

Commit

Permalink
[FLINK-36935][state/forst] Implement fast link for ForStFileSystem (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia authored Jan 6, 2025
1 parent c3d56df commit 9878e56
Show file tree
Hide file tree
Showing 7 changed files with 674 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,9 @@ public void clearDirectories() throws Exception {
}
}

private static void clearDirectories(Path basePath) throws IOException {
FileSystem fileSystem = basePath.getFileSystem();
private void clearDirectories(Path basePath) throws IOException {
FileSystem fileSystem =
forStFileSystem != null ? forStFileSystem : basePath.getFileSystem();
if (fileSystem.exists(basePath)) {
fileSystem.delete(basePath, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.state.forst.fs;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
Expand All @@ -33,14 +32,16 @@
import org.apache.flink.state.forst.fs.cache.FileBasedCache;
import org.apache.flink.state.forst.fs.cache.SizeBasedCacheLimitPolicy;
import org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy;
import org.apache.flink.state.forst.fs.filemapping.FileMappingManager;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.function.Function;
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -58,14 +59,10 @@ public class ForStFlinkFileSystem extends FileSystem {

private static final long SST_FILE_SIZE = 1024 * 1024 * 64;

private static final Function<String, Boolean> miscFileFilter = s -> !s.endsWith(".sst");

private final FileSystem localFS;
private final FileSystem delegateFS;
private final String remoteBase;
private final Function<String, Boolean> localFileFilter;
private final String localBase;
@Nullable private final FileBasedCache fileBasedCache;
private final FileMappingManager fileMappingManager;

public ForStFlinkFileSystem(
FileSystem delegateFS,
Expand All @@ -74,10 +71,9 @@ public ForStFlinkFileSystem(
@Nullable FileBasedCache fileBasedCache) {
this.localFS = FileSystem.getLocalFileSystem();
this.delegateFS = delegateFS;
this.localFileFilter = miscFileFilter;
this.remoteBase = remoteBase;
this.localBase = localBase;
this.fileBasedCache = fileBasedCache;
this.fileMappingManager =
new FileMappingManager(delegateFS, localFS, remoteBase, localBase);
}

/**
Expand Down Expand Up @@ -142,10 +138,10 @@ public ByteBufferWritableFSDataOutputStream create(Path path) throws IOException
@Override
public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwriteMode)
throws IOException {
Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
if (localPathTuple.f0) {
FileMappingManager.RealPath realPath = fileMappingManager.createFile(path);
if (realPath.isLocal) {
return new ByteBufferWritableFSDataOutputStream(
localFS.create(localPathTuple.f1, overwriteMode));
localFS.create(realPath.path, overwriteMode));
}

FSDataOutputStream originalOutputStream = delegateFS.create(path, overwriteMode);
Expand All @@ -157,19 +153,22 @@ public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwrit

@Override
public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throws IOException {
Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
if (localPathTuple.f0) {
FileMappingManager.RealPath realPath = fileMappingManager.realPath(path);
Preconditions.checkNotNull(realPath);
if (realPath.isLocal) {
return new ByteBufferReadableFSDataInputStream(
() -> localFS.open(localPathTuple.f1, bufferSize),
() -> localFS.open(realPath.path, bufferSize),
DEFAULT_INPUT_STREAM_CAPACITY,
localFS.getFileStatus(localPathTuple.f1).getLen());
localFS.getFileStatus(realPath.path).getLen());
}
FileStatus fileStatus = checkNotNull(getFileStatus(path));
FileStatus fileStatus = checkNotNull(getFileStatus(realPath.path));
return new ByteBufferReadableFSDataInputStream(
() -> {
FSDataInputStream inputStream = delegateFS.open(path, bufferSize);
FSDataInputStream inputStream = delegateFS.open(realPath.path, bufferSize);
CachedDataInputStream cachedDataInputStream =
fileBasedCache == null ? null : fileBasedCache.open(path, inputStream);
fileBasedCache == null
? null
: fileBasedCache.open(realPath.path, inputStream);
return cachedDataInputStream == null ? inputStream : cachedDataInputStream;
},
DEFAULT_INPUT_STREAM_CAPACITY,
Expand All @@ -178,19 +177,22 @@ public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throw

@Override
public ByteBufferReadableFSDataInputStream open(Path path) throws IOException {
Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
if (localPathTuple.f0) {
FileMappingManager.RealPath realPath = fileMappingManager.realPath(path);
Preconditions.checkNotNull(realPath);
if (realPath.isLocal) {
return new ByteBufferReadableFSDataInputStream(
() -> localFS.open(localPathTuple.f1),
() -> localFS.open(realPath.path),
DEFAULT_INPUT_STREAM_CAPACITY,
localFS.getFileStatus(localPathTuple.f1).getLen());
localFS.getFileStatus(realPath.path).getLen());
}
FileStatus fileStatus = checkNotNull(getFileStatus(path));
FileStatus fileStatus = checkNotNull(getFileStatus(realPath.path));
return new ByteBufferReadableFSDataInputStream(
() -> {
FSDataInputStream inputStream = delegateFS.open(path);
FSDataInputStream inputStream = delegateFS.open(realPath.path);
CachedDataInputStream cachedDataInputStream =
fileBasedCache == null ? null : fileBasedCache.open(path, inputStream);
fileBasedCache == null
? null
: fileBasedCache.open(realPath.path, inputStream);
return cachedDataInputStream == null ? inputStream : cachedDataInputStream;
},
DEFAULT_INPUT_STREAM_CAPACITY,
Expand All @@ -199,28 +201,7 @@ public ByteBufferReadableFSDataInputStream open(Path path) throws IOException {

@Override
public boolean rename(Path src, Path dst) throws IOException {
// The rename is not atomic for ForSt. Some FileSystems e.g. HDFS, OSS does not allow a
// renaming if the target already exists. So, we delete the target before attempting the
// rename.

Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(src);
if (localPathTuple.f0) {
Path localSrc = localPathTuple.f1;
Path localDst = tryBuildLocalPath(dst).f1;
FileStatus fileStatus = localFS.getFileStatus(localSrc);
boolean success = localFS.rename(localSrc, localDst);
if (!fileStatus.isDir()) {
return success;
}
}

if (delegateFS.exists(dst)) {
boolean deleted = delegateFS.delete(dst, false);
if (!deleted) {
throw new IOException("Fail to delete dst path: " + dst);
}
}
return delegateFS.rename(src, dst);
return fileMappingManager.renameFile(src.toString(), dst.toString());
}

@Override
Expand All @@ -240,143 +221,86 @@ public URI getUri() {

@Override
public boolean exists(final Path f) throws IOException {
Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f);
if (localPathTuple.f0) {
return localFS.exists(localPathTuple.f1);
FileMappingManager.RealPath realPath = fileMappingManager.realPath(f);
if (realPath == null) {
return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir();
}
return delegateFS.exists(f);

boolean status = false;
if (realPath.isLocal) {
status |= localFS.exists(realPath.path);
if (!status) {
status = delegateFS.exists(f);
}
} else {
status = delegateFS.exists(realPath.path);
}
return status;
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
if (localPathTuple.f0) {
return localFS.getFileStatus(localPathTuple.f1);
FileMappingManager.RealPath realPath = fileMappingManager.realPath(path);
Preconditions.checkNotNull(realPath);
if (realPath.isLocal) {
return localFS.getFileStatus(realPath.path);
}
return delegateFS.getFileStatus(path);
return delegateFS.getFileStatus(realPath.path);
}

@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
throws IOException {
Path path = file.getPath();
Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
if (localPathTuple.f0) {
FileStatus localFile = localFS.getFileStatus(localPathTuple.f1);
FileMappingManager.RealPath realPath = fileMappingManager.realPath(path);
Preconditions.checkNotNull(realPath);
if (realPath.isLocal) {
FileStatus localFile = localFS.getFileStatus(realPath.path);
return localFS.getFileBlockLocations(localFile, start, len);
}
return delegateFS.getFileBlockLocations(file, start, len);
}

@Override
public FileStatus[] listStatus(Path path) throws IOException {
FileStatus[] localFiles = new FileStatus[0];
Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
if (localPathTuple.f0) {
localFiles = localFS.listStatus(localPathTuple.f1);
// mapping files
List<FileStatus> fileStatuses = new ArrayList<>();
String pathStr = path.toString();
if (!pathStr.endsWith("/")) {
pathStr += "/";
}
int localFileNum = localFiles == null ? 0 : localFiles.length;
FileStatus[] remoteFiles = delegateFS.listStatus(path);
if (localFileNum == 0) {
return remoteFiles;
}
int remoteFileNum = remoteFiles == null ? 0 : remoteFiles.length;
FileStatus[] fileStatuses = new FileStatus[localFileNum + remoteFileNum];
for (int index = 0; index < localFileNum; index++) {
final FileStatus localFile = localFiles[index];
fileStatuses[index] =
new FileStatus() {
@Override
public long getLen() {
return localFile.getLen();
}

@Override
public long getBlockSize() {
return localFile.getBlockSize();
}

@Override
public short getReplication() {
return localFile.getReplication();
}

@Override
public long getModificationTime() {
return localFile.getModificationTime();
}

@Override
public long getAccessTime() {
return localFile.getAccessTime();
}

@Override
public boolean isDir() {
return localFile.isDir();
}

@Override
public Path getPath() {
if (localFile.getPath().toString().length() == localBase.length()) {
return new Path(remoteBase);
}
return new Path(
remoteBase,
localFile.getPath().toString().substring(localBase.length()));
}
};
}
if (remoteFileNum != 0) {
System.arraycopy(remoteFiles, 0, fileStatuses, localFileNum, remoteFileNum);
List<String> mappingFiles = fileMappingManager.listByPrefix(pathStr);
for (String mappingFile : mappingFiles) {
String relativePath = mappingFile.substring(pathStr.length());
int slashIndex = relativePath.indexOf('/');
if (slashIndex == -1) { // direct child
fileStatuses.add(getFileStatus(new Path(mappingFile)));
}
}
return fileStatuses;
return fileStatuses.toArray(new FileStatus[0]);
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
boolean success = false;
Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
if (localPathTuple.f0) {
success = localFS.delete(localPathTuple.f1, recursive); // delete from local
}
success |= delegateFS.delete(path, recursive); // and delete from remote
boolean success = fileMappingManager.deleteFile(path, recursive);
if (fileBasedCache != null) {
// only new generated file will put into cache, no need to consider file mapping
fileBasedCache.delete(path);
}
return success;
}

@Override
public boolean mkdirs(Path path) throws IOException {
boolean success = true;
Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
if (localPathTuple.f0) {
success &= localFS.mkdirs(localPathTuple.f1);
}
success &= delegateFS.mkdirs(path);
return success;
return delegateFS.mkdirs(path);
}

@Override
public boolean isDistributedFS() {
return delegateFS.isDistributedFS();
}

private Tuple2<Boolean, Path> tryBuildLocalPath(Path path) {
String remotePathStr = path.toString();
if (localFileFilter.apply(path.getName()) && remotePathStr.startsWith(remoteBase)) {
return Tuple2.of(
true,
remotePathStr.length() == remoteBase.length()
? new Path(localBase)
: new Path(localBase, remotePathStr.substring(remoteBase.length())));
}
return Tuple2.of(false, null);
}

public int link(Path src, Path dst) throws IOException {
// let forstdb copy the file
return -1;
return fileMappingManager.link(src.toString(), dst.toString());
}
}
Loading

0 comments on commit 9878e56

Please sign in to comment.