Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
refactor
  • Loading branch information
ykadowak committed Oct 3, 2023
1 parent 74bd1c6 commit d687013
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 68 deletions.
126 changes: 59 additions & 67 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/sync/errgroup"
"github.com/vdaas/vald/pkg/index/job/correction/config"
Expand Down Expand Up @@ -106,9 +107,8 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) {
return nil, err
}

Check warning on line 108 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L105-L108

Added lines #L105 - L108 were not covered by tests

// For debugging
c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool {
log.Debugf("index info: addr(%s), stored(%d), uncommitted(%d)", addr, info.GetStored(), info.GetUncommitted())
log.Infof("index info: addr(%s), stored(%d), uncommitted(%d)", addr, info.GetStored(), info.GetUncommitted())
return true
})

Check warning on line 113 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L110-L113

Added lines #L110 - L113 were not covered by tests

Expand All @@ -133,30 +133,22 @@ func (c *correct) correct(ctx context.Context) (err error) {
// This is used to know which agents possibly have the same index as the target replica.
// We can say this because, thanks to caching, there is no way that the target replica is
// in the agent that has already been corrected.
leftAgentAddrs := make([]string, len(c.agentAddrs))
n := copy(leftAgentAddrs, c.agentAddrs)
if n != len(c.agentAddrs) {
return fmt.Errorf("failed to copy agentAddrs")
}

// Vector with time after this should not be processed
correctionStartTime, err := getCorrectionStartTime(ctx)
correctionStartTime, err := correctionStartTime(ctx)
if err != nil {
log.Errorf("cannot determine correction start time: %w", err)
return err
}

Check warning on line 142 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L131-L142

Added lines #L131 - L142 were not covered by tests

curTargetAgent := 0
if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
// current address is the leftAgentAddrs[0] because this is OrderedRange and
// leftAgentAddrs is copied from c.agentAddrs
leftAgentAddrs = leftAgentAddrs[1:]

vc := vald.NewValdClient(conn)
stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{})
if err != nil {
return err
}
defer func() {
curTargetAgent++
}()

Check warning on line 151 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L144-L151

Added lines #L144 - L151 were not covered by tests

// context and errgroup for stream.Recv and correction
sctx, scancel := context.WithCancel(ctx)
Expand All @@ -173,6 +165,12 @@ func (c *correct) correct(ctx context.Context) (err error) {
var mu sync.Mutex
log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency)

vc := vald.NewValdClient(conn)
stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{})
if err != nil {
return err
}

Check warning on line 172 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L154-L172

Added lines #L154 - L172 were not covered by tests

// The number of items to be received in advance is not known in advance.
// This is because there is a possibility of new items being inserted during processing.
for {
Expand All @@ -181,9 +179,26 @@ func (c *correct) correct(ctx context.Context) (err error) {
if !errors.Is(sctx.Err(), context.Canceled) {
log.Errorf("context done unexpectedly: %v", sctx.Err())
}

Check warning on line 181 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L176-L181

Added lines #L176 - L181 were not covered by tests
goto Finalize

// Finalize
err = seg.Wait()
if err != nil {
log.Errorf("err group returned error: %v", err)
}

Check warning on line 187 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L184-L187

Added lines #L184 - L187 were not covered by tests

berr := bolteg.Wait()
if berr != nil {
log.Errorf("bbolt err group returned error: %v", err)
err = errors.Join(err, berr)
} else {
log.Info("bbolt all batch finished")
}

Check warning on line 195 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L189-L195

Added lines #L189 - L195 were not covered by tests

log.Infof("correction finished for agent %s", addr)
return err

Check warning on line 198 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L197-L198

Added lines #L197 - L198 were not covered by tests

default:
seg.Go(func() error {
seg.Go(safety.RecoverFunc(func() error {
mu.Lock()
// As long as we don't stream.Recv() from the stream, we do not consume the memory of the message.
// So by limiting the number of this errgroup.Go instances, we can limit the memory usage
Expand Down Expand Up @@ -235,7 +250,7 @@ func (c *correct) correct(ctx context.Context) (err error) {
addr: addr,
vec: vec,
},
leftAgentAddrs,
curTargetAgent,
); err != nil {
log.Errorf("failed to check consistency: %v", err)
return nil // continue other processes
Expand All @@ -245,25 +260,9 @@ func (c *correct) correct(ctx context.Context) (err error) {
c.checkedID.AsyncSet(bolteg, []byte(id), nil)

return nil

Check warning on line 262 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L260-L262

Added lines #L260 - L262 were not covered by tests
})
}))
}
}

Finalize:
err = seg.Wait()
if err != nil {
log.Errorf("err group returned error: %v", err)
}

berr := bolteg.Wait()
if berr != nil {
log.Errorf("bolt err group returned error: %v", err)
err = errors.Join(err, berr)
}
log.Info("bbolt all batch finished")

log.Infof("correction finished for agent %s", addr)
return err
},
); err != nil {
log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err)
Expand All @@ -279,39 +278,23 @@ type vectorReplica struct {
}

