From ba90182ada8bfb24bec6629ddae483196e636f6b Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 5 Nov 2024 11:28:32 +0800 Subject: [PATCH] fix get_should_read func function exclude logic error Signed-off-by: SimFG --- server/cdc_impl.go | 10 +++++----- server/cdc_impl_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ server/server.go | 16 ++++++++++++++++ 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/server/cdc_impl.go b/server/cdc_impl.go index 3622057..cbcc994 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -1061,7 +1061,7 @@ func (e *MetaCDC) getChannelReader(info *meta.TaskInfo, replicateEntity *Replica } } else { // skip the msg when db or collection name is not matched - collectionInfos := GetCollectionInfos(info, msgDatabaseName) + collectionInfos := GetCollectionInfos(info, msgDatabaseName, msgCollectionName) if collectionInfos == nil { return true } @@ -1353,7 +1353,7 @@ func GetShouldReadFunc(taskInfo *meta.TaskInfo) cdcreader.ShouldReadFunc { log.Info("database is dropped", zap.String("database", databaseInfo.Name), zap.String("collection", currentCollectionName)) return false } - taskCollectionInfos := GetCollectionInfos(taskInfo, databaseInfo.Name) + taskCollectionInfos := GetCollectionInfos(taskInfo, databaseInfo.Name, currentCollectionName) if taskCollectionInfos == nil { return false } @@ -1361,7 +1361,7 @@ func GetShouldReadFunc(taskInfo *meta.TaskInfo) cdcreader.ShouldReadFunc { } } -func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string) []model.CollectionInfo { +func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string, collectionName string) []model.CollectionInfo { var taskCollectionInfos []model.CollectionInfo if len(taskInfo.CollectionInfos) > 0 { if dbName != cdcreader.DefaultDatabase { @@ -1373,8 +1373,8 @@ func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string) []model.Collecti taskCollectionInfos = taskInfo.DBCollections[dbName] if taskCollectionInfos == nil { isExclude := lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool { - db, _ := getCollectionNameFromFull(s) - return db == dbName + db, collection := getCollectionNameFromFull(s) + return db == dbName && collection == collectionName }) if isExclude { return nil diff --git a/server/cdc_impl_test.go b/server/cdc_impl_test.go index 2646291..48299ad 100644 --- a/server/cdc_impl_test.go +++ b/server/cdc_impl_test.go @@ -835,6 +835,46 @@ func TestShouldReadCollection(t *testing.T) { }, })) }) + + t.Run("db collections", func(t *testing.T) { + f := GetShouldReadFunc(&meta.TaskInfo{ + DBCollections: map[string][]model.CollectionInfo{ + "*": { + { + Name: "*", + }, + }, + }, + ExcludeCollections: []string{"default.foo"}, + }) + assert.False(t, f( + &coremodel.DatabaseInfo{ + Name: cdcreader.DefaultDatabase, + }, + &pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "foo", + }, + })) + assert.True(t, f( + &coremodel.DatabaseInfo{ + Name: cdcreader.DefaultDatabase, + }, + &pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "hoo", + }, + })) + assert.True(t, f( + &coremodel.DatabaseInfo{ + Name: "kind", + }, + &pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "foo", + }, + })) + }) } func TestList(t *testing.T) { diff --git a/server/server.go b/server/server.go index 4717d12..6ab1010 100644 --- a/server/server.go +++ b/server/server.go @@ -126,6 +126,7 @@ func (c *CDCServer) handleRequest(cdcRequest *modelrequest.CDCRequest, writer ht c.handleError(writer, fmt.Sprintf("fail to decode the %s request, error: %s", requestType, err.Error()), http.StatusInternalServerError) return nil } + log.Info("request receive", zap.String("type", requestType), zap.String("data", GetRequestInfo(requestModel))) response, err := handler.handle(c.api, requestModel) if err != nil { code := http.StatusInternalServerError @@ -138,3 +139,18 @@ func (c *CDCServer) handleRequest(cdcRequest *modelrequest.CDCRequest, writer ht return response } + +func GetRequestInfo(request any) string { + r := request + if _, ok := request.(*modelrequest.CreateRequest); ok { + // deep copy + create := &modelrequest.CreateRequest{} + buffer, _ := json.Marshal(request) + _ = json.Unmarshal(buffer, create) + create.MilvusConnectParam.Password = "" + create.MilvusConnectParam.Token = "" + r = create + } + requestBytes, _ := json.Marshal(r) + return string(requestBytes) +}