Skip to content

Commit

Permalink
enhance: Add Segment Level in milvus segment info APIs (milvus-io#34763)
Browse files Browse the repository at this point in the history
See also milvus-io#34746

This PR add segment level field in response of
`GetPersistentSegmentInfo` and `GetQuerySegmentInfo`

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jul 26, 2024
1 parent e8038a7 commit 4ee6c69
Show file tree
Hide file tree
Showing 16 changed files with 61 additions and 78 deletions.
4 changes: 2 additions & 2 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ require (
github.com/gogo/status v1.1.0
github.com/golang/protobuf v1.5.4
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240613032350-814e4bddd264
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240717062137-3ffb1db01632
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/samber/lo v1.27.0
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.17.1
go.uber.org/atomic v1.10.0
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
google.golang.org/grpc v1.57.1
)

Expand Down Expand Up @@ -108,7 +109,6 @@ require (
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.20.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.20.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240613032350-814e4bddd264 h1:IfydraydTj9bmGRcAsT/uVj9by4k6jmjN/nIM7p7JFk=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240613032350-814e4bddd264/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240717062137-3ffb1db01632 h1:CXig0DNtUsCLzchCFe3PR2KgOdobbz9gK2nSV7195PM=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240717062137-3ffb1db01632/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3 h1:ZBpRWhBa7FTFxW4YYVv9AUESoW1Xyb3KNXTzTqfkZmw=
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3/go.mod h1:jQ2BUZny1COsgv1Qbcv8dmbppW+V9J/c4YQZNb3EOm8=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240717062137-3ffb1db01632
github.com/minio/minio-go/v7 v7.0.61
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53 h1:hLeTFOV/IXUoTbm4slVWFSnR296yALJ8Zo+YCMEvAy0=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240717062137-3ffb1db01632 h1:CXig0DNtUsCLzchCFe3PR2KgOdobbz9gK2nSV7195PM=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240717062137-3ffb1db01632/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
Expand Down
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ message SegmentInfo {
repeated int64 node_ids = 15;
bool enable_index = 16;
bool is_fake = 17;
data.SegmentLevel level = 18;
}

message CollectionInfo {
Expand Down
13 changes: 10 additions & 3 deletions internal/proxy/accesslog/info/grpc_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/runtime/protoiface"

"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proxy/connection"
Expand Down Expand Up @@ -161,12 +163,17 @@ type SizeResponse interface {
}

func (i *GrpcAccessInfo) ResponseSize() string {
message, ok := i.resp.(SizeResponse)
if !ok {
var size int
switch r := i.resp.(type) {
case SizeResponse:
size = r.XXX_Size()
case protoiface.MessageV1:
size = proto.Size(r)
default:
return Unknown
}

return fmt.Sprint(message.XXX_Size())
return fmt.Sprint(size)
}

type BaseResponse interface {
Expand Down
2 changes: 2 additions & 0 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4110,6 +4110,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G
PartitionID: info.PartitionID,
NumRows: info.NumOfRows,
State: info.State,
Level: commonpb.SegmentLevel(info.Level),
}
}
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
Expand Down Expand Up @@ -4182,6 +4183,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue
IndexID: info.IndexID,
State: info.SegmentState,
NodeIds: info.NodeIds,
Level: commonpb.SegmentLevel(info.Level),
}
}

Expand Down
24 changes: 8 additions & 16 deletions internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4433,8 +4433,7 @@ func Test_GetCompactionState(t *testing.T) {
proxy := &Proxy{dataCoord: datacoord}
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.GetCompactionState(context.TODO(), nil)
assert.EqualValues(t, &milvuspb.GetCompactionStateResponse{}, resp)
assert.NoError(t, err)
assert.NoError(t, merr.CheckRPCCall(resp, err))
})

