Skip to content

Commit

Permalink
add kvs range duration
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Aug 24, 2023
1 parent 9f099ed commit 5b647be
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
24 changes: 24 additions & 0 deletions internal/observability/metrics/agent/core/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (

brokenIndexStoreCountMetricsName = "agent_core_ngt_broken_index_store_count"
brokenIndexStoreCountMetricsDescription = "How many broken index generations have been stored"

kvsRangeDurationMetricsName = "agent_core_ngt_kvs_range_duration"
kvsRangeDurationMetricsDescription = "The duration of the kvs range method"
)

type ngtMetrics struct {
Expand Down Expand Up @@ -143,6 +146,15 @@ func (n *ngtMetrics) View() ([]*metrics.View, error) {
return nil, err
}

kvsRangeDuration, err := view.New(
view.MatchInstrumentName(kvsRangeDurationMetricsName),
view.WithSetDescription(kvsRangeDurationMetricsDescription),
view.WithSetAggregation(aggregation.LastValue{}),
)
if err != nil {
return nil, err
}

Check warning on line 156 in internal/observability/metrics/agent/core/ngt/ngt.go

View check run for this annotation

Codecov / codecov/patch

internal/observability/metrics/agent/core/ngt/ngt.go#L149-L156

Added lines #L149 - L156 were not covered by tests

return []*metrics.View{
&indexCount,
&uncommittedIndexCount,
Expand All @@ -153,6 +165,7 @@ func (n *ngtMetrics) View() ([]*metrics.View, error) {
&isIndexing,
&isSaving,
&brokenIndexCount,
&kvsRangeDuration,

Check warning on line 168 in internal/observability/metrics/agent/core/ngt/ngt.go

View check run for this annotation

Codecov / codecov/patch

internal/observability/metrics/agent/core/ngt/ngt.go#L168

Added line #L168 was not covered by tests
}, nil
}

Expand Down Expand Up @@ -238,6 +251,15 @@ func (n *ngtMetrics) Register(m metrics.Meter) error {
return err
}

kvsRangeDuration, err := m.AsyncInt64().Gauge(
kvsRangeDurationMetricsName,
metrics.WithDescription(kvsRangeDurationMetricsDescription),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}

Check warning on line 261 in internal/observability/metrics/agent/core/ngt/ngt.go

View check run for this annotation

Codecov / codecov/patch

internal/observability/metrics/agent/core/ngt/ngt.go#L254-L261

Added lines #L254 - L261 were not covered by tests

return m.RegisterCallback(
[]metrics.AsynchronousInstrument{
indexCount,
Expand All @@ -249,6 +271,7 @@ func (n *ngtMetrics) Register(m metrics.Meter) error {
isIndexing,
isSaving,
brokenIndexCount,
kvsRangeDuration,

Check warning on line 274 in internal/observability/metrics/agent/core/ngt/ngt.go

View check run for this annotation

Codecov / codecov/patch

internal/observability/metrics/agent/core/ngt/ngt.go#L274

Added line #L274 was not covered by tests
},
func(ctx context.Context) {
var indexing int64
Expand All @@ -270,6 +293,7 @@ func (n *ngtMetrics) Register(m metrics.Meter) error {
isIndexing.Observe(ctx, int64(indexing))
isSaving.Observe(ctx, int64(saving))
brokenIndexCount.Observe(ctx, int64(n.ngt.BrokenIndexCount()))
kvsRangeDuration.Observe(ctx, n.ngt.KvsRangeDuration())

Check warning on line 296 in internal/observability/metrics/agent/core/ngt/ngt.go

View check run for this annotation

Codecov / codecov/patch

internal/observability/metrics/agent/core/ngt/ngt.go#L296

Added line #L296 was not covered by tests
},
)
}
20 changes: 20 additions & 0 deletions pkg/agent/core/ngt/service/kvs/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/safety"
Expand All @@ -37,6 +38,7 @@ type BidiMap interface {
Range(ctx context.Context, f func(string, uint32, int64) bool)
Len() uint64
Close() error
RangeDuration() int64
}

type valueStructOu struct {
Expand Down Expand Up @@ -66,6 +68,11 @@ const (
// mask = 0xFFF.
)

var (
tmu sync.RWMutex
rangeDur int64
)

// New returns the bidi that satisfies the BidiMap interface.
func New(opts ...Option) BidiMap {
b := &bidi{
Expand Down Expand Up @@ -152,6 +159,8 @@ func (b *bidi) DeleteInverse(val uint32) (key string, ok bool) {

// Range retrieves all set keys and values and calls the callback function f.
func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) {
start := time.Now()

var wg sync.WaitGroup
for i := range b.uo {
idx := i
Expand All @@ -171,6 +180,11 @@ func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) {
}))
}
wg.Wait()

dur := time.Since(start).Nanoseconds()
tmu.Lock()
rangeDur = dur
tmu.Unlock()
}

// Len returns the length of the cache that is set in the bidi.
Expand All @@ -187,3 +201,9 @@ func (b *bidi) Close() error {
}
return b.eg.Wait()
}

func (b *bidi) RangeDuration() int64 {
tmu.RLock()
defer tmu.RUnlock()
return rangeDur

Check warning on line 208 in pkg/agent/core/ngt/service/kvs/kvs.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/kvs/kvs.go#L205-L208

Added lines #L205 - L208 were not covered by tests
}
5 changes: 5 additions & 0 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type NGT interface {
GetDimensionSize() int
Close(ctx context.Context) error
BrokenIndexCount() uint64
KvsRangeDuration() int64
}

type ngt struct {
Expand Down Expand Up @@ -1721,3 +1722,7 @@ func (n *ngt) BrokenIndexCount() uint64 {
func (n *ngt) ListObjectFunc(ctx context.Context, f func(uuid string, oid uint32, ts int64) bool) {
n.kvs.Range(ctx, f)
}

func (n *ngt) KvsRangeDuration() int64 {
return n.kvs.RangeDuration()

Check warning on line 1727 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1726-L1727

Added lines #L1726 - L1727 were not covered by tests
}

0 comments on commit 5b647be

Please sign in to comment.