Skip to content

*: expose SetGlobalGCBarrier/DeleteGlobalGCBarrier API for pd client #9361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
43 changes: 43 additions & 0 deletions client/clients/gc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type GCStatesClient interface {
DeleteGCBarrier(ctx context.Context, barrierID string) (*GCBarrierInfo, error)
// GetGCState gets the current GC state.
GetGCState(ctx context.Context) (GCState, error)
// SetGlobalGCBarrier sets (creates or updates) a global GC barrier.
SetGlobalGCBarrier(ctx context.Context, barrierID string, barrierTS uint64, ttl time.Duration) (*GlobalGCBarrierInfo, error)
// DeleteGlobalGCBarrier deletes a global GC barrier.
DeleteGlobalGCBarrier(ctx context.Context, barrierID string) (*GlobalGCBarrierInfo, error)
// Get the GC states from all keyspaces.
GetAllKeyspacesGCStates(ctx context.Context) (GCStates, error)
}

// InternalController is the interface for controlling GC execution.
Expand Down Expand Up @@ -84,6 +90,16 @@ type GCBarrierInfo struct {
getReqStartTime time.Time
}

// GlobalGCBarrierInfo represents the information of a global GC barrier.
type GlobalGCBarrierInfo struct {
BarrierID string
BarrierTS uint64
TTL time.Duration
// The time when the RPC that fetches the GC barrier info.
// It will be used as the basis for determining whether the barrier is expired.
getReqStartTime time.Time
}

// TTLNeverExpire is a special value for TTL that indicates the barrier never expires.
const TTLNeverExpire = time.Duration(math.MaxInt64)

Expand Down Expand Up @@ -111,6 +127,25 @@ func (b *GCBarrierInfo) isExpiredImpl(now time.Time) bool {
return now.Sub(b.getReqStartTime) > b.TTL
}


// NewGlobalGCBarrierInfo creates a new GCBarrierInfo instance.
func NewGlobalGCBarrierInfo(barrierID string, barrierTS uint64, ttl time.Duration, getReqStartTime time.Time) *GlobalGCBarrierInfo {
return &GlobalGCBarrierInfo{
BarrierID: barrierID,
BarrierTS: barrierTS,
TTL: ttl,
getReqStartTime: getReqStartTime,
}
}

// IsExpired checks whether the barrier is expired.
func (b *GlobalGCBarrierInfo) IsExpired() bool {
if b.TTL == TTLNeverExpire {
return false
}
return time.Now().Sub(b.getReqStartTime) > b.TTL
}

// GCState represents the information of the GC state.
//
//nolint:revive
Expand All @@ -120,3 +155,11 @@ type GCState struct {
GCSafePoint uint64
GCBarriers []*GCBarrierInfo
}

// GCStates represents the information of the GC state for all keyspaces.
type GCStates struct {
// Maps from keyspace id to GC state of that keyspace.
GCStates map[uint32]GCState
// All existing global GC barriers.
GlobalGCBarriers []*GlobalGCBarrierInfo
}
122 changes: 114 additions & 8 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,19 @@ func pbToGCBarrierInfo(pb *pdpb.GCBarrierInfo, reqStartTime time.Time) *gc.GCBar
)
}

func pbToGlobalGCBarrierInfo(pb *pdpb.GlobalGCBarrierInfo, reqStartTime time.Time) *gc.GlobalGCBarrierInfo {
if pb == nil {
return nil
}
ttl := saturatingStdDurationFromSeconds(pb.GetTtlSeconds())
return gc.NewGlobalGCBarrierInfo(
pb.GetBarrierId(),
pb.GetBarrierTs(),
ttl,
reqStartTime,
)
}

// SetGCBarrier sets (creates or updates) a GC barrier.
func (c gcStatesClient) SetGCBarrier(ctx context.Context, barrierID string, barrierTS uint64, ttl time.Duration) (*gc.GCBarrierInfo, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down Expand Up @@ -348,18 +361,111 @@ func (c gcStatesClient) GetGCState(ctx context.Context) (gc.GCState, error) {
}

gcState := resp.GetGcState()
return pbToGCState(gcState, start), nil
}

