Skip to content

Commit

Permalink
More appropriate error handling approach
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Nov 1, 2023
1 parent bf7a35b commit 804d1a5
Show file tree
Hide file tree
Showing 17 changed files with 556 additions and 234 deletions.
6 changes: 6 additions & 0 deletions core/api/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
type Reader interface {
StartRead(ctx context.Context)
QuitRead(ctx context.Context)
ErrorChan() <-chan error
}

// DefaultReader All CDCReader implements should combine it
Expand All @@ -25,3 +26,8 @@ func (d *DefaultReader) StartRead(ctx context.Context) {
func (d *DefaultReader) QuitRead(ctx context.Context) {
log.Warn("QuitRead is not implemented, please check it")
}

func (d *DefaultReader) ErrorChan() <-chan error {
log.Warn("ErrorChan is not implemented, please check it")
return nil
}
21 changes: 21 additions & 0 deletions core/api/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"reflect"
"testing"
)

Expand Down Expand Up @@ -46,3 +47,23 @@ func TestDefaultReader_StartRead(t *testing.T) {
})
}
}

func TestDefaultReader_ErrorChan(t *testing.T) {
tests := []struct {
name string
want <-chan error
}{
{
name: "TestDefaultReader_ErrorChan",
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &DefaultReader{}
if got := d.ErrorChan(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("ErrorChan() = %v, want %v", got, tt.want)
}
})
}
}
3 changes: 3 additions & 0 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ReplicateAPIEvent struct {
CollectionInfo *pb.CollectionInfo
PartitionInfo *pb.PartitionInfo
ReplicateInfo *commonpb.ReplicateInfo
Error error
}

type ReplicateAPIEventType int
Expand All @@ -42,6 +43,8 @@ const (
ReplicateDropCollection
ReplicateCreatePartition
ReplicateDropPartition

ReplicateError = 100
)

type DefaultChannelManager struct{}
Expand Down
43 changes: 43 additions & 0 deletions core/mocks/reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 38 additions & 21 deletions core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package reader
import (
"context"
"sync"
"time"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/samber/lo"
"go.uber.org/zap"

Expand Down Expand Up @@ -57,7 +55,7 @@ type CollectionReader struct {
channelSeekPositions map[string]*msgpb.MsgPosition
replicateCollectionMap util.Map[int64, *pb.CollectionInfo]
replicateChannelMap util.Map[string, struct{}]
replicateChannelChan chan string
errChan chan error
shouldReadFunc ShouldReadFunc
startOnce sync.Once
quitOnce sync.Once
Expand All @@ -70,16 +68,18 @@ func NewCollectionReader(id string, channelManager api.ChannelManager, metaOp ap
metaOp: metaOp,
channelSeekPositions: seekPosition,
shouldReadFunc: shouldReadFunc,
replicateChannelChan: make(chan string, 10),
errChan: make(chan error),
}
return reader, nil
}

func (reader *CollectionReader) StartRead(ctx context.Context) {
reader.startOnce.Do(func() {
reader.metaOp.SubscribeCollectionEvent(reader.id, func(info *pb.CollectionInfo) bool {
log.Info("has watched to read collection", zap.String("name", info.Schema.Name))
collectionLog := log.With(zap.String("collection_name", info.Schema.Name), zap.Int64("collection_id", info.ID))
collectionLog.Info("has watched to read collection")
if !reader.shouldReadFunc(info) {
collectionLog.Info("the collection should not be read")
return false
}
startPositions := make([]*msgpb.MsgPosition, 0)
Expand All @@ -90,16 +90,19 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
})
}
if err := reader.channelManager.StartReadCollection(ctx, info, startPositions); err != nil {
log.Warn("fail to start to replicate the collection data in the watch process", zap.Int64("id", info.ID), zap.Error(err))
collectionLog.Warn("fail to start to replicate the collection data in the watch process", zap.Any("info", info), zap.Error(err))
reader.sendError(err)
}
reader.replicateCollectionMap.Store(info.ID, info)
log.Info("has started to read collection", zap.String("name", info.Schema.Name))
collectionLog.Info("has started to read collection")
return true
})
reader.metaOp.SubscribePartitionEvent(reader.id, func(info *pb.PartitionInfo) bool {
partitionLog := log.With(zap.Int64("collection_id", info.CollectionID), zap.Int64("partition_id", info.PartitionID), zap.String("partition_name", info.PartitionName))
partitionLog.Info("has watched to read partition")
collectionName := reader.metaOp.GetCollectionNameByID(ctx, info.CollectionID)
if collectionName == "" {
log.Info("the collection name is empty", zap.Int64("collection_id", info.CollectionID), zap.String("partition_name", info.PartitionName))
partitionLog.Info("the collection name is empty")
return true
}
tmpCollectionInfo := &pb.CollectionInfo{
Expand All @@ -109,17 +112,16 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
},
}
if !reader.shouldReadFunc(tmpCollectionInfo) {
partitionLog.Info("the partition should not be read", zap.String("name", collectionName))
return true
}

var err error
err = retry.Do(ctx, func() error {
err = reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
return err
}, retry.Sleep(time.Second))
err := reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
if err != nil {
log.Panic("fail to add partition", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName), zap.Error(err))
partitionLog.Warn("fail to add partition", zap.String("collection_name", collectionName), zap.Any("partition", info), zap.Error(err))
reader.sendError(err)
}
partitionLog.Info("has started to add partition")
return false
})
reader.metaOp.WatchCollection(ctx, nil)
Expand All @@ -130,12 +132,15 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
})
if err != nil {
log.Warn("get all collection failed", zap.Error(err))
reader.sendError(err)
return
}
seekPositions := lo.Values(reader.channelSeekPositions)
for _, info := range existedCollectionInfos {
log.Info("exist collection", zap.String("name", info.Schema.Name))
if err := reader.channelManager.StartReadCollection(ctx, info, seekPositions); err != nil {
log.Warn("fail to start to replicate the collection data", zap.Int64("id", info.ID), zap.Error(err))
log.Warn("fail to start to replicate the collection data", zap.Any("collection", info), zap.Error(err))
reader.sendError(err)
}
reader.replicateCollectionMap.Store(info.ID, info)
}
Expand All @@ -155,22 +160,29 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
log.Info("the collection is not in the watch list", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName))
return true
}
var err error
err = retry.Do(ctx, func() error {
err = reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
return err
}, retry.Sleep(time.Second))
err := reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
if err != nil {
log.Panic("fail to add partition", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName), zap.Error(err))
log.Warn("fail to add partition", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName), zap.Error(err))
reader.sendError(err)
}
return false
})
if err != nil {
log.Warn("get all partition failed", zap.Error(err))
reader.sendError(err)
}
})
}

