From f12e368a762a8fcd67103eb7867b620fd4678de5 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 27 Aug 2024 10:36:59 +0800 Subject: [PATCH] fix: Fill nil schema so that Milvus can watch channel for those upgraded from 2.2 to 2.4 #35695 (#35694) See also: [#35701 ](https://github.com/milvus-io/milvus/issues/35701) --------- Signed-off-by: yangxuan --- internal/datacoord/channel.go | 11 +++- internal/datacoord/channel_manager.go | 13 +++- internal/datacoord/channel_manager_test.go | 75 ++++++++++++++++++++++ internal/datacoord/channel_store_test.go | 4 +- internal/datacoord/policy_test.go | 6 +- 5 files changed, 105 insertions(+), 4 deletions(-) diff --git a/internal/datacoord/channel.go b/internal/datacoord/channel.go index 6618a6cf22e85..c13309ba537ef 100644 --- a/internal/datacoord/channel.go +++ b/internal/datacoord/channel.go @@ -74,6 +74,9 @@ func (ch *channelMeta) UpdateWatchInfo(info *datapb.ChannelWatchInfo) { zap.Any("old watch info", ch.WatchInfo), zap.Any("new watch info", info)) ch.WatchInfo = proto.Clone(info).(*datapb.ChannelWatchInfo) + if ch.Schema == nil { + ch.Schema = info.GetSchema() + } } func (ch *channelMeta) GetWatchInfo() *datapb.ChannelWatchInfo { @@ -221,7 +224,7 @@ func (c *StateChannel) Clone() *StateChannel { func (c *StateChannel) String() string { // schema maybe too large to print - return fmt.Sprintf("Name: %s, CollectionID: %d, StartPositions: %v", c.Name, c.CollectionID, c.StartPositions) + return fmt.Sprintf("Name: %s, CollectionID: %d, StartPositions: %v, Schema: %v", c.Name, c.CollectionID, c.StartPositions, c.Schema) } func (c *StateChannel) GetName() string { @@ -259,6 +262,12 @@ func (c *StateChannel) UpdateWatchInfo(info *datapb.ChannelWatchInfo) { } c.Info = proto.Clone(info).(*datapb.ChannelWatchInfo) + if c.Schema == nil { + log.Info("Channel updating watch info for nil schema in old info", + zap.Any("old watch info", c.Info), + zap.Any("new watch info", info)) + c.Schema = info.GetSchema() + } } func (c *StateChannel) Assign(nodeID int64) { diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index c8b3a03e075f2..b4ebdb118fb38 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -700,11 +700,22 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error { return err } + schema := ch.GetSchema() + if schema == nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + collInfo, err := m.h.GetCollection(ctx, ch.GetCollectionID()) + if err != nil { + return err + } + schema = collInfo.Schema + } + info := &datapb.ChannelWatchInfo{ Vchan: reduceVChanSize(vcInfo), StartTs: startTs, State: inferStateByOpType(op.Type), - Schema: ch.GetSchema(), + Schema: schema, OpID: opID, } ch.UpdateWatchInfo(info) diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 97f874466b8f6..6ed8aaf258439 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" kvmock "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -707,6 +708,80 @@ func (s *ChannelManagerSuite) TestStartup() { s.checkAssignment(m, 2, "ch3", ToWatch) } +func (s *ChannelManagerSuite) TestStartupNilSchema() { + chNodes := map[string]int64{ + "ch1": 1, + "ch2": 1, + "ch3": 3, + } + var keys, values []string + for channel, nodeID := range chNodes { + keys = append(keys, fmt.Sprintf("channel_store/%d/%s", nodeID, channel)) + info := generateWatchInfo(channel, datapb.ChannelWatchState_ToRelease) + info.Schema = nil + bs, err := proto.Marshal(info) + s.Require().NoError(err) + values = append(values, string(bs)) + } + s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once() + s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false) + m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc) + s.Require().NoError(err) + err = m.Startup(context.TODO(), nil, []int64{1, 3}) + s.Require().NoError(err) + + for ch, node := range chNodes { + channel, got := m.GetChannel(node, ch) + s.Require().True(got) + s.Nil(channel.GetSchema()) + s.Equal(ch, channel.GetName()) + log.Info("Recovered nil schema channel", zap.Any("channel", channel)) + } + + s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return( + &collectionInfo{ID: 111, Schema: &schemapb.CollectionSchema{Name: "coll111"}}, + nil, + ) + + err = m.DeleteNode(1) + s.Require().NoError(err) + + err = m.DeleteNode(3) + s.Require().NoError(err) + + s.checkAssignment(m, bufferID, "ch1", Standby) + s.checkAssignment(m, bufferID, "ch2", Standby) + s.checkAssignment(m, bufferID, "ch3", Standby) + + for ch := range chNodes { + channel, got := m.GetChannel(bufferID, ch) + s.Require().True(got) + s.NotNil(channel.GetSchema()) + s.Equal(ch, channel.GetName()) + + s.NotNil(channel.GetWatchInfo()) + s.NotNil(channel.GetWatchInfo().Schema) + log.Info("Recovered non-nil schema channel", zap.Any("channel", channel)) + } + + err = m.AddNode(7) + s.Require().NoError(err) + s.checkAssignment(m, 7, "ch1", ToWatch) + s.checkAssignment(m, 7, "ch2", ToWatch) + s.checkAssignment(m, 7, "ch3", ToWatch) + + for ch := range chNodes { + channel, got := m.GetChannel(7, ch) + s.Require().True(got) + s.NotNil(channel.GetSchema()) + s.Equal(ch, channel.GetName()) + + s.NotNil(channel.GetWatchInfo()) + s.NotNil(channel.GetWatchInfo().Schema) + log.Info("non-nil schema channel", zap.Any("channel", channel)) + } +} + func (s *ChannelManagerSuite) TestStartupRootCoordFailed() { chNodes := map[string]int64{ "ch1": 1, diff --git a/internal/datacoord/channel_store_test.go b/internal/datacoord/channel_store_test.go index 0e3ccd37851b9..191a0e040f024 100644 --- a/internal/datacoord/channel_store_test.go +++ b/internal/datacoord/channel_store_test.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/kv/predicates" @@ -38,7 +39,8 @@ func generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.Chan Vchan: &datapb.VchannelInfo{ ChannelName: name, }, - State: state, + Schema: &schemapb.CollectionSchema{}, + State: state, } } diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index 150116d09b6b7..422e12ccfa527 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/suite" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" ) @@ -36,7 +37,10 @@ func getChannel(name string, collID int64) *StateChannel { return &StateChannel{ Name: name, CollectionID: collID, - Info: &datapb.ChannelWatchInfo{Vchan: &datapb.VchannelInfo{}}, + Info: &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ChannelName: name, CollectionID: collID}, + }, + Schema: &schemapb.CollectionSchema{Name: "coll1"}, } }