diff --git a/WORKSPACE b/WORKSPACE
index a423f1c46a..6efa122eea 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -143,6 +143,11 @@ git_repository(
"//:thirdparties/brpc/brpc.patch",
"//:thirdparties/brpc/fix-gcc11.patch",
"//:thirdparties/brpc/0001-bvar-warning-on-conflict-bvar-name.patch",
+ "//:thirdparties/brpc/0002-Support-fork-without-exec.patch",
+ "//:thirdparties/brpc/0003-Add-docs-on-fork-w-o-exec.patch",
+ "//:thirdparties/brpc/0004-not-register-pthread_atfork-in-child-process.patch",
+ "//:thirdparties/brpc/0005-Fix-LatencyRecorder-qps-not-accurate.patch",
+ "//:thirdparties/brpc/0006-fix-1973-1863.patch",
],
patch_args = ["-p1"],
)
diff --git a/curvefs/conf/client.conf b/curvefs/conf/client.conf
index 755d624920..8ba0127ec2 100644
--- a/curvefs/conf/client.conf
+++ b/curvefs/conf/client.conf
@@ -85,6 +85,12 @@ fuseClient.getThreadPool=4
# it gurantee the consistent of file after rename, otherwise you should
# disable it for performance.
fuseClient.enableMultiMountPointRename=true
+
+# the rename transaction models are different between version 1 and version 2
+# the v2 version greatly improves the performance of rename, especially in concurrent scenarios.
+# Node: v1 and v2 are incompatible and cannot be directly upgraded from a v1 cluster to v2.
+fuseClient.txVersion=1
+
# splice will bring higher performance in some cases
# but there might be a kernel issue that will cause kernel panic when enabling it
# see https://lore.kernel.org/all/CAAmZXrsGg2xsP1CK+cbuEMumtrqdvD-NKnWzhNcvn71RV3c1yw@mail.gmail.com/
diff --git a/curvefs/conf/metaserver.conf b/curvefs/conf/metaserver.conf
index a16fd4d101..1140bef40d 100644
--- a/curvefs/conf/metaserver.conf
+++ b/curvefs/conf/metaserver.conf
@@ -258,11 +258,17 @@ storage.rocksdb.unordered_write_buffer_size=67108864
# for store inode which exclude its s3chunkinfo list (default: 3)
storage.rocksdb.unordered_max_write_buffer_number=3
# rocksdb column family's write_buffer_size
-# for store dentry and inode's s3chunkinfo list (unit: bytes, default: 128MB)
+# for store dentry and inode's s3chunkinfo list (unit: bytes, default: 64MB)
storage.rocksdb.ordered_write_buffer_size=67108864
# rocksdb column family's max_write_buffer_number
# for store dentry and inode's s3chunkinfo list (default: 3)
storage.rocksdb.ordered_max_write_buffer_number=3
+# rocksdb column family's write_buffer_size
+# for store tx lock and write (unit: bytes, default: 64MB)
+storage.rocksdb.tx_cf_write_buffer_size=67108864
+# rocksdb column family's max_write_buffer_number
+# for store tx lock and write (default: 3)
+storage.rocksdb.tx_cf_max_write_buffer_number=3
# The target number of write history bytes to hold in memory (default: 20MB)
storage.rocksdb.max_write_buffer_size_to_maintain=20971520
# rocksdb memtable prefix bloom size ratio (size=write_buffer_size*memtable_prefix_bloom_size_ratio)
@@ -286,6 +292,8 @@ storage.rocksdb.perf_sampling_ratio=0
# we will sending its with rpc streaming instead of
# padding its into inode (default: 25000, about 25000 * 41 (byte) = 1MB)
storage.s3_meta_inside_inode.limit_size=25000
+# TTL(millisecond) for tx lock
+storage.tx_lock_ttl_ms=5000
# recycle options
# metaserver scan recycle period, default 1h
diff --git a/curvefs/devops/util/tmpl.sh b/curvefs/devops/util/tmpl.sh
index cb1e5ea181..ed69596098 100755
--- a/curvefs/devops/util/tmpl.sh
+++ b/curvefs/devops/util/tmpl.sh
@@ -3,7 +3,7 @@
# Usage:
# tmpl.sh DSV SOURCE DESTINATION
# Example:
-# tmpl.sh = /usr/local/metaserver.conf /tmp/metaserver.con
+# tmpl.sh = /usr/local/metaserver.conf /tmp/metaserver.conf
g_dsv=$1
g_src=$2
diff --git a/curvefs/docker/debian11/entrypoint.sh b/curvefs/docker/debian11/entrypoint.sh
index 0ca397dace..35faec3cad 100755
--- a/curvefs/docker/debian11/entrypoint.sh
+++ b/curvefs/docker/debian11/entrypoint.sh
@@ -104,7 +104,10 @@ function prepare() {
}
function create_directory() {
- chmod 700 "$g_prefix/data"
+ if [ "$g_role" != "monitor" ]; then
+ chmod 700 "$g_prefix/data"
+ fi
+
if [ "$g_role" == "etcd" ]; then
mkdir -p "$g_prefix/data/wal"
elif [ "$g_role" == "metaserver" ]; then
diff --git a/curvefs/docker/openeuler/entrypoint.sh b/curvefs/docker/openeuler/entrypoint.sh
index 0ca397dace..35faec3cad 100644
--- a/curvefs/docker/openeuler/entrypoint.sh
+++ b/curvefs/docker/openeuler/entrypoint.sh
@@ -104,7 +104,10 @@ function prepare() {
}
function create_directory() {
- chmod 700 "$g_prefix/data"
+ if [ "$g_role" != "monitor" ]; then
+ chmod 700 "$g_prefix/data"
+ fi
+
if [ "$g_role" == "etcd" ]; then
mkdir -p "$g_prefix/data/wal"
elif [ "$g_role" == "metaserver" ]; then
diff --git a/curvefs/proto/common.proto b/curvefs/proto/common.proto
index a56402c744..25ba220f42 100644
--- a/curvefs/proto/common.proto
+++ b/curvefs/proto/common.proto
@@ -80,7 +80,7 @@ message PartitionInfo {
// partition manage inodeid range [start, end]
required uint64 start = 5;
required uint64 end = 6;
- required uint64 txId = 7;
+ optional uint64 txId = 7;
optional uint64 nextId = 8;
// status can change from READWRITE to READONLY, but can not chanage from READONLY to READWRITE
// READWRITE/READONLY can change to DELETING, but DELETING can not change to READWRITE/READONLY
diff --git a/curvefs/proto/mds.proto b/curvefs/proto/mds.proto
index a3c6ca1c26..f89c13e733 100644
--- a/curvefs/proto/mds.proto
+++ b/curvefs/proto/mds.proto
@@ -194,6 +194,7 @@ message RefreshSessionRequest {
required string fsName = 2;
required Mountpoint mountpoint = 3;
optional FsDelta fsDelta = 4;
+ optional string mdsAddrs = 5;
}
message RefreshSessionResponse {
@@ -202,6 +203,7 @@ message RefreshSessionResponse {
optional bool enableSumInDir = 3;
optional uint64 fsCapacity = 4;
optional uint64 fsUsedBytes = 5;
+ optional string mdsAddrsOverride = 6;
}
message DLockValue {
@@ -234,6 +236,22 @@ message CommitTxResponse {
required FSStatusCode statusCode = 1;
}
+message SetClientMdsAddrsOverrideRequest {
+ required string clientMdsAddrsOverride = 1;
+}
+
+message SetClientMdsAddrsOverrideResponse {
+ required FSStatusCode statusCode = 1;
+}
+
+message TsoRequest {}
+
+message TsoResponse {
+ required FSStatusCode statusCode = 1;
+ optional uint64 ts = 2; // transaction sequence number
+ optional uint64 timestamp = 3;
+}
+
service MdsService {
// fs interface
rpc CreateFs(CreateFsRequest) returns (CreateFsResponse);
@@ -249,6 +267,11 @@ service MdsService {
rpc GetLatestTxId(GetLatestTxIdRequest) returns (GetLatestTxIdResponse);
rpc CommitTx(CommitTxRequest) returns (CommitTxResponse);
+ rpc Tso(TsoRequest) returns (TsoResponse);
+
// client lease
rpc RefreshSession(RefreshSessionRequest) returns (RefreshSessionResponse);
+
+ // client mds addrs override, for mds migration
+ rpc SetClientMdsAddrsOverride(SetClientMdsAddrsOverrideRequest) returns (SetClientMdsAddrsOverrideResponse);
}
diff --git a/curvefs/proto/metaserver.proto b/curvefs/proto/metaserver.proto
index f0ab8167a2..d22246069e 100644
--- a/curvefs/proto/metaserver.proto
+++ b/curvefs/proto/metaserver.proto
@@ -49,6 +49,15 @@ enum MetaStatusCode {
RPC_STREAM_ERROR = 25;
INODE_S3_META_TOO_LARGE = 26;
STORAGE_CLOSED = 27;
+ // tx v2 related
+ TX_FAILED = 28;
+ TX_WRITE_CONFLICT = 29;
+ TX_KEY_LOCKED = 30;
+ TX_COMMITTED = 31;
+ TX_ROLLBACKED = 32;
+ TX_TIMEOUT = 33;
+ TX_INPROGRESS = 34;
+ TX_MISMATCH = 35;
}
// dentry interface
@@ -59,7 +68,7 @@ message GetDentryRequest {
required uint32 fsId = 4;
required uint64 parentInodeId = 5;
required string name = 6;
- required uint64 txId = 7;
+ optional uint64 txId = 7;
optional uint64 appliedIndex = 8;
}
@@ -74,7 +83,8 @@ message Dentry {
required uint64 inodeId = 2;
required uint64 parentInodeId = 3;
required string name = 4;
- required uint64 txId = 5;
+ // reused txId as ts in tx v2 for compatibility in metaserver
+ optional uint64 txId = 5;
optional uint32 flag = 6;
optional FsFileType type = 7;
optional uint64 txSequence = 8;
@@ -88,6 +98,7 @@ message GetDentryResponse {
required MetaStatusCode statusCode = 1;
optional Dentry dentry = 2;
optional uint64 appliedIndex = 3;
+ optional TxLock txLock = 4;
}
message ListDentryRequest {
@@ -96,7 +107,7 @@ message ListDentryRequest {
required uint32 partitionId = 3;
required uint32 fsId = 4;
required uint64 dirInodeId = 5;
- required uint64 txId = 6;
+ optional uint64 txId = 6;
optional string last = 7; // the name of last entry
optional uint32 count = 8; // the number of entry required
optional bool onlyDir = 9;
@@ -107,6 +118,7 @@ message ListDentryResponse {
required MetaStatusCode statusCode = 1;
repeated Dentry dentrys = 2;
optional uint64 appliedIndex = 3;
+ optional TxLock txLock = 4;
}
message CreateDentryRequest {
@@ -120,6 +132,7 @@ message CreateDentryRequest {
message CreateDentryResponse {
required MetaStatusCode statusCode = 1;
optional uint64 appliedIndex = 2;
+ optional TxLock txLock = 3;
}
message DeleteDentryRequest {
@@ -127,15 +140,17 @@ message DeleteDentryRequest {
required uint32 copysetId = 2;
required uint32 partitionId = 3;
required uint32 fsId = 4;
- required uint64 txId = 5;
+ optional uint64 txId = 5;
required uint64 parentInodeId = 6;
required string name = 7;
optional FsFileType type = 8;
+ optional Time create = 9;
}
message DeleteDentryResponse {
required MetaStatusCode statusCode = 1;
optional uint64 appliedIndex = 2;
+ optional TxLock txLock = 3;
}
message PrepareRenameTxRequest {
@@ -159,6 +174,85 @@ message PrepareRenameTxResponse {
optional uint64 appliedIndex = 2;
}
+message TxLock {
+ required string primaryKey = 1;
+ required uint64 startTs = 2;
+ required uint64 timestamp = 3;
+ optional uint32 index = 4;
+ optional int32 ttl = 5;
+}
+
+enum TxWriteKind {
+ Commit = 1;
+ Rollback = 2;
+}
+
+message TS {
+ required uint64 ts = 1;
+}
+
+message TxWrite {
+ required uint64 startTs = 1;
+ required TxWriteKind kind = 2;
+}
+
+message PrewriteRenameTxRequest {
+ required uint32 poolId = 1;
+ required uint32 copysetId = 2;
+ required uint32 partitionId = 3;
+ repeated Dentry dentrys = 4;
+ required TxLock txLock = 5;
+}
+
+message PrewriteRenameTxResponse {
+ required MetaStatusCode statusCode = 1;
+ repeated Dentry dentrys = 2;
+ optional TxLock txLock = 3;
+ optional uint64 appliedIndex = 4;
+}
+
+message CheckTxStatusRequest {
+ required uint32 poolId = 1;
+ required uint32 copysetId = 2;
+ required uint32 partitionId = 3;
+ required string primaryKey = 4;
+ required uint64 startTs = 5;
+ required uint64 curTimestamp = 6;
+}
+
+message CheckTxStatusResponse {
+ required MetaStatusCode statusCode = 1;
+ optional uint64 appliedIndex = 2;
+}
+
+message ResolveTxLockRequest {
+ required uint32 poolId = 1;
+ required uint32 copysetId = 2;
+ required uint32 partitionId = 3;
+ required Dentry dentry = 4;
+ required uint64 startTs = 5;
+ required uint64 commitTs = 6;
+}
+
+message ResolveTxLockResponse {
+ required MetaStatusCode statusCode = 1;
+ optional uint64 appliedIndex = 2;
+}
+
+message CommitTxRequest {
+ required uint32 poolId = 1;
+ required uint32 copysetId = 2;
+ required uint32 partitionId = 3;
+ repeated Dentry dentrys = 4;
+ required uint64 startTs = 5;
+ required uint64 commitTs = 6;
+}
+
+message CommitTxResponse {
+ required MetaStatusCode statusCode = 1;
+ optional uint64 appliedIndex = 2;
+}
+
// inode interface
message GetInodeRequest {
required uint32 poolId = 1;
@@ -266,7 +360,7 @@ message Inode {
optional uint64 rdev = 16;
// field 17 is left for compatibility
map
s3ChunkInfoMap = 18; // TYPE_S3 only, first is chunk index
- optional uint32 dtime = 19;
+ optional uint64 dtime = 19;
optional uint32 openmpcount = 20; // openmpcount mount points had the file open
map xattr = 21;
repeated uint64 parent = 22;
@@ -538,6 +632,11 @@ service MetaServerService {
rpc CreateDentry(CreateDentryRequest) returns (CreateDentryResponse);
rpc DeleteDentry(DeleteDentryRequest) returns (DeleteDentryResponse);
rpc PrepareRenameTx(PrepareRenameTxRequest) returns (PrepareRenameTxResponse);
+ // tx v2
+ rpc PrewriteRenameTx(PrewriteRenameTxRequest) returns (PrewriteRenameTxResponse);
+ rpc CheckTxStatus(CheckTxStatusRequest) returns (CheckTxStatusResponse);
+ rpc ResolveTxLock(ResolveTxLockRequest) returns (ResolveTxLockResponse);
+ rpc CommitTx(CommitTxRequest) returns (CommitTxResponse);
// inode interface
rpc GetInode(GetInodeRequest) returns (GetInodeResponse);
diff --git a/curvefs/proto/topology.proto b/curvefs/proto/topology.proto
index c9d70682b5..f248655fca 100644
--- a/curvefs/proto/topology.proto
+++ b/curvefs/proto/topology.proto
@@ -61,7 +61,6 @@ message ClusterInfoData {
required string clusterId = 1;
//
map partitionIndexs = 2;
-
}
message PoolData {
diff --git a/curvefs/sdk/README.md b/curvefs/sdk/README.md
index c58aa8feba..6848362017 100644
--- a/curvefs/sdk/README.md
+++ b/curvefs/sdk/README.md
@@ -6,12 +6,12 @@ How to build
``` bash
$ git clone git@github.com:opencurve/curve.git
-$ cd curve
-$ make dep stor=fs
+$ make playground
+$ make ci-dep stor=fs
$ make sdk
```
-It will generate a jar after build success:
+It will generate a jar package after build success:
```
Build SDK success => /curve/curvefs/sdk/output/curvefs-hadoop-1.0-SNAPSHOT.jar
diff --git a/curvefs/sdk/java/native/BUILD b/curvefs/sdk/java/native/BUILD
index 3361aeea44..13dcef4778 100644
--- a/curvefs/sdk/java/native/BUILD
+++ b/curvefs/sdk/java/native/BUILD
@@ -26,6 +26,8 @@ cc_binary(
copts = CURVE_DEFAULT_COPTS,
linkopts = [
"-Wl,-rpath=/tmp/libcurvefs,--disable-new-dtags",
+ "-L/usr/lib/x86_64-linux-gnu/",
+ "-lhashkit",
],
deps = [
"@com_google_absl//absl/cleanup",
diff --git a/curvefs/sdk/java/pom.xml b/curvefs/sdk/java/pom.xml
index b3f15f585c..a3ed98aea9 100644
--- a/curvefs/sdk/java/pom.xml
+++ b/curvefs/sdk/java/pom.xml
@@ -30,7 +30,25 @@
native/build
+
+ src/main/resources
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ package
+
+ shade
+
+
+
+
+
diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java
index ac42dfaf82..2a79fcc1c4 100644
--- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java
+++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright (c) 2023 NetEase Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.opencurve.curve.fs.flink;
import io.opencurve.curve.fs.hadoop.CurveFileSystem;
@@ -10,28 +26,17 @@
import java.net.URI;
public class CurveFileSystemFactory implements FileSystemFactory {
- private org.apache.hadoop.conf.Configuration conf;
-
+ private org.apache.hadoop.conf.Configuration conf = new Configuration();
private static final String CURVE_FS_CONFIG_PREFIXES = "curvefs.";
private static final String FLINK_CONFIG_PREFIXES = "fs.";
public static String SCHEME = "curvefs";
@Override
public void configure(org.apache.flink.configuration.Configuration config) {
- conf = new Configuration();
- if (config != null) {
- for (String key : config.keySet()) {
- if (key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES)) {
- String value = config.getString(key, null);
- if (value != null) {
- if (CurveFileSystem.class.getCanonicalName().equals(value.trim())) {
- SCHEME = key.split("\\.")[1];
- }
- conf.set(key, value);
- }
- }
- }
- }
+ config.keySet()
+ .stream()
+ .filter(key -> key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES))
+ .forEach(key -> conf.set(key, config.getString(key, "")));
}
@Override
@@ -45,4 +50,4 @@ public FileSystem create(URI uri) throws IOException {
fs.initialize(uri, conf);
return new HadoopFileSystem(fs);
}
-}
+}
\ No newline at end of file
diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java
index da68151bbd..d065492109 100644
--- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java
+++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright (c) 2023 NetEase Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.opencurve.curve.fs.flink;
import org.apache.flink.connector.file.table.FileSystemTableFactory;
diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java
index 2dd1be8d1d..25ad56564f 100644
--- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java
+++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java
@@ -59,8 +59,6 @@ public class CurveFSInputStream extends FSInputStream {
*/
public CurveFSInputStream(Configuration conf, CurveFSProto curvefs,
int fh, long flength, int bufferSize) {
- // Whoever's calling the constructor is responsible for doing the actual curve_open
- // call and providing the file handle.
fileLength = flength;
fileHandle = fh;
closed = false;
@@ -73,6 +71,7 @@ public CurveFSInputStream(Configuration conf, CurveFSProto curvefs,
/** Curve likes things to be closed before it shuts down,
* so closing the IOStream stuff voluntarily in a finalizer is good
*/
+ @Override
protected void finalize() throws Throwable {
try {
if (!closed) {
@@ -91,7 +90,6 @@ private synchronized boolean fillBuffer() throws IOException {
bufValid = 0;
- // attempt to reset to old position. If it fails, too bad.
curve.lseek(fileHandle, curvePos, CurveFSMount.SEEK_SET);
throw new IOException("Failed to fill read buffer! Error code:" + err);
}
@@ -102,6 +100,7 @@ private synchronized boolean fillBuffer() throws IOException {
/*
* Get the current position of the stream.
*/
+ @Override
public synchronized long getPos() throws IOException {
return curvePos - bufValid + bufPos;
}
@@ -117,6 +116,7 @@ public synchronized int available() throws IOException {
return (int) (fileLength - getPos());
}
+ @Override
public synchronized void seek(long targetPos) throws IOException {
LOG.trace("CurveInputStream.seek: Seeking to position " + targetPos + " on fd "
+ fileHandle);
@@ -142,6 +142,7 @@ public synchronized void seek(long targetPos) throws IOException {
* they'll be dealt with before anybody even tries to call this method!
* @return false.
*/
+ @Override
public synchronized boolean seekToNewSource(long targetPos) {
return false;
}
diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java
index 90366cc210..26d0492142 100644
--- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java
+++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java
@@ -29,6 +29,7 @@
import io.opencurve.curve.fs.libfs.CurveFSStat;
import io.opencurve.curve.fs.libfs.CurveFSStatVFS;
+import java.util.UUID;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
@@ -37,6 +38,7 @@
class CurveFSTalker extends CurveFSProto {
private CurveFSMount mount;
private String fsname = null;
+ private String mountpoint = null;
private boolean inited = false;
private static final String PREFIX_KEY = "curvefs";
@@ -72,14 +74,15 @@ void initialize(URI uri, Configuration conf) throws IOException {
if (null == fsname || fsname.isEmpty()) {
throw new IOException("curvefs.name is not set");
}
- mount.mount(fsname, "/");
+ mountpoint = UUID.randomUUID().toString();
+ mount.mount(fsname, mountpoint);
inited = true;
}
@Override
void shutdown() throws IOException {
if (inited) {
- mount.umount(fsname, "/");
+ mount.umount(fsname, mountpoint);
mount = null;
inited = false;
}
@@ -179,4 +182,4 @@ void chown(Path path, int uid, int gid) throws IOException {
void rename(Path src, Path dst) throws IOException {
mount.rename(tostr(src), tostr(dst));
}
-}
+}
\ No newline at end of file
diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java
index 09df042c1b..fc031d38d8 100644
--- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java
+++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java
@@ -59,12 +59,14 @@ private Path makeAbsolute(Path path) {
return new Path(workingDir, path);
}
+ @Override
public URI getUri() {
return uri;
}
+ @Override
public String getScheme() {
- return uri.getScheme();
+ return "hdfs";
}
@Override
@@ -85,14 +87,12 @@ public void initialize(URI uri, Configuration conf) throws IOException {
this.workingDir = getHomeDirectory();
}
-
+ @Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
path = makeAbsolute(path);
- // throws filenotfoundexception if path is a directory
int fd = curve.open(path, CurveFSMount.O_RDONLY, 0);
- /* get file size */
CurveFSStat stat = new CurveFSStat();
curve.fstat(fd, stat);
@@ -102,10 +102,11 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
@Override
public void close() throws IOException {
- super.close(); // this method does stuff, make sure it's run!
+ super.close();
curve.shutdown();
}
+ @Override
public FSDataOutputStream append(Path path, int bufferSize, Progressable progress) throws IOException {
path = makeAbsolute(path);
@@ -122,6 +123,7 @@ public FSDataOutputStream append(Path path, int bufferSize, Progressable progres
return new FSDataOutputStream(ostream, statistics);
}
+ @Override
public Path getWorkingDirectory() {
return workingDir;
}
@@ -144,6 +146,7 @@ public boolean mkdirs(Path f) throws IOException {
return mkdirs(f, perms);
}
+ @Override
public FileStatus getFileStatus(Path path) throws IOException {
path = makeAbsolute(path);
@@ -160,7 +163,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
return status;
}
-
+ @Override
public FileStatus[] listStatus(Path path) throws IOException {
path = makeAbsolute(path);
@@ -174,12 +177,10 @@ public FileStatus[] listStatus(Path path) throws IOException {
for (int i = 0; i < status.length; i++) {
status[i] = getFileStatus(new Path(path, dirlist[i]));
}
- curve.shutdown();
return status;
} else {
throw new FileNotFoundException("File " + path + " does not exist.");
}
-
}
@Override
@@ -208,9 +209,9 @@ public void setTimes(Path path, long mtime, long atime) throws IOException {
curve.setattr(path, stat, mask);
}
+ @Override
public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {
-
path = makeAbsolute(path);
boolean exists = exists(path);
@@ -268,6 +269,7 @@ public void setOwner(Path path, String username, String groupname) throws IOExce
}
@Deprecated
+ @Override
public FSDataOutputStream createNonRecursive(Path path, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
@@ -278,7 +280,7 @@ public FSDataOutputStream createNonRecursive(Path path, FsPermission permission,
if (parent != null) {
CurveFSStat stat = new CurveFSStat();
- curve.lstat(parent, stat); // handles FileNotFoundException case
+ curve.lstat(parent, stat);
if (stat.isFile()) {
throw new FileAlreadyExistsException(parent.toString());
}
@@ -314,14 +316,15 @@ public boolean rename(Path src, Path dst) throws IOException {
}
@Deprecated
+ @Override
public boolean delete(Path path) throws IOException {
return delete(path, false);
}
+ @Override
public boolean delete(Path path, boolean recursive) throws IOException {
path = makeAbsolute(path);
- /* path exists? */
FileStatus status;
try {
status = getFileStatus(path);
@@ -329,13 +332,11 @@ public boolean delete(Path path, boolean recursive) throws IOException {
return false;
}
- /* we're done if its a file */
if (status.isFile()) {
curve.unlink(path);
return true;
}
- /* get directory contents */
FileStatus[] dirlist = listStatus(path);
if (dirlist == null) {
return false;
@@ -383,6 +384,6 @@ protected int getDefaultPort() {
@Override
public String getCanonicalServiceName() {
- return null; // Does not support Token
+ return null;
}
}
diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java
deleted file mode 100644
index d488e309dc..0000000000
--- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package io.opencurve.curve.fs.hadoop;
-
-public class Main {
- public static void main(String[] args) {
- System.out.println("Hello world!");
- }
-}
diff --git a/curvefs/sdk/libcurvefs/libcurvefs.cpp b/curvefs/sdk/libcurvefs/libcurvefs.cpp
index d53c0b51b8..fb54c3b509 100644
--- a/curvefs/sdk/libcurvefs/libcurvefs.cpp
+++ b/curvefs/sdk/libcurvefs/libcurvefs.cpp
@@ -143,7 +143,7 @@ int curvefs_open(uintptr_t instance_ptr,
}
}
- uint64_t fd;
+ uint64_t fd = 0;
rc = mount->vfs->Open(path, flags, mode, &fd);
if (rc != CURVEFS_ERROR::OK) {
return SysErr(rc);
@@ -164,7 +164,7 @@ ssize_t curvefs_read(uintptr_t instance_ptr,
int fd,
char* buffer,
size_t count) {
- size_t nread;
+ size_t nread = 0;
auto mount = get_instance(instance_ptr);
auto rc = mount->vfs->Read(fd, buffer, count, &nread);
if (rc != CURVEFS_ERROR::OK) {
diff --git a/curvefs/src/client/BUILD b/curvefs/src/client/BUILD
index 450e8b6ced..33a3b45089 100644
--- a/curvefs/src/client/BUILD
+++ b/curvefs/src/client/BUILD
@@ -99,6 +99,7 @@ cc_library(
"//curvefs/src/common:metric_utils",
"//curvefs/src/common:dynamic_vlog",
"//curvefs/src/common:threading",
+ "//curvefs/src/metaserver:metaserver_storage_conv",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
diff --git a/curvefs/src/client/client_operator.cpp b/curvefs/src/client/client_operator.cpp
index 720ce65ee3..fc41375caf 100644
--- a/curvefs/src/client/client_operator.cpp
+++ b/curvefs/src/client/client_operator.cpp
@@ -20,18 +20,22 @@
* Author: Jingli Chen (Wine93)
*/
+#include "curvefs/src/client/client_operator.h"
+
#include
-#include "src/common/uuid.h"
-#include "curvefs/src/client/client_operator.h"
#include "curvefs/src/client/filesystem/error.h"
+#include "curvefs/src/metaserver/storage/converter.h"
+#include "src/common/uuid.h"
namespace curvefs {
namespace client {
using ::curve::common::UUIDGenerator;
-using ::curvefs::metaserver::DentryFlag;
-using ::curvefs::mds::topology::PartitionTxId;
using ::curvefs::client::filesystem::ToFSError;
+using ::curvefs::mds::topology::PartitionTxId;
+using ::curvefs::metaserver::DentryFlag;
+using ::curvefs::metaserver::TxLock;
+using ::curvefs::metaserver::storage::Key4Dentry;
#define LOG_ERROR(action, rc) \
LOG(ERROR) << action << " failed, retCode = " << rc \
@@ -60,6 +64,7 @@ RenameOperator::RenameOperator(uint32_t fsId,
dstTxId_(0),
oldInodeId_(0),
oldInodeSize_(-1),
+ startTs_(0),
dentryManager_(dentryManager),
inodeManager_(inodeManager),
metaClient_(metaClient),
@@ -77,6 +82,7 @@ std::string RenameOperator::DebugString() {
<< ", srcPartitionId = " << srcPartitionId_
<< ", dstPartitionId = " << dstPartitionId_
<< ", srcTxId = " << srcTxId_ << ", dstTxId_ = " << dstTxId_
+ << ", startTs = " << startTs_
<< ", oldInodeId = " << oldInodeId_
<< ", srcDentry = [" << srcDentry_.ShortDebugString() << "]"
<< ", dstDentry = [" << dstDentry_.ShortDebugString() << "]"
@@ -194,7 +200,6 @@ CURVEFS_ERROR RenameOperator::RecordOldInodeInfo() {
return CURVEFS_ERROR::NOT_EXIST;
}
}
-
return CURVEFS_ERROR::OK;
}
@@ -204,7 +209,6 @@ CURVEFS_ERROR RenameOperator::PrepareRenameTx(
if (rc != MetaStatusCode::OK) {
LOG_ERROR("PrepareRenameTx", rc);
}
-
return ToFSError(rc);
}
@@ -272,6 +276,118 @@ CURVEFS_ERROR RenameOperator::CommitTx() {
return CURVEFS_ERROR::OK;
}
+CURVEFS_ERROR RenameOperator::PrewriteRenameTx(
+ const std::vector& dentrys, const TxLock& txLockIn) {
+ TxLock txLockOut;
+ uint32_t dcount = 0;
+ auto rc = metaClient_->PrewriteRenameTx(dentrys, txLockIn, &txLockOut);
+ while (rc == MetaStatusCode::TX_KEY_LOCKED) {
+ dcount += txLockOut.index();
+ auto rt = dentryManager_->CheckAndResolveTx(dentrys[dcount],
+ txLockOut, txLockIn.timestamp(), txLockIn.startts());
+ if (rt != MetaStatusCode::OK) {
+ LOG_ERROR("CheckAndResolveTx", rt);
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ if (dcount < dentrys.size()) {
+ rc = metaClient_->PrewriteRenameTx(std::vector(
+ dentrys.begin() + dcount, dentrys.end()),
+ txLockIn, &txLockOut);
+ } else {
+ break;
+ }
+ }
+ if (rc != MetaStatusCode::OK) {
+ LOG_ERROR("PrewriteRenameTx", rc);
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ return CURVEFS_ERROR::OK;
+}
+
+CURVEFS_ERROR RenameOperator::PrewriteTx() {
+ uint64_t timestamp;
+ auto rc = mdsClient_->Tso(&startTs_, ×tamp);
+ if (rc != FSStatusCode::OK) {
+ LOG_ERROR("start Tso", rc);
+ return CURVEFS_ERROR::INTERNAL;
+ }
+
+ dentry_ = Dentry(srcDentry_);
+ dentry_.set_flag(DentryFlag::DELETE_MARK_FLAG);
+ dentry_.set_type(srcDentry_.type());
+ dentry_.set_txid(startTs_);
+
+ newDentry_ = Dentry(srcDentry_);
+ newDentry_.set_parentinodeid(newParentId_);
+ newDentry_.set_name(newname_);
+ newDentry_.set_type(srcDentry_.type());
+ newDentry_.set_txid(startTs_);
+
+ Key4Dentry key4Dentry(
+ dentry_.fsid(), dentry_.parentinodeid(), dentry_.name());
+ std::string primaryKey = key4Dentry.SerializeToString();
+
+ TxLock txLockIn;
+ txLockIn.set_primarykey(primaryKey);
+ txLockIn.set_startts(startTs_);
+ txLockIn.set_timestamp(timestamp);
+
+ if (!metaClient_->GetPartitionId(dentry_.fsid(), dentry_.parentinodeid(),
+ &srcPartitionId_) || !metaClient_->GetPartitionId(newDentry_.fsid(),
+ newDentry_.parentinodeid(), &dstPartitionId_)) {
+ LOG_ERROR("GetPartitionId", rc);
+ return CURVEFS_ERROR::INTERNAL;
+ }
+
+ // note: do not prewrite concurrently, the tx write table clear logic based primary key prewrite first // NOLINT
+ CURVEFS_ERROR rt = CURVEFS_ERROR::OK;
+ std::vector dentrys{dentry_};
+ if (srcPartitionId_ == dstPartitionId_) {
+ dentrys.push_back(newDentry_);
+ rt = PrewriteRenameTx(dentrys, txLockIn);
+ } else {
+ rt = PrewriteRenameTx(dentrys, txLockIn);
+ if (rt == CURVEFS_ERROR::OK) {
+ dentrys[0] = newDentry_;
+ rt = PrewriteRenameTx(dentrys, txLockIn);
+ }
+ }
+ if (rt != CURVEFS_ERROR::OK) {
+ LOG_ERROR("PrepPrewriteTxareTx", rc);
+ return rt;
+ }
+ return CURVEFS_ERROR::OK;
+}
+
+CURVEFS_ERROR RenameOperator::CommitTxV2() {
+ uint64_t commitTs;
+ uint64_t timestamp;
+ auto rc = mdsClient_->Tso(&commitTs, ×tamp);
+ if (rc != FSStatusCode::OK) {
+ LOG_ERROR("CommitTxV2 Tso", rc);
+ return CURVEFS_ERROR::INTERNAL;
+ }
+
+ MetaStatusCode rt = MetaStatusCode::OK;
+ std::vector dentrys{dentry_};
+ if (srcPartitionId_ == dstPartitionId_) {
+ dentrys.push_back(newDentry_);
+ rt = metaClient_->CommitTx(dentrys, startTs_, commitTs);
+ } else {
+ rt = metaClient_->CommitTx(dentrys, startTs_, commitTs);
+ if (rt == MetaStatusCode::OK) {
+ dentrys[0] = newDentry_;
+ // do not need check second key commit result
+ metaClient_->CommitTx(dentrys, startTs_, commitTs);
+ }
+ }
+ if (rt != MetaStatusCode::OK) {
+ LOG_ERROR("CommitTx", rt);
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ return CURVEFS_ERROR::OK;
+}
+
CURVEFS_ERROR RenameOperator::LinkInode(uint64_t inodeId, uint64_t parent) {
std::shared_ptr inodeWrapper;
auto rc = inodeManager_->GetInode(inodeId, inodeWrapper);
@@ -413,8 +529,6 @@ CURVEFS_ERROR RenameOperator::UpdateInodeCtime() {
LOG_ERROR("UpdateInodeCtime", rc);
return rc;
}
-
- LOG(INFO) << "UpdateInodeCtime inodeid = " << srcDentry_.inodeid();
return rc;
}
diff --git a/curvefs/src/client/client_operator.h b/curvefs/src/client/client_operator.h
index 0f073a67c4..0408fd1181 100644
--- a/curvefs/src/client/client_operator.h
+++ b/curvefs/src/client/client_operator.h
@@ -27,13 +27,15 @@
#include
#include
-#include "curvefs/src/client/inode_cache_manager.h"
-#include "curvefs/src/client/dentry_cache_manager.h"
+#include "curvefs/src/client/inode_manager.h"
+#include "curvefs/src/client/dentry_manager.h"
#include "curvefs/src/client/rpcclient/mds_client.h"
namespace curvefs {
namespace client {
+using ::curvefs::metaserver::MetaStatusCode;
+using ::curvefs::metaserver::TxLock;
using rpcclient::MdsClient;
class RenameOperator {
@@ -56,6 +58,8 @@ class RenameOperator {
CURVEFS_ERROR LinkDestParentInode();
CURVEFS_ERROR PrepareTx();
CURVEFS_ERROR CommitTx();
+ CURVEFS_ERROR PrewriteTx();
+ CURVEFS_ERROR CommitTxV2();
CURVEFS_ERROR UnlinkSrcParentInode();
void UnlinkOldInode();
CURVEFS_ERROR UpdateInodeParent();
@@ -85,6 +89,9 @@ class RenameOperator {
CURVEFS_ERROR PrepareRenameTx(const std::vector& dentrys);
+ CURVEFS_ERROR PrewriteRenameTx(
+ const std::vector& dentrys, const TxLock& txLockIn);
+
CURVEFS_ERROR LinkInode(uint64_t inodeId, uint64_t parent = 0);
CURVEFS_ERROR UnLinkInode(uint64_t inodeId, uint64_t parent = 0);
@@ -107,6 +114,7 @@ class RenameOperator {
// if dest exist, record the size and type of file or empty dir
int64_t oldInodeSize_;
FsFileType oldInodeType_;
+ uint64_t startTs_; // tx sequence number
Dentry srcDentry_;
Dentry dstDentry_;
Dentry dentry_;
diff --git a/curvefs/src/client/common/common.cpp b/curvefs/src/client/common/common.cpp
index b50898a630..1210287fc3 100644
--- a/curvefs/src/client/common/common.cpp
+++ b/curvefs/src/client/common/common.cpp
@@ -45,6 +45,18 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype) {
case MetaServerOpType::PrepareRenameTx:
os << "PrepareRenameTx";
break;
+ case MetaServerOpType::PrewriteRenameTx:
+ os << "PrewriteRenameTx";
+ break;
+ case MetaServerOpType::CheckTxStatus:
+ os << "CheckTxStatus";
+ break;
+ case MetaServerOpType::ResolveTxLock:
+ os << "ResolveTxLock";
+ break;
+ case MetaServerOpType::CommitTx:
+ os << "CommitTx";
+ break;
case MetaServerOpType::GetInode:
os << "GetInode";
break;
diff --git a/curvefs/src/client/common/common.h b/curvefs/src/client/common/common.h
index a76312b117..2a5a13d931 100644
--- a/curvefs/src/client/common/common.h
+++ b/curvefs/src/client/common/common.h
@@ -50,6 +50,10 @@ enum class MetaServerOpType {
CreateDentry,
DeleteDentry,
PrepareRenameTx,
+ PrewriteRenameTx,
+ CheckTxStatus,
+ ResolveTxLock,
+ CommitTx,
GetInode,
BatchGetInodeAttr,
BatchGetXAttr,
diff --git a/curvefs/src/client/common/config.cpp b/curvefs/src/client/common/config.cpp
index dc80177ab3..6125a22af9 100644
--- a/curvefs/src/client/common/config.cpp
+++ b/curvefs/src/client/common/config.cpp
@@ -105,6 +105,8 @@ DEFINE_uint64(fuseClientBurstReadIopsSecs, 180,
"the times that Read burst iops can continue");
DEFINE_validator(fuseClientBurstReadIopsSecs, &pass_uint64);
+DEFINE_int32(TxVersion, 1, "tx version");
+
void InitMdsOption(Configuration *conf, MdsOption *mdsOpt) {
conf->GetValueFatalIfFail("mdsOpt.mdsMaxRetryMS", &mdsOpt->mdsMaxRetryMS);
conf->GetValueFatalIfFail("mdsOpt.rpcRetryOpt.maxRPCTimeoutMS",
@@ -428,6 +430,8 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) {
&clientOption->dummyServerStartPort);
conf->GetValueFatalIfFail("fuseClient.enableMultiMountPointRename",
&clientOption->enableMultiMountPointRename);
+ conf->GetIntValue("fuseClient.txVersion",
+ &FLAGS_TxVersion);
conf->GetValueFatalIfFail("fuseClient.downloadMaxRetryTimes",
&clientOption->downloadMaxRetryTimes);
conf->GetValueFatalIfFail("fuseClient.warmupThreadsNum",
diff --git a/curvefs/src/client/curve_fuse_op.cpp b/curvefs/src/client/curve_fuse_op.cpp
index dd249c8892..cd2128d805 100644
--- a/curvefs/src/client/curve_fuse_op.cpp
+++ b/curvefs/src/client/curve_fuse_op.cpp
@@ -48,6 +48,7 @@
#include "curvefs/src/common/metric_utils.h"
#include "src/common/configuration.h"
#include "src/common/gflags_helper.h"
+#include "src/common/log_util.h"
using ::curve::common::Configuration;
using ::curvefs::client::CURVEFS_ERROR;
@@ -152,6 +153,7 @@ int InitLog(const char *confPath, const char *argv0) {
FLAGS_vlog_level = FLAGS_v;
// initialize logging module
+ curve::common::DisableLoggingToStdErr();
google::InitGoogleLogging(argv0);
bool succ = InitAccessLog(FLAGS_log_dir);
diff --git a/curvefs/src/client/dentry_cache_manager.cpp b/curvefs/src/client/dentry_cache_manager.cpp
deleted file mode 100644
index afb5e49eef..0000000000
--- a/curvefs/src/client/dentry_cache_manager.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (c) 2021 NetEase Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/*
- * Project: curve
- * Created Date: Thur May 27 2021
- * Author: xuchaojie
- */
-#include "curvefs/src/client/dentry_cache_manager.h"
-
-#include
-#include
-#include
-#include
-#include
-#include
-
-using ::curvefs::metaserver::MetaStatusCode_Name;
-
-namespace curvefs {
-namespace client {
-namespace common {
-DECLARE_bool(enableCto);
-} // namespace common
-} // namespace client
-} // namespace curvefs
-
-namespace curvefs {
-namespace client {
-
-using curve::common::WriteLockGuard;
-using NameLockGuard = ::curve::common::GenericNameLockGuard;
-using ::curvefs::client::filesystem::ToFSError;
-
-CURVEFS_ERROR DentryCacheManagerImpl::GetDentry(uint64_t parent,
- const std::string &name,
- Dentry *out) {
- std::string key = GetDentryCacheKey(parent, name);
- NameLockGuard lock(nameLock_, key);
-
- MetaStatusCode ret = metaClient_->GetDentry(fsId_, parent, name, out);
- if (ret != MetaStatusCode::OK) {
- LOG_IF(ERROR, ret != MetaStatusCode::NOT_FOUND)
- << "metaClient_ GetDentry failed, MetaStatusCode = " << ret
- << ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret)
- << ", parent = " << parent << ", name = " << name;
- return ToFSError(ret);
- }
- return CURVEFS_ERROR::OK;
-}
-
-CURVEFS_ERROR DentryCacheManagerImpl::CreateDentry(const Dentry &dentry) {
- std::string key = GetDentryCacheKey(dentry.parentinodeid(), dentry.name());
- NameLockGuard lock(nameLock_, key);
- MetaStatusCode ret = metaClient_->CreateDentry(dentry);
- if (ret != MetaStatusCode::OK) {
- LOG(ERROR) << "metaClient_ CreateDentry failed, MetaStatusCode = "
- << ret
- << ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret)
- << ", parent = " << dentry.parentinodeid()
- << ", name = " << dentry.name();
- return ToFSError(ret);
- }
-
- return CURVEFS_ERROR::OK;
-}
-
-CURVEFS_ERROR DentryCacheManagerImpl::DeleteDentry(uint64_t parent,
- const std::string &name,
- FsFileType type) {
- std::string key = GetDentryCacheKey(parent, name);
- NameLockGuard lock(nameLock_, key);
-
- MetaStatusCode ret = metaClient_->DeleteDentry(fsId_, parent, name, type);
- if (ret != MetaStatusCode::OK && ret != MetaStatusCode::NOT_FOUND) {
- LOG(ERROR) << "metaClient_ DeleteInode failed, MetaStatusCode = " << ret
- << ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret)
- << ", parent = " << parent << ", name = " << name;
- return ToFSError(ret);
- }
- return CURVEFS_ERROR::OK;
-}
-
-CURVEFS_ERROR DentryCacheManagerImpl::ListDentry(uint64_t parent,
- std::list *dentryList,
- uint32_t limit,
- bool onlyDir,
- uint32_t nlink) {
- dentryList->clear();
- // means no dir under this dir
- if (onlyDir && nlink == 2) {
- LOG(INFO) << "ListDentry parent = " << parent
- << ", onlyDir = 1 and nlink = 2, return directly";
- return CURVEFS_ERROR::OK;
- }
-
- MetaStatusCode ret = MetaStatusCode::OK;
- bool perceed = true;
- std::string last = "";
- do {
- std::list part;
- ret = metaClient_->ListDentry(fsId_, parent, last, limit, onlyDir,
- &part);
- VLOG(6) << "ListDentry fsId = " << fsId_ << ", parent = " << parent
- << ", last = " << last << ", count = " << limit
- << ", onlyDir = " << onlyDir
- << ", ret = " << ret << ", part.size() = " << part.size();
- if (ret != MetaStatusCode::OK) {
- LOG(ERROR) << "metaClient_ ListDentry failed"
- << ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret)
- << ", parent = " << parent << ", last = " << last
- << ", count = " << limit << ", onlyDir = " << onlyDir;
- return ToFSError(ret);
- }
-
- if (!onlyDir) {
- if (part.size() < limit) {
- perceed = false;
- }
- if (!part.empty()) {
- last = part.back().name();
- dentryList->splice(dentryList->end(), part);
- }
- } else {
- // means iterate over the range
- if (part.empty()) {
- perceed = false;
- } else {
- last = part.back().name();
- if (part.back().type() != FsFileType::TYPE_DIRECTORY) {
- part.pop_back();
- }
- dentryList->splice(dentryList->end(), part);
- // means already get all the dir under this dir
- if (nlink - dentryList->size() == 2) {
- perceed = false;
- }
- }
- }
- } while (perceed);
-
- return CURVEFS_ERROR::OK;
-}
-
-} // namespace client
-} // namespace curvefs
diff --git a/curvefs/src/client/dentry_manager.cpp b/curvefs/src/client/dentry_manager.cpp
new file mode 100644
index 0000000000..9c5227ccfb
--- /dev/null
+++ b/curvefs/src/client/dentry_manager.cpp
@@ -0,0 +1,287 @@
+/*
+ * Copyright (c) 2021 NetEase Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Project: curve
+ * Created Date: Thur May 27 2021
+ * Author: xuchaojie
+ */
+#include "curvefs/src/client/dentry_manager.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include "curvefs/src/metaserver/storage/converter.h"
+
+using ::curvefs::metaserver::MetaStatusCode_Name;
+using ::curvefs::metaserver::storage::Key4Dentry;
+
+namespace curvefs {
+namespace client {
+namespace common {
+DECLARE_bool(enableCto);
+} // namespace common
+} // namespace client
+} // namespace curvefs
+
+namespace curvefs {
+namespace client {
+
+using curve::common::WriteLockGuard;
+using NameLockGuard = ::curve::common::GenericNameLockGuard;
+using ::curvefs::client::filesystem::ToFSError;
+
+MetaStatusCode DentryCacheManagerImpl::CheckTxStatus(
+ const std::string primaryKey, uint64_t startTs, uint64_t curTimestamp) {
+ Key4Dentry key4Dentry;
+ if (!key4Dentry.ParseFromString(primaryKey)) {
+ LOG(ERROR) << "CheckTxStatus parse primary key failed, primaryKey = "
+ << primaryKey;
+ return MetaStatusCode::PARSE_FROM_STRING_FAILED;
+ }
+ return metaClient_->CheckTxStatus(key4Dentry.fsId, key4Dentry.parentInodeId,
+ primaryKey, startTs, curTimestamp);
+}
+
+MetaStatusCode DentryCacheManagerImpl::ResolveTxLock(const Dentry &dentry,
+ uint64_t startTs, uint64_t commitTs) {
+ return metaClient_->ResolveTxLock(dentry, startTs, commitTs);
+}
+
+MetaStatusCode DentryCacheManagerImpl::CheckAndResolveTx(const Dentry& dentry,
+ const TxLock& txLock, uint64_t timestamp, uint64_t commitTs) {
+ auto rt = CheckTxStatus(txLock.primarykey(), txLock.startts(), timestamp);
+ switch (rt) {
+ case MetaStatusCode::TX_COMMITTED:
+ return ResolveTxLock(dentry, txLock.startts(), commitTs);
+ case MetaStatusCode::TX_ROLLBACKED:
+ case MetaStatusCode::TX_TIMEOUT:
+ return ResolveTxLock(dentry, txLock.startts());
+ default:
+ LOG(ERROR) << "CheckTxStatus unexpected rt = "
+ << MetaStatusCode_Name(rt);
+ return rt;
+ }
+ return MetaStatusCode::OK;
+}
+
+CURVEFS_ERROR DentryCacheManagerImpl::GetDentry(uint64_t parent,
+ const std::string &name,
+ Dentry *out) {
+ std::string key = GetDentryCacheKey(parent, name);
+ NameLockGuard lock(nameLock_, key);
+ TxLock txLockOut;
+ MetaStatusCode ret = metaClient_->GetDentry(fsId_, parent, name, out,
+ &txLockOut);
+ while (ret == MetaStatusCode::TX_KEY_LOCKED) {
+ uint64_t ts = 0;
+ uint64_t timestamp = 0;
+ if (mdsClient_->Tso(&ts, ×tamp) != FSStatusCode::OK) {
+ LOG(ERROR) << "GetDentry Tso failed, parent = " << parent
+ << ", name = " << name;
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ Dentry dentry;
+ dentry.set_fsid(fsId_);
+ dentry.set_parentinodeid(parent);
+ dentry.set_name(name);
+ MetaStatusCode rc = CheckAndResolveTx(dentry, txLockOut, timestamp, ts);
+ if (rc != MetaStatusCode::OK) {
+ LOG(ERROR) << "GetDentry CheckAndResolveTx failed, rc = "
+ << MetaStatusCode_Name(rc)
+ << ", parent = " << parent << ", name = " << name;
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ ret = metaClient_->GetDentry(fsId_, parent, name, out, &txLockOut);
+ }
+
+ if (ret != MetaStatusCode::OK) {
+ LOG_IF(ERROR, ret != MetaStatusCode::NOT_FOUND)
+ << "metaClient_ GetDentry failed, MetaStatusCode = " << ret
+ << ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret)
+ << ", parent = " << parent << ", name = " << name;
+ return ToFSError(ret);
+ }
+ return CURVEFS_ERROR::OK;
+}
+
+CURVEFS_ERROR DentryCacheManagerImpl::CreateDentry(const Dentry &dentry) {
+ std::string key = GetDentryCacheKey(dentry.parentinodeid(), dentry.name());
+ NameLockGuard lock(nameLock_, key);
+ TxLock txLockOut;
+ MetaStatusCode ret = metaClient_->CreateDentry(dentry, &txLockOut);
+ while (ret == MetaStatusCode::TX_KEY_LOCKED) {
+ uint64_t ts = 0;
+ uint64_t timestamp = 0;
+ if (mdsClient_->Tso(&ts, ×tamp) != FSStatusCode::OK) {
+ LOG(ERROR) << "CreateDentry Tso failed, dentry = "
+ << dentry.ShortDebugString();
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ MetaStatusCode rc = CheckAndResolveTx(dentry, txLockOut, timestamp, ts);
+ if (rc != MetaStatusCode::OK) {
+ LOG(ERROR) << "CreateDentry CheckAndResolveTx failed, rc = "
+ << MetaStatusCode_Name(rc) << ", dentry = "
+ << dentry.ShortDebugString();
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ ret = metaClient_->CreateDentry(dentry, &txLockOut);
+ }
+ if (ret != MetaStatusCode::OK) {
+ LOG(ERROR) << "metaClient_ CreateDentry failed"
+ << ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret)
+ << ", parent = " << dentry.parentinodeid()
+ << ", name = " << dentry.name();
+ return ToFSError(ret);
+ }
+ return CURVEFS_ERROR::OK;
+}
+
+CURVEFS_ERROR DentryCacheManagerImpl::DeleteDentry(uint64_t parent,
+ const std::string &name,
+ FsFileType type) {
+ std::string key = GetDentryCacheKey(parent, name);
+ NameLockGuard lock(nameLock_, key);
+
+ TxLock txLockOut;
+ MetaStatusCode ret = metaClient_->DeleteDentry(
+ fsId_, parent, name, type, &txLockOut);
+ while (ret == MetaStatusCode::TX_KEY_LOCKED) {
+ uint64_t ts = 0;
+ uint64_t timestamp = 0;
+ if (mdsClient_->Tso(&ts, ×tamp) != FSStatusCode::OK) {
+ LOG(ERROR) << "DeleteDentry Tso failed, parent = " << parent
+ << ", name = " << name;
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ Dentry dentry;
+ dentry.set_fsid(fsId_);
+ dentry.set_parentinodeid(parent);
+ dentry.set_name(name);
+ MetaStatusCode rc = CheckAndResolveTx(dentry, txLockOut, timestamp, ts);
+ if (rc != MetaStatusCode::OK) {
+ LOG(ERROR) << "DeleteDentry CheckAndResolveTx failed, rc = "
+ << MetaStatusCode_Name(rc) << ", parent = " << parent
+ << ", name = " << name;
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ ret = metaClient_->DeleteDentry(fsId_, parent, name, type, &txLockOut);
+ }
+
+ if (ret != MetaStatusCode::OK && ret != MetaStatusCode::NOT_FOUND) {
+ LOG(ERROR) << "metaClient_ DeleteInode failed, MetaStatusCode = " << ret
+ << ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret)
+ << ", parent = " << parent << ", name = " << name;
+ return ToFSError(ret);
+ }
+ return CURVEFS_ERROR::OK;
+}
+
+CURVEFS_ERROR DentryCacheManagerImpl::ListDentry(uint64_t parent,
+ std::list *dentryList,
+ uint32_t limit,
+ bool onlyDir,
+ uint32_t nlink) {
+ dentryList->clear();
+ // means no dir under this dir
+ if (onlyDir && nlink == 2) {
+ LOG(INFO) << "ListDentry parent = " << parent
+ << ", onlyDir = 1 and nlink = 2, return directly";
+ return CURVEFS_ERROR::OK;
+ }
+
+ MetaStatusCode ret = MetaStatusCode::OK;
+ bool perceed = true;
+ std::string last = "";
+ TxLock txLockOut;
+ do {
+ std::list part;
+ ret = metaClient_->ListDentry(fsId_, parent, last, limit, onlyDir,
+ &part, &txLockOut);
+ VLOG(6) << "ListDentry fsId = " << fsId_ << ", parent = " << parent
+ << ", last = " << last << ", count = " << limit
+ << ", onlyDir = " << onlyDir
+ << ", ret = " << ret << ", part.size() = " << part.size();
+ if (ret == MetaStatusCode::TX_KEY_LOCKED) {
+ uint64_t ts = 0;
+ uint64_t timestamp = 0;
+ if (mdsClient_->Tso(&ts, ×tamp) != FSStatusCode::OK) {
+ LOG(ERROR) << "ListDentry Tso failed, parent = " << parent;
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ Dentry dentry;
+ dentry.set_fsid(fsId_);
+ dentry.set_parentinodeid(parent);
+ if (part.empty()) {
+ LOG(ERROR) << "ListDentry tx key locked, but part is empty"
+ << ", parent = " << parent;
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ dentry.set_name(part.back().name());
+ part.pop_back();
+ MetaStatusCode rc = CheckAndResolveTx(
+ dentry, txLockOut, timestamp, ts);
+ if (rc != MetaStatusCode::OK) {
+ LOG(ERROR) << "ListDentry CheckAndResolveTx failed, rc = "
+ << MetaStatusCode_Name(rc)
+ << ", parent = " << parent;
+ return CURVEFS_ERROR::INTERNAL;
+ }
+ } else if (ret != MetaStatusCode::OK) {
+ LOG(ERROR) << "metaClient_ ListDentry failed"
+ << ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret)
+ << ", parent = " << parent << ", last = " << last
+ << ", count = " << limit << ", onlyDir = " << onlyDir;
+ return ToFSError(ret);
+ }
+
+ if (!onlyDir) {
+ if (part.size() < limit && ret != MetaStatusCode::TX_KEY_LOCKED) {
+ perceed = false;
+ }
+ if (!part.empty()) {
+ last = part.back().name();
+ dentryList->splice(dentryList->end(), part);
+ }
+ } else {
+ // means iterate over the range
+ if (part.empty() && ret != MetaStatusCode::TX_KEY_LOCKED) {
+ perceed = false;
+ } else {
+ if (!part.empty()) {
+ last = part.back().name();
+ if (part.back().type() != FsFileType::TYPE_DIRECTORY) {
+ part.pop_back();
+ }
+ dentryList->splice(dentryList->end(), part);
+ }
+ // means already get all the dir under this dir
+ if (nlink - dentryList->size() == 2) {
+ perceed = false;
+ }
+ }
+ }
+ } while (perceed);
+
+ return CURVEFS_ERROR::OK;
+}
+
+} // namespace client
+} // namespace curvefs
diff --git a/curvefs/src/client/dentry_cache_manager.h b/curvefs/src/client/dentry_manager.h
similarity index 76%
rename from curvefs/src/client/dentry_cache_manager.h
rename to curvefs/src/client/dentry_manager.h
index 84f9f20f53..0014a28349 100644
--- a/curvefs/src/client/dentry_cache_manager.h
+++ b/curvefs/src/client/dentry_manager.h
@@ -21,8 +21,8 @@
* Author: xuchaojie
*/
-#ifndef CURVEFS_SRC_CLIENT_DENTRY_CACHE_MANAGER_H_
-#define CURVEFS_SRC_CLIENT_DENTRY_CACHE_MANAGER_H_
+#ifndef CURVEFS_SRC_CLIENT_DENTRY_MANAGER_H_
+#define CURVEFS_SRC_CLIENT_DENTRY_MANAGER_H_
#include
#include
@@ -33,6 +33,7 @@
#include
#include "curvefs/src/client/rpcclient/metaserver_client.h"
+#include "curvefs/src/client/rpcclient/mds_client.h"
#include "src/common/concurrent/concurrent.h"
#include "src/common/concurrent/name_lock.h"
#include "curvefs/src/client/filesystem/error.h"
@@ -44,6 +45,7 @@ namespace client {
using rpcclient::MetaServerClient;
using rpcclient::MetaServerClientImpl;
+using rpcclient::MdsClient;
using ::curvefs::client::filesystem::CURVEFS_ERROR;
static const char* kDentryKeyDelimiter = ":";
@@ -57,6 +59,8 @@ class DentryCacheManager {
fsId_ = fsId;
}
+ virtual void Init(std::shared_ptr mdsClient) = 0;
+
virtual CURVEFS_ERROR GetDentry(uint64_t parent,
const std::string &name, Dentry *out) = 0;
@@ -70,6 +74,9 @@ class DentryCacheManager {
std::list *dentryList, uint32_t limit,
bool onlyDir = false, uint32_t nlink = 0) = 0;
+ virtual MetaStatusCode CheckAndResolveTx(const Dentry& dentry,
+ const TxLock& txLock, uint64_t timestamp, uint64_t commitTs) = 0;
+
protected:
uint32_t fsId_;
};
@@ -83,6 +90,10 @@ class DentryCacheManagerImpl : public DentryCacheManager {
const std::shared_ptr &metaClient)
: metaClient_(metaClient) {}
+ void Init(std::shared_ptr mdsClient) override {
+ mdsClient_ = mdsClient;
+ }
+
CURVEFS_ERROR GetDentry(uint64_t parent,
const std::string &name, Dentry *out) override;
@@ -96,11 +107,22 @@ class DentryCacheManagerImpl : public DentryCacheManager {
std::list *dentryList, uint32_t limit,
bool dirOnly = false, uint32_t nlink = 0) override;
+ MetaStatusCode CheckAndResolveTx(const Dentry& dentry, const TxLock& txLock,
+ uint64_t timestamp, uint64_t commitTs) override;
+
std::string GetDentryCacheKey(uint64_t parent, const std::string &name) {
return std::to_string(parent) + kDentryKeyDelimiter + name;
}
private:
+ MetaStatusCode CheckTxStatus(const std::string primaryKey, uint64_t startTs,
+ uint64_t curTimestamp);
+
+ MetaStatusCode ResolveTxLock(const Dentry& dentry, uint64_t startTs,
+ uint64_t commitTs = 0);
+
+ private:
+ std::shared_ptr mdsClient_;
std::shared_ptr metaClient_;
curve::common::GenericNameLock nameLock_;
};
@@ -108,4 +130,4 @@ class DentryCacheManagerImpl : public DentryCacheManager {
} // namespace client
} // namespace curvefs
-#endif // CURVEFS_SRC_CLIENT_DENTRY_CACHE_MANAGER_H_
+#endif // CURVEFS_SRC_CLIENT_DENTRY_MANAGER_H_
diff --git a/curvefs/src/client/filesystem/defer_sync.cpp b/curvefs/src/client/filesystem/defer_sync.cpp
index 2b33e33ba4..3a91709f9d 100644
--- a/curvefs/src/client/filesystem/defer_sync.cpp
+++ b/curvefs/src/client/filesystem/defer_sync.cpp
@@ -30,14 +30,94 @@ namespace curvefs {
namespace client {
namespace filesystem {
-DeferSync::DeferSync(DeferSyncOption option)
- : option_(option),
+using ::curve::common::LockGuard;
+using ::curve::common::ReadLockGuard;
+using ::curve::common::WriteLockGuard;
+using ::curvefs::client::filesystem::AttrCtime;
+
+#define RETURN_FALSE_IF_CTO_ON() \
+ do { \
+ if (cto_) { \
+ return false; \
+ } \
+ } while (0)
+
+DeferInodes::DeferInodes(bool cto)
+ : cto_(cto),
+ rwlock_(),
+ inodes_() {}
+
+bool DeferInodes::Add(const std::shared_ptr& inode) {
+ RETURN_FALSE_IF_CTO_ON();
+ WriteLockGuard lk(rwlock_);
+ Ino ino = inode->GetInodeId();
+ auto ret = inodes_.emplace(ino, inode);
+ auto iter = ret.first;
+ bool yes = ret.second;
+ if (!yes) { // already exists
+ iter->second = inode;
+ }
+ return true;
+}
+
+bool DeferInodes::Get(Ino ino, std::shared_ptr* inode) {
+ RETURN_FALSE_IF_CTO_ON();
+ ReadLockGuard lk(rwlock_);
+ auto iter = inodes_.find(ino);
+ if (iter == inodes_.end()) {
+ return false;
+ }
+ *inode = iter->second;
+ return true;
+}
+
+bool DeferInodes::Remove(const std::shared_ptr& inode) {
+ RETURN_FALSE_IF_CTO_ON();
+ WriteLockGuard lk(rwlock_);
+ InodeAttr attr;
+ inode->GetInodeAttrLocked(&attr);
+ auto iter = inodes_.find(attr.inodeid());
+ if (iter == inodes_.end()) {
+ return false;
+ }
+
+ InodeAttr defered;
+ iter->second->GetInodeAttrLocked(&defered);
+ if (AttrCtime(attr) < AttrCtime(defered)) {
+ // it means the old defered inode already replaced by the lastest one,
+ // so we can't remove it before it synced yet.
+ return false;
+ }
+ inodes_.erase(iter);
+ return true;
+}
+
+size_t DeferInodes::Size() {
+ ReadLockGuard lk(rwlock_);
+ return inodes_.size();
+}
+
+SyncInodeClosure::SyncInodeClosure(const std::shared_ptr& inodes,
+ const std::shared_ptr& inode)
+ : inodes_(inodes), inode_(inode) {}
+
+void SyncInodeClosure::Run() {
+ std::unique_ptr self_guard(this);
+ MetaStatusCode rc = GetStatusCode();
+ if (rc == MetaStatusCode::OK || rc == MetaStatusCode::NOT_FOUND) {
+ inodes_->Remove(inode_);
+ }
+}
+
+DeferSync::DeferSync(bool cto, DeferSyncOption option)
+ : cto_(cto),
+ option_(option),
mutex_(),
running_(false),
thread_(),
sleeper_(),
- inodes_() {
-}
+ pending_(),
+ inodes_(std::make_shared(cto)) {}
void DeferSync::Start() {
if (!running_.exchange(true)) {
@@ -55,20 +135,32 @@ void DeferSync::Stop() {
}
}
+SyncInodeClosure* DeferSync::NewSyncInodeClosure(
+ const std::shared_ptr& inode) {
+ // NOTE: we only store the defer inodes in nocto scenario,
+ // which means we don't need to remove the inode from defer inodes
+ // even if the inode already synced done in cto scenario.
+ if (cto_) {
+ return nullptr;
+ }
+ return new SyncInodeClosure(inodes_, inode);
+}
+
void DeferSync::SyncTask() {
- std::vector> inodes;
+ std::vector> syncing;
for ( ;; ) {
bool running = sleeper_.wait_for(std::chrono::seconds(option_.delay));
{
LockGuard lk(mutex_);
- inodes.swap(inodes_);
+ syncing.swap(pending_);
}
- for (const auto& inode : inodes) {
+ for (const auto& inode : syncing) {
+ auto closure = NewSyncInodeClosure(inode);
UniqueLock lk(inode->GetUniqueLock());
- inode->Async(nullptr, true);
+ inode->Async(closure, true);
}
- inodes.clear();
+ syncing.clear();
if (!running) {
break;
@@ -78,18 +170,12 @@ void DeferSync::SyncTask() {
void DeferSync::Push(const std::shared_ptr& inode) {
LockGuard lk(mutex_);
- inodes_.emplace_back(inode);
+ pending_.emplace_back(inode);
+ inodes_->Add(inode);
}
-bool DeferSync::IsDefered(Ino ino, InodeAttr* attr) {
- LockGuard lk(mutex_);
- for (const auto& inode : inodes_) {
- if (inode->GetInodeId() == ino) {
- inode->GetInodeAttr(attr);
- return true;
- }
- }
- return false;
+bool DeferSync::IsDefered(Ino ino, std::shared_ptr* inode) {
+ return inodes_->Get(ino, inode);
}
} // namespace filesystem
diff --git a/curvefs/src/client/filesystem/defer_sync.h b/curvefs/src/client/filesystem/defer_sync.h
index 41264a7e00..17472ce11e 100644
--- a/curvefs/src/client/filesystem/defer_sync.h
+++ b/curvefs/src/client/filesystem/defer_sync.h
@@ -27,23 +27,56 @@
#include
#include
+#include "absl/container/btree_map.h"
#include "src/common/interruptible_sleeper.h"
#include "curvefs/src/client/common/config.h"
+#include "curvefs/src/client/rpcclient/task_excutor.h"
#include "curvefs/src/client/filesystem/meta.h"
namespace curvefs {
namespace client {
namespace filesystem {
-using ::curvefs::client::common::DeferSyncOption;
-
+using ::curve::common::RWLock;
using ::curve::common::Mutex;
-using ::curve::common::LockGuard;
using ::curve::common::InterruptibleSleeper;
+using ::curvefs::client::common::DeferSyncOption;
+using ::curvefs::client::rpcclient::MetaServerClientDone;
+
+// NOTE: we only store the defer inodes in nocto scenario.
+class DeferInodes {
+ public:
+ explicit DeferInodes(bool cto);
+
+ bool Add(const std::shared_ptr& inode);
+
+ bool Get(Ino ino, std::shared_ptr* inode);
+
+ bool Remove(const std::shared_ptr& inode);
+
+ size_t Size();
+
+ private:
+ bool cto_;
+ RWLock rwlock_;
+ absl::btree_map> inodes_;
+};
+
+class SyncInodeClosure : public MetaServerClientDone {
+ public:
+ explicit SyncInodeClosure(const std::shared_ptr& inodes,
+ const std::shared_ptr& inode);
+
+ void Run() override;
+
+ private:
+ std::shared_ptr inodes_;
+ std::shared_ptr inode_;
+};
class DeferSync {
public:
- explicit DeferSync(DeferSyncOption option);
+ explicit DeferSync(bool cto, DeferSyncOption option);
void Start();
@@ -51,18 +84,26 @@ class DeferSync {
void Push(const std::shared_ptr& inode);
- bool IsDefered(Ino ino, InodeAttr* attr);
+ bool IsDefered(Ino ino, std::shared_ptr* inode);
private:
+ SyncInodeClosure* NewSyncInodeClosure(
+ const std::shared_ptr& inode);
+
void SyncTask();
private:
+ friend class SyncInodeClosure;
+
+ private:
+ bool cto_;
DeferSyncOption option_;
Mutex mutex_;
std::atomic running_;
std::thread thread_;
InterruptibleSleeper sleeper_;
- std::vector> inodes_;
+ std::vector> pending_;
+ std::shared_ptr inodes_;
};
} // namespace filesystem
diff --git a/curvefs/src/client/filesystem/filesystem.cpp b/curvefs/src/client/filesystem/filesystem.cpp
index 2aad563051..4546ade2af 100644
--- a/curvefs/src/client/filesystem/filesystem.cpp
+++ b/curvefs/src/client/filesystem/filesystem.cpp
@@ -31,7 +31,8 @@ namespace filesystem {
FileSystem::FileSystem(FileSystemOption option, ExternalMember member)
: option_(option), member(member) {
- deferSync_ = std::make_shared(option.deferSyncOption);
+ deferSync_ = std::make_shared(option.cto,
+ option.deferSyncOption);
negative_ = std::make_shared(option.lookupCacheOption);
dirCache_ = std::make_shared(option.dirCacheOption);
openFiles_ = std::make_shared(option_.openFilesOption,
@@ -257,11 +258,6 @@ CURVEFS_ERROR FileSystem::Lookup(Ino parent,
CURVEFS_ERROR FileSystem::GetAttr(Ino ino, AttrOut* attrOut) {
InodeAttr attr;
- if (!option_.cto && deferSync_->IsDefered(ino, &attr)) {
- *attrOut = AttrOut(attr);
- return CURVEFS_ERROR::OK;
- }
-
auto rc = rpc_->GetAttr(ino, &attr);
if (rc == CURVEFS_ERROR::OK) {
*attrOut = AttrOut(attr);
@@ -319,7 +315,7 @@ CURVEFS_ERROR FileSystem::Open(Ino ino, FileInfo* fi) {
bool yes = openFiles_->IsOpened(ino, &inode);
if (yes) {
openFiles_->Open(ino, inode);
- // fi->keep_cache = 1;
+ // fi->keep_cache = 1; // FIXME(Wine93): let it works.
return CURVEFS_ERROR::OK;
}
diff --git a/curvefs/src/client/filesystem/package.h b/curvefs/src/client/filesystem/package.h
index 47f9fccc36..8eb7a14b23 100644
--- a/curvefs/src/client/filesystem/package.h
+++ b/curvefs/src/client/filesystem/package.h
@@ -25,8 +25,8 @@
#include
-#include "curvefs/src/client/dentry_cache_manager.h"
-#include "curvefs/src/client/inode_cache_manager.h"
+#include "curvefs/src/client/dentry_manager.h"
+#include "curvefs/src/client/inode_manager.h"
#include "curvefs/src/client/xattr_manager.h"
namespace curvefs {
diff --git a/curvefs/src/client/fuse_client.cpp b/curvefs/src/client/fuse_client.cpp
index 6f1c0b4771..f1e19721e1 100644
--- a/curvefs/src/client/fuse_client.cpp
+++ b/curvefs/src/client/fuse_client.cpp
@@ -104,6 +104,8 @@ DECLARE_uint64(fuseClientAvgReadBytes);
DECLARE_uint64(fuseClientBurstReadBytes);
DECLARE_uint64(fuseClientBurstReadBytesSecs);
DECLARE_bool(fs_disableXattr);
+
+DECLARE_int32(TxVersion);
} // namespace common
} // namespace client
} // namespace curvefs
@@ -158,6 +160,8 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
metaCache, mdsClient_,
&enableSumInDir_);
+ dentryManager_->Init(mdsClient_);
+
xattrManager_ = std::make_shared(inodeManager_,
dentryManager_, option_.listDentryLimit, option_.listDentryThreads);
@@ -368,35 +372,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req,
return HandleOpenFlags(req, ino, fi, fileOut);
}
-CURVEFS_ERROR FuseClient::UpdateParentMCTimeAndNlink(
- fuse_ino_t parent, FsFileType type, NlinkChange nlink) {
-
- std::shared_ptr parentInodeWrapper;
- auto ret = inodeManager_->GetInode(parent, parentInodeWrapper);
- if (ret != CURVEFS_ERROR::OK) {
- LOG(ERROR) << "inodeManager get inode fail, ret = " << ret
- << ", inodeid = " << parent;
- return ret;
- }
-
- {
- curve::common::UniqueLock lk = parentInodeWrapper->GetUniqueLock();
- parentInodeWrapper->UpdateTimestampLocked(kModifyTime | kChangeTime);
-
- if (FsFileType::TYPE_DIRECTORY == type) {
- parentInodeWrapper->UpdateNlinkLocked(nlink);
- }
-
- if (option_.fileSystemOption.deferSyncOption.deferDirMtime) {
- inodeManager_->ShipToFlush(parentInodeWrapper);
- } else {
- return parentInodeWrapper->SyncAttr();
- }
- }
-
- return CURVEFS_ERROR::OK;
-}
-
CURVEFS_ERROR FuseClient::MakeNode(
fuse_req_t req,
fuse_ino_t parent,
@@ -535,15 +510,6 @@ CURVEFS_ERROR FuseClient::DeleteNode(uint64_t ino, fuse_ino_t parent,
return ret;
}
- ret = UpdateParentMCTimeAndNlink(parent, type, NlinkChange::kSubOne);
- if (ret != CURVEFS_ERROR::OK) {
- LOG(ERROR) << "UpdateParentMCTimeAndNlink failed"
- << ", parent: " << parent
- << ", name: " << name
- << ", type: " << type;
- return ret;
- }
-
std::shared_ptr inodeWrapper;
ret = inodeManager_->GetInode(ino, inodeWrapper);
if (ret != CURVEFS_ERROR::OK) {
@@ -933,38 +899,63 @@ CURVEFS_ERROR FuseClient::FuseOpRename(fuse_req_t req, fuse_ino_t parent,
return CURVEFS_ERROR::NAME_TOO_LONG;
}
- auto renameOp =
- RenameOperator(fsInfo_->fsid(), fsInfo_->fsname(),
- parent, name, newparent, newname,
- dentryManager_, inodeManager_, metaClient_, mdsClient_,
- option_.enableMultiMountPointRename);
+ auto renameOp = RenameOperator(fsInfo_->fsid(), fsInfo_->fsname(), parent,
+ name, newparent, newname, dentryManager_, inodeManager_, metaClient_,
+ mdsClient_, option_.enableMultiMountPointRename);
- curve::common::LockGuard lg(renameMutex_);
CURVEFS_ERROR rc = CURVEFS_ERROR::OK;
- VLOG(3) << "FuseOpRename [start]: " << renameOp.DebugString();
- RETURN_IF_UNSUCCESS(GetTxId);
- RETURN_IF_UNSUCCESS(Precheck);
- RETURN_IF_UNSUCCESS(RecordOldInodeInfo);
- // Do not move LinkDestParentInode behind CommitTx.
- // If so, the nlink will be lost when the machine goes down
- RETURN_IF_UNSUCCESS(LinkDestParentInode);
- RETURN_IF_UNSUCCESS(PrepareTx);
- RETURN_IF_UNSUCCESS(CommitTx);
- VLOG(3) << "FuseOpRename [success]: " << renameOp.DebugString();
- // Do not check UnlinkSrcParentInode, beause rename is already success
- renameOp.UnlinkSrcParentInode();
- renameOp.UnlinkOldInode();
- if (parent != newparent) {
- renameOp.UpdateInodeParent();
- }
- renameOp.UpdateInodeCtime();
- renameOp.UpdateCache();
+ if (common::FLAGS_TxVersion == 1) {
+ curve::common::LockGuard lg(renameMutex_);
+ VLOG(3) << "FuseOpRename [start]: " << renameOp.DebugString();
+ RETURN_IF_UNSUCCESS(GetTxId);
+ RETURN_IF_UNSUCCESS(Precheck);
+ RETURN_IF_UNSUCCESS(RecordOldInodeInfo);
+ // Do not move LinkDestParentInode behind CommitTx.
+ // If so, the nlink will be lost when the machine goes down
+ RETURN_IF_UNSUCCESS(LinkDestParentInode);
+ RETURN_IF_UNSUCCESS(PrepareTx);
+ RETURN_IF_UNSUCCESS(CommitTx);
+ VLOG(3) << "FuseOpRename [success]: " << renameOp.DebugString();
+ // Do not check UnlinkSrcParentInode, beause rename is already success
+ renameOp.UnlinkSrcParentInode();
+ renameOp.UnlinkOldInode();
+ if (parent != newparent) {
+ renameOp.UpdateInodeParent();
+ }
+ renameOp.UpdateInodeCtime();
+ renameOp.UpdateCache();
- if (enableSumInDir_.load()) {
- xattrManager_->UpdateParentXattrAfterRename(
- parent, newparent, newname, &renameOp);
- }
+ if (enableSumInDir_.load()) {
+ xattrManager_->UpdateParentXattrAfterRename(
+ parent, newparent, newname, &renameOp);
+ }
+ } else if (common::FLAGS_TxVersion == 2) {
+ VLOG(3) << "FuseOpRename [start]: " << renameOp.DebugString();
+ RETURN_IF_UNSUCCESS(Precheck);
+ RETURN_IF_UNSUCCESS(RecordOldInodeInfo);
+ // Do not move LinkDestParentInode behind CommitTx.
+ // If so, the nlink will be lost when the machine goes down
+ RETURN_IF_UNSUCCESS(LinkDestParentInode);
+ RETURN_IF_UNSUCCESS(PrewriteTx);
+ RETURN_IF_UNSUCCESS(CommitTxV2);
+ VLOG(3) << "FuseOpRename [success]: " << renameOp.DebugString();
+ // Do not check UnlinkSrcParentInode, beause rename is already success
+ renameOp.UnlinkSrcParentInode();
+ renameOp.UnlinkOldInode();
+ if (parent != newparent) {
+ renameOp.UpdateInodeParent();
+ }
+ renameOp.UpdateInodeCtime();
+ if (enableSumInDir_.load()) {
+ xattrManager_->UpdateParentXattrAfterRename(
+ parent, newparent, newname, &renameOp);
+ }
+ } else {
+ LOG(ERROR) << "FuseOpRename not support tx version: "
+ << common::FLAGS_TxVersion;
+ return CURVEFS_ERROR::NOT_SUPPORT;
+ }
return rc;
}
@@ -1175,6 +1166,13 @@ CURVEFS_ERROR FuseClient::FuseOpListXattr(fuse_req_t req, fuse_ino_t ino,
// +1 because, the format is key\0key\0
*realSize += it.first.length() + 1;
}
+ // add summary xattr key
+ if (inodeAttr.type() == FsFileType::TYPE_DIRECTORY) {
+ *realSize += strlen(XATTR_DIR_RFILES) + 1;
+ *realSize += strlen(XATTR_DIR_RSUBDIRS) + 1;
+ *realSize += strlen(XATTR_DIR_RENTRIES) + 1;
+ *realSize += strlen(XATTR_DIR_RFBYTES) + 1;
+ }
if (size == 0) {
return CURVEFS_ERROR::OK;
@@ -1188,6 +1186,16 @@ CURVEFS_ERROR FuseClient::FuseOpListXattr(fuse_req_t req, fuse_ino_t ino,
memcpy(value, it.first.c_str(), tsize);
value += tsize;
}
+ if (inodeAttr.type() == FsFileType::TYPE_DIRECTORY) {
+ memcpy(value, XATTR_DIR_RFILES, strlen(XATTR_DIR_RFILES) + 1);
+ value += strlen(XATTR_DIR_RFILES) + 1;
+ memcpy(value, XATTR_DIR_RSUBDIRS, strlen(XATTR_DIR_RSUBDIRS) + 1);
+ value += strlen(XATTR_DIR_RSUBDIRS) + 1;
+ memcpy(value, XATTR_DIR_RENTRIES, strlen(XATTR_DIR_RENTRIES) + 1);
+ value += strlen(XATTR_DIR_RENTRIES) + 1;
+ memcpy(value, XATTR_DIR_RFBYTES, strlen(XATTR_DIR_RFBYTES) + 1);
+ value += strlen(XATTR_DIR_RFBYTES) + 1;
+ }
return CURVEFS_ERROR::OK;
}
return CURVEFS_ERROR::OUT_OF_RANGE;
diff --git a/curvefs/src/client/fuse_client.h b/curvefs/src/client/fuse_client.h
index 13adc3c591..aae53041cb 100644
--- a/curvefs/src/client/fuse_client.h
+++ b/curvefs/src/client/fuse_client.h
@@ -41,13 +41,13 @@
#include "curvefs/src/client/client_operator.h"
#include "curvefs/src/client/common/common.h"
#include "curvefs/src/client/common/config.h"
-#include "curvefs/src/client/dentry_cache_manager.h"
+#include "curvefs/src/client/dentry_manager.h"
#include "curvefs/src/client/dir_buffer.h"
#include "curvefs/src/client/filesystem/error.h"
#include "curvefs/src/client/filesystem/filesystem.h"
#include "curvefs/src/client/filesystem/meta.h"
#include "curvefs/src/client/fuse_common.h"
-#include "curvefs/src/client/inode_cache_manager.h"
+#include "curvefs/src/client/inode_manager.h"
#include "curvefs/src/client/lease/lease_excutor.h"
#include "curvefs/src/client/metric/client_metric.h"
#include "curvefs/src/client/rpcclient/mds_client.h"
@@ -415,9 +415,6 @@ class FuseClient {
private:
virtual void FlushData() = 0;
- CURVEFS_ERROR UpdateParentMCTimeAndNlink(
- fuse_ino_t parent, FsFileType type, NlinkChange nlink);
-
std::string GenerateNewRecycleName(fuse_ino_t ino,
fuse_ino_t parent, const char* name) {
std::string newName(name);
diff --git a/curvefs/src/client/inode_cache_manager.cpp b/curvefs/src/client/inode_manager.cpp
similarity index 80%
rename from curvefs/src/client/inode_cache_manager.cpp
rename to curvefs/src/client/inode_manager.cpp
index e8601c1526..d3aa1c9fac 100644
--- a/curvefs/src/client/inode_cache_manager.cpp
+++ b/curvefs/src/client/inode_manager.cpp
@@ -21,7 +21,7 @@
* Author: xuchaojie
*/
-#include "curvefs/src/client/inode_cache_manager.h"
+#include "curvefs/src/client/inode_manager.h"
#include
#include
@@ -30,6 +30,8 @@
#include
#include "curvefs/proto/metaserver.pb.h"
#include "curvefs/src/client/filesystem/error.h"
+#include "curvefs/src/client/filesystem/utils.h"
+#include "curvefs/src/client/filesystem/defer_sync.h"
#include "curvefs/src/client/inode_wrapper.h"
using ::curvefs::metaserver::Inode;
@@ -47,10 +49,73 @@ namespace curvefs {
namespace client {
using ::curvefs::client::filesystem::ToFSError;
+using ::curvefs::client::filesystem::AttrCtime;
using NameLockGuard = ::curve::common::GenericNameLockGuard;
using curvefs::client::common::FLAGS_enableCto;
+#define RETURN_IF_CTO_ON() \
+ do { \
+ if (cto_) { \
+ return; \
+ } \
+ } while (0)
+
+DeferWatcher::DeferWatcher(bool cto, std::shared_ptr deferSync)
+ : cto_(cto),
+ deferSync_(deferSync),
+ deferAttrs_() {}
+
+void DeferWatcher::PreGetAttrs(const std::set& inos) {
+ RETURN_IF_CTO_ON();
+ InodeAttr attr;
+ std::shared_ptr inode;
+ for (const auto& ino : inos) {
+ bool yes = deferSync_->IsDefered(ino, &inode);
+ if (!yes) {
+ continue;
+ }
+ inode->GetInodeAttr(&attr);
+ deferAttrs_.emplace(ino, attr);
+ }
+}
+
+bool DeferWatcher::TryUpdate(InodeAttr* attr) {
+ Ino ino = attr->inodeid();
+ auto iter = deferAttrs_.find(ino);
+ if (iter == deferAttrs_.end()) {
+ return false;
+ }
+
+ auto& defered = iter->second;
+ if (AttrCtime(*attr) > AttrCtime(defered)) {
+ return false;
+ }
+ *attr = defered;
+ return true;
+}
+
+void DeferWatcher::PostGetAttrs(std::list* attrs) {
+ RETURN_IF_CTO_ON();
+ if (deferAttrs_.size() == 0) {
+ return;
+ }
+ for (auto& attr : *attrs) {
+ TryUpdate(&attr);
+ }
+}
+
+void DeferWatcher::PostGetAttrs(std::map* attrs) {
+ RETURN_IF_CTO_ON();
+ if (deferAttrs_.size() == 0) {
+ return;
+ }
+ for (auto& item : *attrs) {
+ auto& attr = item.second;
+ TryUpdate(&attr);
+ }
+}
+
#define GET_INODE_REMOTE(FSID, INODEID, OUT, STREAMING) \
MetaStatusCode ret = metaClient_->GetInode(FSID, INODEID, OUT, STREAMING); \
if (ret != MetaStatusCode::OK) { \
@@ -76,6 +141,11 @@ InodeCacheManagerImpl::GetInode(uint64_t inodeId,
return CURVEFS_ERROR::OK;
}
+ bool cto = FLAGS_enableCto;
+ if (!cto && deferSync_->IsDefered(inodeId, &out)) {
+ return CURVEFS_ERROR::OK;
+ }
+
// get inode from metaserver
Inode inode;
bool streaming = false;
@@ -97,6 +167,11 @@ CURVEFS_ERROR InodeCacheManagerImpl::GetInodeAttr(uint64_t inodeId,
std::set inodeIds;
std::list attrs;
inodeIds.emplace(inodeId);
+
+ bool cto = FLAGS_enableCto;
+ auto watcher = std::make_shared(cto, deferSync_);
+ watcher->PreGetAttrs(inodeIds);
+
MetaStatusCode ret =
metaClient_->BatchGetInodeAttr(fsId_, inodeIds, &attrs);
if (MetaStatusCode::OK != ret) {
@@ -113,6 +188,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::GetInodeAttr(uint64_t inodeId,
return CURVEFS_ERROR::INTERNAL;
}
+ watcher->PostGetAttrs(&attrs);
*out = *attrs.begin();
return CURVEFS_ERROR::OK;
}
@@ -124,6 +200,10 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttr(
return CURVEFS_ERROR::OK;
}
+ bool cto = FLAGS_enableCto;
+ auto watcher = std::make_shared(cto, deferSync_);
+ watcher->PreGetAttrs(*inodeIds);
+
MetaStatusCode ret = metaClient_->BatchGetInodeAttr(fsId_, *inodeIds,
attrs);
if (MetaStatusCode::OK != ret) {
@@ -131,6 +211,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttr(
<< ret << ", MetaStatusCode_Name = "
<< MetaStatusCode_Name(ret);
}
+ watcher->PostGetAttrs(attrs);
return ToFSError(ret);
}
@@ -144,6 +225,10 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttrAsync(
return CURVEFS_ERROR::OK;
}
+ bool cto = FLAGS_enableCto;
+ auto watcher = std::make_shared(cto, deferSync_);
+ watcher->PreGetAttrs(*inodeIds);
+
// split inodeIds by partitionId and batch limit
std::vector> inodeGroups;
if (!metaClient_->SplitRequestInodes(fsId_, *inodeIds, &inodeGroups)) {
@@ -168,6 +253,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttrAsync(
// wait for all sudrequest finished
cond->Wait();
+ watcher->PostGetAttrs(attrs);
return CURVEFS_ERROR::OK;
}
diff --git a/curvefs/src/client/inode_cache_manager.h b/curvefs/src/client/inode_manager.h
similarity index 91%
rename from curvefs/src/client/inode_cache_manager.h
rename to curvefs/src/client/inode_manager.h
index ce0ebb3d17..093b5e750d 100644
--- a/curvefs/src/client/inode_cache_manager.h
+++ b/curvefs/src/client/inode_manager.h
@@ -21,8 +21,8 @@
* Author: xuchaojie
*/
-#ifndef CURVEFS_SRC_CLIENT_INODE_CACHE_MANAGER_H_
-#define CURVEFS_SRC_CLIENT_INODE_CACHE_MANAGER_H_
+#ifndef CURVEFS_SRC_CLIENT_INODE_MANAGER_H_
+#define CURVEFS_SRC_CLIENT_INODE_MANAGER_H_
#include
#include
@@ -46,6 +46,7 @@
#include "curvefs/src/client/common/config.h"
#include "curvefs/src/client/filesystem/openfile.h"
#include "curvefs/src/client/filesystem/defer_sync.h"
+#include "absl/container/btree_map.h"
using ::curve::common::LRUCache;
using ::curve::common::CacheMetrics;
@@ -66,6 +67,7 @@ using rpcclient::BatchGetInodeAttrDone;
using curve::common::CountDownEvent;
using metric::S3ChunkInfoMetric;
using common::RefreshDataOption;
+using ::curvefs::client::filesystem::Ino;
using ::curvefs::client::filesystem::OpenFiles;
using ::curvefs::client::filesystem::DeferSync;
@@ -116,6 +118,25 @@ class InodeCacheManager {
uint32_t fsId_;
};
+class DeferWatcher {
+ public:
+ DeferWatcher(bool cto, std::shared_ptr deferSync);
+
+ void PreGetAttrs(const std::set& inos);
+
+ void PostGetAttrs(std::list* attrs);
+
+ void PostGetAttrs(std::map* attrs);
+
+ private:
+ bool TryUpdate(InodeAttr* attr);
+
+ private:
+ bool cto_;
+ std::shared_ptr deferSync_;
+ absl::btree_map deferAttrs_;
+};
+
class InodeCacheManagerImpl : public InodeCacheManager,
public std::enable_shared_from_this {
public:
@@ -221,4 +242,4 @@ class BatchGetInodeAttrAsyncDone : public BatchGetInodeAttrDone {
} // namespace client
} // namespace curvefs
-#endif // CURVEFS_SRC_CLIENT_INODE_CACHE_MANAGER_H_
+#endif // CURVEFS_SRC_CLIENT_INODE_MANAGER_H_
diff --git a/curvefs/src/client/kvclient/kvclient_manager.cpp b/curvefs/src/client/kvclient/kvclient_manager.cpp
index c87630758b..9e677cc212 100644
--- a/curvefs/src/client/kvclient/kvclient_manager.cpp
+++ b/curvefs/src/client/kvclient/kvclient_manager.cpp
@@ -62,10 +62,12 @@ void KVClientManager::Uninit() {
}
void KVClientManager::Set(std::shared_ptr task) {
+ kvClientManagerMetric_->setQueueSize << 1;
threadPool_.Enqueue([task, this]() {
std::string error_log;
task->res =
client_->Set(task->key, task->value, task->length, &error_log);
+ kvClientManagerMetric_->setQueueSize << -1;
if (task->res) {
kvClientManagerMetric_->count << 1;
}
@@ -96,12 +98,14 @@ void UpdateHitMissMetric(memcached_return_t retCode,
}
void KVClientManager::Get(std::shared_ptr task) {
+ kvClientManagerMetric_->getQueueSize << 1;
threadPool_.Enqueue([task, this]() {
std::string error_log;
memcached_return_t retCode;
task->res = client_->Get(task->key, task->value, task->offset,
task->valueLength, &error_log, &task->length,
&retCode);
+ kvClientManagerMetric_->getQueueSize << -1;
UpdateHitMissMetric(retCode, kvClientManagerMetric_.get());
OnReturn(&kvClientManagerMetric_->get, task);
});
diff --git a/curvefs/src/client/kvclient/memcache_client.h b/curvefs/src/client/kvclient/memcache_client.h
index 57e82a7f44..7ca3ea1248 100644
--- a/curvefs/src/client/kvclient/memcache_client.h
+++ b/curvefs/src/client/kvclient/memcache_client.h
@@ -166,9 +166,10 @@ class MemCachedClient : public KVClient {
*errorlog = ResError(ue);
if (ue != MEMCACHED_NOTFOUND) {
- LOG(ERROR) << "Get key = " << key << " error = " << *errorlog
- << ", get_value_len = " << value_length
- << ", expect_value_len = " << length;
+ LOG_EVERY_N(WARNING, 1000) << "Get key = " << key
+ << " error = " << *errorlog << ", get_value_len = "
+ << value_length << ", expect_value_len = " << length;
+ free(res);
memcached_free(tcli);
tcli = nullptr;
}
diff --git a/curvefs/src/client/lease/lease_excutor.cpp b/curvefs/src/client/lease/lease_excutor.cpp
index 08dc400c88..13972db561 100644
--- a/curvefs/src/client/lease/lease_excutor.cpp
+++ b/curvefs/src/client/lease/lease_excutor.cpp
@@ -31,6 +31,10 @@ using curvefs::mds::topology::PartitionTxId;
namespace curvefs {
namespace client {
+namespace common {
+DECLARE_int32(TxVersion);
+} // namespace common
+
LeaseExecutor::~LeaseExecutor() {
if (task_) {
task_->Stop();
@@ -71,26 +75,32 @@ void LeaseExecutor::Stop() {
}
bool LeaseExecutor::RefreshLease() {
+ // for tx v2 txIds and latestTxIdList will empty here
// get partition txid list
std::vector txIds;
- metaCache_->GetAllTxIds(&txIds);
-
+ if (common::FLAGS_TxVersion == 1) {
+ metaCache_->GetAllTxIds(&txIds);
+ }
// refresh from mds
std::vector latestTxIdList;
- FSStatusCode ret = mdsCli_->RefreshSession(txIds, &latestTxIdList,
- fsName_, mountpoint_,
- enableSumInDir_);
+ std::string mdsAddrs = mdsCli_->GetMdsAddrs();
+ std::string mdsAddrsOverride;
+ FSStatusCode ret =
+ mdsCli_->RefreshSession(txIds, &latestTxIdList, fsName_, mountpoint_,
+ enableSumInDir_, mdsAddrs, &mdsAddrsOverride);
+
if (ret != FSStatusCode::OK) {
LOG(ERROR) << "LeaseExecutor refresh session fail, ret = " << ret
<< ", errorName = " << FSStatusCode_Name(ret);
return true;
}
-
// update to metacache
std::for_each(latestTxIdList.begin(), latestTxIdList.end(),
[&](const PartitionTxId &item) {
metaCache_->SetTxId(item.partitionid(), item.txid());
});
+ // update mds addrs
+ mdsCli_->SetMdsAddrs(mdsAddrsOverride);
return true;
}
diff --git a/curvefs/src/client/metric/client_metric.cpp b/curvefs/src/client/metric/client_metric.cpp
index d6c2592ebb..b9db382123 100644
--- a/curvefs/src/client/metric/client_metric.cpp
+++ b/curvefs/src/client/metric/client_metric.cpp
@@ -75,17 +75,18 @@ void AsyncContextCollectMetrics(
std::shared_ptr s3Metric,
const std::shared_ptr& context) {
if (s3Metric.get() != nullptr) {
- CollectMetrics(&s3Metric->adaptorReadS3, context->actualLen,
- context->timer.u_elapsed());
+
+ CollectMetrics(&s3Metric->adaptorAsyncReadS3, context->actualLen,
+ butil::cpuwide_time_us() - context->start);
switch (context->type) {
case curve::common::ContextType::Disk:
- CollectMetrics(&s3Metric->readFromDiskCache, context->actualLen,
- context->timer.u_elapsed());
+ CollectMetrics(&s3Metric->asyncReadDiskCache, context->actualLen,
+ butil::cpuwide_time_us() - context->start);
break;
case curve::common::ContextType::S3:
- CollectMetrics(&s3Metric->readFromS3, context->actualLen,
- context->timer.u_elapsed());
+ CollectMetrics(&s3Metric->asyncReadFromS3, context->actualLen,
+ butil::cpuwide_time_us() - context->start);
break;
default:
break;
diff --git a/curvefs/src/client/metric/client_metric.h b/curvefs/src/client/metric/client_metric.h
index bfbf0f3373..a55dc7e8c7 100644
--- a/curvefs/src/client/metric/client_metric.h
+++ b/curvefs/src/client/metric/client_metric.h
@@ -55,6 +55,7 @@ struct MDSClientMetric {
InterfaceMetric refreshSession;
InterfaceMetric getLatestTxId;
InterfaceMetric commitTx;
+ InterfaceMetric tso;
InterfaceMetric allocOrGetMemcacheCluster;
MDSClientMetric()
@@ -70,6 +71,7 @@ struct MDSClientMetric {
refreshSession(prefix, "refreshSession"),
getLatestTxId(prefix, "getLatestTxId"),
commitTx(prefix, "commitTx"),
+ tso(prefix, "tso"),
allocOrGetMemcacheCluster(prefix, "allocOrGetMemcacheCluster") {}
};
@@ -93,6 +95,10 @@ struct MetaServerClientMetric {
// tnx
InterfaceMetric prepareRenameTx;
+ InterfaceMetric prewriteRenameTx;
+ InterfaceMetric checkTxStatus;
+ InterfaceMetric resolveTxLock;
+ InterfaceMetric commitTx;
// volume extent
InterfaceMetric updateVolumeExtent;
@@ -100,9 +106,11 @@ struct MetaServerClientMetric {
InterfaceMetric updateDeallocatableBlockGroup;
MetaServerClientMetric()
- : getDentry(prefix, "getDentry"), listDentry(prefix, "listDentry"),
+ : getDentry(prefix, "getDentry"),
+ listDentry(prefix, "listDentry"),
createDentry(prefix, "createDentry"),
- deleteDentry(prefix, "deleteDentry"), getInode(prefix, "getInode"),
+ deleteDentry(prefix, "deleteDentry"),
+ getInode(prefix, "getInode"),
batchGetInodeAttr(prefix, "batchGetInodeAttr"),
batchGetXattr(prefix, "batchGetXattr"),
createInode(prefix, "createInode"),
@@ -110,10 +118,14 @@ struct MetaServerClientMetric {
deleteInode(prefix, "deleteInode"),
appendS3ChunkInfo(prefix, "appendS3ChunkInfo"),
prepareRenameTx(prefix, "prepareRenameTx"),
+ prewriteRenameTx(prefix, "prewriteRenameTx"),
+ checkTxStatus(prefix, "checkTxStatus"),
+ resolveTxLock(prefix, "resolveTxLock"),
+ commitTx(prefix, "commitTx"),
updateVolumeExtent(prefix, "updateVolumeExtent"),
getVolumeExtent(prefix, "getVolumeExtent"),
- updateDeallocatableBlockGroup(prefix,
- "updateDeallocatableBlockGroup") {}
+ updateDeallocatableBlockGroup(
+ prefix, "updateDeallocatableBlockGroup") {}
};
struct InflightGuard {
@@ -239,6 +251,17 @@ struct S3Metric {
std::string fsName;
InterfaceMetric adaptorWrite;
InterfaceMetric adaptorRead;
+
+ InterfaceMetric adaptorDequeue;
+ InterfaceMetric adaptorProcess;
+
+ InterfaceMetric adaptorAsyncReadS3;
+ InterfaceMetric asyncReadDiskCache;
+ InterfaceMetric asyncReadFromS3;
+
+ InterfaceMetric waitDownloading;
+
+
InterfaceMetric adaptorWriteS3;
InterfaceMetric adaptorWriteDiskCache;
InterfaceMetric adaptorReadS3;
@@ -258,11 +281,21 @@ struct S3Metric {
bvar::Status readSize;
bvar::Status writeSize;
+ bvar::Adder readAllHitsMemCounts;
+ bvar::Adder readRequestCounts;
+ bvar::Adder s3ReadRequestCounts;
+
explicit S3Metric(const std::string& name = "")
: fsName(!name.empty() ? name
: prefix + curve::common::ToHexString(this)),
adaptorWrite(prefix, fsName + "_adaptor_write"),
adaptorRead(prefix, fsName + "_adaptor_read"),
+ adaptorDequeue(prefix, fsName + "_adaptor_dequeue"),
+ adaptorProcess(prefix, fsName + "_adaptor_process"),
+ adaptorAsyncReadS3(prefix, fsName + "_adaptor_async_read"),
+ asyncReadDiskCache(prefix, fsName + "_async_read_from_disk"),
+ asyncReadFromS3(prefix, fsName + "_async_read_from_s3"),
+ waitDownloading(prefix, fsName + "_wait_download"),
adaptorWriteS3(prefix, fsName + "_adaptor_write_s3"),
adaptorWriteDiskCache(prefix, fsName + "_adaptor_write_disk_cache"),
adaptorReadS3(prefix, fsName + "_adaptor_read_s3"),
@@ -274,7 +307,11 @@ struct S3Metric {
writeToKVCache(prefix, fsName + "_write_to_kv_cache"),
readFromKVCache(prefix, fsName + "_read_from_kv_cache"),
readSize(prefix, fsName + "_adaptor_read_size", 0),
- writeSize(prefix, fsName + "_adaptor_write_size", 0) {}
+ writeSize(prefix, fsName + "_adaptor_write_size", 0) {
+ readAllHitsMemCounts.expose_as(prefix, "read_all_hits_mem");
+ readRequestCounts.expose_as(prefix, "read_request_counts");
+ s3ReadRequestCounts.expose_as(prefix, "s3_read_request_counts");
+ }
};
template
@@ -317,6 +354,10 @@ struct KVClientManagerMetric {
bvar::Adder hit;
// kvcache miss
bvar::Adder miss;
+ // kvcache getQueueSize
+ bvar::Adder getQueueSize;
+ // kvcache setQueueSize
+ bvar::Adder setQueueSize;
explicit KVClientManagerMetric(const std::string& name = "")
: fsName(!name.empty() ? name
@@ -325,7 +366,9 @@ struct KVClientManagerMetric {
set(prefix, fsName + "_set"),
count(prefix, fsName + "_count"),
hit(prefix, fsName + "_hit"),
- miss(prefix, fsName + "_miss") {}
+ miss(prefix, fsName + "_miss"),
+ getQueueSize(prefix, fsName + "_get_queue_size"),
+ setQueueSize(prefix, fsName + "_set_queue_size") {}
};
struct MemcacheClientMetric {
diff --git a/curvefs/src/client/rpcclient/BUILD b/curvefs/src/client/rpcclient/BUILD
index 13cdfccf1c..cd39cfa2f5 100644
--- a/curvefs/src/client/rpcclient/BUILD
+++ b/curvefs/src/client/rpcclient/BUILD
@@ -39,5 +39,6 @@ cc_library(
"//src/client:curve_client",
"@com_google_absl//absl/cleanup",
"@com_google_absl//absl/types:optional",
+ "@com_google_absl//absl/strings",
],
)
diff --git a/curvefs/src/client/rpcclient/base_client.cpp b/curvefs/src/client/rpcclient/base_client.cpp
index ca504f5201..a2f75ae177 100644
--- a/curvefs/src/client/rpcclient/base_client.cpp
+++ b/curvefs/src/client/rpcclient/base_client.cpp
@@ -167,6 +167,12 @@ void MDSBaseClient::CommitTx(const CommitTxRequest& request,
stub.CommitTx(cntl, &request, response, nullptr);
}
+void MDSBaseClient::Tso(const TsoRequest& request, TsoResponse* response,
+ brpc::Controller* cntl, brpc::Channel* channel) {
+ curvefs::mds::MdsService_Stub stub(channel);
+ stub.Tso(cntl, &request, response, nullptr);
+}
+
// TODO(all): do we really need pass `fsId` all the time?
// each curve-fuse process only mount one filesystem
void MDSBaseClient::AllocateVolumeBlockGroup(
diff --git a/curvefs/src/client/rpcclient/base_client.h b/curvefs/src/client/rpcclient/base_client.h
index f43d2854ee..7f3bd8161f 100644
--- a/curvefs/src/client/rpcclient/base_client.h
+++ b/curvefs/src/client/rpcclient/base_client.h
@@ -57,11 +57,11 @@ using curvefs::metaserver::GetInodeResponse;
using curvefs::metaserver::Inode;
using curvefs::metaserver::ListDentryRequest;
using curvefs::metaserver::ListDentryResponse;
-using curvefs::metaserver::PrepareRenameTxRequest;
-using curvefs::metaserver::PrepareRenameTxResponse;
+using curvefs::metaserver::ManageInodeType;
+using curvefs::metaserver::PrewriteRenameTxRequest;
+using curvefs::metaserver::PrewriteRenameTxResponse;
using curvefs::metaserver::UpdateInodeRequest;
using curvefs::metaserver::UpdateInodeResponse;
-using curvefs::metaserver::ManageInodeType;
using curvefs::common::FSType;
using curvefs::common::PartitionInfo;
@@ -84,6 +84,8 @@ using curvefs::mds::CommitTxRequest;
using curvefs::mds::CommitTxResponse;
using curvefs::mds::RefreshSessionRequest;
using curvefs::mds::RefreshSessionResponse;
+using curvefs::mds::TsoRequest;
+using curvefs::mds::TsoResponse;
using curvefs::mds::UmountFsRequest;
using curvefs::mds::UmountFsResponse;
@@ -197,6 +199,9 @@ class MDSBaseClient {
brpc::Controller* cntl,
brpc::Channel* channel);
+ virtual void Tso(const TsoRequest& request, TsoResponse* response,
+ brpc::Controller* cntl, brpc::Channel* channel);
+
virtual void AllocateVolumeBlockGroup(uint32_t fsId,
uint32_t count,
const std::string& owner,
diff --git a/curvefs/src/client/rpcclient/mds_client.cpp b/curvefs/src/client/rpcclient/mds_client.cpp
index 3c52bf3a5a..2f53aa3d7d 100644
--- a/curvefs/src/client/rpcclient/mds_client.cpp
+++ b/curvefs/src/client/rpcclient/mds_client.cpp
@@ -25,6 +25,7 @@
#include