Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add StartReadCollectionForKafka unit test #138

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (r *replicateChannelManager) startReadCollectionForKafka(ctx context.Contex
}

// send api event when the collection is not replicated and ctx is not done
if err := r.sendCreateCollectionvent(ctx, info, sourceDBInfo); err != nil {
if err := r.sendCreateCollectionEvent(ctx, info, sourceDBInfo); err != nil {
return nil, err
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func (r *replicateChannelManager) startReadCollectionForMilvus(ctx context.Conte
zap.String("collection_name", info.Schema.Name))
return nil, nil
}
err = r.sendCreateCollectionvent(ctx, info, sourceDBInfo)
err = r.sendCreateCollectionEvent(ctx, info, sourceDBInfo)
if err != nil {
return nil, err
}
Expand All @@ -270,7 +270,7 @@ func (r *replicateChannelManager) startReadCollectionForMilvus(ctx context.Conte
return targetInfo, nil
}

func (r *replicateChannelManager) sendCreateCollectionvent(ctx context.Context, info *pb.CollectionInfo, sourceDBInfo *model.DatabaseInfo) error {
func (r *replicateChannelManager) sendCreateCollectionEvent(ctx context.Context, info *pb.CollectionInfo, sourceDBInfo *model.DatabaseInfo) error {
select {
case <-ctx.Done():
log.Warn("context is done in the start read collection")
Expand Down
205 changes: 199 additions & 6 deletions core/reader/replicate_channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewReplicateChannelManagerWithFactory(mqConfig config.MQConfig,
readConfig config.ReaderConfig,
metaOp api.MetaOp,
msgPackCallback func(string, *msgstream.MsgPack),
downstream string,
) (api.ChannelManager, error) {
streamDispatchClient, err := GetMsgDispatcherClient(factoryCreator, mqConfig, false)
if err != nil {
Expand Down Expand Up @@ -84,11 +85,11 @@ func NewReplicateChannelManagerWithFactory(mqConfig config.MQConfig,
msgPackCallback: msgPackCallback,
addCollectionLock: &deadlock.RWMutex{},
addCollectionCnt: new(int),
downstream: "milvus",
downstream: downstream,
}, nil
}

func TestNewReplicateChannelManager(t *testing.T) {
func TestNewReplicateChannelManagerForMilvus(t *testing.T) {
t.Run("empty config", func(t *testing.T) {
_, err := NewReplicateChannelManagerWithFactory(config.MQConfig{}, NewDefaultFactoryCreator(), nil, config.ReaderConfig{
MessageBufferSize: 10,
Expand All @@ -98,7 +99,8 @@ func TestNewReplicateChannelManager(t *testing.T) {
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
})
},
"milvus")
assert.Error(t, err)
})

Expand All @@ -118,7 +120,42 @@ func TestNewReplicateChannelManager(t *testing.T) {
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
})
}, "milvus")
assert.NoError(t, err)
})
}

func TestNewReplicateChannelManagerForKafka(t *testing.T) {
t.Run("empty config", func(t *testing.T) {
_, err := NewReplicateChannelManagerWithFactory(config.MQConfig{}, NewDefaultFactoryCreator(), nil, config.ReaderConfig{
MessageBufferSize: 10,
Retry: config.RetrySettings{
RetryTimes: 1,
InitBackOff: 1,
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
}, "kafka")
assert.Error(t, err)
})

t.Run("success", func(t *testing.T) {
factoryCreator := mocks.NewFactoryCreator(t)
factory := msgstream.NewMockMqFactory()
factoryCreator.EXPECT().NewPmsFactory(mock.Anything).Return(factory)
_, err := NewReplicateChannelManagerWithFactory(config.MQConfig{
Pulsar: config.PulsarConfig{
Address: "pulsar://localhost:6650",
},
}, factoryCreator, nil, config.ReaderConfig{
MessageBufferSize: 10,
Retry: config.RetrySettings{
RetryTimes: 1,
InitBackOff: 1,
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
}, "kafka")
assert.NoError(t, err)
})
}
Expand Down Expand Up @@ -167,7 +204,7 @@ func TestChannelUtils(t *testing.T) {
})
}

func TestStartReadCollection(t *testing.T) {
func TestStartReadCollectionForMilvus(t *testing.T) {
util.InitMilvusPkgParam()

factoryCreator := mocks.NewFactoryCreator(t)
Expand All @@ -188,7 +225,7 @@ func TestStartReadCollection(t *testing.T) {
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
})
}, "milvus")
assert.NoError(t, err)
manager.SetCtx(context.Background())

Expand Down Expand Up @@ -305,6 +342,162 @@ func TestStartReadCollection(t *testing.T) {
VirtualChannelNames: []string{"collection-partition-p1_v0"},
}, nil)
assert.NoError(t, err)
channel := <-realManager.GetChannelChan()
assert.Equal(t, "collection-partition-p2", channel)
}

// partition not found
{
realManager.retryOptions = []retry.Option{
retry.Attempts(1),
}
err := realManager.AddPartition(context.Background(), &model.DatabaseInfo{}, &pb.CollectionInfo{
ID: 41,
Schema: &schemapb.CollectionSchema{
Name: "test",
},
}, &pb.PartitionInfo{})
assert.Error(t, err)
}

