Skip to content

Commit

Permalink
Handle the cdc and source milvus chaos
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Jan 25, 2024
1 parent 296dbac commit 354f3ba
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 52 deletions.
2 changes: 1 addition & 1 deletion core/model/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type TargetCollectionInfo struct {
PChannel string
VChannel string
BarrierChan chan<- uint64
PartitionBarrierChan map[int64]chan<- uint64
PartitionBarrierChan map[int64]chan<- uint64 // id is the source partition id
Dropped bool
DroppedPartition map[int64]struct{} // id is the source partition id
}
Expand Down
3 changes: 2 additions & 1 deletion core/reader/etcd_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ func (e *EtcdOp) WatchPartition(ctx context.Context, filter api.PartitionFilter)
info := &pb.PartitionInfo{}
err := proto.Unmarshal(event.Kv.Value, info)
if err != nil {
log.Warn("fail to unmarshal the partition info", zap.String("key", partitionKey), zap.Error(err))
log.Warn("fail to unmarshal the partition info",
zap.String("key", partitionKey), zap.String("value", util.Base64Encode(event.Kv.Value)), zap.Error(err))
continue
}
if info.State != pb.PartitionState_PartitionCreated ||
Expand Down
215 changes: 176 additions & 39 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@ type replicateChannelManager struct {

channelLock sync.RWMutex
channelHandlerMap map[string]*replicateChannelHandler
forwardLock sync.RWMutex
channelForwardMap map[string]struct{}

collectionLock sync.Mutex
replicateCollections map[int64]chan struct{}

partitionLock sync.Mutex
replicatePartitions map[int64]map[int64]chan struct{}

channelChan chan string
apiEventChan chan *api.ReplicateAPIEvent
channelChan chan string
apiEventChan chan *api.ReplicateAPIEvent
forwardReplicateChannel chan string

msgPackCallback func(string, *msgstream.MsgPack)

Expand Down Expand Up @@ -104,13 +107,15 @@ func NewReplicateChannelManager(mqConfig config.MQConfig,
InitBackOff: readConfig.Retry.InitBackOff,
MaxBackOff: readConfig.Retry.InitBackOff,
}),
messageBufferSize: readConfig.MessageBufferSize,
channelHandlerMap: make(map[string]*replicateChannelHandler),
replicateCollections: make(map[int64]chan struct{}),
replicatePartitions: make(map[int64]map[int64]chan struct{}),
channelChan: make(chan string, 10),
apiEventChan: make(chan *api.ReplicateAPIEvent, 10),
msgPackCallback: msgPackCallback,
messageBufferSize: readConfig.MessageBufferSize,
channelHandlerMap: make(map[string]*replicateChannelHandler),
channelForwardMap: make(map[string]struct{}),
replicateCollections: make(map[int64]chan struct{}),
replicatePartitions: make(map[int64]map[int64]chan struct{}),
channelChan: make(chan string, 10),
apiEventChan: make(chan *api.ReplicateAPIEvent, 10),
forwardReplicateChannel: make(chan string),
msgPackCallback: msgPackCallback,
}, nil
}

