Skip to content

Commit

Permalink
fix get_should_read func function exclude logic error
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Nov 5, 2024
1 parent e64af8b commit ba90182
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 5 deletions.
10 changes: 5 additions & 5 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ func (e *MetaCDC) getChannelReader(info *meta.TaskInfo, replicateEntity *Replica
}
} else {
// skip the msg when db or collection name is not matched
collectionInfos := GetCollectionInfos(info, msgDatabaseName)
collectionInfos := GetCollectionInfos(info, msgDatabaseName, msgCollectionName)
if collectionInfos == nil {
return true
}
Expand Down Expand Up @@ -1353,15 +1353,15 @@ func GetShouldReadFunc(taskInfo *meta.TaskInfo) cdcreader.ShouldReadFunc {
log.Info("database is dropped", zap.String("database", databaseInfo.Name), zap.String("collection", currentCollectionName))
return false
}
taskCollectionInfos := GetCollectionInfos(taskInfo, databaseInfo.Name)
taskCollectionInfos := GetCollectionInfos(taskInfo, databaseInfo.Name, currentCollectionName)
if taskCollectionInfos == nil {
return false
}
return MatchCollection(taskInfo, taskCollectionInfos, databaseInfo.Name, currentCollectionName)
}
}

func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string) []model.CollectionInfo {
func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string, collectionName string) []model.CollectionInfo {
var taskCollectionInfos []model.CollectionInfo
if len(taskInfo.CollectionInfos) > 0 {
if dbName != cdcreader.DefaultDatabase {
Expand All @@ -1373,8 +1373,8 @@ func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string) []model.Collecti
taskCollectionInfos = taskInfo.DBCollections[dbName]
if taskCollectionInfos == nil {
isExclude := lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool {
db, _ := getCollectionNameFromFull(s)
return db == dbName
db, collection := getCollectionNameFromFull(s)
return db == dbName && collection == collectionName
})
if isExclude {
return nil
Expand Down
40 changes: 40 additions & 0 deletions server/cdc_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,46 @@ func TestShouldReadCollection(t *testing.T) {
},
}))
})

t.Run("db collections", func(t *testing.T) {
f := GetShouldReadFunc(&meta.TaskInfo{
DBCollections: map[string][]model.CollectionInfo{
"*": {
{
Name: "*",
},
},
},
ExcludeCollections: []string{"default.foo"},
})
assert.False(t, f(
&coremodel.DatabaseInfo{
Name: cdcreader.DefaultDatabase,
},
&pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Name: "foo",
},
}))
assert.True(t, f(
&coremodel.DatabaseInfo{
Name: cdcreader.DefaultDatabase,
},
&pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Name: "hoo",
},
}))
assert.True(t, f(
&coremodel.DatabaseInfo{
Name: "kind",
},
&pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{
Name: "foo",
},
}))
})
}

func TestList(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (c *CDCServer) handleRequest(cdcRequest *modelrequest.CDCRequest, writer ht
c.handleError(writer, fmt.Sprintf("fail to decode the %s request, error: %s", requestType, err.Error()), http.StatusInternalServerError)
return nil
}
log.Info("request receive", zap.String("type", requestType), zap.String("data", GetRequestInfo(requestModel)))
response, err := handler.handle(c.api, requestModel)
if err != nil {
code := http.StatusInternalServerError
Expand All @@ -138,3 +139,18 @@ func (c *CDCServer) handleRequest(cdcRequest *modelrequest.CDCRequest, writer ht

return response
}

func GetRequestInfo(request any) string {
r := request
if _, ok := request.(*modelrequest.CreateRequest); ok {
// deep copy
create := &modelrequest.CreateRequest{}
buffer, _ := json.Marshal(request)
_ = json.Unmarshal(buffer, create)
create.MilvusConnectParam.Password = ""
create.MilvusConnectParam.Token = ""
r = create
}
requestBytes, _ := json.Marshal(r)
return string(requestBytes)
}

0 comments on commit ba90182

Please sign in to comment.