// add partition
{
err := realManager.AddPartition(context.Background(), &model.DatabaseInfo{}, &pb.CollectionInfo{
ID: 31001,
Schema: &schemapb.CollectionSchema{
Name: "test",
},
}, &pb.PartitionInfo{
PartitionName: "_default2",
})
assert.NoError(t, err)
time.Sleep(100 * time.Millisecond)

event := <-realManager.GetEventChan()
assert.Equal(t, api.ReplicateCreatePartition, event.EventType)
}

// stop read collection
{
err := realManager.StopReadCollection(context.Background(), &pb.CollectionInfo{
ID: 31001,
StartPositions: []*commonpb.KeyDataPair{
{
Key: "collection_partition_p1",
},
},
})
assert.NoError(t, err)
}
})
}

func TestStartReadCollectionForKafka(t *testing.T) {
util.InitMilvusPkgParam()

factoryCreator := mocks.NewFactoryCreator(t)
factory := msgstream.NewMockFactory(t)
factoryCreator.EXPECT().NewPmsFactory(mock.Anything).Return(factory)

manager, err := NewReplicateChannelManagerWithFactory(config.MQConfig{
Pulsar: config.PulsarConfig{
Address: "pulsar://localhost:6650",
},
}, factoryCreator, nil, config.ReaderConfig{
MessageBufferSize: 10,
Retry: config.RetrySettings{
RetryTimes: 1,
InitBackOff: 1,
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
}, "kafka")
assert.NoError(t, err)
manager.SetCtx(context.Background())

t.Run("context cancel", func(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()
err = manager.StartReadCollection(ctx, &model.DatabaseInfo{}, &pb.CollectionInfo{}, nil)
assert.Error(t, err)
})

realManager := manager.(*replicateChannelManager)
stream := msgstream.NewMockMsgStream(t)
streamChan := make(chan *msgstream.MsgPack)

factory.EXPECT().NewMsgStream(mock.Anything).Return(stream, nil).Maybe()
stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
stream.EXPECT().Chan().Return(streamChan).Maybe()
stream.EXPECT().Close().Return().Maybe()

t.Run("read channel", func(t *testing.T) {
{
// start read
handler, err := realManager.startReadChannel(&model.SourceCollectionInfo{
PChannel: "kafka_test_read_channel",
VChannel: "kafka_test_read_channel_v0",
CollectionID: 11001,
ShardNum: 1,
}, &model.TargetCollectionInfo{
CollectionID: 21001,
CollectionName: "read_channel",
PartitionInfo: map[string]int64{
"_default": 1101,
},
PChannel: "kafka_ttest_read_channel",
VChannel: "kafka_ttest_read_channel_v0",
BarrierChan: util.NewOnceWriteChan(make(chan<- uint64)),
PartitionBarrierChan: map[int64]*util.OnceWriteChan[uint64]{
1101: util.NewOnceWriteChan(make(chan<- uint64)),
},
})
assert.NoError(t, err)
handler.startReadChannel()
assert.Equal(t, "kafka_ttest_read_channel", <-realManager.GetChannelChan())

_, err = realManager.startReadChannel(&model.SourceCollectionInfo{
PChannel: "kafka_test_read_channel_2",
VChannel: "kafka_test_read_channel_2_v0",
CollectionID: 11002,
}, &model.TargetCollectionInfo{
CollectionName: "kafka_read_channel_2",
PChannel: "kafka_ttest_read_channel_2",
VChannel: "kafka_ttest_read_channel_2_v0",
})
assert.NoError(t, err)
}
{
assert.NotNil(t, realManager.GetMsgChan("kafka_ttest_read_channel"))
assert.Nil(t, realManager.GetMsgChan("no_exist_channel"))
}
{
// stop read
realManager.stopReadChannel("no_exist_channel", 11001)
realManager.stopReadChannel("kafka_test_read_channel", 11001)
realManager.stopReadChannel("kafka_test_read_channel", 11002)
}
})

t.Run("collection and partition", func(t *testing.T) {
// start read collection
{
err := realManager.StartReadCollection(context.Background(), &model.DatabaseInfo{}, &pb.CollectionInfo{
ID: 31001,
Schema: &schemapb.CollectionSchema{
Name: "test",
},
StartPositions: []*commonpb.KeyDataPair{
{
Key: "collection_partition_p1",
},
},
PhysicalChannelNames: []string{"collection-partition-p1"},
VirtualChannelNames: []string{"collection-partition-p1_v0"},
}, nil)
assert.NoError(t, err)
event := <-realManager.GetEventChan()
assert.Equal(t, api.ReplicateCreateCollection, event.EventType)
}

// partition not found
Expand Down
3 changes: 3 additions & 0 deletions server/model/request/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ func GetTask(taskInfo *meta.TaskInfo) Task {
taskInfo.MilvusConnectParam.Username = ""
taskInfo.MilvusConnectParam.Password = ""
taskInfo.MilvusConnectParam.Token = ""
taskInfo.KafkaConnectParam.SASL.Username = ""
taskInfo.KafkaConnectParam.SASL.Password = ""
return Task{
TaskID: taskInfo.TaskID,
MilvusConnectParam: taskInfo.MilvusConnectParam,
KafkaConnectParam: taskInfo.KafkaConnectParam,
CollectionInfos: taskInfo.CollectionInfos,
State: taskInfo.State.String(),
LastPauseReason: taskInfo.Reason,
Expand Down
Loading