Skip to content

Commit

Permalink
Add corrector test
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Sep 25, 2023
1 parent 55bc821 commit 004bf81
Show file tree
Hide file tree
Showing 5 changed files with 579 additions and 11 deletions.
2 changes: 2 additions & 0 deletions internal/errors/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ package errors
var ErrIndexReplicaOne = New("nothing to correct when index replica is 1")

var ErrNoAvailableAgentToInsert = New("no available agent to insert replica")

var ErrFailedToCorrectReplicaNum = New("failed to correct replica number after correction process")
11 changes: 6 additions & 5 deletions internal/net/grpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,29 @@ import (

type contextKey string

const grpcMethodContextKey contextKey = "grpc_method"
// exported only for testing
const GrpcMethodContextKey contextKey = "grpc_method"

// WrapGRPCMethod returns a copy of parent in which the method associated with key (grpcMethodContextKey).
func WrapGRPCMethod(ctx context.Context, method string) context.Context {
m := FromGRPCMethod(ctx)
if m == "" {
return context.WithValue(ctx, grpcMethodContextKey, method)
return context.WithValue(ctx, GrpcMethodContextKey, method)

Check warning on line 30 in internal/net/grpc/context.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/context.go#L30

Added line #L30 was not covered by tests
}
if strings.HasSuffix(m, method) {
return ctx
}
return context.WithValue(ctx, grpcMethodContextKey, m+"/"+method)
return context.WithValue(ctx, GrpcMethodContextKey, m+"/"+method)

Check warning on line 35 in internal/net/grpc/context.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/context.go#L35

Added line #L35 was not covered by tests
}

// WithGRPCMethod returns a copy of parent in which the method associated with key (grpcMethodContextKey).
func WithGRPCMethod(ctx context.Context, method string) context.Context {
return context.WithValue(ctx, grpcMethodContextKey, method)
return context.WithValue(ctx, GrpcMethodContextKey, method)

Check warning on line 40 in internal/net/grpc/context.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/context.go#L40

Added line #L40 was not covered by tests
}

// FromGRPCMethod returns the value associated with this context for key (grpcMethodContextKey).
func FromGRPCMethod(ctx context.Context) string {
if v := ctx.Value(grpcMethodContextKey); v != nil {
if v := ctx.Value(GrpcMethodContextKey); v != nil {

Check warning on line 45 in internal/net/grpc/context.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/context.go#L45

Added line #L45 was not covered by tests
if method, ok := v.(string); ok {
return method
}
Expand Down
113 changes: 113 additions & 0 deletions internal/test/mock/grpc_testify_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (

"github.com/stretchr/testify/mock"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/net/grpc/pool"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

Expand Down Expand Up @@ -63,3 +66,113 @@ func (losm *ListObjectStreamMock) Send(res *payload.Object_List_Response) error
args := losm.Called(res)
return args.Error(0)
}

type ClientInternal struct {
mock.Mock
}

type (
CallOption = grpc.CallOption
DialOption = pool.DialOption
ClientConn = pool.ClientConn
)

func (c *ClientInternal) StartConnectionMonitor(ctx context.Context) (<-chan error, error) {
args := c.Called(ctx)
return args.Get(0).(<-chan error), args.Error(1)
}

func (c *ClientInternal) Connect(ctx context.Context, addr string, dopts ...DialOption) (pool.Conn, error) {
args := c.Called(ctx, addr, dopts)
return args.Get(0).(pool.Conn), args.Error(1)
}

func (c *ClientInternal) IsConnected(ctx context.Context, addr string) bool {
args := c.Called(ctx, addr)
return args.Bool(0)
}

func (c *ClientInternal) Disconnect(ctx context.Context, addr string) error {
args := c.Called(ctx, addr)
return args.Error(0)
}

func (c *ClientInternal) Range(ctx context.Context,
f func(ctx context.Context,
addr string,
conn *ClientConn,
copts ...CallOption) error) error {
args := c.Called(ctx, f)
return args.Error(0)
}

func (c *ClientInternal) RangeConcurrent(ctx context.Context,
concurrency int,
f func(ctx context.Context,
addr string,
conn *ClientConn,
copts ...CallOption) error) error {
args := c.Called(ctx, concurrency, f)
return args.Error(0)
}

func (c *ClientInternal) OrderedRange(ctx context.Context,
order []string,
f func(ctx context.Context,
addr string,
conn *ClientConn,
copts ...CallOption) error) error {
args := c.Called(ctx, order, f)
return args.Error(0)
}

func (c *ClientInternal) OrderedRangeConcurrent(ctx context.Context,
order []string,
concurrency int,
f func(ctx context.Context,
addr string,
conn *ClientConn,
copts ...CallOption) error) error {
args := c.Called(ctx, order, concurrency, f)
return args.Error(0)
}

func (c *ClientInternal) Do(ctx context.Context, addr string,
f func(ctx context.Context,
conn *ClientConn,
copts ...CallOption) (interface{}, error)) (interface{}, error) {
args := c.Called(ctx, addr, f)
return args.Get(0), args.Error(1)
}

func (c *ClientInternal) RoundRobin(ctx context.Context, f func(ctx context.Context,
conn *ClientConn,
copts ...CallOption) (interface{}, error)) (interface{}, error) {
args := c.Called(ctx, f)
return args.Get(0), args.Error(1)
}

func (c *ClientInternal) GetDialOption() []DialOption {
args := c.Called()
return args.Get(0).([]DialOption)
}

func (c *ClientInternal) GetCallOption() []CallOption {
args := c.Called()
return args.Get(0).([]CallOption)
}

func (c *ClientInternal) GetBackoff() backoff.Backoff {
args := c.Called()
return args.Get(0).(backoff.Backoff)
}

func (c *ClientInternal) ConnectedAddrs() []string {
args := c.Called()
return args.Get(0).([]string)
}

func (c *ClientInternal) Close(ctx context.Context) error {
args := c.Called(ctx)
return args.Error(0)
}
17 changes: 11 additions & 6 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ import (
"github.com/vdaas/vald/pkg/index/job/correction/config"
)

const (
insertMethod = "core.v1.Vald/Insert"
updateMethod = "core.v1.Vald/Update"
deleteMethod = "core.v1.Vald/Delete"
)

type Corrector interface {
Start(ctx context.Context) (<-chan error, error)
PreStop(ctx context.Context) error
Expand Down Expand Up @@ -385,7 +391,6 @@ func (c *correct) correctReplica(
}

// when there are less replicas than the correct number, add the extra replicas
// TODO: refine this logic. pretty complicated
if diff < 0 {
log.Infof("replica shortage of vector %s. inserting to other agents...", targetReplica.vec.GetId())
if len(availableAddrs) == 0 {
Expand All @@ -404,7 +409,7 @@ func (c *correct) correctReplica(
}

if diff < 0 {
return fmt.Errorf("failed to insert the sufficient amount of index to meet the replica setting")
return errors.ErrFailedToCorrectReplicaNum
}

return nil
Expand Down Expand Up @@ -433,15 +438,15 @@ func (c *correct) correctReplica(
}

if diff > 0 {
return fmt.Errorf("failed to delete the sufficient amount of index to meet the replica setting")
return errors.ErrFailedToCorrectReplicaNum
}

Check warning on line 442 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L441-L442

Added lines #L441 - L442 were not covered by tests

return nil
}

func (c *correct) updateObject(ctx context.Context, addr string, vector *payload.Object_Vector) error {
res, err := c.discoverer.GetClient().
Do(grpc.WithGRPCMethod(ctx, "core.v1.Vald/Update"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
Do(grpc.WithGRPCMethod(ctx, updateMethod), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
// TODO: use UpdateTimestamp when it's implemented because here we just want to update only the timestamp but not the vector
return vald.NewUpdateClient(conn).Update(ctx, &payload.Update_Request{
Vector: vector,
Expand All @@ -467,7 +472,7 @@ func (c *correct) updateObject(ctx context.Context, addr string, vector *payload

func (c *correct) insertObject(ctx context.Context, addr string, vector *payload.Object_Vector) error {
res, err := c.discoverer.GetClient().
Do(grpc.WithGRPCMethod(ctx, "core.v1.Vald/Insert"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
Do(grpc.WithGRPCMethod(ctx, insertMethod), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
return vald.NewInsertClient(conn).Insert(ctx, &payload.Insert_Request{
Vector: vector,
// FIXME: this should be deleted after Config.Timestamp deprecation
Expand All @@ -489,7 +494,7 @@ func (c *correct) insertObject(ctx context.Context, addr string, vector *payload

func (c *correct) deleteObject(ctx context.Context, addr string, vector *payload.Object_Vector) error {
res, err := c.discoverer.GetClient().
Do(grpc.WithGRPCMethod(ctx, "core.v1.Vald/Delete"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
Do(grpc.WithGRPCMethod(ctx, deleteMethod), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
return vald.NewRemoveClient(conn).Remove(ctx, &payload.Remove_Request{
Id: &payload.Object_ID{
Id: vector.GetId(),
Expand Down
Loading

0 comments on commit 004bf81

Please sign in to comment.