Skip to content

Commit

Permalink
[feat]curvefs/client:support warmup sym link
Browse files Browse the repository at this point in the history
add feat support symlink by fuseOpReadLink

Signed-off-by: Cyber-SiKu <[email protected]>
  • Loading branch information
Cyber-SiKu committed Aug 30, 2023
1 parent 6f5c85b commit f40f641
Show file tree
Hide file tree
Showing 5 changed files with 1,536 additions and 214 deletions.
2 changes: 1 addition & 1 deletion curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,6 @@ CURVEFS_ERROR FuseClient::FuseOpLink(fuse_req_t req,
CURVEFS_ERROR FuseClient::FuseOpReadLink(fuse_req_t req, fuse_ino_t ino,
std::string *linkStr) {
(void)req;
VLOG(1) << "FuseOpReadLink, ino: " << ino << ", linkStr: " << linkStr;
InodeAttr attr;
CURVEFS_ERROR ret = inodeManager_->GetInodeAttr(ino, &attr);
if (ret != CURVEFS_ERROR::OK) {
Expand All @@ -1344,6 +1343,7 @@ CURVEFS_ERROR FuseClient::FuseOpReadLink(fuse_req_t req, fuse_ino_t ino,
return ret;
}
*linkStr = attr.symlink();
VLOG(1) << "FuseOpReadLink, ino: " << ino << ", linkStr: " << *linkStr;
return CURVEFS_ERROR::OK;
}

Expand Down
6 changes: 5 additions & 1 deletion curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ class FuseS3Client : public FuseClient {
char *buffer, size_t *rSize) {
return FuseOpRead(req, ino, size, off, fi, buffer, rSize);
};
auto readLinkFunc = [this](fuse_req_t req, fuse_ino_t ino,
std::string *linkStr) {
return FuseClient::FuseOpReadLink(req, ino, linkStr);
};
warmupManager_ = std::make_shared<warmup::WarmupManagerS3Impl>(
metaClient_, inodeManager_, dentryManager_, fsInfo_, readFunc,
s3Adaptor_, nullptr);
readLinkFunc, nullptr, s3Adaptor_);
}

