From 2c255b2a10b0eec8a6fe65225b5cce3efd702a1d Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Mon, 30 Sep 2024 22:57:47 +0800 Subject: [PATCH] test: add StartReadCollectionForKafka unit test --- core/reader/replicate_channel_manager.go | 6 +- core/reader/replicate_channel_manager_test.go | 202 +++++++++++++++++- 2 files changed, 199 insertions(+), 9 deletions(-) diff --git a/core/reader/replicate_channel_manager.go b/core/reader/replicate_channel_manager.go index 8539fd1..89ab934 100644 --- a/core/reader/replicate_channel_manager.go +++ b/core/reader/replicate_channel_manager.go @@ -178,7 +178,7 @@ func (r *replicateChannelManager) startReadCollectionForKafka(ctx context.Contex } // send api event when the collection is not replicated and ctx is not done - if err := r.sendCreateCollectionvent(ctx, info, sourceDBInfo); err != nil { + if err := r.sendCreateCollectionEvent(ctx, info, sourceDBInfo); err != nil { return nil, err } @@ -246,7 +246,7 @@ func (r *replicateChannelManager) startReadCollectionForMilvus(ctx context.Conte zap.String("collection_name", info.Schema.Name)) return nil, nil } - err = r.sendCreateCollectionvent(ctx, info, sourceDBInfo) + err = r.sendCreateCollectionEvent(ctx, info, sourceDBInfo) if err != nil { return nil, err } @@ -270,7 +270,7 @@ func (r *replicateChannelManager) startReadCollectionForMilvus(ctx context.Conte return targetInfo, nil } -func (r *replicateChannelManager) sendCreateCollectionvent(ctx context.Context, info *pb.CollectionInfo, sourceDBInfo model.DatabaseInfo) error { +func (r *replicateChannelManager) sendCreateCollectionEvent(ctx context.Context, info *pb.CollectionInfo, sourceDBInfo model.DatabaseInfo) error { select { case <-ctx.Done(): log.Warn("context is done in the start read collection") diff --git a/core/reader/replicate_channel_manager_test.go b/core/reader/replicate_channel_manager_test.go index 23fd9f0..88726f9 100644 --- a/core/reader/replicate_channel_manager_test.go +++ b/core/reader/replicate_channel_manager_test.go @@ -52,6 +52,7 @@ func NewReplicateChannelManagerWithFactory(mqConfig config.MQConfig, readConfig config.ReaderConfig, metaOp api.MetaOp, msgPackCallback func(string, *msgstream.MsgPack), + downstream string, ) (api.ChannelManager, error) { streamDispatchClient, err := GetMsgDispatcherClient(factoryCreator, mqConfig, false) if err != nil { @@ -84,11 +85,11 @@ func NewReplicateChannelManagerWithFactory(mqConfig config.MQConfig, msgPackCallback: msgPackCallback, addCollectionLock: &deadlock.RWMutex{}, addCollectionCnt: new(int), - downstream: "milvus", + downstream: downstream, }, nil } -func TestNewReplicateChannelManager(t *testing.T) { +func TestNewReplicateChannelManagerForMilvus(t *testing.T) { t.Run("empty config", func(t *testing.T) { _, err := NewReplicateChannelManagerWithFactory(config.MQConfig{}, NewDefaultFactoryCreator(), nil, config.ReaderConfig{ MessageBufferSize: 10, @@ -98,7 +99,8 @@ func TestNewReplicateChannelManager(t *testing.T) { MaxBackOff: 1, }, }, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { - }) + }, + "milvus") assert.Error(t, err) }) @@ -118,7 +120,42 @@ func TestNewReplicateChannelManager(t *testing.T) { MaxBackOff: 1, }, }, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { - }) + }, "milvus") + assert.NoError(t, err) + }) +} + +func TestNewReplicateChannelManagerForKafka(t *testing.T) { + t.Run("empty config", func(t *testing.T) { + _, err := NewReplicateChannelManagerWithFactory(config.MQConfig{}, NewDefaultFactoryCreator(), nil, config.ReaderConfig{ + MessageBufferSize: 10, + Retry: config.RetrySettings{ + RetryTimes: 1, + InitBackOff: 1, + MaxBackOff: 1, + }, + }, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { + }, "kafka") + assert.Error(t, err) + }) + + t.Run("success", func(t *testing.T) { + factoryCreator := mocks.NewFactoryCreator(t) + factory := msgstream.NewMockMqFactory() + factoryCreator.EXPECT().NewPmsFactory(mock.Anything).Return(factory) + _, err := NewReplicateChannelManagerWithFactory(config.MQConfig{ + Pulsar: config.PulsarConfig{ + Address: "pulsar://localhost:6650", + }, + }, factoryCreator, nil, config.ReaderConfig{ + MessageBufferSize: 10, + Retry: config.RetrySettings{ + RetryTimes: 1, + InitBackOff: 1, + MaxBackOff: 1, + }, + }, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { + }, "kafka") assert.NoError(t, err) }) } @@ -167,7 +204,7 @@ func TestChannelUtils(t *testing.T) { }) } -func TestStartReadCollection(t *testing.T) { +func TestStartReadCollectionForMilvus(t *testing.T) { util.InitMilvusPkgParam() factoryCreator := mocks.NewFactoryCreator(t) @@ -188,7 +225,7 @@ func TestStartReadCollection(t *testing.T) { MaxBackOff: 1, }, }, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { - }) + }, "milvus") assert.NoError(t, err) manager.SetCtx(context.Background()) @@ -353,6 +390,159 @@ func TestStartReadCollection(t *testing.T) { }) } +func TestStartReadCollectionForKafka(t *testing.T) { + util.InitMilvusPkgParam() + + factoryCreator := mocks.NewFactoryCreator(t) + factory := msgstream.NewMockFactory(t) + factoryCreator.EXPECT().NewPmsFactory(mock.Anything).Return(factory) + + manager, err := NewReplicateChannelManagerWithFactory(config.MQConfig{ + Pulsar: config.PulsarConfig{ + Address: "pulsar://localhost:6650", + }, + }, factoryCreator, nil, config.ReaderConfig{ + MessageBufferSize: 10, + Retry: config.RetrySettings{ + RetryTimes: 1, + InitBackOff: 1, + MaxBackOff: 1, + }, + }, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { + }, "kafka") + assert.NoError(t, err) + manager.SetCtx(context.Background()) + + t.Run("context cancel", func(t *testing.T) { + ctx, cancelFunc := context.WithCancel(context.Background()) + cancelFunc() + err = manager.StartReadCollection(ctx, &pb.CollectionInfo{}, nil) + assert.Error(t, err) + }) + + realManager := manager.(*replicateChannelManager) + + stream := msgstream.NewMockMsgStream(t) + streamChan := make(chan *msgstream.MsgPack) + + factory.EXPECT().NewMsgStream(mock.Anything).Return(stream, nil).Maybe() + stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + stream.EXPECT().Chan().Return(streamChan).Maybe() + stream.EXPECT().Close().Return().Maybe() + + t.Run("read channel", func(t *testing.T) { + { + // start read + handler, err := realManager.startReadChannel(&model.SourceCollectionInfo{ + PChannel: "test_read_channel", + VChannel: "test_read_channel_v0", + CollectionID: 11001, + ShardNum: 1, + }, &model.TargetCollectionInfo{ + CollectionID: 21001, + CollectionName: "read_channel", + PartitionInfo: map[string]int64{ + "_default": 1101, + }, + PChannel: "ttest_read_channel", + VChannel: "ttest_read_channel_v0", + BarrierChan: util.NewOnceWriteChan(make(chan<- uint64)), + PartitionBarrierChan: map[int64]*util.OnceWriteChan[uint64]{ + 1101: util.NewOnceWriteChan(make(chan<- uint64)), + }, + }) + assert.NoError(t, err) + handler.startReadChannel() + assert.Equal(t, "ttest_read_channel", <-realManager.GetChannelChan()) + + _, err = realManager.startReadChannel(&model.SourceCollectionInfo{ + PChannel: "test_read_channel_2", + VChannel: "test_read_channel_2_v0", + CollectionID: 11002, + }, &model.TargetCollectionInfo{ + CollectionName: "read_channel_2", + PChannel: "ttest_read_channel_2", + VChannel: "ttest_read_channel_2_v0", + }) + assert.NoError(t, err) + } + { + assert.NotNil(t, realManager.GetMsgChan("ttest_read_channel")) + assert.Nil(t, realManager.GetMsgChan("no_exist_channel")) + } + { + // stop read + realManager.stopReadChannel("no_exist_channel", 11001) + realManager.stopReadChannel("test_read_channel", 11001) + realManager.stopReadChannel("test_read_channel", 11002) + } + }) + + t.Run("collection and partition", func(t *testing.T) { + // start collection + { + err := realManager.StartReadCollection(context.Background(), &pb.CollectionInfo{ + ID: 31001, + Schema: &schemapb.CollectionSchema{ + Name: "test", + }, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "collection_partition_p1", + }, + }, + PhysicalChannelNames: []string{"collection-partition-p1"}, + VirtualChannelNames: []string{"collection-partition-p1_v0"}, + }, nil) + assert.NoError(t, err) + } + + // partition not found + { + realManager.retryOptions = []retry.Option{ + retry.Attempts(1), + } + err := realManager.AddPartition(context.Background(), &pb.CollectionInfo{ + ID: 41, + Schema: &schemapb.CollectionSchema{ + Name: "test", + }, + }, &pb.PartitionInfo{}) + assert.Error(t, err) + } + + // add partition + { + err := realManager.AddPartition(context.Background(), &pb.CollectionInfo{ + ID: 31001, + Schema: &schemapb.CollectionSchema{ + Name: "test", + }, + }, &pb.PartitionInfo{ + PartitionName: "_default2", + }) + assert.NoError(t, err) + time.Sleep(100 * time.Millisecond) + + event := <-realManager.GetEventChan() + assert.Equal(t, api.ReplicateCreatePartition, event.EventType) + } + + // stop read collection + { + err := realManager.StopReadCollection(context.Background(), &pb.CollectionInfo{ + ID: 31001, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "collection_partition_p1", + }, + }, + }) + assert.NoError(t, err) + } + }) +} + func noRetry(handler *replicateChannelHandler) { handler.handlerOpts.RetryOptions = util.GetRetryOptions(config.RetrySettings{ RetryTimes: 1,