Expand Down Expand Up @@ -199,6 +204,7 @@ func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info
r.collectionLock.Lock()
if _, ok := r.replicateCollections[info.ID]; ok {
r.collectionLock.Unlock()
log.Info("the collection is already replicated", zap.String("collection_name", info.Schema.Name))
return nil
}
barrier := NewBarrier(len(info.StartPositions), func(msgTs uint64, b *Barrier) {
Expand All @@ -213,6 +219,10 @@ func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info
},
ReplicateParam: api.ReplicateParam{Database: sourceDBInfo.Name},
}:
r.droppedCollections.Store(info.ID, struct{}{})
for _, name := range info.PhysicalChannelNames {
r.stopReadChannel(name, info.ID)
}
}
})
r.replicateCollections[info.ID] = barrier.CloseChan
Expand Down Expand Up @@ -460,26 +470,125 @@ func (r *replicateChannelManager) startReadChannel(sourceInfo *model.SourceColle
channelHandler.forwardMsgFunc = r.forwardMsg
channelHandler.isDroppedCollection = r.isDroppedCollection
channelHandler.isDroppedPartition = r.isDroppedPartition
hasReplicateForTargetChannel := false
for _, handler := range r.channelHandlerMap {
handler.recordLock.RLock()
if handler.targetPChannel == targetInfo.PChannel {
hasReplicateForTargetChannel = true
}
handler.recordLock.RUnlock()
}
if hasReplicateForTargetChannel {
log.Info("channel already has replicate for target channel", zap.String("channel_name", sourceInfo.PChannelName))
go func() {
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
for {
select {
case <-tick.C:
log.Info("wait the new replicate channel", zap.String("target_pchannel", targetInfo.PChannel))
case targetChannel := <-r.forwardReplicateChannel:
isRepeatedChannel := false
r.channelLock.RLock()
for _, handler := range r.channelHandlerMap {
handler.recordLock.RLock()
if handler.targetPChannel == targetChannel {
isRepeatedChannel = true
}
handler.recordLock.RUnlock()
}
r.channelLock.RUnlock()
if isRepeatedChannel {
continue
}
log.Info("success to get the new replicate channel", zap.String("target_pchannel", targetInfo.PChannel))
channelHandler.targetPChannel = targetChannel
r.forwardLock.Lock()
r.channelForwardMap[channelHandler.targetPChannel] = struct{}{}
r.forwardLock.Unlock()
channelHandler.startReadChannel()
return
}
}
}()
} else {
channelHandler.startReadChannel()
r.forwardLock.Lock()
r.channelForwardMap[channelHandler.targetPChannel] = struct{}{}
r.forwardLock.Unlock()
}
r.channelHandlerMap[sourceInfo.PChannelName] = channelHandler
r.channelChan <- sourceInfo.PChannelName
return nil
}
if channelHandler.targetPChannel != targetInfo.PChannel {
log.Info("diff target pchannel", zap.String("target_channel", targetInfo.PChannel), zap.String("handler_channel", channelHandler.targetPChannel))
go func() {
r.forwardLock.Lock()
_, hasForward := r.channelForwardMap[targetInfo.PChannel]
if !hasForward {
r.channelForwardMap[targetInfo.PChannel] = struct{}{}
}
r.forwardLock.Unlock()
if !hasForward {
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
for {
select {
case <-tick.C:
log.Info("forward the diff replicate channel", zap.String("target_channel", targetInfo.PChannel))
case r.forwardReplicateChannel <- targetInfo.PChannel:
log.Info("success to forward the diff replicate channel", zap.String("target_channel", targetInfo.PChannel))
return
}
}
}
}()
}
channelHandler.AddCollection(sourceInfo.CollectionID, targetInfo)
return nil
}

