Skip to content

Commit

Permalink
fix: Fill nil schema so that Milvus can watch channel for those upgra…
Browse files Browse the repository at this point in the history
…ded from 2.2 to 2.4 milvus-io#35695 (milvus-io#35694)

See also: [milvus-io#35701 ](milvus-io#35701)

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Aug 27, 2024
1 parent 3e1052f commit f12e368
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 4 deletions.
11 changes: 10 additions & 1 deletion internal/datacoord/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func (ch *channelMeta) UpdateWatchInfo(info *datapb.ChannelWatchInfo) {
zap.Any("old watch info", ch.WatchInfo),
zap.Any("new watch info", info))
ch.WatchInfo = proto.Clone(info).(*datapb.ChannelWatchInfo)
if ch.Schema == nil {
ch.Schema = info.GetSchema()
}
}

func (ch *channelMeta) GetWatchInfo() *datapb.ChannelWatchInfo {
Expand Down Expand Up @@ -221,7 +224,7 @@ func (c *StateChannel) Clone() *StateChannel {

func (c *StateChannel) String() string {
// schema maybe too large to print
return fmt.Sprintf("Name: %s, CollectionID: %d, StartPositions: %v", c.Name, c.CollectionID, c.StartPositions)
return fmt.Sprintf("Name: %s, CollectionID: %d, StartPositions: %v, Schema: %v", c.Name, c.CollectionID, c.StartPositions, c.Schema)
}

func (c *StateChannel) GetName() string {
Expand Down Expand Up @@ -259,6 +262,12 @@ func (c *StateChannel) UpdateWatchInfo(info *datapb.ChannelWatchInfo) {
}

c.Info = proto.Clone(info).(*datapb.ChannelWatchInfo)
if c.Schema == nil {
log.Info("Channel updating watch info for nil schema in old info",
zap.Any("old watch info", c.Info),
zap.Any("new watch info", info))
c.Schema = info.GetSchema()
}
}

func (c *StateChannel) Assign(nodeID int64) {
Expand Down
13 changes: 12 additions & 1 deletion internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,11 +700,22 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error {
return err
}

schema := ch.GetSchema()
if schema == nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
collInfo, err := m.h.GetCollection(ctx, ch.GetCollectionID())
if err != nil {
return err
}
schema = collInfo.Schema
}

info := &datapb.ChannelWatchInfo{
Vchan: reduceVChanSize(vcInfo),
StartTs: startTs,
State: inferStateByOpType(op.Type),
Schema: ch.GetSchema(),
Schema: schema,
OpID: opID,
}
ch.UpdateWatchInfo(info)
Expand Down
75 changes: 75 additions & 0 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
kvmock "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -707,6 +708,80 @@ func (s *ChannelManagerSuite) TestStartup() {
s.checkAssignment(m, 2, "ch3", ToWatch)
}

func (s *ChannelManagerSuite) TestStartupNilSchema() {
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
"ch3": 3,
}
var keys, values []string
for channel, nodeID := range chNodes {
keys = append(keys, fmt.Sprintf("channel_store/%d/%s", nodeID, channel))
info := generateWatchInfo(channel, datapb.ChannelWatchState_ToRelease)
info.Schema = nil
bs, err := proto.Marshal(info)
s.Require().NoError(err)
values = append(values, string(bs))
}
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once()
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
err = m.Startup(context.TODO(), nil, []int64{1, 3})
s.Require().NoError(err)

for ch, node := range chNodes {
channel, got := m.GetChannel(node, ch)
s.Require().True(got)
s.Nil(channel.GetSchema())
s.Equal(ch, channel.GetName())
log.Info("Recovered nil schema channel", zap.Any("channel", channel))
}

s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(
&collectionInfo{ID: 111, Schema: &schemapb.CollectionSchema{Name: "coll111"}},
nil,
)

err = m.DeleteNode(1)
s.Require().NoError(err)

err = m.DeleteNode(3)
s.Require().NoError(err)

s.checkAssignment(m, bufferID, "ch1", Standby)
s.checkAssignment(m, bufferID, "ch2", Standby)
s.checkAssignment(m, bufferID, "ch3", Standby)

for ch := range chNodes {
channel, got := m.GetChannel(bufferID, ch)
s.Require().True(got)
s.NotNil(channel.GetSchema())
s.Equal(ch, channel.GetName())

s.NotNil(channel.GetWatchInfo())
s.NotNil(channel.GetWatchInfo().Schema)
log.Info("Recovered non-nil schema channel", zap.Any("channel", channel))
}

err = m.AddNode(7)
s.Require().NoError(err)
s.checkAssignment(m, 7, "ch1", ToWatch)
s.checkAssignment(m, 7, "ch2", ToWatch)
s.checkAssignment(m, 7, "ch3", ToWatch)

for ch := range chNodes {
channel, got := m.GetChannel(7, ch)
s.Require().True(got)
s.NotNil(channel.GetSchema())
s.Equal(ch, channel.GetName())

s.NotNil(channel.GetWatchInfo())
s.NotNil(channel.GetWatchInfo().Schema)
log.Info("non-nil schema channel", zap.Any("channel", channel))
}
}

func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {
chNodes := map[string]int64{
"ch1": 1,
Expand Down
4 changes: 3 additions & 1 deletion internal/datacoord/channel_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/kv/predicates"
Expand Down Expand Up @@ -38,7 +39,8 @@ func generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.Chan
Vchan: &datapb.VchannelInfo{
ChannelName: name,
},
State: state,
Schema: &schemapb.CollectionSchema{},
State: state,
}
}

Expand Down
6 changes: 5 additions & 1 deletion internal/datacoord/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)
Expand All @@ -36,7 +37,10 @@ func getChannel(name string, collID int64) *StateChannel {
return &StateChannel{
Name: name,
CollectionID: collID,
Info: &datapb.ChannelWatchInfo{Vchan: &datapb.VchannelInfo{}},
Info: &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{ChannelName: name, CollectionID: collID},
},
Schema: &schemapb.CollectionSchema{Name: "coll1"},
}
}

Expand Down

0 comments on commit f12e368

Please sign in to comment.