t.Run("get compaction state with unhealthy proxy", func(t *testing.T) {
Expand All @@ -4453,8 +4452,7 @@ func Test_ManualCompaction(t *testing.T) {
proxy := &Proxy{dataCoord: datacoord}
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.ManualCompaction(context.TODO(), nil)
assert.EqualValues(t, &milvuspb.ManualCompactionResponse{}, resp)
assert.NoError(t, err)
assert.NoError(t, merr.CheckRPCCall(resp, err))
})
t.Run("test manual compaction with unhealthy", func(t *testing.T) {
datacoord := &DataCoordMock{}
Expand All @@ -4472,8 +4470,7 @@ func Test_GetCompactionStateWithPlans(t *testing.T) {
proxy := &Proxy{dataCoord: datacoord}
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.GetCompactionStateWithPlans(context.TODO(), nil)
assert.EqualValues(t, &milvuspb.GetCompactionPlansResponse{}, resp)
assert.NoError(t, err)
assert.NoError(t, merr.CheckRPCCall(resp, err))
})
t.Run("test get compaction state with plans with unhealthy proxy", func(t *testing.T) {
datacoord := &DataCoordMock{}
Expand Down Expand Up @@ -4505,8 +4502,7 @@ func Test_GetFlushState(t *testing.T) {
resp, err := proxy.GetFlushState(context.TODO(), &milvuspb.GetFlushStateRequest{
CollectionName: "coll",
})
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{}, resp)
assert.NoError(t, err)
assert.NoError(t, merr.CheckRPCCall(resp, err))
})

t.Run("test get flush state with unhealthy proxy", func(t *testing.T) {
Expand All @@ -4529,8 +4525,7 @@ func TestProxy_GetComponentStates(t *testing.T) {
n.session = &sessionutil.Session{}
n.session.UpdateRegistered(true)
resp, err = n.GetComponentStates(context.Background(), nil)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.NoError(t, merr.CheckRPCCall(resp, err))
}

func TestProxy_Import(t *testing.T) {
Expand Down Expand Up @@ -4575,8 +4570,7 @@ func TestProxy_Import(t *testing.T) {
Files: []string{"a.json"},
}
resp, err := proxy.Import(context.TODO(), req)
assert.NoError(t, err)
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
assert.NoError(t, merr.CheckRPCCall(resp, err))
})

t.Run("GetImportState failed", func(t *testing.T) {
Expand All @@ -4601,8 +4595,7 @@ func TestProxy_Import(t *testing.T) {

req := &milvuspb.GetImportStateRequest{}
resp, err := proxy.GetImportState(context.TODO(), req)
assert.NoError(t, err)
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
assert.NoError(t, merr.CheckRPCCall(resp, err))
})

t.Run("ListImportTasks failed", func(t *testing.T) {
Expand All @@ -4627,8 +4620,7 @@ func TestProxy_Import(t *testing.T) {

req := &milvuspb.ListImportTasksRequest{}
resp, err := proxy.ListImportTasks(context.TODO(), req)
assert.NoError(t, err)
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
assert.NoError(t, merr.CheckRPCCall(resp, err))
})
}

Expand Down
68 changes: 25 additions & 43 deletions internal/querycoordv2/session/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,15 @@ func (suite *ClusterTestSuite) TestLoadSegments() {
Infos: []*querypb.SegmentLoadInfo{{}},
})
suite.NoError(err)
suite.Equal(merr.Success(), status)
merr.Ok(status)

status, err = suite.cluster.LoadSegments(ctx, 1, &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{},
Infos: []*querypb.SegmentLoadInfo{{}},
})
suite.NoError(err)
suite.Equal(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected error",
}, status)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
suite.Equal("unexpected error", status.GetReason())

_, err = suite.cluster.LoadSegments(ctx, 3, &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{},
Expand All @@ -248,16 +246,14 @@ func (suite *ClusterTestSuite) TestWatchDmChannels() {
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(merr.Success(), status)
merr.Ok(status)

status, err = suite.cluster.WatchDmChannels(ctx, 1, &querypb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected error",
}, status)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
suite.Equal("unexpected error", status.GetReason())
}

func (suite *ClusterTestSuite) TestUnsubDmChannel() {
Expand All @@ -266,16 +262,14 @@ func (suite *ClusterTestSuite) TestUnsubDmChannel() {
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(merr.Success(), status)
merr.Ok(status)

status, err = suite.cluster.UnsubDmChannel(ctx, 1, &querypb.UnsubDmChannelRequest{
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected error",
}, status)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
suite.Equal("unexpected error", status.GetReason())
}

func (suite *ClusterTestSuite) TestReleaseSegments() {
Expand All @@ -284,16 +278,14 @@ func (suite *ClusterTestSuite) TestReleaseSegments() {
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(merr.Success(), status)
merr.Ok(status)

status, err = suite.cluster.ReleaseSegments(ctx, 1, &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected error",
}, status)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
suite.Equal("unexpected error", status.GetReason())
}

func (suite *ClusterTestSuite) TestLoadAndReleasePartitions() {
Expand All @@ -302,31 +294,27 @@ func (suite *ClusterTestSuite) TestLoadAndReleasePartitions() {
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(merr.Success(), status)
merr.Ok(status)

status, err = suite.cluster.LoadPartitions(ctx, 1, &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected error",
}, status)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
suite.Equal("unexpected error", status.GetReason())

status, err = suite.cluster.ReleasePartitions(ctx, 0, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(merr.Success(), status)
merr.Ok(status)

status, err = suite.cluster.ReleasePartitions(ctx, 1, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected error",
}, status)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
suite.Equal("unexpected error", status.GetReason())
}

func (suite *ClusterTestSuite) TestGetDataDistribution() {
Expand All @@ -342,10 +330,8 @@ func (suite *ClusterTestSuite) TestGetDataDistribution() {
})

suite.NoError(err)
suite.Equal(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected error",
}, resp.GetStatus())
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
suite.Equal("unexpected error", resp.GetStatus().GetReason())
}

