From 03f368e536c55d5fc2b145fc9303089668cdd642 Mon Sep 17 00:00:00 2001 From: Ken Han Date: Fri, 19 Jan 2024 09:54:26 +0000 Subject: [PATCH] [feat] curvefs: change s3 info Signed-off-by: Ken Han --- curvefs/proto/mds.proto | 13 +- curvefs/src/client/BUILD | 1 + curvefs/src/client/curve_fuse_op.cpp | 75 +++++++ curvefs/src/client/curve_fuse_op.h | 1 + curvefs/src/client/filesystem/error.h | 1 + curvefs/src/client/filesystem/xattr.h | 5 + curvefs/src/client/fuse_s3_client.cpp | 29 +++ curvefs/src/client/fuse_s3_client.h | 4 + curvefs/src/client/metric/client_metric.h | 2 + curvefs/src/client/rpcclient/base_client.cpp | 12 ++ curvefs/src/client/rpcclient/base_client.h | 8 + curvefs/src/client/rpcclient/mds_client.cpp | 33 +++ curvefs/src/client/rpcclient/mds_client.h | 7 + curvefs/src/client/s3/client_s3.cpp | 4 + curvefs/src/client/s3/client_s3.h | 2 + .../src/client/s3/client_s3_cache_manager.cpp | 4 + curvefs/src/mds/fs_info_wrapper.h | 6 + curvefs/src/mds/fs_manager.cpp | 30 +++ curvefs/src/mds/fs_manager.h | 17 +- curvefs/src/mds/mds_service.cpp | 31 +++ curvefs/src/mds/mds_service.h | 5 + curvefs/test/client/mock_client_s3.h | 1 + .../test/client/rpcclient/mock_mds_client.h | 4 + src/common/s3_adapter.h | 7 +- .../command/curvefs/update/s3info/s3info.go | 196 ++++++++++++++++++ .../pkg/cli/command/curvefs/update/update.go | 2 + tools-v2/pkg/config/fs.go | 17 ++ 27 files changed, 513 insertions(+), 4 deletions(-) create mode 100644 tools-v2/pkg/cli/command/curvefs/update/s3info/s3info.go diff --git a/curvefs/proto/mds.proto b/curvefs/proto/mds.proto index f89c13e733..37bfdfddc5 100644 --- a/curvefs/proto/mds.proto +++ b/curvefs/proto/mds.proto @@ -77,7 +77,7 @@ enum FsStatus { message FsDetail { optional common.Volume volume = 1; - optional common.S3Info s3Info = 2; + optional common.S3Info s3info = 2; } message Mountpoint { @@ -114,6 +114,16 @@ message GetFsInfoResponse { optional FsInfo fsInfo = 2; } +message UpdateS3InfoRequest { + required string fsName = 1; + required common.S3Info s3Info = 2; +} + +message UpdateS3InfoResponse { + required FSStatusCode statusCode = 1; + optional FsInfo fsInfo = 2; +} + message CreateFsRequest { required string fsName = 1; required uint64 blockSize = 2; @@ -255,6 +265,7 @@ message TsoResponse { service MdsService { // fs interface rpc CreateFs(CreateFsRequest) returns (CreateFsResponse); + rpc UpdateS3Info(UpdateS3InfoRequest) returns (UpdateS3InfoResponse); rpc MountFs(MountFsRequest) returns (MountFsResponse); rpc UmountFs(UmountFsRequest) returns (UmountFsResponse); // TODO(chengyi01): move to GetFssInfo diff --git a/curvefs/src/client/BUILD b/curvefs/src/client/BUILD index 33a3b45089..f6eefffbc6 100644 --- a/curvefs/src/client/BUILD +++ b/curvefs/src/client/BUILD @@ -92,6 +92,7 @@ cc_library( "//external:brpc", "//external:gflags", "//external:glog", + "//external:json", "//src/client:curve_client", "//src/common:curve_common", "//src/common:curve_s3_adapter", diff --git a/curvefs/src/client/curve_fuse_op.cpp b/curvefs/src/client/curve_fuse_op.cpp index cd2128d805..e59376b3b1 100644 --- a/curvefs/src/client/curve_fuse_op.cpp +++ b/curvefs/src/client/curve_fuse_op.cpp @@ -68,6 +68,7 @@ using ::curvefs::client::filesystem::EntryOut; using ::curvefs::client::filesystem::FileOut; using ::curvefs::client::filesystem::IsListWarmupXAttr; using ::curvefs::client::filesystem::IsWarmupXAttr; +using ::curvefs::client::filesystem::IsS3ConfigXAttr; using ::curvefs::client::filesystem::StrAttr; using ::curvefs::client::filesystem::StrEntry; using ::curvefs::client::filesystem::StrMode; @@ -423,6 +424,78 @@ FuseClient* Client() { return g_ClientInstance; } + +void UpdateS3Config(fuse_req_t req, + fuse_ino_t ino, + const char* name, + const char* value) { + auto fs = g_ClientInstance->GetFileSystem(); + + if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) { + LOG(ERROR) << "updating s3 config only works for s3"; + return fs->ReplyError(req, CURVEFS_ERROR::NOT_SUPPORT); + } + + const std::string fsName = g_ClientInstance->GetFsInfo()->fsname(); + const curvefs::common::S3Info oldS3Info = + g_ClientInstance->GetFsInfo()->detail().s3info(); + curvefs::common::S3Info newS3Info(oldS3Info); + + Json::CharReaderBuilder builder; + Json::CharReaderBuilder::strictMode(&builder.settings_); + std::unique_ptr reader(builder.newCharReader()); + Json::Value rootNode; + JSONCPP_STRING errormsg; + if (!reader->parse(value, value + strlen(value), &rootNode, &errormsg)) { + LOG(ERROR) << "Error parsing the input value ' " + << value + << " ': " << errormsg; + return fs->ReplyError(req, CURVEFS_ERROR::IO_ERROR); + } + + FuseS3Client* g_S3ClientInstance = + dynamic_cast(g_ClientInstance); + if (!g_S3ClientInstance) { + LOG(ERROR) << "Dynamic cast from FuseClient to FuseS3Client failed"; + return fs->ReplyError(req, CURVEFS_ERROR::INTERNAL); + } + + if (rootNode.isMember("ak") && + rootNode["ak"].asString() != oldS3Info.ak()) { + newS3Info.set_ak(rootNode["ak"].asString()); + } + + if (rootNode.isMember("sk") && + rootNode["sk"].asString() != oldS3Info.sk()) { + newS3Info.set_sk(rootNode["sk"].asString()); + } + + if (rootNode.isMember("bucketname") && + rootNode["bucketname"].asString() != oldS3Info.bucketname()) { + newS3Info.set_bucketname(rootNode["bucketname"].asString()); + } + + if (rootNode.isMember("endpoint") && + rootNode["endpoint"].asString() != oldS3Info.endpoint()) { + newS3Info.set_endpoint(rootNode["endpoint"].asString()); + } + + if (oldS3Info.SerializeAsString() == newS3Info.SerializeAsString()) { + return fs->ReplyError(req, CURVEFS_ERROR::NODATA); + } + + FsInfo fsInfo; + CURVEFS_ERROR updateStatusCode = + g_S3ClientInstance->UpdateS3Info(fsName, newS3Info, &fsInfo); + if (updateStatusCode != CURVEFS_ERROR::OK) { + return fs->ReplyError(req, updateStatusCode); + } + + g_ClientInstance->SetFsInfo(std::make_shared(fsInfo)); + + return fs->ReplyError(req, CURVEFS_ERROR::OK); +} + void TriggerWarmup(fuse_req_t req, fuse_ino_t ino, const char* name, @@ -921,6 +994,8 @@ void FuseOpSetXattr(fuse_req_t req, if (IsWarmupXAttr(name)) { return TriggerWarmup(req, ino, name, value, size); + } else if (IsS3ConfigXAttr(name)) { + return UpdateS3Config(req, ino, name, value); } rc = client->FuseOpSetXattr(req, ino, name, value, size, flags); return fs->ReplyError(req, rc); diff --git a/curvefs/src/client/curve_fuse_op.h b/curvefs/src/client/curve_fuse_op.h index 231cf26526..c5dbc28977 100644 --- a/curvefs/src/client/curve_fuse_op.h +++ b/curvefs/src/client/curve_fuse_op.h @@ -30,6 +30,7 @@ #include #include #include +#include #include "curvefs/src/client/fuse_common.h" diff --git a/curvefs/src/client/filesystem/error.h b/curvefs/src/client/filesystem/error.h index 75efe38283..d0ebfb2532 100644 --- a/curvefs/src/client/filesystem/error.h +++ b/curvefs/src/client/filesystem/error.h @@ -59,6 +59,7 @@ enum class CURVEFS_ERROR { NOSYS = -20, END_OF_FILE = -21, NOT_A_DIRECTORY = -22, + UPDATE_S3_INFO_FAILED = -23, }; std::string StrErr(CURVEFS_ERROR code); diff --git a/curvefs/src/client/filesystem/xattr.h b/curvefs/src/client/filesystem/xattr.h index 26bc3a6c2e..8cde3e1127 100644 --- a/curvefs/src/client/filesystem/xattr.h +++ b/curvefs/src/client/filesystem/xattr.h @@ -45,6 +45,7 @@ const char XATTR_DIR_RFBYTES[] = "curve.dir.rfbytes"; const char XATTR_DIR_PREFIX[] = "curve.dir"; const char XATTR_WARMUP_OP[] = "curvefs.warmup.op"; const char XATTR_WARMUP_OP_LIST[] = "curvefs.warmup.op.list"; +const char XATTR_S3_CONFIG[] = "curvefs.s3.update.config"; inline bool IsSpecialXAttr(const std::string& key) { static std::map xattrs { @@ -69,6 +70,10 @@ inline bool IsListWarmupXAttr(const std::string& key) { return key == XATTR_WARMUP_OP_LIST; } +inline bool IsS3ConfigXAttr(const std::string& key) { + return key == XATTR_S3_CONFIG; +} + } // namespace filesystem } // namespace client } // namespace curvefs diff --git a/curvefs/src/client/fuse_s3_client.cpp b/curvefs/src/client/fuse_s3_client.cpp index ab2524f76a..88573642be 100644 --- a/curvefs/src/client/fuse_s3_client.cpp +++ b/curvefs/src/client/fuse_s3_client.cpp @@ -389,6 +389,35 @@ CURVEFS_ERROR FuseS3Client::Truncate(InodeWrapper *inode, uint64_t length) { return s3Adaptor_->Truncate(inode, length); } +CURVEFS_ERROR FuseS3Client::UpdateS3Info(const std::string& fsName, + const curvefs::common::S3Info& s3Info, + FsInfo* fsInfo) { + ::curve::common::S3InfoOption s3InfoOption; + ::curvefs::client::common::S3Info2FsS3Option(s3Info, &s3InfoOption); + FSStatusCode updateStatusCode = + mdsClient_->UpdateS3Info(fsName, s3Info, fsInfo); + if (updateStatusCode != FSStatusCode::OK) { + LOG(ERROR) << "Update s3 info error code: (FSStatusCode)" + << updateStatusCode; + return CURVEFS_ERROR::UPDATE_S3_INFO_FAILED; + } + + if (option_.s3Opt.s3AdaptrOpt.ak != s3Info.ak() || + option_.s3Opt.s3AdaptrOpt.sk != s3Info.sk() || + option_.s3Opt.s3AdaptrOpt.s3Address != s3Info.endpoint() || + option_.s3Opt.s3AdaptrOpt.bucketName != s3Info.bucketname()) { + + option_.s3Opt.s3AdaptrOpt.s3Address = s3Info.endpoint(); + option_.s3Opt.s3AdaptrOpt.ak = s3Info.ak(); + option_.s3Opt.s3AdaptrOpt.sk = s3Info.sk(); + option_.s3Opt.s3AdaptrOpt.bucketName = s3Info.bucketname(); + + s3Adaptor_->GetS3Client()->Reinit(option_.s3Opt.s3AdaptrOpt); + } + + return CURVEFS_ERROR::OK; +} + CURVEFS_ERROR FuseS3Client::FuseOpFlush(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) { (void)req; diff --git a/curvefs/src/client/fuse_s3_client.h b/curvefs/src/client/fuse_s3_client.h index 1b4b4a70b4..5eec08ea7e 100644 --- a/curvefs/src/client/fuse_s3_client.h +++ b/curvefs/src/client/fuse_s3_client.h @@ -125,6 +125,10 @@ class FuseS3Client : public FuseClient { CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override; + CURVEFS_ERROR UpdateS3Info(const std::string& fsName, + const curvefs::common::S3Info& s3Info, + FsInfo* fsInfo); + private: bool InitKVCache(const KVClientManagerOpt &opt); diff --git a/curvefs/src/client/metric/client_metric.h b/curvefs/src/client/metric/client_metric.h index f5140b896d..b38cfd0b68 100644 --- a/curvefs/src/client/metric/client_metric.h +++ b/curvefs/src/client/metric/client_metric.h @@ -43,6 +43,7 @@ namespace metric { struct MDSClientMetric { static const std::string prefix; + InterfaceMetric updateS3Info; InterfaceMetric mountFs; InterfaceMetric umountFs; InterfaceMetric getFsInfo; @@ -61,6 +62,7 @@ struct MDSClientMetric { MDSClientMetric() : mountFs(prefix, "mountFs"), umountFs(prefix, "umountFs"), + updateS3Info(prefix, "updateS3Info"), getFsInfo(prefix, "getFsInfo"), getMetaServerInfo(prefix, "getMetaServerInfo"), getMetaServerListInCopysets(prefix, "getMetaServerListInCopysets"), diff --git a/curvefs/src/client/rpcclient/base_client.cpp b/curvefs/src/client/rpcclient/base_client.cpp index a2f75ae177..8a1deb716e 100644 --- a/curvefs/src/client/rpcclient/base_client.cpp +++ b/curvefs/src/client/rpcclient/base_client.cpp @@ -30,6 +30,18 @@ namespace rpcclient { using ::curvefs::mds::space::SpaceService_Stub; +void MDSBaseClient::UpdateS3Info(const std::string& fsName, + const curvefs::common::S3Info& s3Info, + UpdateS3InfoResponse* response, + brpc::Controller* cntl, + brpc::Channel* channel) { + UpdateS3InfoRequest request; + request.set_fsname(fsName); + request.mutable_s3info()->CopyFrom(s3Info); + curvefs::mds::MdsService_Stub stub(channel); + stub.UpdateS3Info(cntl, &request, response, nullptr); +} + void MDSBaseClient::MountFs(const std::string& fsName, const Mountpoint& mountPt, MountFsResponse* response, brpc::Controller* cntl, diff --git a/curvefs/src/client/rpcclient/base_client.h b/curvefs/src/client/rpcclient/base_client.h index 7f3bd8161f..d7af9da029 100644 --- a/curvefs/src/client/rpcclient/base_client.h +++ b/curvefs/src/client/rpcclient/base_client.h @@ -76,6 +76,8 @@ using curvefs::mds::FsInfo; using curvefs::mds::FsStatus; using curvefs::mds::GetFsInfoRequest; using curvefs::mds::GetFsInfoResponse; +using curvefs::mds::UpdateS3InfoRequest; +using curvefs::mds::UpdateS3InfoResponse; using curvefs::mds::MountFsRequest; using curvefs::mds::MountFsResponse; using curvefs::mds::GetLatestTxIdRequest; @@ -143,6 +145,12 @@ class MDSBaseClient { public: virtual ~MDSBaseClient() = default; + virtual void UpdateS3Info(const std::string& fsName, + const curvefs::common::S3Info& s3Info, + UpdateS3InfoResponse* response, + brpc::Controller* cntl, + brpc::Channel* channel); + virtual void MountFs(const std::string& fsName, const Mountpoint& mountPt, MountFsResponse* response, brpc::Controller* cntl, brpc::Channel* channel); diff --git a/curvefs/src/client/rpcclient/mds_client.cpp b/curvefs/src/client/rpcclient/mds_client.cpp index 2f53aa3d7d..356deb25d4 100644 --- a/curvefs/src/client/rpcclient/mds_client.cpp +++ b/curvefs/src/client/rpcclient/mds_client.cpp @@ -60,6 +60,39 @@ MdsClientImpl::Init(const ::curve::client::MetaServerOption &mdsOpt, [&](int addrindex, uint64_t rpctimeoutMS, brpc::Channel *channel, \ brpc::Controller *cntl) -> int +FSStatusCode MdsClientImpl::UpdateS3Info(const std::string& fsName, + const curvefs::common::S3Info& s3Info, + FsInfo* fsInfo) { + auto task = RPCTask { + (void)addrindex; + (void)rpctimeoutMS; + mdsClientMetric_.updateS3Info.qps.count << 1; + LatencyUpdater updater(&mdsClientMetric_.updateS3Info.latency); + UpdateS3InfoResponse response; + mdsbasecli_->UpdateS3Info(fsName, s3Info, &response, cntl, channel); + if (cntl->Failed()) { + mdsClientMetric_.updateS3Info.eps.count << 1; + LOG(WARNING) << "UpdateS3Info Failed, errorcode = " + << cntl->ErrorCode() + << ", error content:" << cntl->ErrorText() + << ", log id = " << cntl->log_id(); + return -cntl->ErrorCode(); + } + + FSStatusCode ret = response.statuscode(); + if (ret != FSStatusCode::OK) { + LOG(WARNING) << "UpdateS3Info: fsname = " << fsName + << ", s3Info = " << s3Info.ShortDebugString() + << ", errcode = " << ret + << ", errmsg = " << FSStatusCode_Name(ret); + } else if (response.has_fsinfo()) { + fsInfo->CopyFrom(response.fsinfo()); + } + return ret; + }; + return ReturnError(rpcexcutor_.DoRPCTask(task, mdsOpt_.mdsMaxRetryMS)); +} + FSStatusCode MdsClientImpl::MountFs(const std::string& fsName, const Mountpoint& mountPt, FsInfo* fsInfo) { auto task = RPCTask { diff --git a/curvefs/src/client/rpcclient/mds_client.h b/curvefs/src/client/rpcclient/mds_client.h index e74ec84c03..e7503d6c30 100644 --- a/curvefs/src/client/rpcclient/mds_client.h +++ b/curvefs/src/client/rpcclient/mds_client.h @@ -60,6 +60,7 @@ namespace curvefs { namespace client { namespace rpcclient { +using curvefs::common::S3Info; using curvefs::mds::GetLatestTxIdRequest; using curvefs::mds::GetLatestTxIdResponse; using curvefs::mds::CommitTxRequest; @@ -75,6 +76,9 @@ class MdsClient { virtual FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt, MDSBaseClient *baseclient) = 0; + virtual FSStatusCode UpdateS3Info(const std::string& fsName, + const S3Info& s3Info, FsInfo* fsInfo) = 0; + virtual FSStatusCode MountFs(const std::string& fsName, const Mountpoint& mountPt, FsInfo* fsInfo) = 0; @@ -171,6 +175,9 @@ class MdsClientImpl : public MdsClient { FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt, MDSBaseClient *baseclient) override; + FSStatusCode UpdateS3Info(const std::string& fsName, const S3Info& s3Info, + FsInfo* fsInfo) override; + FSStatusCode MountFs(const std::string& fsName, const Mountpoint& mountPt, FsInfo* fsInfo) override; diff --git a/curvefs/src/client/s3/client_s3.cpp b/curvefs/src/client/s3/client_s3.cpp index 9b329aefa1..9bceb25b8c 100644 --- a/curvefs/src/client/s3/client_s3.cpp +++ b/curvefs/src/client/s3/client_s3.cpp @@ -28,6 +28,10 @@ void S3ClientImpl::Init(const curve::common::S3AdapterOption &option) { s3Adapter_->Init(option); } +void S3ClientImpl::Reinit(const curve::common::S3AdapterOption &option) { + s3Adapter_->Reinit(option); +} + void S3ClientImpl::Deinit() { s3Adapter_->Deinit(); } diff --git a/curvefs/src/client/s3/client_s3.h b/curvefs/src/client/s3/client_s3.h index 301670a2fa..3fb626c683 100644 --- a/curvefs/src/client/s3/client_s3.h +++ b/curvefs/src/client/s3/client_s3.h @@ -41,6 +41,7 @@ class S3Client { S3Client() {} virtual ~S3Client() {} virtual void Init(const curve::common::S3AdapterOption& option) = 0; + virtual void Reinit(const curve::common::S3AdapterOption& option) = 0; virtual void Deinit() = 0; virtual int Upload(const std::string& name, const char* buf, uint64_t length) = 0; @@ -65,6 +66,7 @@ class S3ClientImpl : public S3Client { } virtual ~S3ClientImpl() {} void Init(const curve::common::S3AdapterOption& option); + void Reinit(const curve::common::S3AdapterOption& option); void Deinit(); int Upload(const std::string& name, const char* buf, uint64_t length); void UploadAsync(std::shared_ptr context); diff --git a/curvefs/src/client/s3/client_s3_cache_manager.cpp b/curvefs/src/client/s3/client_s3_cache_manager.cpp index 1bd098eb71..ed800fab0f 100644 --- a/curvefs/src/client/s3/client_s3_cache_manager.cpp +++ b/curvefs/src/client/s3/client_s3_cache_manager.cpp @@ -2524,6 +2524,10 @@ void DataCache::FlushTaskExecute( LOG(WARNING) << "Put object failed, key: " << context->key; // Retry using s3 no matter what the original was + if (--(context->retries) <= 0) { + s3TaskEvent.Signal(); + return; + } context->type = curve::common::ContextType::S3; s3ClientAdaptor_->GetS3Client()->UploadAsync(context); }; diff --git a/curvefs/src/mds/fs_info_wrapper.h b/curvefs/src/mds/fs_info_wrapper.h index ea33ad5af7..a4db99c7f7 100644 --- a/curvefs/src/mds/fs_info_wrapper.h +++ b/curvefs/src/mds/fs_info_wrapper.h @@ -35,6 +35,7 @@ namespace curvefs { namespace mds { using ::curvefs::common::FSType; +using ::curvefs::common::S3Info; // A wrapper for proto FsInfo class FsInfoWrapper { @@ -66,6 +67,11 @@ class FsInfoWrapper { fsInfo_.set_status(status); } + void SetS3Info(S3Info s3info) { + FsDetail* fsdetail_ = fsInfo_.mutable_detail(); + fsdetail_->mutable_s3info()->CopyFrom(s3info); + } + void SetFsName(const std::string& name) { fsInfo_.set_fsname(name); } diff --git a/curvefs/src/mds/fs_manager.cpp b/curvefs/src/mds/fs_manager.cpp index 7c6a1aa51a..060bef10fd 100644 --- a/curvefs/src/mds/fs_manager.cpp +++ b/curvefs/src/mds/fs_manager.cpp @@ -598,6 +598,36 @@ FSStatusCode FsManager::DeleteFs(const std::string& fsName) { return FSStatusCode::OK; } +FSStatusCode FsManager::UpdateS3Info(const std::string& fsName, + const S3Info& s3Info, FsInfo* fsInfo) { + NameLockGuard lock(nameLock_, fsName); + + // query fs + FsInfoWrapper wrapper; + FSStatusCode ret = fsStorage_->Get(fsName, &wrapper); + if (ret != FSStatusCode::OK) { + LOG(WARNING) << "UpdateS3Info fail, get fs fail, fsName = " << fsName + << ", errCode = " << FSStatusCode_Name(ret); + return ret; + } + + // update s3Info + wrapper.SetS3Info(s3Info); + // for persistence consider + ret = fsStorage_->Update(wrapper); + if (ret != FSStatusCode::OK) { + LOG(WARNING) << "UpdateS3Info fail, update fs fail, fsName = " << fsName + << ", s3Info = " << s3Info.ShortDebugString() + << ", errCode = " << FSStatusCode_Name(ret); + return ret; + } + + // convert fs info + *fsInfo = std::move(wrapper).ProtoFsInfo(); + + return FSStatusCode::OK; +} + FSStatusCode FsManager::MountFs(const std::string& fsName, const Mountpoint& mountpoint, FsInfo* fsInfo) { NameLockGuard lock(nameLock_, fsName); diff --git a/curvefs/src/mds/fs_manager.h b/curvefs/src/mds/fs_manager.h index c1f3ed1e2b..69aac08c94 100644 --- a/curvefs/src/mds/fs_manager.h +++ b/curvefs/src/mds/fs_manager.h @@ -62,10 +62,11 @@ using ::curve::common::Atomic; using ::curve::common::InterruptibleSleeper; using ::curve::common::S3Adapter; using ::curve::common::Thread; + +using ::curvefs::common::S3Info; using ::curvefs::mds::topology::PartitionTxId; using ::curvefs::mds::topology::Topology; using ::curvefs::mds::topology::TopologyManager; - using ::curvefs::mds::dlock::DLock; using ::curvefs::mds::Mountpoint; @@ -126,6 +127,20 @@ class FsManager { */ FSStatusCode DeleteFs(const std::string& fsName); + /** + * @brief Update s3 info and return the update fs info that includes + * updated s3 info. + * + * @param[in] fsName: fsname of fs + * @param[in] s3Info: fsInfo + * @param[out] fsInfo: return the fsInfo + * + * @return If success return OK; + * else return error code + */ + FSStatusCode UpdateS3Info(const std::string& fsName, + const S3Info& s3Info, FsInfo* fsInfo); + /** * @brief Mount fs, mount point can not repeat. It will increate * mountNum. diff --git a/curvefs/src/mds/mds_service.cpp b/curvefs/src/mds/mds_service.cpp index 17c33156c2..e242997ea9 100644 --- a/curvefs/src/mds/mds_service.cpp +++ b/curvefs/src/mds/mds_service.cpp @@ -141,6 +141,37 @@ void MdsServiceImpl::CreateFs(::google::protobuf::RpcController* controller, << ", capacity = " << request->capacity(); } +void MdsServiceImpl::UpdateS3Info(::google::protobuf::RpcController* controller, + const UpdateS3InfoRequest* request, + UpdateS3InfoResponse* response, + ::google::protobuf::Closure* done) { + (void)controller; + brpc::ClosureGuard doneGuard(done); + const std::string &fsName = request->fsname(); + const curvefs::common::S3Info &s3Info = request->s3info(); + + LOG(INFO) << "UpdateS3Info request, fsName = " << fsName + << ", s3Info = " << s3Info.ShortDebugString(); + + FSStatusCode status = + fsManager_->UpdateS3Info(fsName, s3Info, response->mutable_fsinfo()); + + if (status != FSStatusCode::OK) { + response->clear_fsinfo(); + response->set_statuscode(status); + LOG(ERROR) << "UpdateS3Info fail, fsName = " << fsName + << ", s3Info = " << s3Info.ShortDebugString() + << ", errCode = " << FSStatusCode_Name(status); + return; + } + + response->set_statuscode(FSStatusCode::OK); + LOG(INFO) << "UpdateS3Info success, fsName = " << fsName + << ", s3Info in response = " + << response->fsinfo().detail().s3info().ShortDebugString() + << ", mps = " << response->mutable_fsinfo()->mountpoints_size(); +} + void MdsServiceImpl::MountFs(::google::protobuf::RpcController* controller, const MountFsRequest* request, MountFsResponse* response, ::google::protobuf::Closure* done) { diff --git a/curvefs/src/mds/mds_service.h b/curvefs/src/mds/mds_service.h index dbcfaaf6f4..7d01de2a0d 100644 --- a/curvefs/src/mds/mds_service.h +++ b/curvefs/src/mds/mds_service.h @@ -50,6 +50,11 @@ class MdsServiceImpl : public MdsService { CreateFsResponse* response, ::google::protobuf::Closure* done); + void UpdateS3Info(::google::protobuf::RpcController* controller, + const UpdateS3InfoRequest* request, + UpdateS3InfoResponse* response, + ::google::protobuf::Closure* done); + void MountFs(::google::protobuf::RpcController* controller, const MountFsRequest* request, MountFsResponse* response, diff --git a/curvefs/test/client/mock_client_s3.h b/curvefs/test/client/mock_client_s3.h index 5588fd73c5..938049cc73 100644 --- a/curvefs/test/client/mock_client_s3.h +++ b/curvefs/test/client/mock_client_s3.h @@ -41,6 +41,7 @@ class MockS3Client : public S3Client { ~MockS3Client() {} MOCK_METHOD1(Init, void(const curve::common::S3AdapterOption &options)); + MOCK_METHOD1(Reinit, void(const curve::common::S3AdapterOption &options)); MOCK_METHOD0(Deinit, void()); MOCK_METHOD3(Upload, int(const std::string &name, const char *buf, uint64_t length)); diff --git a/curvefs/test/client/rpcclient/mock_mds_client.h b/curvefs/test/client/rpcclient/mock_mds_client.h index c77a2296cc..72443afb9a 100644 --- a/curvefs/test/client/rpcclient/mock_mds_client.h +++ b/curvefs/test/client/rpcclient/mock_mds_client.h @@ -50,6 +50,10 @@ class MockMdsClient : public MdsClient { FSStatusCode(const ::curve::client::MetaServerOption& mdsOpt, MDSBaseClient* baseclient)); + MOCK_METHOD3(UpdateS3Info, + FSStatusCode(const std::string& fsName, + const S3Info& s3Info, FsInfo* fsInfo)); + MOCK_METHOD3(MountFs, FSStatusCode(const std::string& fsName, const Mountpoint& mountPt, FsInfo* fsInfo)); diff --git a/src/common/s3_adapter.h b/src/common/s3_adapter.h index 2adbbfb3bc..7e7b3a32ea 100644 --- a/src/common/s3_adapter.h +++ b/src/common/s3_adapter.h @@ -162,18 +162,21 @@ struct PutObjectAsyncContext : public Aws::Client::AsyncCallerContext { butil::Timer timer; int retCode; // >= 0 success, < 0 fail ContextType type; + size_t retries; explicit PutObjectAsyncContext( std::string key, const char* buffer, size_t bufferSize, PutObjectAsyncCallBack cb = [](const std::shared_ptr&) {}, - ContextType type = ContextType::Unkown) + ContextType type = ContextType::Unkown, + size_t retries = 10) : key(std::move(key)), buffer(buffer), bufferSize(bufferSize), cb(std::move(cb)), type(type), - timer(butil::Timer::STARTED) {} + timer(butil::Timer::STARTED), + retries(retries) {} }; class S3Adapter { diff --git a/tools-v2/pkg/cli/command/curvefs/update/s3info/s3info.go b/tools-v2/pkg/cli/command/curvefs/update/s3info/s3info.go new file mode 100644 index 0000000000..184032b1c3 --- /dev/null +++ b/tools-v2/pkg/cli/command/curvefs/update/s3info/s3info.go @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2022 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveCli + * Created Date: 2024-01-24 + * Author: ken90242 (Ken Han) + */ + + package s3info + + import ( + "errors" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" + cmderror "github.com/opencurve/curve/tools-v2/internal/error" + cobrautil "github.com/opencurve/curve/tools-v2/internal/utils" + mountinfo "github.com/cilium/cilium/pkg/mountinfo" + + "github.com/opencurve/curve/tools-v2/pkg/config" + "github.com/opencurve/curve/tools-v2/pkg/output" + "github.com/spf13/cobra" + "golang.org/x/sys/unix" + ) + + const ( + updateS3InfoExample = `$ curve fs update s3info /mnt/test --s3.ak=ak` + ) + + const ( + CURVEFS_S3_UPDATE_CONFIG_XATTR = "curvefs.s3.update.config" + ) + + type S3InfoObj struct { + AK string `json:"ak,omitempty"` + SK string `json:"sk,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + Bucketname string `json:"bucketname,omitempty"` +} + + type UpdateS3InfoCommand struct { + basecmd.FinalCurveCmd + CtrlFilePath string // path in user system + MountpointInfo *mountinfo.MountInfo + S3InfoByte []byte + } + + var _ basecmd.FinalCurveCmdFunc = (*UpdateS3InfoCommand)(nil) + + func fileNotExists(filePath string) bool { + _, err := os.Stat(filePath) + return os.IsNotExist(err) +} + + func NewUpdateS3InfoCommand() *UpdateS3InfoCommand { + uCmd := &UpdateS3InfoCommand{ + FinalCurveCmd: basecmd.FinalCurveCmd{ + Use: "s3info", + Short: "update s3 info for both MDS and Fuse client", + Example: updateS3InfoExample, + }, + } + basecmd.NewFinalCurveCli(&uCmd.FinalCurveCmd, uCmd) + return uCmd + } + + func NewUpdateCommand() *cobra.Command { + return NewUpdateS3InfoCommand().Cmd + } + + func (uCmd *UpdateS3InfoCommand) AddFlags() { + config.AddS3AkOptionFlag(uCmd.Cmd) + config.AddS3SkOptionFlag(uCmd.Cmd) + config.AddS3EndpointOptionFlag(uCmd.Cmd) + config.AddS3BucknameOptionFlag(uCmd.Cmd) + } + + func (uCmd *UpdateS3InfoCommand) Init(cmd *cobra.Command, args []string) error { + // check args + ak := config.GetS3AkOptionFlag(uCmd.Cmd) + sk := config.GetS3SkOptionFlag(uCmd.Cmd) + endpoint := config.GetS3EndpointOptionFlag(uCmd.Cmd) + bucketName := config.GetS3BucknameOptionFlag(uCmd.Cmd) + + if len(args) == 0 { + return fmt.Errorf("no mountpath has been specified") + } + + // check has curvefs mountpoint + mountpoints, err := cobrautil.GetCurveFSMountPoints() + if err.TypeCode() != cmderror.CODE_SUCCESS { + return err.ToError() + } else if len(mountpoints) == 0 { + return errors.New("no curvefs mountpoint has been found") + } + + absPath, _ := filepath.Abs(args[0]) + uCmd.CtrlFilePath = "" + for _, mountpointInfo := range mountpoints { + rel, err := filepath.Rel(mountpointInfo.MountPoint, absPath) + if err == nil && !strings.HasPrefix(rel, "..") { + // found the mountpoint + if uCmd.MountpointInfo == nil || + len(uCmd.MountpointInfo.MountPoint) < len(mountpointInfo.MountPoint) { + // Prevent the curvefs directory from being mounted under the curvefs directory + // /a/b/c: + // test-1 mount in /a + // test-1 mount in /a/b + // warmup /a/b/c. + uCmd.CtrlFilePath = fmt.Sprintf("%s/%s", absPath, config.CURVEFS_CTRL_FILE) + } + } + } + + var s3Info S3InfoObj + if ak != config.FLAG2DEFAULT[config.CURVEFS_S3_AK] { + s3Info.AK = ak + } + if sk != config.FLAG2DEFAULT[config.CURVEFS_S3_SK] { + s3Info.SK = sk + } + if endpoint != config.FLAG2DEFAULT[config.CURVEFS_S3_ENDPOINT] { + s3Info.Endpoint = endpoint + } + if bucketName != config.FLAG2DEFAULT[config.CURVEFS_S3_BUCKETNAME] { + s3Info.Bucketname = bucketName + } + + jsonData, errStr := json.Marshal(s3Info) + // Serialize the S3InfoObj struct to JSON bytes + if errStr != nil { + convertErr := cmderror.ErrSetxattr() + convertErr.Format(CURVEFS_S3_UPDATE_CONFIG_XATTR, err) + return convertErr.ToError() + } + + uCmd.S3InfoByte = make([]byte, len(jsonData)) + copy(uCmd.S3InfoByte, jsonData) + + return nil + } + + func (uCmd *UpdateS3InfoCommand) Print(cmd *cobra.Command, args []string) error { + return output.FinalCmdOutput(&uCmd.FinalCurveCmd, uCmd) + } + + func (uCmd *UpdateS3InfoCommand) RunCommand(cmd *cobra.Command, args []string) error { + var createFileErr error + var file *os.File + createFileErr = nil + if fileNotExists(uCmd.CtrlFilePath) { + file, createFileErr = os.Create(uCmd.CtrlFilePath) + } + + if createFileErr != nil { + return fmt.Errorf("there was an error creating temporary control file: %s", createFileErr) + } + + defer file.Close() + + err := unix.Setxattr(uCmd.CtrlFilePath, CURVEFS_S3_UPDATE_CONFIG_XATTR, uCmd.S3InfoByte, 0) + if err == unix.ENOTSUP || err == unix.EOPNOTSUPP { + return fmt.Errorf("filesystem does not support extended attributes") + } else if err != nil { + setErr := cmderror.ErrSetxattr() + setErr.Format(CURVEFS_S3_UPDATE_CONFIG_XATTR, err.Error()) + return setErr.ToError() + } + + fmt.Println("s3 info is updated.") + + return nil + } + + func (uCmd *UpdateS3InfoCommand) ResultPlainOutput() error { + return output.FinalCmdOutputPlain(&uCmd.FinalCurveCmd) + } + \ No newline at end of file diff --git a/tools-v2/pkg/cli/command/curvefs/update/update.go b/tools-v2/pkg/cli/command/curvefs/update/update.go index 5882a195b4..b968b0e665 100644 --- a/tools-v2/pkg/cli/command/curvefs/update/update.go +++ b/tools-v2/pkg/cli/command/curvefs/update/update.go @@ -20,6 +20,7 @@ import ( basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvefs/update/fs" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvefs/update/mds" + "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvefs/update/s3info" "github.com/spf13/cobra" ) @@ -33,6 +34,7 @@ func (updateCmd *UpdateCommand) AddSubCommands() { updateCmd.Cmd.AddCommand( fs.NewFsCommand(), mds.NewMdsCommand(), + s3info.NewUpdateCommand(), ) } diff --git a/tools-v2/pkg/config/fs.go b/tools-v2/pkg/config/fs.go index 15af464ade..5b0a1ff7bb 100644 --- a/tools-v2/pkg/config/fs.go +++ b/tools-v2/pkg/config/fs.go @@ -33,6 +33,7 @@ import ( const ( // curvefs + CURVEFS_CTRL_FILE = "__ctrl_file__.tmp" CURVEFS_MDSADDR = "mdsaddr" VIPER_CURVEFS_MDSADDR = "curvefs.mdsAddr" CURVEFS_MDSDUMMYADDR = "mdsdummyaddr" @@ -658,21 +659,37 @@ func AddS3AkOptionFlag(cmd *cobra.Command) { AddStringOptionFlag(cmd, CURVEFS_S3_AK, "s3 ak") } +func GetS3AkOptionFlag(cmd *cobra.Command) string { + return GetFlagString(cmd, CURVEFS_S3_AK) +} + // S3.Sk [option] func AddS3SkOptionFlag(cmd *cobra.Command) { AddStringOptionFlag(cmd, CURVEFS_S3_SK, "s3 sk") } +func GetS3SkOptionFlag(cmd *cobra.Command) string { + return GetFlagString(cmd, CURVEFS_S3_SK) +} + // S3.Endpoint [option] func AddS3EndpointOptionFlag(cmd *cobra.Command) { AddStringOptionFlag(cmd, CURVEFS_S3_ENDPOINT, "s3 endpoint") } +func GetS3EndpointOptionFlag(cmd *cobra.Command) string { + return GetFlagString(cmd, CURVEFS_S3_ENDPOINT) +} + // S3.Buckname [option] func AddS3BucknameOptionFlag(cmd *cobra.Command) { AddStringOptionFlag(cmd, CURVEFS_S3_BUCKETNAME, "s3 buckname") } +func GetS3BucknameOptionFlag(cmd *cobra.Command) string { + return GetFlagString(cmd, CURVEFS_S3_BUCKETNAME) +} + // S3.Blocksize [option] func AddS3BlocksizeOptionFlag(cmd *cobra.Command) { AddStringOptionFlag(cmd, CURVEFS_S3_BLOCKSIZE, "s3 blocksize")