Skip to content

Commit

Permalink
Fix the ut test and delete the comment
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Jan 26, 2024
1 parent 6b3ab31 commit 5cc6c6e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 25 deletions.
20 changes: 0 additions & 20 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,21 +552,6 @@ func (r *replicateChannelManager) startReadChannel(sourceInfo *model.SourceColle
func (r *replicateChannelManager) forwardMsg(targetPChannel string, msg *msgstream.MsgPack) {
var handler *replicateChannelHandler

//forwardReplicateChannelFunc := func() {
// tick := time.NewTicker(5 * time.Second)
// defer tick.Stop()
// for {
// select {
// case <-tick.C:
// log.Info("wait the replicate channel handler ready", zap.String("target_pchannel", targetPChannel))
// case r.forwardReplicateChannel <- targetPChannel:
// log.Info("success to forward replicate channel", zap.String("target_pchannel", targetPChannel))
// return
// }
// }
//}
//forwardReplicateChannelFunc()

_ = retry.Do(r.replicateCtx, func() error {
r.channelLock.RLock()
defer r.channelLock.RUnlock()
Expand All @@ -582,7 +567,6 @@ func (r *replicateChannelManager) forwardMsg(targetPChannel string, msg *msgstre
case r.forwardReplicateChannel <- targetPChannel:
log.Info("success to forward replicate channel", zap.String("target_pchannel", targetPChannel))
default:

}
return errors.Newf("channel %s not found when forward the msg", targetPChannel)
}
Expand Down Expand Up @@ -974,10 +958,6 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
log.Info("skip drop partition msg because partition has been dropped", zap.Int64("partition_id", realMsg.PartitionID))
continue
}
//if r.isDroppingPartition(realMsg.PartitionID, info) {
// log.Info("skip drop partition msg because partition is dropping", zap.Int64("partition_id", realMsg.PartitionID))
// continue
//}
realMsg.CollectionID = info.CollectionID
if realMsg.PartitionName == "" {
err = errors.Newf("empty partition name")
Expand Down
14 changes: 9 additions & 5 deletions core/reader/replicate_channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,11 @@ func TestStartReadCollection(t *testing.T) {
assert.Equal(t, "test_read_channel", <-realManager.channelChan)

err = realManager.startReadChannel(&model.SourceCollectionInfo{
PChannelName: "test_read_channel",
PChannelName: "test_read_channel_2",
CollectionID: 11002,
}, &model.TargetCollectionInfo{
CollectionName: "read_channel_2",
PChannel: "ttest_read_channel_2",
})
assert.NoError(t, err)
}
Expand Down Expand Up @@ -354,8 +355,9 @@ func TestReplicateChannelHandler(t *testing.T) {
stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Close().Return().Maybe()
stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Chan().Return(streamChan)
handler, err := newReplicateChannelHandler(context.Background(), &model.SourceCollectionInfo{PChannelName: "test_p", SeekPosition: &msgstream.MsgPosition{ChannelName: "test_p", MsgID: []byte("test")}}, &model.TargetCollectionInfo{PChannel: "test_p"}, api.TargetAPI(nil), &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory})
handler, err := newReplicateChannelHandler(context.Background(),
&model.SourceCollectionInfo{PChannelName: "test_p", SeekPosition: &msgstream.MsgPosition{ChannelName: "test_p", MsgID: []byte("test")}},
&model.TargetCollectionInfo{PChannel: "test_p"}, api.TargetAPI(nil), &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory})
assert.NoError(t, err)
noRetry(handler)
time.Sleep(100 * time.Microsecond)
Expand All @@ -373,7 +375,6 @@ func TestReplicateChannelHandler(t *testing.T) {
stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Close().Return().Maybe()
stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Chan().Return(streamChan)

handler, err := newReplicateChannelHandler(context.Background(), &model.SourceCollectionInfo{
PChannelName: "test_p",
Expand All @@ -393,6 +394,7 @@ func TestReplicateChannelHandler(t *testing.T) {

handler.AddCollection(1, &model.TargetCollectionInfo{
CollectionName: "test",
PChannel: "p1",
})
handler.RemoveCollection(1)
})
Expand All @@ -408,7 +410,6 @@ func TestReplicateChannelHandler(t *testing.T) {
stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Close().Return().Maybe()
stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Chan().Return(streamChan)
targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error 1")).Once()
targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(&model.CollectionInfo{
Partitions: map[string]int64{
Expand Down Expand Up @@ -438,6 +439,7 @@ func TestReplicateChannelHandler(t *testing.T) {
PartitionBarrierChan: map[int64]chan<- uint64{
1001: make(chan<- uint64),
},
DroppedPartition: make(map[int64]struct{}),
})
}()
err = handler.AddPartitionInfo(&pb.CollectionInfo{
Expand Down Expand Up @@ -510,9 +512,11 @@ func TestReplicateChannelHandler(t *testing.T) {
PartitionBarrierChan: map[int64]chan<- uint64{
1021: partitionBarrierChan,
},
DroppedPartition: make(map[int64]struct{}),
}, targetClient, &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory})
assert.NoError(t, err)
noRetry(handler)
handler.startReadChannel()
done := make(chan struct{})

go func() {
Expand Down

0 comments on commit 5cc6c6e

Please sign in to comment.