Skip to content

Commit

Permalink
migrate experimental-txn-mode-write-with-shared-buffer flag to featur…
Browse files Browse the repository at this point in the history
…e gate.

Signed-off-by: ishan16696 <[email protected]>
  • Loading branch information
ishan16696 committed Dec 18, 2024
1 parent b5c620a commit fb62034
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 94 deletions.
9 changes: 4 additions & 5 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,11 +583,10 @@ func NewConfig() *Config {
LogRotationConfigJSON: DefaultLogRotationConfig,
EnableGRPCGateway: true,

ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
ExperimentalMemoryMlock: false,
ExperimentalTxnModeWriteWithSharedBuffer: true,
ExperimentalStopGRPCServiceOnDefrag: false,
ExperimentalMaxLearners: membership.DefaultMaxLearners,
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
ExperimentalMemoryMlock: false,
ExperimentalStopGRPCServiceOnDefrag: false,
ExperimentalMaxLearners: membership.DefaultMaxLearners,

CompactHashCheckTime: DefaultCompactHashCheckTime,
// TODO: delete in v3.7
Expand Down
121 changes: 97 additions & 24 deletions server/embed/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,26 @@ func TestConfigFileOtherFields(t *testing.T) {

func TestConfigFileFeatureGates(t *testing.T) {
testCases := []struct {
name string
serverFeatureGatesJSON string
experimentalStopGRPCServiceOnDefrag string
experimentalInitialCorruptCheck string
experimentalCompactHashCheckEnabled string
expectErr bool
expectedFeatures map[featuregate.Feature]bool
name string
serverFeatureGatesJSON string
experimentalStopGRPCServiceOnDefrag string
experimentalInitialCorruptCheck string
experimentalCompactHashCheckEnabled string
experimentalTxnModeWriteWithSharedBuffer string
expectErr bool
expectedFeatures map[featuregate.Feature]bool
}{
{
name: "default",
expectedFeatures: map[featuregate.Feature]bool{
features.DistributedTracing: false,
features.StopGRPCServiceOnDefrag: false,
features.InitialCorruptCheck: false,
features.DistributedTracing: false,
features.StopGRPCServiceOnDefrag: false,
features.InitialCorruptCheck: false,
features.TxnModeWriteWithSharedBuffer: false,
},
},
{
name: "cannot set both experimental flag and feature gate flag",
name: "cannot set both experimental flag and feature gate flag for StopGRPCServiceOnDefrag",
serverFeatureGatesJSON: "StopGRPCServiceOnDefrag=true",
experimentalStopGRPCServiceOnDefrag: "false",
expectErr: true,
Expand All @@ -122,6 +124,12 @@ func TestConfigFileFeatureGates(t *testing.T) {
experimentalInitialCorruptCheck: "false",
expectErr: true,
},
{
name: "cannot set both experimental flag and feature gate flag for TxnModeWriteWithSharedBuffer",
serverFeatureGatesJSON: "TxnModeWriteWithSharedBuffer=true",
experimentalTxnModeWriteWithSharedBuffer: "false",
expectErr: true,
},
{
name: "ok to set different experimental flag and feature gate flag",
serverFeatureGatesJSON: "DistributedTracing=true",
Expand All @@ -133,21 +141,35 @@ func TestConfigFileFeatureGates(t *testing.T) {
},
},
{
name: "can set feature gate to true from experimental flag",
name: "ok to set different multiple experimental flags and feature gate flags",
serverFeatureGatesJSON: "StopGRPCServiceOnDefrag=true,TxnModeWriteWithSharedBuffer=true",
experimentalCompactHashCheckEnabled: "true",
experimentalInitialCorruptCheck: "true",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: true,
features.CompactHashCheck: true,
features.InitialCorruptCheck: true,
features.TxnModeWriteWithSharedBuffer: true,
},
},
{
name: "can set feature gate StopGRPCServiceOnDefrag to true from experimental flag",
experimentalStopGRPCServiceOnDefrag: "true",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: true,
features.DistributedTracing: false,
features.InitialCorruptCheck: false,
features.StopGRPCServiceOnDefrag: true,
features.DistributedTracing: false,
features.InitialCorruptCheck: false,
features.TxnModeWriteWithSharedBuffer: false,
},
},
{
name: "can set feature gate to false from experimental flag",
name: "can set feature gate StopGRPCServiceOnDefrag to false from experimental flag",
experimentalStopGRPCServiceOnDefrag: "false",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: false,
features.DistributedTracing: false,
features.InitialCorruptCheck: false,
features.StopGRPCServiceOnDefrag: false,
features.DistributedTracing: false,
features.InitialCorruptCheck: false,
features.TxnModeWriteWithSharedBuffer: false,
},
},
{
Expand All @@ -168,6 +190,28 @@ func TestConfigFileFeatureGates(t *testing.T) {
features.InitialCorruptCheck: false,
},
},
{
name: "can set feature gate TxnModeWriteWithSharedBuffer to true from experimental flag",
experimentalTxnModeWriteWithSharedBuffer: "true",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: false,
features.DistributedTracing: false,
features.InitialCorruptCheck: false,
features.CompactHashCheck: false,
features.TxnModeWriteWithSharedBuffer: true,
},
},
{
name: "can set feature gate TxnModeWriteWithSharedBuffer to false from experimental flag",
experimentalTxnModeWriteWithSharedBuffer: "false",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: false,
features.DistributedTracing: false,
features.InitialCorruptCheck: false,
features.CompactHashCheck: false,
features.TxnModeWriteWithSharedBuffer: false,
},
},
{
name: "can set feature gate StopGRPCServiceOnDefrag to true from feature gate flag",
serverFeatureGatesJSON: "StopGRPCServiceOnDefrag=true",
Expand All @@ -187,14 +231,34 @@ func TestConfigFileFeatureGates(t *testing.T) {
},
},
{
name: "can set feature gate to false from feature gate flag",
name: "can set feature gate StopGRPCServiceOnDefrag to false from feature gate flag",
serverFeatureGatesJSON: "StopGRPCServiceOnDefrag=false",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: false,
features.DistributedTracing: false,
features.InitialCorruptCheck: false,
},
},
{
name: "can set feature gate TxnModeWriteWithSharedBuffer to true from feature gate flag",
serverFeatureGatesJSON: "TxnModeWriteWithSharedBuffer=true",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: false,
features.DistributedTracing: false,
features.InitialCorruptCheck: false,
features.TxnModeWriteWithSharedBuffer: true,
},
},
{
name: "can set feature gate TxnModeWriteWithSharedBuffer to false from feature gate flag",
serverFeatureGatesJSON: "TxnModeWriteWithSharedBuffer=false",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: false,
features.DistributedTracing: false,
features.InitialCorruptCheck: false,
features.TxnModeWriteWithSharedBuffer: false,
},
},
{
name: "cannot set both experimental flag and feature gate flag for ExperimentalCompactHashCheckEnabled",
serverFeatureGatesJSON: "CompactHashCheck=true",
Expand Down Expand Up @@ -232,10 +296,11 @@ func TestConfigFileFeatureGates(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
yc := struct {
ExperimentalStopGRPCServiceOnDefrag *bool `json:"experimental-stop-grpc-service-on-defrag,omitempty"`
ExperimentalInitialCorruptCheck *bool `json:"experimental-initial-corrupt-check,omitempty"`
ExperimentalCompactHashCheckEnabled *bool `json:"experimental-compact-hash-check-enabled,omitempty"`
ServerFeatureGatesJSON string `json:"feature-gates"`
ExperimentalStopGRPCServiceOnDefrag *bool `json:"experimental-stop-grpc-service-on-defrag,omitempty"`
ExperimentalInitialCorruptCheck *bool `json:"experimental-initial-corrupt-check,omitempty"`
ExperimentalCompactHashCheckEnabled *bool `json:"experimental-compact-hash-check-enabled,omitempty"`
ExperimentalTxnModeWriteWithSharedBuffer *bool `json:"experimental-txn-mode-write-with-shared-buffer,omitempty"`
ServerFeatureGatesJSON string `json:"feature-gates"`
}{
ServerFeatureGatesJSON: tc.serverFeatureGatesJSON,
}
Expand All @@ -248,6 +313,14 @@ func TestConfigFileFeatureGates(t *testing.T) {
yc.ExperimentalInitialCorruptCheck = &experimentalInitialCorruptCheck
}

if tc.experimentalTxnModeWriteWithSharedBuffer != "" {
experimentalTxnModeWriteWithSharedBuffer, err := strconv.ParseBool(tc.experimentalTxnModeWriteWithSharedBuffer)
if err != nil {
t.Fatal(err)
}
yc.ExperimentalTxnModeWriteWithSharedBuffer = &experimentalTxnModeWriteWithSharedBuffer
}

if tc.experimentalStopGRPCServiceOnDefrag != "" {
experimentalStopGRPCServiceOnDefrag, err := strconv.ParseBool(tc.experimentalStopGRPCServiceOnDefrag)
if err != nil {
Expand Down
107 changes: 53 additions & 54 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,60 +168,59 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)

srvcfg := config.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.AdvertiseClientUrls,
PeerURLs: cfg.AdvertisePeerUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
DiscoveryCfg: cfg.DiscoveryCfg,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckTime: cfg.CompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
Name: cfg.Name,
ClientURLs: cfg.AdvertiseClientUrls,
PeerURLs: cfg.AdvertisePeerUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
DiscoveryCfg: cfg.DiscoveryCfg,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckTime: cfg.CompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
ExperimentalMaxLearners: cfg.ExperimentalMaxLearners,
V2Deprecation: cfg.V2DeprecationEffective(),
Expand Down
2 changes: 1 addition & 1 deletion server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ Experimental feature:
Duration of periodical watch progress notification.
--experimental-warning-apply-duration '100ms'
Warning is generated if requests take more than this duration.
--experimental-txn-mode-write-with-shared-buffer 'true'
--experimental-txn-mode-write-with-shared-buffer 'true'. It's deprecated, and will be decommissioned in v3.7. Use '--feature-gates=TxnModeWriteWithSharedBuffer=true' instead.
Enable the write transaction to use a shared buffer in its readonly check operations.
--experimental-bootstrap-defrag-threshold-megabytes
Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {

func (s *EtcdServer) NewUberApplier() apply.UberApplier {
return apply.NewUberApplier(s.lg, s.be, s.KV(), s.alarmStore, s.authStore, s.lessor, s.cluster, s, s, s.consistIndex,
s.Cfg.WarningApplyDuration, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.Cfg.QuotaBackendBytes)
s.Cfg.WarningApplyDuration, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.Cfg.QuotaBackendBytes)
}

func verifySnapshotIndex(snapshot raftpb.Snapshot, cindex uint64) {
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
// When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer.
var mode mvcc.ReadTxMode
if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer*/ {
if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer)*/ {
mode = mvcc.SharedBufReadTxMode
} else {
mode = mvcc.ConcurrentReadTxMode
Expand Down
3 changes: 2 additions & 1 deletion server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
apply2 "go.etcd.io/etcd/server/v3/etcdserver/apply"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.etcd.io/etcd/server/v3/features"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp"
"go.etcd.io/etcd/server/v3/storage/mvcc"
Expand Down Expand Up @@ -183,7 +184,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}(time.Now())

get := func() {
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.KV(), s.lessor)
}
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
Expand Down
Loading

0 comments on commit fb62034

Please sign in to comment.