func (reader *CollectionReader) sendError(err error) {
select {
case reader.errChan <- err:
log.Info("send the error", zap.String("id", reader.id), zap.Error(err))
default:
log.Info("skip the error, because it will quit soon", zap.String("id", reader.id), zap.Error(err))
}
}

func (reader *CollectionReader) QuitRead(ctx context.Context) {
reader.quitOnce.Do(func() {
reader.replicateCollectionMap.Range(func(_ int64, value *pb.CollectionInfo) bool {
Expand All @@ -182,5 +194,10 @@ func (reader *CollectionReader) QuitRead(ctx context.Context) {
})
reader.metaOp.UnsubscribeEvent(reader.id, api.CollectionEventType)
reader.metaOp.UnsubscribeEvent(reader.id, api.PartitionEventType)
reader.sendError(nil)
})
}

func (reader *CollectionReader) ErrorChan() <-chan error {
return reader.errChan
}
18 changes: 16 additions & 2 deletions core/reader/collection_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/zilliztech/milvus-cdc/core/pb"
)

// Before running this case, should start the etcd server
func TestCollectionReader(t *testing.T) {
etcdOp, err := NewEtcdOp(nil, "", "", "")
assert.NoError(t, err)
Expand Down Expand Up @@ -86,11 +87,20 @@ func TestCollectionReader(t *testing.T) {
return !strings.Contains(ci.Schema.Name, "test")
})
assert.NoError(t, err)
go func() {
select {
case <-time.After(time.Second):
t.Fail()
case err := <-reader.ErrorChan():
assert.Error(t, err)
}
}()
reader.StartRead(context.Background())
channelManager.EXPECT().StartReadCollection(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
channelManager.EXPECT().AddPartition(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
// add collection

{
// filter collection
field3 := &schemapb.FieldSchema{
FieldID: 100,
Name: "age",
Expand All @@ -106,6 +116,7 @@ func TestCollectionReader(t *testing.T) {
collectionBytes, _ := proto.Marshal(collectionInfo)
_, _ = realOp.etcdClient.Put(context.Background(), realOp.collectionPrefix()+"/1/100003", string(collectionBytes))

// filter partition
{
info := &pb.PartitionInfo{
State: pb.PartitionState_PartitionCreated,
Expand All @@ -116,8 +127,9 @@ func TestCollectionReader(t *testing.T) {
_, _ = realOp.etcdClient.Put(context.Background(), realOp.partitionPrefix()+"/100003/300047", getStringForMessage(info))
}
}
// add partition

{
// put collection
field3 := &schemapb.FieldSchema{
FieldID: 100,
Name: "age",
Expand All @@ -138,6 +150,8 @@ func TestCollectionReader(t *testing.T) {
}
collectionBytes, _ := proto.Marshal(collectionInfo)
_, _ = realOp.etcdClient.Put(context.Background(), realOp.collectionPrefix()+"/1/100004", string(collectionBytes))

// put partition
info := &pb.PartitionInfo{
State: pb.PartitionState_PartitionCreated,
PartitionName: "foo",
Expand Down
Loading

0 comments on commit 804d1a5

Please sign in to comment.