FuseS3Client(const std::shared_ptr<MdsClient> &mdsClient,
Expand Down
191 changes: 136 additions & 55 deletions curvefs/src/client/warmup/warmup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

#include <glog/logging.h>
#include <unistd.h>
#include <fmt/format.h>

#include <algorithm>
#include <atomic>
#include <deque>
#include <list>
#include <memory>
#include <string>
#include <utility>

#include "curvefs/src/client/common/common.h"
Expand Down Expand Up @@ -57,7 +59,7 @@ bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key,
}
// add warmup Progress
if (AddWarmupProcess(key, type)) {
VLOG(9) << "add warmup list task:" << key;
LOG(INFO) << "add warmup list task:" << key;
WriteLockGuard lock(warmupFilelistDequeMutex_);
auto iter = FindWarmupFilelistByKeyLocked(key);
if (iter == warmupFilelistDeque_.end()) {
Expand All @@ -83,7 +85,7 @@ bool WarmupManagerS3Impl::AddWarmupFile(fuse_ino_t key, const std::string &path,
}
// add warmup Progress
if (AddWarmupProcess(key, type)) {
VLOG(9) << "add warmup single task:" << key;
LOG(INFO) << "add warmup single task:" << key;
FetchDentryEnqueue(key, path);
}
return true;
Expand Down Expand Up @@ -164,7 +166,6 @@ void WarmupManagerS3Impl::FetchDentryEnqueue(fuse_ino_t key,

void WarmupManagerS3Impl::LookPath(fuse_ino_t key, std::string file) {
VLOG(9) << "LookPath start key: " << key << " file: " << file;
std::vector<std::string> splitPath;
// remove enter, newline, blank
std::string blanks("\r\n ");
file.erase(0, file.find_first_not_of(blanks));
Expand All @@ -173,59 +174,35 @@ void WarmupManagerS3Impl::LookPath(fuse_ino_t key, std::string file) {
VLOG(9) << "empty path";
return;
}
bool isRoot = false;
if (file == "/") {
splitPath.push_back(file);
isRoot = true;
} else {
curve::common::AddSplitStringToResult(file, "/", &splitPath);

if (!curve::common::StringStartWith(file, "/")) {
LOG(ERROR) << fmt::format("{} isn't absolute path", file);
file.erase(0, 1);
}
VLOG(6) << "splitPath size is: " << splitPath.size();
if (splitPath.size() == 1 && isRoot) {
std::vector<std::string> splitPath;
curve::common::AddSplitStringToResult(file, "/", &splitPath);

VLOG(6) << fmt::format("splitPath: {}", fmt::join(splitPath, ","));
if (splitPath.empty()) {
VLOG(9) << "i am root";
auto task = [this, key]() {
FetchChildDentry(key, fsInfo_->rootinodeid());
};
AddFetchDentryTask(key, task);
return;
} else if (splitPath.size() == 1) {
VLOG(9) << "parent is root: " << fsInfo_->rootinodeid()
<< ", path is: " << splitPath[0];
auto task = [this, key, splitPath]() {
FetchDentry(key, fsInfo_->rootinodeid(), splitPath[0]);
};
AddFetchDentryTask(key, task);
return;
} else if (splitPath.size() > 1) { // travel path
VLOG(9) << "traverse path start: " << splitPath.size();
std::string lastName = splitPath.back();
splitPath.pop_back();
fuse_ino_t ino = fsInfo_->rootinodeid();
for (auto iter : splitPath) {
VLOG(9) << "traverse path: " << iter << "ino is: " << ino;
Dentry dentry;
std::string pathName = iter;
CURVEFS_ERROR ret =
dentryManager_->GetDentry(ino, pathName, &dentry);
if (ret != CURVEFS_ERROR::OK) {
if (ret != CURVEFS_ERROR::NOTEXIST) {
LOG(WARNING)
<< "dentryManager_ get dentry fail, ret = " << ret
<< ", parent inodeid = " << ino << ", name = " << file;
}
VLOG(9) << "FetchDentry error: " << ret;
return;
}
ino = dentry.inodeid();
} else {
fuse_ino_t parent;
bool result =
GetInodeSubPathParent(fsInfo_->rootinodeid(), splitPath, &parent);
if (!result) {
LOG(ERROR) << "GetInodeSubPathParent fail, path: " << file;
return;
}
auto task = [this, key, ino, lastName]() {
FetchDentry(key, ino, lastName);
std::string lastName = splitPath.back();
auto task = [this, key, parent, lastName]() {
FetchDentry(key, parent, lastName);
};
AddFetchDentryTask(key, task);
VLOG(9) << "ino is: " << ino << " lastname is: " << lastName;
return;
} else {
VLOG(3) << "unknown path";
}
VLOG(9) << "LookPath start end: " << key << " file: " << file;
}
Expand Down Expand Up @@ -265,7 +242,25 @@ void WarmupManagerS3Impl::FetchDentry(fuse_ino_t key, fuse_ino_t ino,
return;

} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
// skip links
std::string symLink;
CURVEFS_ERROR statusCode =
fuseOpReadLink_(0, dentry.inodeid(), &symLink);
if (statusCode != CURVEFS_ERROR::OK) {
LOG(ERROR) << "readlink fail, inodeid = " << dentry.inodeid()
<< ", path = " << file;
return;
}
std::vector<std::string> splitSymLink;
curve::common::AddSplitStringToResult(symLink, "/", &splitSymLink);
fuse_ino_t parent;
if (!GetInodeSubPathParent(dentry.inodeid(), splitSymLink, &parent)) {
LOG(ERROR) << "GetInodeSubPathParent fail, path: " << symLink;
return;
}
std::string lastName = splitSymLink.back();
auto task = [this, key, parent, lastName]() {
FetchDentry(key, parent, lastName);
};
} else {
VLOG(3) << "unkown, file: " << file << ", ino: " << ino;
return;
Expand Down Expand Up @@ -302,7 +297,26 @@ void WarmupManagerS3Impl::FetchChildDentry(fuse_ino_t key, fuse_ino_t ino) {
};
AddFetchDentryTask(key, task);
VLOG(9) << "FetchChildDentry: " << dentry.inodeid();
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) { // need todo
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
std::string symLink;
CURVEFS_ERROR statusCode =
fuseOpReadLink_(0, dentry.inodeid(), &symLink);
if (statusCode != CURVEFS_ERROR::OK) {
LOG(ERROR) << "readlink fail, inodeid = " << dentry.inodeid();
continue;
}
std::vector<std::string> splitSymLink;
curve::common::AddSplitStringToResult(symLink, "/", &splitSymLink);
fuse_ino_t parent;
if (!GetInodeSubPathParent(dentry.inodeid(), splitSymLink,
&parent)) {
LOG(ERROR) << "GetInodeSubPathParent fail, path: " << symLink;
continue;
}
std::string lastName = splitSymLink.back();
auto task = [this, key, parent, lastName]() {
FetchDentry(key, parent, lastName);
};
} else {
VLOG(9) << "unknown type";
}
Expand Down Expand Up @@ -411,8 +425,8 @@ void WarmupManagerS3Impl::TravelChunk(fuse_ino_t ino,
if (!firstBlockFull) {
travelStartIndex = blockIndexBegin + 1;
auto objectName = curvefs::common::s3util::GenObjName(
chunkid, blockIndexBegin, compaction,
fsId, ino, objectPrefix);
chunkid, blockIndexBegin, compaction, fsId, ino,
objectPrefix);
prefetchObjs->push_back(
std::make_pair(objectName, firstBlockSize));
} else {
Expand All @@ -424,8 +438,8 @@ void WarmupManagerS3Impl::TravelChunk(fuse_ino_t ino,
? blockIndexEnd
: blockIndexEnd - 1;
auto objectName = curvefs::common::s3util::GenObjName(
chunkid, blockIndexEnd, compaction,
fsId, ino, objectPrefix);
chunkid, blockIndexEnd, compaction, fsId, ino,
objectPrefix);
// there is no need to care about the order
// in which objects are downloaded
prefetchObjs->push_back(
Expand Down Expand Up @@ -539,8 +553,7 @@ bool WarmupManagerS3Impl::ProgressDone(fuse_ino_t key) {
bool ret;
{
ReadLockGuard lockList(warmupFilelistDequeMutex_);
ret =
FindWarmupFilelistByKeyLocked(key) == warmupFilelistDeque_.end();
ret = FindWarmupFilelistByKeyLocked(key) == warmupFilelistDeque_.end();
}

{
Expand Down Expand Up @@ -600,7 +613,7 @@ void WarmupManagerS3Impl::ScanCleanWarmupProgress() {
ReadLockGuard lock(inode2ProgressMutex_);
for (auto iter = inode2Progress_.begin(); iter != inode2Progress_.end();) {
if (ProgressDone(iter->first)) {
VLOG(9) << "warmup key: " << iter->first << " done!";
LOG(INFO) << "warmup task: " << iter->first << " done!";
iter = inode2Progress_.erase(iter);
} else {
++iter;
Expand Down Expand Up @@ -719,6 +732,74 @@ void WarmupManager::CollectMetrics(InterfaceMetric *interface, int count,
interface->latency << (butil::cpuwide_time_us() - start);
}

bool WarmupManagerS3Impl::GetInodeSubPathParent(
fuse_ino_t inode, std::vector<std::string> subPath, fuse_ino_t *ret) {
if (subPath.empty()) {
return false;
}

if (subPath.size() == 1) {
*ret = inode;
return true;
}

fuse_ino_t nextInode;
std::string currentPath = subPath.front();
if (currentPath == "..") {
std::shared_ptr<InodeWrapper> inodeWrapper;
CURVEFS_ERROR statusCode = inodeManager_->GetInode(inode, inodeWrapper);
if (statusCode != CURVEFS_ERROR::OK) {
LOG(ERROR) << "get inode fail, inodeid = " << inode;
return false;
}
curve::common::UniqueLock lck = inodeWrapper->GetUniqueLock();
auto parents = inodeWrapper->GetParentLocked();
if (parents.empty()) {
// out of curvefs
// For example directory /A is symlinked to ../B
LOG(ERROR) << fmt::format(
"out of curvefs, inodeid = {}, subpath = {}", inode,
fmt::join(subPath, "/"));
return false;
}
// only support one parent
nextInode = parents[0];
} else if (currentPath == ".") {
nextInode = inode;
} else {
Dentry dentry;
CURVEFS_ERROR statusCode =
dentryManager_->GetDentry(inode, currentPath, &dentry);
if (statusCode != CURVEFS_ERROR::OK) {
LOG(ERROR) << "get dentry fail, inodeid = " << inode
<< ", path = " << currentPath;
return false;
}
if (dentry.type() == FsFileType::TYPE_SYM_LINK) {
// sym link
std::string symLink;
CURVEFS_ERROR statusCode =
fuseOpReadLink_(0, dentry.inodeid(), &symLink);
if (statusCode != CURVEFS_ERROR::OK) {
LOG(ERROR) << "readlink fail, inodeid = " << inode
<< ", path = " << currentPath;
return false;
}
std::vector<std::string> splitSymLink;
curve::common::AddSplitStringToResult(symLink, "/", &splitSymLink);
subPath.insert(subPath.begin() + 1, splitSymLink.begin(),
splitSymLink.end());
nextInode = inode;
} else {
nextInode = dentry.inodeid();
}
}

return GetInodeSubPathParent(
nextInode, std::vector<std::string>(subPath.begin() + 1, subPath.end()),
ret);
}

} // namespace warmup
} // namespace client
} // namespace curvefs
Loading

0 comments on commit f40f641

Please sign in to comment.