// Validate len(addrs) >= 2 before calling this function
func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorReplica, leftAgentAddrs []string) error {
// availableAddrs is the agents' addr that doesn't have the target replica thus is available to insert the replica
// to fix the index replica number if required.
availableAddrs := make([]string, 0, len(c.agentAddrs)-1)
for _, addr := range c.agentAddrs {
if addr != targetReplica.addr {
availableAddrs = append(availableAddrs, addr)
}
}
// idxだけ渡せば良い?c.addrsに全ての情報があるので?
func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorReplica, targetAgentIdx int) error {
// leftAgentAddrs is the agents' addr that hasn't been corrected yet.
leftAgentAddrs := c.agentAddrs[targetAgentIdx+1:]

// Vector with time after this should not be processed
correctionStartTime, err := getCorrectionStartTime(ctx)
correctionStartTime, err := correctionStartTime(ctx)
if err != nil {
log.Errorf("cannot determine correction start time: %w", err)
return err
}

Check warning on line 291 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L282-L291

Added lines #L282 - L291 were not covered by tests

foundReplicas := make([]*vectorReplica, 0, len(availableAddrs))
foundReplicas := make([]*vectorReplica, 0, len(c.agentAddrs))
var mu sync.Mutex
if err := c.discoverer.GetClient().OrderedRangeConcurrent(ctx, leftAgentAddrs, len(leftAgentAddrs),
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
// To avoid GetObject to myself. To maintain backward compatibility for withoug cache operation
if addr == targetReplica.addr {
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
default:
}
vc := vald.NewValdClient(conn)
vec, err := vc.GetObject(ctx, &payload.Object_VectorRequest{
vec, err := vald.NewValdClient(conn).GetObject(ctx, &payload.Object_VectorRequest{
Id: &payload.Object_ID{
Id: targetReplica.vec.GetId(),
},
Expand Down Expand Up @@ -344,12 +327,6 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep
addr: addr,
vec: vec,
})

// Remove this addr from availableAddrs because this addr has the target replica
// and not available to insert the replica to fix the index replica number
slices.DeleteFunc(availableAddrs, func(availableAddr string) bool {
return availableAddr == addr
})
mu.Unlock()

return nil

Check warning on line 332 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L325-L332

Added lines #L325 - L332 were not covered by tests
Expand All @@ -364,7 +341,7 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep
}

Check warning on line 341 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L339-L341

Added lines #L339 - L341 were not covered by tests

// check replica number
if err := c.correctReplica(ctx, targetReplica, foundReplicas, availableAddrs); err != nil {
if err := c.correctReplica(ctx, targetReplica, foundReplicas); err != nil {
return fmt.Errorf("failed to fix index replica: %w", err)
}

Check warning on line 346 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L344-L346

Added lines #L344 - L346 were not covered by tests

Expand Down Expand Up @@ -409,11 +386,12 @@ func (c *correct) correctTimestamp(ctx context.Context, targetReplica *vectorRep
return nil
}

// correctReplica corrects the number of replicas of the target vector.
// skipcq: GO-R1005
func (c *correct) correctReplica(
ctx context.Context,
targetReplica *vectorReplica,
foundReplicas []*vectorReplica,
availableAddrs []string,
) error {
// diff < 0 means there is less replica than the correct number
existReplica := len(foundReplicas) + 1
Expand All @@ -423,6 +401,20 @@ func (c *correct) correctReplica(
return nil
}

// availableAddrs = c.agentAddrs - foundReplicas - targetReplica.addr
availableAddrs := make([]string, 0, len(c.agentAddrs))
for _, addr := range c.agentAddrs {
if addr == targetReplica.addr {
continue
}
if slices.ContainsFunc(foundReplicas, func(replica *vectorReplica) bool {
return replica.addr == addr
}) {
continue
}
availableAddrs = append(availableAddrs, addr)
}

// when there are less replicas than the correct number, add the extra replicas
if diff < 0 {
log.Infof("replica shortage of vector %s. inserting to other agents...", targetReplica.vec.GetId())
Expand Down Expand Up @@ -596,7 +588,7 @@ func embedTime(ctx context.Context) context.Context {
return context.WithValue(ctx, correctionStartTimeKey, time.Now())

Check warning on line 588 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L583-L588

Added lines #L583 - L588 were not covered by tests
}

func getCorrectionStartTime(ctx context.Context) (time.Time, error) {
func correctionStartTime(ctx context.Context) (time.Time, error) {
v := ctx.Value(correctionStartTimeKey)
if t, ok := v.(time.Time); ok {
return t, nil
Expand Down
10 changes: 9 additions & 1 deletion pkg/index/job/correction/service/corrector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,17 @@ func Test_correct_correctReplica(t *testing.T) {
},
},
}

// agentAddrs = availableAddrs + target.addr + found.addr
// skipcq: CRT-D0001
c.agentAddrs = append(test.args.availableAddrs, test.args.target.addr)
for _, found := range test.args.found {
c.agentAddrs = append(c.agentAddrs, found.addr)
}

t.Run(test.name, func(tt *testing.T) {
tt.Parallel()
err := c.correctReplica(context.Background(), test.args.target, test.args.found, test.args.availableAddrs)
err := c.correctReplica(context.Background(), test.args.target, test.args.found)
if test.want.err != nil {
require.ErrorIs(t, test.want.err, err)
}
Expand Down

0 comments on commit d687013

Please sign in to comment.