func (r *replicateChannelManager) forwardMsg(targetPChannel string, msg *msgstream.MsgPack) {
r.channelLock.RLock()
defer r.channelLock.RUnlock()

var handler *replicateChannelHandler

for _, channelHandler := range r.channelHandlerMap {
if channelHandler.targetPChannel == targetPChannel {
handler = channelHandler
break
//forwardReplicateChannelFunc := func() {

Check failure on line 555 in core/reader/replicate_channel_manager.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 555 in core/reader/replicate_channel_manager.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
// tick := time.NewTicker(5 * time.Second)
// defer tick.Stop()
// for {
// select {
// case <-tick.C:
// log.Info("wait the replicate channel handler ready", zap.String("target_pchannel", targetPChannel))
// case r.forwardReplicateChannel <- targetPChannel:
// log.Info("success to forward replicate channel", zap.String("target_pchannel", targetPChannel))
// return
// }
// }
//}
//forwardReplicateChannelFunc()

_ = retry.Do(r.replicateCtx, func() error {
r.channelLock.RLock()
defer r.channelLock.RUnlock()

for _, channelHandler := range r.channelHandlerMap {
if channelHandler.targetPChannel == targetPChannel {
handler = channelHandler
break
}
}
}
if handler == nil {
select {
case r.forwardReplicateChannel <- targetPChannel:
log.Info("success to forward replicate channel", zap.String("target_pchannel", targetPChannel))
default:

Check failure on line 585 in core/reader/replicate_channel_manager.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
}
return errors.Newf("channel %s not found when forward the msg", targetPChannel)
}
return nil
}, r.retryOptions...)

if handler == nil {
r.apiEventChan <- &api.ReplicateAPIEvent{
EventType: api.ReplicateError,
Expand Down Expand Up @@ -510,6 +619,7 @@ func (r *replicateChannelManager) stopReadChannel(pChannelName string, collectio
return
}
channelHandler.RemoveCollection(collectionID)
// because the channel maybe be repeated to use for the forward message, NOT CLOSE
// if channelHandler.IsEmpty() {
// channelHandler.Close()
// }
Expand All @@ -524,7 +634,7 @@ type replicateChannelHandler struct {
metaOp api.MetaOp
// key: source milvus collectionID value: *model.TargetCollectionInfo
recordLock sync.RWMutex
collectionRecords map[int64]*model.TargetCollectionInfo
collectionRecords map[int64]*model.TargetCollectionInfo // key is suorce collection id
collectionNames map[string]int64
msgPackChan chan *msgstream.MsgPack
apiEventChan chan *api.ReplicateAPIEvent
Expand Down Expand Up @@ -566,26 +676,34 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection
collectionName := collectionInfo.Schema.Name
partitionName := partitionInfo.PartitionName

partitionLog := log.With(
zap.String("collection_name", collectionName),
zap.Int64("collection_id", collectionID),
zap.Int64("partition_id", partitionID),
zap.String("partition_name", partitionName),
)

targetInfo, err := r.getCollectionTargetInfo(collectionID)
if err != nil {
partitionLog.Warn("fail to get collection target info", zap.Error(err))
return err
}
if targetInfo == nil || targetInfo.Dropped {
log.Info("the collection is dropping or dropped, skip the partition",
zap.String("collection_name", collectionInfo.Schema.Name), zap.String("partition_name", partitionInfo.PartitionName))
partitionLog.Info("the collection is dropping or dropped, skip the partition")
return nil
}
r.recordLock.Lock()
defer r.recordLock.Unlock()
if targetInfo.PartitionBarrierChan[partitionID] != nil {
log.Info("the partition barrier chan is not nil", zap.Int64("collection_id", collectionID), zap.String("partition_name", partitionName), zap.Int64("partition_id", partitionID))
log.Info("the partition barrier chan is not nil")
return nil
}
targetInfo.PartitionBarrierChan[partitionID] = barrierChan
if partitionInfo.State == pb.PartitionState_PartitionDropping ||
partitionInfo.State == pb.PartitionState_PartitionDropped {
targetInfo.DroppedPartition[partitionID] = struct{}{}
}
log.Info("add partition info done")

// TODO use goroutine pool
go func() {
Expand Down Expand Up @@ -642,6 +760,10 @@ func (r *replicateChannelHandler) RemovePartitionInfo(collectionID int64, name s
delete(targetInfo.PartitionInfo, name)
}
delete(targetInfo.PartitionBarrierChan, id)
targetInfo.DroppedPartition[id] = struct{}{}

log.Info("remove partition info", zap.String("collection_name", targetInfo.CollectionName), zap.Int64("collection_id", collectionID),
zap.String("partition_name", name), zap.Int64("partition_id", id))
}

func (r *replicateChannelHandler) IsEmpty() bool {
Expand Down Expand Up @@ -678,6 +800,10 @@ func (r *replicateChannelHandler) startReadChannel() {

func (r *replicateChannelHandler) getCollectionTargetInfo(collectionID int64) (*model.TargetCollectionInfo, error) {
r.recordLock.RLock()
if r.isDroppedCollection != nil && r.isDroppedCollection(collectionID) {
r.recordLock.RUnlock()
return nil, nil
}
targetInfo, ok := r.collectionRecords[collectionID]
r.recordLock.RUnlock()
if ok {
Expand Down Expand Up @@ -765,17 +891,6 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
}
beginTS := pack.BeginTs

minTS := GetTSManager().GetMinTS(r.pChannelName)
if minTS == 0 {
r.sendErrEvent(errors.Newf("fail to get channel ts, channel: %s", r.pChannelName))
log.Warn("fail to get channel ts", zap.String("channel", r.pChannelName))
return nil
}
endTS := minTS
if beginTS > endTS {
beginTS = endTS
}

needTsMsg := false
pChannel := r.targetPChannel
for _, msg := range pack.Msgs {
Expand Down Expand Up @@ -856,24 +971,35 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
continue
}
if r.isDroppedPartition != nil && r.isDroppedPartition(realMsg.PartitionID) {
log.Info("skip delete msg because partition has been dropped", zap.Int64("partition_id", realMsg.PartitionID))
log.Info("skip drop partition msg because partition has been dropped", zap.Int64("partition_id", realMsg.PartitionID))
continue
}
if r.isDroppingPartition(realMsg.PartitionID, info) {
log.Info("skip drop partition msg because partition is dropping", zap.Int64("partition_id", realMsg.PartitionID))
continue
}
realMsg.CollectionID = info.CollectionID
if realMsg.PartitionName == "" {
err = errors.Newf("empty partition name")
log.Warn("invalid drop partition message", zap.Any("msg", msg))
log.Warn("invalid drop partition message, empty partition name", zap.Any("msg", msg))
}
_ = retry.Do(r.replicateCtx, func() error {
var partitionBarrierChan chan<- uint64
retryErr := retry.Do(r.replicateCtx, func() error {
err = nil
if info.PartitionBarrierChan[realMsg.PartitionID] == nil {
r.recordLock.RLock()
partitionBarrierChan = info.PartitionBarrierChan[realMsg.PartitionID]
if partitionBarrierChan == nil {
err = errors.Newf("not found the partition info [%d]", realMsg.PartitionID)
log.Warn("invalid drop partition message", zap.Any("msg", msg))
}
r.recordLock.RUnlock()
return err
}, r.retryOptions...)
if retryErr != nil && err == nil {
err = retryErr
}
if err == nil {
info.PartitionBarrierChan[realMsg.PartitionID] <- msg.EndTs()
partitionBarrierChan <- msg.EndTs()
if realMsg.PartitionName != "" {
realMsg.PartitionID, err = r.getPartitionID(sourceCollectionID, info, realMsg.PartitionName)
}
Expand All @@ -899,7 +1025,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
}
log.Info("receive msg", logFields...)
if pChannel != info.PChannel {
log.Info("forward the msg", zap.Any("msg", msg), zap.String("pChannel", pChannel), zap.String("info_pChannel", info.PChannel))
log.Info("forward the msg", zap.Any("msg_type", msg.Type().String()), zap.String("pChannel", pChannel), zap.String("info_pChannel", info.PChannel))
r.forwardMsgFunc(info.PChannel, &msgstream.MsgPack{
BeginTs: msg.BeginTs(),
EndTs: msg.EndTs(),
Expand All @@ -916,6 +1042,18 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
log.Warn("not support msg type", zap.Any("msg", msg))
}
}

minTS := GetTSManager().GetMinTS(r.pChannelName, len(newPack.Msgs) == 0)
if minTS == 0 {
r.sendErrEvent(errors.Newf("fail to get channel ts, channel: %s", r.pChannelName))
log.Warn("fail to get channel ts", zap.String("channel", r.pChannelName))
return nil
}
endTS := minTS
if beginTS > endTS {
beginTS = endTS
}

for _, position := range newPack.StartPositions {
position.ChannelName = pChannel
position.Timestamp = beginTS
Expand Down Expand Up @@ -1003,6 +1141,5 @@ func newReplicateChannelHandler(ctx context.Context,
}
channelHandler.AddCollection(sourceInfo.CollectionID, targetInfo)
GetTSManager().CollectTS(channelHandler.pChannelName, math.MaxUint64)
channelHandler.startReadChannel()
return channelHandler, nil
}
Loading

0 comments on commit 354f3ba

Please sign in to comment.