diff --git a/.golangci.yml b/.golangci.yml index e4bb6246..0cbcb801 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -2,6 +2,7 @@ run: skip-dirs: - pb - mocks + - tool linters: disable-all: true diff --git a/Makefile b/Makefile index f2640ac7..cdd2e29d 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ lint-fix: @echo "Running gofumpt fix" @gofumpt -l -w ./ @echo "Running gci fix" - @gci write ./ --skip-generated -s standard -s default -s "prefix(github.com/zilliztech)" --custom-order + @gci write ./ -s standard -s default -s "prefix(github.com/zilliztech)" --custom-order --skip-generated static-check: @echo "Running go-lint check:" diff --git a/codecov.yml b/codecov.yml index 500593b4..a3e262c3 100644 --- a/codecov.yml +++ b/codecov.yml @@ -39,4 +39,7 @@ ignore: - "examples/*" - "tests/*" - "**/mocks/*" - - "*_gen.go" \ No newline at end of file + - "*_gen.go" + - "server/main/*" + - "server/tool/*" + - "server/configs/*" \ No newline at end of file diff --git a/core/reader/channel_reader.go b/core/reader/channel_reader.go index d8c840a3..81235fa7 100644 --- a/core/reader/channel_reader.go +++ b/core/reader/channel_reader.go @@ -59,7 +59,8 @@ var _ api.Reader = (*ChannelReader)(nil) func NewChannelReader(channelName, seekPosition string, mqConfig config.MQConfig, dataHandler func(context.Context, *msgstream.MsgPack) bool, - creator FactoryCreator) (api.Reader, error) { + creator FactoryCreator, +) (api.Reader, error) { channelReader := &ChannelReader{ factoryCreator: creator, channelName: channelName, diff --git a/core/reader/etcd_op.go b/core/reader/etcd_op.go index b17be5ea..4d2b9615 100644 --- a/core/reader/etcd_op.go +++ b/core/reader/etcd_op.go @@ -554,7 +554,6 @@ func (e *EtcdOp) internalGetAllPartition(ctx context.Context, filters []api.Part existedPartitionInfos = append(existedPartitionInfos, info) } return existedPartitionInfos, nil - } func (e *EtcdOp) GetAllPartition(ctx context.Context, filter api.PartitionFilter) ([]*pb.PartitionInfo, error) { diff --git a/core/reader/replicate_channel_manager.go b/core/reader/replicate_channel_manager.go index 6d9f45c3..cd8e68ce 100644 --- a/core/reader/replicate_channel_manager.go +++ b/core/reader/replicate_channel_manager.go @@ -493,9 +493,9 @@ func (r *replicateChannelManager) stopReadChannel(pChannelName string, collectio return } channelHandler.RemoveCollection(collectionID) - //if channelHandler.IsEmpty() { + // if channelHandler.IsEmpty() { // channelHandler.Close() - //} + // } } type replicateChannelHandler struct { @@ -747,7 +747,6 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa Msgs: make([]msgstream.TsMsg, 0), } beginTS := pack.BeginTs - endTS := pack.EndTs minTS := GetTSManager().GetMinTS(r.pChannelName) if minTS == 0 { @@ -755,7 +754,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa log.Warn("fail to get channel ts", zap.String("channel", r.pChannelName)) return nil } - endTS = minTS + endTS := minTS if beginTS > endTS { beginTS = endTS } diff --git a/core/reader/replicate_channel_manager_test.go b/core/reader/replicate_channel_manager_test.go index 7624c98f..4e6399e4 100644 --- a/core/reader/replicate_channel_manager_test.go +++ b/core/reader/replicate_channel_manager_test.go @@ -34,9 +34,11 @@ import ( "github.com/zilliztech/milvus-cdc/core/api" "github.com/zilliztech/milvus-cdc/core/config" + "github.com/zilliztech/milvus-cdc/core/log" "github.com/zilliztech/milvus-cdc/core/mocks" "github.com/zilliztech/milvus-cdc/core/model" "github.com/zilliztech/milvus-cdc/core/pb" + "github.com/zilliztech/milvus-cdc/core/util" ) func TestNewReplicateChannelManager(t *testing.T) { @@ -255,6 +257,9 @@ func TestStartReadCollection(t *testing.T) { } err := realManager.AddPartition(context.Background(), &pb.CollectionInfo{ ID: 41, + Schema: &schemapb.CollectionSchema{ + Name: "test", + }, }, &pb.PartitionInfo{}) assert.Error(t, err) } @@ -297,6 +302,14 @@ func TestStartReadCollection(t *testing.T) { }) } +func noRetry(handler *replicateChannelHandler) { + handler.retryOptions = util.GetRetryOptions(config.RetrySettings{ + RetryTimes: 1, + MaxBackOff: 1, + InitBackOff: 1, + }) +} + func TestReplicateChannelHandler(t *testing.T) { t.Run("fail to new msg stream", func(t *testing.T) { factory := msgstream.NewMockFactory(t) @@ -337,11 +350,12 @@ func TestReplicateChannelHandler(t *testing.T) { streamChan := make(chan *msgstream.MsgPack) close(streamChan) stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - stream.EXPECT().Close().Return().Once() + stream.EXPECT().Close().Return().Maybe() stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once() stream.EXPECT().Chan().Return(streamChan).Once() handler, err := newReplicateChannelHandler(context.Background(), &model.SourceCollectionInfo{PChannelName: "test_p", SeekPosition: &msgstream.MsgPosition{ChannelName: "test_p", MsgID: []byte("test")}}, &model.TargetCollectionInfo{PChannel: "test_p"}, api.TargetAPI(nil), &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory}) assert.NoError(t, err) + noRetry(handler) time.Sleep(100 * time.Microsecond) handler.Close() }) @@ -355,9 +369,9 @@ func TestReplicateChannelHandler(t *testing.T) { factory.EXPECT().NewTtMsgStream(mock.Anything).Return(stream, nil) stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - stream.EXPECT().Close().Return().Once() + stream.EXPECT().Close().Return().Maybe() stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once() - stream.EXPECT().Chan().Return(streamChan).Once().Maybe() + stream.EXPECT().Chan().Return(streamChan) handler, err := newReplicateChannelHandler(context.Background(), &model.SourceCollectionInfo{ PChannelName: "test_p", @@ -370,6 +384,7 @@ func TestReplicateChannelHandler(t *testing.T) { CollectionName: "foo", }, targetClient, &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory}) assert.NoError(t, err) + noRetry(handler) time.Sleep(100 * time.Millisecond) assert.True(t, handler.containCollection("foo")) handler.Close() @@ -389,10 +404,10 @@ func TestReplicateChannelHandler(t *testing.T) { factory.EXPECT().NewTtMsgStream(mock.Anything).Return(stream, nil) stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - stream.EXPECT().Close().Return().Once() + stream.EXPECT().Close().Return().Maybe() stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once() - stream.EXPECT().Chan().Return(streamChan).Once() - targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once() + stream.EXPECT().Chan().Return(streamChan) + targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error 1")).Once() targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(&model.CollectionInfo{ Partitions: map[string]int64{ "p1": 10001, @@ -406,6 +421,11 @@ func TestReplicateChannelHandler(t *testing.T) { return newReplicateChannelHandler(context.Background(), &model.SourceCollectionInfo{CollectionID: 1, PChannelName: "test_p", SeekPosition: &msgstream.MsgPosition{ChannelName: "test_p", MsgID: []byte("test")}}, &model.TargetCollectionInfo{CollectionID: 100, CollectionName: "test", PChannel: "test_p"}, api.TargetAPI(targetClient), &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory}) }() assert.NoError(t, err) + handler.retryOptions = util.GetRetryOptions(config.RetrySettings{ + RetryTimes: 3, + MaxBackOff: 1, + InitBackOff: 1, + }) time.Sleep(100 * time.Millisecond) handler.Close() @@ -418,7 +438,7 @@ func TestReplicateChannelHandler(t *testing.T) { }, }) }() - _ = handler.AddPartitionInfo(&pb.CollectionInfo{ + err = handler.AddPartitionInfo(&pb.CollectionInfo{ ID: 2, Schema: &schemapb.CollectionSchema{ Name: "test2", @@ -427,6 +447,7 @@ func TestReplicateChannelHandler(t *testing.T) { PartitionID: 2001, PartitionName: "p2", }, make(chan<- uint64)) + assert.NoError(t, err) time.Sleep(1500 * time.Millisecond) handler.RemovePartitionInfo(2, "p2", 10002) @@ -434,7 +455,7 @@ func TestReplicateChannelHandler(t *testing.T) { assert.NotNil(t, handler.msgPackChan) // test updateTargetPartitionInfo - targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once() + targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error 2")).Once() targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(&model.CollectionInfo{ Partitions: map[string]int64{ "p1": 30001, @@ -456,6 +477,8 @@ func TestReplicateChannelHandler(t *testing.T) { }) t.Run("handle msg pack", func(t *testing.T) { + GetTSManager().EmptyTS() + factory := msgstream.NewMockFactory(t) stream := msgstream.NewMockMsgStream(t) targetClient := mocks.NewTargetAPI(t) @@ -463,7 +486,7 @@ func TestReplicateChannelHandler(t *testing.T) { factory.EXPECT().NewTtMsgStream(mock.Anything).Return(stream, nil) stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - stream.EXPECT().Close().Return().Once() + stream.EXPECT().Close().Return().Maybe() stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once() stream.EXPECT().Chan().Return(streamChan) @@ -487,6 +510,7 @@ func TestReplicateChannelHandler(t *testing.T) { }, }, targetClient, &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory}) assert.NoError(t, err) + noRetry(handler) done := make(chan struct{}) go func() { @@ -551,6 +575,7 @@ func TestReplicateChannelHandler(t *testing.T) { }() // create collection msg / create partition msg / timetick msg + log.Info("create collection msg / create partition msg / timetick msg") streamChan <- &msgstream.MsgPack{ BeginTs: 1, EndTs: 2, @@ -617,6 +642,7 @@ func TestReplicateChannelHandler(t *testing.T) { } // insert msg + log.Info("insert msg") streamChan <- &msgstream.MsgPack{ BeginTs: 1, EndTs: 2, @@ -651,6 +677,7 @@ func TestReplicateChannelHandler(t *testing.T) { } // delete msg + log.Info("delete msg") streamChan <- &msgstream.MsgPack{ BeginTs: 1, EndTs: 2, @@ -699,6 +726,7 @@ func TestReplicateChannelHandler(t *testing.T) { } // drop collection msg + log.Info("drop collection msg") streamChan <- &msgstream.MsgPack{ BeginTs: 1, EndTs: 2, @@ -730,6 +758,7 @@ func TestReplicateChannelHandler(t *testing.T) { } // drop partition msg + log.Info("drop partition msg") streamChan <- &msgstream.MsgPack{ BeginTs: 1, EndTs: 2, @@ -762,6 +791,8 @@ func TestReplicateChannelHandler(t *testing.T) { }, } + // close + log.Info("close") handler.Close() close(streamChan) <-done diff --git a/core/reader/ts_manager.go b/core/reader/ts_manager.go index 119160f4..a237121a 100644 --- a/core/reader/ts_manager.go +++ b/core/reader/ts_manager.go @@ -116,3 +116,14 @@ func (m *tsManager) GetMinTS(channelName string) uint64 { TSMetricVec.WithLabelValues(channelName).Set(float64(msgTime)) return minTS } + +// EmptyTS Only for test +func (m *tsManager) EmptyTS() { + for k := range m.channelTS.GetUnsafeMap() { + m.channelTS.Delete(k) + } + for k := range m.channelRef.GetUnsafeMap() { + m.channelRef.Delete(k) + } + m.retryOptions = util.NoRetryOption() +} diff --git a/core/util/retry_option.go b/core/util/retry_option.go index 8b5bbc13..054e69e5 100644 --- a/core/util/retry_option.go +++ b/core/util/retry_option.go @@ -46,3 +46,11 @@ func GetRetryOptions(c config.RetrySettings) []retry.Option { retry.MaxSleepTime(time.Duration(c.MaxBackOff) * time.Second), } } + +func NoRetryOption() []retry.Option { + return []retry.Option{ + retry.Attempts(1), + retry.Sleep(time.Second), + retry.MaxSleepTime(time.Second), + } +} diff --git a/core/writer/channel_writer.go b/core/writer/channel_writer.go index 3b8fc1f2..1eefd9de 100644 --- a/core/writer/channel_writer.go +++ b/core/writer/channel_writer.go @@ -59,7 +59,8 @@ type ChannelWriter struct { func NewChannelWriter(dataHandler api.DataHandler, writerConfig config.WriterConfig, - droppedObjs map[string]map[string]uint64) api.Writer { + droppedObjs map[string]map[string]uint64, +) api.Writer { w := &ChannelWriter{ dataHandler: dataHandler, messageManager: NewReplicateMessageManager(dataHandler, writerConfig.MessageBufferSize), diff --git a/core/writer/milvus_handler_test.go b/core/writer/milvus_handler_test.go index 7fc6af58..b51e29e4 100644 --- a/core/writer/milvus_handler_test.go +++ b/core/writer/milvus_handler_test.go @@ -36,6 +36,8 @@ import ( "google.golang.org/grpc" "github.com/zilliztech/milvus-cdc/core/api" + "github.com/zilliztech/milvus-cdc/core/config" + "github.com/zilliztech/milvus-cdc/core/util" ) func TestDataHandler(t *testing.T) { @@ -68,7 +70,13 @@ func TestDataHandler(t *testing.T) { dataHandler, err := NewMilvusDataHandler(AddressOption("localhost:50051")) assert.NoError(t, err) dataHandler.ignorePartition = true - ctx := context.Background() + dataHandler.retryOptions = util.GetRetryOptions(config.RetrySettings{ + RetryTimes: 1, + MaxBackOff: 1, + InitBackOff: 1, + }) + ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancelFunc() // create collection t.Run("create collection", func(t *testing.T) { @@ -384,31 +392,35 @@ func TestDataHandler(t *testing.T) { t.Run("flush", func(t *testing.T) { { - milvusService.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(&milvuspb.BoolResponse{ + call := milvusService.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(&milvuspb.BoolResponse{ Status: &commonpb.Status{}, Value: true, - }, nil).Once() - milvusService.EXPECT().Flush(mock.Anything, mock.Anything).Return(&milvuspb.FlushResponse{Status: &commonpb.Status{}}, nil).Once() + }, nil).Maybe() + flushCall := milvusService.EXPECT().Flush(mock.Anything, mock.Anything).Return(&milvuspb.FlushResponse{Status: &commonpb.Status{}}, nil).Once() err := dataHandler.Flush(ctx, &api.FlushParam{ FlushRequest: milvuspb.FlushRequest{ CollectionNames: []string{"foo"}, }, }) assert.NoError(t, err) + call.Unset() + flushCall.Unset() } { - milvusService.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(&milvuspb.BoolResponse{ + call := milvusService.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(&milvuspb.BoolResponse{ Status: &commonpb.Status{}, Value: true, - }, nil).Once() - milvusService.EXPECT().Flush(mock.Anything, mock.Anything).Return(&milvuspb.FlushResponse{Status: &commonpb.Status{Code: 500, ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil).Once() + }, nil).Maybe() + flushCall := milvusService.EXPECT().Flush(mock.Anything, mock.Anything).Return(&milvuspb.FlushResponse{Status: &commonpb.Status{Code: 500, ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil).Once() err := dataHandler.Flush(ctx, &api.FlushParam{ FlushRequest: milvuspb.FlushRequest{ CollectionNames: []string{"foo"}, }, }) assert.Error(t, err) + call.Unset() + flushCall.Unset() } }) diff --git a/server/configs/cdc-grafana.json b/server/configs/cdc-grafana.json index f4ff1773..55c4b63b 100644 --- a/server/configs/cdc-grafana.json +++ b/server/configs/cdc-grafana.json @@ -25,7 +25,7 @@ "fiscalYearStartMonth": 0, "graphTooltip": 0, "id": 102, - "iteration": 1703127226995, + "iteration": 1704695074161, "links": [], "liveNow": false, "panels": [ @@ -378,7 +378,7 @@ "options": { "mode": "exclude", "names": [ - "4ce0077db777454a915b912c5e568285" + "92e46715897e443fa69685e86a6d669b" ], "prefix": "All except:", "readOnly": true @@ -771,7 +771,7 @@ } ] }, - "unit": "ms" + "unit": "dateTimeAsLocalNoDateIfToday" }, "overrides": [] }, @@ -801,7 +801,7 @@ "uid": "${datasource}" }, "editorMode": "code", - "expr": "milvus_cdc_replicate_tt_lag{namespace=\"$namespace\", pod=\"$pod\", task_id=~\"$task_id\", vchannel_name=~\"$channel_name\"}", + "expr": "milvus_cdc_replicate_tt{namespace=\"$namespace\", pod=\"$pod\", task_id=~\"$task_id\", vchannel_name=~\"$channel_name\"}", "legendFormat": "{{task_id}}-{{vchannel_name}}-{{op_type}}", "range": true, "refId": "A" @@ -901,6 +901,96 @@ ], "title": "api execute count", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "dateTimeAsLocalNoDateIfToday" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 32 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "code", + "expr": "milvus_cdc_center_tt{namespace=\"$namespace\",pod=\"$pod\"}", + "legendFormat": "{{channel_name}}", + "range": true, + "refId": "A" + } + ], + "title": "center ts", + "type": "timeseries" } ], "refresh": "5s", @@ -911,7 +1001,7 @@ "list": [ { "current": { - "selected": true, + "selected": false, "text": "prometheus", "value": "prometheus" }, @@ -929,7 +1019,7 @@ }, { "current": { - "selected": true, + "selected": false, "text": "chaos-testing", "value": "chaos-testing" }, @@ -956,8 +1046,8 @@ { "current": { "selected": true, - "text": "milvus-cdc-77-69c79c655c-b6vkr", - "value": "milvus-cdc-77-69c79c655c-b6vkr" + "text": "milvus-cdc-13-7849b6f758-tqtm4", + "value": "milvus-cdc-13-7849b6f758-tqtm4" }, "datasource": { "type": "prometheus", @@ -1015,13 +1105,9 @@ { "allValue": ".*", "current": { - "selected": true, - "text": [ - "4ce0077db777454a915b912c5e568285" - ], - "value": [ - "4ce0077db777454a915b912c5e568285" - ] + "selected": false, + "text": "All", + "value": "$__all" }, "datasource": { "type": "prometheus", @@ -1047,13 +1133,9 @@ { "allValue": ".*", "current": { - "selected": true, - "text": [ - "cdc-test-upstream-77-rootcoord-dml_0" - ], - "value": [ - "cdc-test-upstream-77-rootcoord-dml_0" - ] + "selected": false, + "text": "All", + "value": "$__all" }, "datasource": { "type": "prometheus", @@ -1079,13 +1161,13 @@ ] }, "time": { - "from": "now-30m", + "from": "now-3h", "to": "now" }, "timepicker": {}, "timezone": "browser", "title": "cdc", "uid": "d8ecc7e9-236f-4ed3-9835-e76e9a9f8ff8", - "version": 14, + "version": 21, "weekStart": "" } \ No newline at end of file diff --git a/server/configs/collection_start_position.yaml b/server/configs/collection_start_position.yaml new file mode 100644 index 00000000..73263a22 --- /dev/null +++ b/server/configs/collection_start_position.yaml @@ -0,0 +1,3 @@ +etcdAddress: 127.0.0.1:2379 +rootPath: by-dev +collectionID: 1 \ No newline at end of file diff --git a/server/configs/msg_count.yaml b/server/configs/msg_count.yaml new file mode 100644 index 00000000..e5143de9 --- /dev/null +++ b/server/configs/msg_count.yaml @@ -0,0 +1,23 @@ +etcdAddress: + - localhost:2379 +taskPositionPrefix: cdc/task_position +taskPositionKey: 90de65ec2d004298985a8122a77247dd/445993766148522019 +timeout: 600 +countMode: true +messageDetail: 0 +pulsar: + address: pulsar://localhost:6650 + webAddress: localhost:80 + maxMessageSize: 5242880 + tenant: public + namespace: default +#kafka: +# address: 10.104.34.53:9092 +collectionID: 1 +dataKey: 1 +pkFieldName: pk +taskPositionMode: false + +taskPositions: + - name: by-dev-rootcoord-dml_0 + position: AAAAAAAAAAAAAAAAAAAAAA== \ No newline at end of file diff --git a/server/tool/collection_start_position/main.go b/server/tool/collection_start_position/main.go new file mode 100644 index 00000000..64f4cb3a --- /dev/null +++ b/server/tool/collection_start_position/main.go @@ -0,0 +1,82 @@ +/* + * Licensed to the LF AI & Data foundation under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * // + * http://www.apache.org/licenses/LICENSE-2.0 + * // + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "encoding/base64" + "fmt" + "os" + + "github.com/golang/protobuf/proto" + clientv3 "go.etcd.io/etcd/client/v3" + "sigs.k8s.io/yaml" + + "github.com/zilliztech/milvus-cdc/core/pb" +) + +var GlobalConfig StartCollectionConfig + +type StartCollectionConfig struct { + EtcdAddress string + RootPath string + CollectionID string +} + +func main() { + fileContent, err := os.ReadFile("./configs/collection_start_position.yaml") + if err != nil { + panic(err) + } + + var positionConfig StartCollectionConfig + err = yaml.Unmarshal(fileContent, &positionConfig) + if err != nil { + panic(err) + } + GlobalConfig = positionConfig + + etcdClient, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{GlobalConfig.EtcdAddress}, + }) + getResp, err := etcdClient.Get(context.Background(), + fmt.Sprintf("%s/meta/root-coord/database/collection-info/1/%s", GlobalConfig.RootPath, GlobalConfig.CollectionID)) + if err != nil { + panic(err) + } + bytesData := getResp.Kvs[0].Value + + collectionInfo := &pb.CollectionInfo{} + err = proto.Unmarshal(bytesData, collectionInfo) + if err != nil { + panic(err) + } + for _, position := range collectionInfo.StartPositions { + msgPosition := &pb.MsgPosition{ + ChannelName: position.Key, + MsgID: position.Data, + } + positionData, err := proto.Marshal(msgPosition) + if err != nil { + panic(err) + } + base64Position := base64.StdEncoding.EncodeToString(positionData) + fmt.Println("channelName: ", position.Key, " position: ", base64Position) + } +} diff --git a/server/tool/msg_count/main.go b/server/tool/msg_count/main.go new file mode 100644 index 00000000..cea11ba3 --- /dev/null +++ b/server/tool/msg_count/main.go @@ -0,0 +1,415 @@ +/* + * Licensed to the LF AI & Data foundation under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * // + * http://www.apache.org/licenses/LICENSE-2.0 + * // + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "os" + "strconv" + "strings" + "time" + + "github.com/goccy/go-json" + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/samber/lo" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "sigs.k8s.io/yaml" + + "github.com/zilliztech/milvus-cdc/core/config" + "github.com/zilliztech/milvus-cdc/core/reader" + "github.com/zilliztech/milvus-cdc/core/util" + "github.com/zilliztech/milvus-cdc/server/model" + "github.com/zilliztech/milvus-cdc/server/model/meta" +) + +var GlobalConfig PositionConfig + +type PositionConfig struct { + EtcdAddress []string + TaskPositionPrefix string + TaskPositionKey string + PkFieldName string + Timeout int + CountMode bool + TaskPositionMode bool + MessageDetail int + Pulsar config.PulsarConfig + Kafka config.KafkaConfig + TaskPositions []model.ChannelInfo + + CollectionID int64 + Data string +} + +func main() { + paramtable.Init() + log.ReplaceGlobals(zap.NewNop(), &log.ZapProperties{ + Core: zapcore.NewNopCore(), + Syncer: zapcore.AddSync(ioutil.Discard), + Level: zap.NewAtomicLevel(), + }) + + fileContent, err := os.ReadFile("./configs/msg_count.yaml") + if err != nil { + panic(err) + } + var positionConfig PositionConfig + err = yaml.Unmarshal(fileContent, &positionConfig) + if err != nil { + panic(err) + } + GlobalConfig = positionConfig + + if positionConfig.TaskPositionMode { + markPrintln("task position mode") + timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(positionConfig.Timeout)*time.Second) + defer cancelFunc() + + for _, position := range positionConfig.TaskPositions { + kd, err := decodePosition(position.Name, position.Position) + if err != nil { + panic(err) + } + + GetMQMessageDetail(timeoutCtx, positionConfig, position.Name, kd) + } + return + } + + client, err := clientv3.New(clientv3.Config{Endpoints: positionConfig.EtcdAddress}) + if err != nil { + panic(err) + } + + timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(positionConfig.Timeout)*time.Second) + defer cancelFunc() + var getResp *clientv3.GetResponse + if positionConfig.TaskPositionKey != "" { + getResp, err = client.Get(timeoutCtx, fmt.Sprintf("%s/%s", positionConfig.TaskPositionPrefix, positionConfig.TaskPositionKey)) + } else { + getResp, err = client.Get(timeoutCtx, positionConfig.TaskPositionPrefix, clientv3.WithPrefix()) + } + if err != nil { + panic(err) + } + if len(getResp.Kvs) == 0 { + panic("task position not exist") + } + for _, kv := range getResp.Kvs { + GetCollectionPositionDetail(timeoutCtx, positionConfig, kv.Value) + markPrintln("++++++++++++++++++++++++++") + } +} + +func decodePosition(pchannel, position string) (*commonpb.KeyDataPair, error) { + positionBytes, err := base64.StdEncoding.DecodeString(position) + if err != nil { + return nil, err + } + msgPosition := &msgpb.MsgPosition{} + err = proto.Unmarshal(positionBytes, msgPosition) + if err != nil { + return nil, err + } + return &commonpb.KeyDataPair{ + Key: pchannel, + Data: msgPosition.MsgID, + }, nil +} + +func GetCollectionPositionDetail(ctx context.Context, config PositionConfig, v []byte) { + taskPosition := &meta.TaskCollectionPosition{} + err := json.Unmarshal(v, taskPosition) + if err != nil { + panic(err) + } + markPrintln("task id:", taskPosition.TaskID) + markPrintln("collection id:", taskPosition.CollectionID) + markPrintln("collection name:", taskPosition.CollectionName) + markPrintln("====================") + for s, pair := range taskPosition.Positions { + GetMQMessageDetail(ctx, config, s, pair.DataPair) + } +} + +func GetMQMessageDetail(ctx context.Context, config PositionConfig, pchannel string, kd *commonpb.KeyDataPair) { + //if config.IncludeCurrent { + // markPrintln("include current position") + // GetCurrentMsgInfo(ctx, config, pchannel, &msgstream.MsgPosition{ + // ChannelName: pchannel, + // MsgID: kd.GetData(), + // }) + //} + + msgStream := MsgStream(config, false) + defer msgStream.Close() + + consumeSubName := pchannel + strconv.Itoa(rand.Int()) + initialPosition := mqwrapper.SubscriptionPositionUnknown + // initialPosition := mqwrapper.SubscriptionPositionEarliest + err := msgStream.AsConsumer(ctx, []string{pchannel}, consumeSubName, initialPosition) + if err != nil { + msgStream.Close() + panic(err) + } + + // not including the current msg in this position + err = msgStream.Seek(ctx, []*msgstream.MsgPosition{ + { + ChannelName: pchannel, + MsgID: kd.GetData(), + }, + }) + if err != nil { + msgStream.Close() + panic(err) + } + + select { + case <-ctx.Done(): + markPrintln(ctx.Err()) + case msgpack := <-msgStream.Chan(): + endTs := msgpack.EndTs + end := msgpack.EndPositions[0] + msgTime := tsoutil.PhysicalTime(endTs) + markPrintln("channel name:", pchannel) + markPrintln("msg time:", msgTime) + markPrintln("end position:", util.Base64MsgPosition(end)) + currentMsgCount := make(map[string]int) + MsgCount(msgpack, currentMsgCount, config.MessageDetail, config.PkFieldName) + markPrintln("msg info, count:", currentMsgCount) + if config.CountMode { + msgCount := make(map[string]int) + MsgCount(msgpack, msgCount, config.MessageDetail, config.PkFieldName) + MsgCountForStream(ctx, msgStream, config, pchannel, msgCount) + } + + markPrintln("====================") + } +} + +func MsgCountForStream(ctx context.Context, msgStream msgstream.MsgStream, config PositionConfig, pchannel string, msgCount map[string]int) { + GetLatestMsgInfo(ctx, config, pchannel) + + latestMsgID, err := msgStream.GetLatestMsgID(pchannel) + if err != nil { + msgStream.Close() + markPrintln("current count:", msgCount) + panic(err) + } + for { + select { + case <-ctx.Done(): + markPrintln("count timeout, err: ", ctx.Err()) + markPrintln("current count:", msgCount) + return + case msgpack := <-msgStream.Chan(): + end := msgpack.EndPositions[0] + ok, err := latestMsgID.LessOrEqualThan(end.GetMsgID()) + if err != nil { + msgStream.Close() + markPrintln("less or equal err, current count:", msgCount) + panic(err) + } + MsgCount(msgpack, msgCount, config.MessageDetail, config.PkFieldName) + if ok { + markPrintln("has count the latest msg, current count:", msgCount) + return + } + } + } +} + +func GetLatestMsgInfo(ctx context.Context, config PositionConfig, pchannel string) { + msgStream := MsgStream(config, true) + defer msgStream.Close() + + consumeSubName := pchannel + strconv.Itoa(rand.Int()) + initialPosition := mqwrapper.SubscriptionPositionLatest + err := msgStream.AsConsumer(ctx, []string{pchannel}, consumeSubName, initialPosition) + if err != nil { + msgStream.Close() + panic(err) + } + + timeoutCtx, cancelFunc := context.WithTimeout(ctx, 3*time.Second) + defer cancelFunc() + + select { + case <-timeoutCtx.Done(): + markPrintln("get latest msg info timeout, err: ", timeoutCtx.Err()) + case msgpack := <-msgStream.Chan(): + endTs := msgpack.EndTs + end := msgpack.EndPositions[0] + msgTime := tsoutil.PhysicalTime(endTs) + markPrintln("latest channel name:", pchannel) + markPrintln("latest msg time:", msgTime) + markPrintln("latest end position:", util.Base64MsgPosition(end)) + } +} + +func MsgCount(msgpack *msgstream.MsgPack, msgCount map[string]int, detail int, pk string) { + for _, msg := range msgpack.Msgs { + msgCount[msg.Type().String()] += 1 + if msg.Type() == commonpb.MsgType_Insert { + insertMsg := msg.(*msgstream.InsertMsg) + if GlobalConfig.CollectionID != 0 { + if insertMsg.CollectionID != GlobalConfig.CollectionID { + continue + } + } + if detail > 0 { + pkString := "" + for _, data := range insertMsg.GetFieldsData() { + if data.GetFieldName() == pk { + if detail == 3 { + var dataStrs []string + if data.GetScalars().GetLongData() != nil { + dataStrs = lo.Map(data.GetScalars().GetLongData().GetData(), func(t int64, i int) string { + return strconv.FormatInt(t, 10) + }) + } else if data.GetScalars().GetStringData() != nil { + dataStrs = data.GetScalars().GetStringData().GetData() + } + for i, str := range dataStrs { + if str == GlobalConfig.Data { + pkString = fmt.Sprintf("insert pk: %s, timestamp: %d, endTS: %s", str, insertMsg.Timestamps[i], tsoutil.PhysicalTime(msgpack.EndTs)) + break + } + } + break + } + + if data.GetScalars().GetLongData() != nil { + pkString = fmt.Sprintf("[\"insert pks\"] [pks=\"[%s]\"]", GetArrayString(data.GetScalars().GetLongData().GetData())) + } else if data.GetScalars().GetStringData() != nil { + pkString = fmt.Sprintf("[\"insert pks\"] [pks=\"[%s]\"]", strings.Join(data.GetScalars().GetStringData().GetData(), ",")) + } else { + pkString = "[\"insert pks\"] [pks=\"[]\"], not found" + } + break + } + } + if detail != 3 { + var times []time.Time + for _, timestamp := range insertMsg.Timestamps { + times = append(times, tsoutil.PhysicalTime(timestamp)) + } + markPrintln(pkString, ", timestamps:", times) + } else if pkString != "" { + markPrintln(pkString) + } + } + markPrintln(fmt.Sprintf("channel_name=%s ,insert_data_len=%d", msgpack.EndPositions[0].GetChannelName(), insertMsg.GetNumRows())) + msgCount["insert_count"] += int(insertMsg.GetNumRows()) + } else if msg.Type() == commonpb.MsgType_Delete { + deleteMsg := msg.(*msgstream.DeleteMsg) + if detail > 0 { + if detail == 3 { + var dataStrs []string + if deleteMsg.GetPrimaryKeys().GetIntId() != nil { + dataStrs = lo.Map(deleteMsg.GetPrimaryKeys().GetIntId().GetData(), func(t int64, i int) string { + return strconv.FormatInt(t, 10) + }) + } else if deleteMsg.GetPrimaryKeys().GetStrId() != nil { + dataStrs = deleteMsg.GetPrimaryKeys().GetStrId().GetData() + } + for i, str := range dataStrs { + if str == GlobalConfig.Data { + markPrintln(fmt.Sprintf("delete pk: %s, timestamp: %d, endTS: %s", str, deleteMsg.Timestamps[i], tsoutil.PhysicalTime(msgpack.EndTs))) + break + } + } + } else { + var times []time.Time + for _, timestamp := range deleteMsg.Timestamps { + times = append(times, tsoutil.PhysicalTime(timestamp)) + } + if deleteMsg.GetPrimaryKeys().GetIntId() != nil { + markPrintln(fmt.Sprintf("[\"delete pks\"] [pks=\"[%s]\"]", GetArrayString(deleteMsg.GetPrimaryKeys().GetIntId().GetData())), ", timestamps:", times) + } else if deleteMsg.GetPrimaryKeys().GetStrId() != nil { + markPrintln(fmt.Sprintf("[\"delete pks\"] [pks=\"[%s]\"]", strings.Join(deleteMsg.GetPrimaryKeys().GetStrId().GetData(), ",")), ", timestamps:", times) + } + } + } + markPrintln(fmt.Sprintf("channel_name=%s ,delete_data_len=%d", msgpack.EndPositions[0].GetChannelName(), deleteMsg.GetNumRows())) + msgCount["delete_count"] += int(deleteMsg.GetNumRows()) + } else if msg.Type() == commonpb.MsgType_TimeTick { + if detail > 1 && detail != 3 { + timeTickMsg := msg.(*msgstream.TimeTickMsg) + markPrintln("time tick msg info, ts:", tsoutil.PhysicalTime(timeTickMsg.EndTimestamp)) + } + } + } + if detail > 1 && detail != 3 { + markPrintln("msg count, end position:", util.Base64MsgPosition(msgpack.EndPositions[0]), ", endts:", tsoutil.PhysicalTime(msgpack.EndTs)) + } +} + +func GetArrayString(n []int64) string { + s := make([]string, len(n)) + for i, v := range n { + s[i] = strconv.FormatInt(v, 10) + } + return strings.Join(s, ",") +} + +func MsgStream(config PositionConfig, isTTStream bool) msgstream.MsgStream { + var factory msgstream.Factory + factoryCreator := reader.NewDefaultFactoryCreator() + + if config.Pulsar.Address != "" { + factory = factoryCreator.NewPmsFactory(&config.Pulsar) + } else if config.Kafka.Address != "" { + factory = factoryCreator.NewKmsFactory(&config.Kafka) + } else { + panic(errors.New("fail to get the msg stream, check the mqConfig param")) + } + if isTTStream { + stream, err := factory.NewTtMsgStream(context.Background()) + if err != nil { + panic(err) + } + return stream + } + stream, err := factory.NewMsgStream(context.Background()) + if err != nil { + panic(err) + } + return stream +} + +func markPrintln(a ...any) { + // a = append(a, "cdc-position-mark") + fmt.Println(a...) +}