func (suite *ClusterTestSuite) TestGetMetrics() {
Expand All @@ -356,10 +342,8 @@ func (suite *ClusterTestSuite) TestGetMetrics() {

resp, err = suite.cluster.GetMetrics(ctx, 1, &milvuspb.GetMetricsRequest{})
suite.NoError(err)
suite.Equal(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected error",
}, resp.GetStatus())
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
suite.Equal("unexpected error", resp.GetStatus().GetReason())
}

func (suite *ClusterTestSuite) TestSyncDistribution() {
Expand All @@ -368,16 +352,14 @@ func (suite *ClusterTestSuite) TestSyncDistribution() {
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(merr.Success(), status)
merr.Ok(status)

status, err = suite.cluster.SyncDistribution(ctx, 1, &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{},
})
suite.NoError(err)
suite.Equal(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected error",
}, status)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
suite.Equal("unexpected error", status.GetReason())
}

func (suite *ClusterTestSuite) TestGetComponentStates() {
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met
NodeIds: make([]int64, 0),
SegmentState: commonpb.SegmentState_Sealed,
IndexInfos: make([]*querypb.FieldIndexInfo, 0),
Level: first.Level,
}
for _, indexInfo := range first.IndexInfo {
info.IndexName = indexInfo.IndexName
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/binlog_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func assertTestData(t *testing.T, i int, value *Value) {

f106 := typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4})

assert.EqualValues(t, &Value{
assert.EqualExportedValues(t, &Value{
int64(i),
&Int64PrimaryKey{Value: int64(i)},
int64(i),
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry {
if v == nil {
return 8
}
return uint64(v.(*schemapb.ScalarField).XXX_Size())
return uint64(proto.Size(v.(*schemapb.ScalarField)))
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240717062137-3ffb1db01632
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2
Expand Down
Loading

0 comments on commit 4ee6c69

Please sign in to comment.