func pbToGCState(from *pdpb.GCState, reqStartTime time.Time) gc.GCState {
keyspaceID := constants.NullKeyspaceID
if gcState.KeyspaceScope != nil {
keyspaceID = gcState.KeyspaceScope.KeyspaceId
if from.KeyspaceScope != nil {
keyspaceID = from.KeyspaceScope.KeyspaceId
}
gcBarriers := make([]*gc.GCBarrierInfo, 0, len(gcState.GetGcBarriers()))
for _, b := range gcState.GetGcBarriers() {
gcBarriers = append(gcBarriers, pbToGCBarrierInfo(b, start))
gcBarriers := make([]*gc.GCBarrierInfo, 0, len(from.GetGcBarriers()))
for _, b := range from.GetGcBarriers() {
gcBarriers = append(gcBarriers, pbToGCBarrierInfo(b, reqStartTime))
}
return gc.GCState{
KeyspaceID: keyspaceID,
TxnSafePoint: gcState.GetTxnSafePoint(),
GCSafePoint: gcState.GetGcSafePoint(),
TxnSafePoint: from.GetTxnSafePoint(),
GCSafePoint: from.GetGcSafePoint(),
GCBarriers: gcBarriers,
}, nil
}
}

// SetGlobalGCBarrier sets (creates or updates) a GC barrier.
func (c gcStatesClient) SetGlobalGCBarrier(ctx context.Context, barrierID string, barrierTS uint64, ttl time.Duration) (*gc.GlobalGCBarrierInfo, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SetGlobalGCBarrier", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { metrics.CmdDurationSetGlobalGCBarrier.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.client.inner.option.Timeout)
defer cancel()
req := &pdpb.SetGlobalGCBarrierRequest{
Header: c.client.requestHeader(),
BarrierId: barrierID,
BarrierTs: barrierTS,
TtlSeconds: roundUpDurationToSeconds(ttl),
}
protoClient, ctx := c.client.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.SetGlobalGCBarrier(ctx, req)
if err = c.client.respForErr(metrics.CmdFailedDurationSetGlobalGCBarrier, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return pbToGlobalGCBarrierInfo(resp.GetNewBarrierInfo(), start), nil
}

// DeleteGlobalGCBarrier deletes a GC barrier.
func (c gcStatesClient) DeleteGlobalGCBarrier(ctx context.Context, barrierID string) (*gc.GlobalGCBarrierInfo, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.DeleteGlobalGCBarrier", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { metrics.CmdDurationDeleteGlobalGCBarrier.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.client.inner.option.Timeout)
defer cancel()
req := &pdpb.DeleteGlobalGCBarrierRequest{
Header: c.client.requestHeader(),
BarrierId: barrierID,
}
protoClient, ctx := c.client.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.DeleteGlobalGCBarrier(ctx, req)
if err = c.client.respForErr(metrics.CmdFailedDurationDeleteGlobalGCBarrier, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return pbToGlobalGCBarrierInfo(resp.GetDeletedBarrierInfo(), start), nil
}

// Get the GC states from all keyspaces.
func (c gcStatesClient) GetAllKeyspacesGCStates(ctx context.Context) (gc.GCStates, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetAllKeyspacesGCState", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { metrics.CmdDurationGetAllKeyspacesGCStates.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.client.inner.option.Timeout)
defer cancel()
req := &pdpb.GetAllKeyspacesGCStatesRequest{
Header: c.client.requestHeader(),
}
protoClient, ctx := c.client.getClientAndContext(ctx)
if protoClient == nil {
return gc.GCStates{}, errs.ErrClientGetProtoClient
}

resp, err := protoClient.GetAllKeyspacesGCStates(ctx, req)
if err = c.client.respForErr(metrics.CmdFailedDurationGetAllKeyspacesGCStates, start, err, resp.GetHeader()); err != nil {
return gc.GCStates{}, err
}

var ret gc.GCStates
ret.GCStates = make(map[uint32]gc.GCState, len(resp.GetGcStates()))
for _, state := range resp.GetGcStates() {
ret.GCStates[state.KeyspaceScope.KeyspaceId] = pbToGCState(state, start)
}
for _, barrier := range resp.GetGlobalGcBarriers() {
ret.GlobalGCBarriers = append(ret.GlobalGCBarriers, pbToGlobalGCBarrierInfo(barrier, start))
}
return ret, nil
}
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/tiancaiamao/kvproto v0.0.0-20250527075632-19ed6383b311
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20250521074834-db74bf0e3ac1 h1:peVY7sHw2RCDH8lEHEVMYe4M7GXyeXRKR8QDc5f5dM4=
github.com/pingcap/kvproto v0.0.0-20250521074834-db74bf0e3ac1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand All @@ -76,6 +74,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tiancaiamao/kvproto v0.0.0-20250527075632-19ed6383b311 h1:vtKCtyD9oYGlcS6hCRBBvfGQnuNVo76npmyxyq7vEsI=
github.com/tiancaiamao/kvproto v0.0.0-20250527075632-19ed6383b311/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
Expand Down
12 changes: 12 additions & 0 deletions client/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ var (
CmdDurationSetGCBarrier prometheus.Observer
CmdDurationDeleteGCBarrier prometheus.Observer
CmdDurationGetGCState prometheus.Observer
CmdDurationSetGlobalGCBarrier prometheus.Observer
CmdDurationDeleteGlobalGCBarrier prometheus.Observer
CmdDurationGetAllKeyspacesGCStates prometheus.Observer

CmdFailedDurationGetRegion prometheus.Observer
CmdFailedDurationTSOWait prometheus.Observer
Expand All @@ -300,7 +303,10 @@ var (
CmdFailedDurationAdvanceGCSafePoint prometheus.Observer
CmdFailedDurationSetGCBarrier prometheus.Observer
CmdFailedDurationDeleteGCBarrier prometheus.Observer
CmdFailedDurationSetGlobalGCBarrier prometheus.Observer
CmdFailedDurationDeleteGlobalGCBarrier prometheus.Observer
CmdFailedDurationGetGCState prometheus.Observer
CmdFailedDurationGetAllKeyspacesGCStates prometheus.Observer

InternalCmdDurationGetClusterInfo prometheus.Observer
InternalCmdDurationGetMembers prometheus.Observer
Expand Down Expand Up @@ -358,6 +364,9 @@ func initLabelValues() {
CmdDurationSetGCBarrier = cmdDuration.WithLabelValues("set_gc_barrier")
CmdDurationDeleteGCBarrier = cmdDuration.WithLabelValues("delete_gc_barrier")
CmdDurationGetGCState = cmdDuration.WithLabelValues("get_gc_state")
CmdDurationSetGlobalGCBarrier = cmdDuration.WithLabelValues("set_global_gc_barrier")
CmdDurationDeleteGlobalGCBarrier = cmdDuration.WithLabelValues("delete_global_gc_barrier")
CmdDurationGetAllKeyspacesGCStates = cmdDuration.WithLabelValues("get_all_keyspaces_gc_states")

CmdFailedDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
CmdFailedDurationTSOWait = cmdFailedDuration.WithLabelValues("wait")
Expand Down Expand Up @@ -385,6 +394,9 @@ func initLabelValues() {
CmdFailedDurationSetGCBarrier = cmdFailedDuration.WithLabelValues("set_gc_barrier")
CmdFailedDurationDeleteGCBarrier = cmdFailedDuration.WithLabelValues("delete_gc_barrier")
CmdFailedDurationGetGCState = cmdFailedDuration.WithLabelValues("get_gc_state")
CmdFailedDurationSetGlobalGCBarrier = cmdFailedDuration.WithLabelValues("set_global_gc_barrier")
CmdFailedDurationDeleteGlobalGCBarrier = cmdFailedDuration.WithLabelValues("delete_global_gc_barrier")
CmdFailedDurationGetAllKeyspacesGCStates = cmdDuration.WithLabelValues("get_all_keyspaces_gc_states")

InternalCmdDurationGetClusterInfo = internalCmdDuration.WithLabelValues("get_cluster_info")
InternalCmdDurationGetMembers = internalCmdDuration.WithLabelValues("get_members")
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go 1.23.0
require (
github.com/AlekSi/gocov-xml v1.0.0
github.com/BurntSushi/toml v0.3.1
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/aws/aws-sdk-go-v2/config v1.25.12
github.com/aws/aws-sdk-go-v2/credentials v1.16.10
github.com/aws/aws-sdk-go-v2/service/kms v1.26.5
Expand Down Expand Up @@ -222,3 +223,5 @@ require (
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/pingcap/kvproto => github.com/tiancaiamao/kvproto v0.0.0-20250527075632-19ed6383b311
Loading
Loading