Skip to content

Commit

Permalink
enhance: add manual alloc segment rpc for datacoord (milvus-io#35002)
Browse files Browse the repository at this point in the history
issue: milvus-io#33285

- segment allocation will move to streamingnode, so a manual alloc
segment rpc is required

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Jul 26, 2024
1 parent 4ee6c69 commit 1cff553
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 11 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=SubCluster --dir=internal/datacoord --filename=mock_subcluster.go --output=internal/datacoord --structname=MockSubCluster --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Broker --dir=internal/datacoord/broker --filename=mock_coordinator_broker.go --output=internal/datacoord/broker --structname=MockBroker --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=WorkerManager --dir=internal/datacoord --filename=mock_worker_manager.go --output=internal/datacoord --structname=MockWorkerManager --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Manager --dir=internal/datacoord --filename=mock_segment_manager.go --output=internal/datacoord --structname=MockManager --with-expecter --inpackage

generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage
Expand Down
60 changes: 59 additions & 1 deletion internal/datacoord/mock_segment_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 20 additions & 9 deletions internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ func putAllocation(a *Allocation) {
type Manager interface {
// CreateSegment create new segment when segment not exist

// AllocSegment allocates rows and record the allocation.
// Deprecated: AllocSegment allocates rows and record the allocation, will be deprecated after enabling streamingnode.
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error)
AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error)

// AllocNewGrowingSegment allocates segment for streaming node.
AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error)

// DropSegment drops the segment from manager.
DropSegment(ctx context.Context, segmentID UniqueID)
// FlushImportSegments set importing segment state to Flushed.
Expand Down Expand Up @@ -320,7 +324,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
return nil, err
}
for _, allocation := range newSegmentAllocations {
segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName, commonpb.SegmentState_Growing, datapb.SegmentLevel_L1)
segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName)
if err != nil {
log.Error("Failed to open new segment for segment allocation")
return nil, err
Expand Down Expand Up @@ -417,9 +421,12 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c
return segment, nil
}

func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID,
channelName string, segmentState commonpb.SegmentState, level datapb.SegmentLevel,
) (*SegmentInfo, error) {
// AllocNewGrowingSegment allocates segment for streaming node.
func (s *SegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) {
return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, segmentID, channelName)
}

func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*SegmentInfo, error) {
log := log.Ctx(ctx)
ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "open-Segment")
defer sp.End()
Expand All @@ -428,29 +435,33 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
log.Error("failed to open new segment while allocID", zap.Error(err))
return nil, err
}
return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, id, channelName)
}

func (s *SegmentManager) openNewSegmentWithGivenSegmentID(ctx context.Context, collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*SegmentInfo, error) {
maxNumOfRows, err := s.estimateMaxNumOfRows(collectionID)
if err != nil {
log.Error("failed to open new segment while estimateMaxNumOfRows", zap.Error(err))
return nil, err
}

segmentInfo := &datapb.SegmentInfo{
ID: id,
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
NumOfRows: 0,
State: segmentState,
State: commonpb.SegmentState_Growing,
MaxRowNum: int64(maxNumOfRows),
Level: level,
Level: datapb.SegmentLevel_L1,
LastExpireTime: 0,
}
segment := NewSegmentInfo(segmentInfo)
if err := s.meta.AddSegment(ctx, segment); err != nil {
log.Error("failed to add segment to DataCoord", zap.Error(err))
return nil, err
}
s.segments = append(s.segments, id)
s.segments = append(s.segments, segmentID)
log.Info("datacoord: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
Expand Down
4 changes: 4 additions & 0 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,10 @@ func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID Uniqu
return nil, nil
}

func (s *spySegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) {
return nil, nil
}

func (s *spySegmentManager) allocSegmentForImport(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64, taskID int64) (*Allocation, error) {
return nil, nil
}
Expand Down
22 changes: 22 additions & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,28 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
}, nil
}

// AllocSegment alloc a new growing segment, add it into segment meta.
func (s *Server) AllocSegment(ctx context.Context, req *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) {
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &datapb.AllocSegmentResponse{Status: merr.Status(err)}, nil
}
// !!! SegmentId must be allocated from rootCoord id allocation.
if req.GetCollectionId() == 0 || req.GetPartitionId() == 0 || req.GetVchannel() == "" || req.GetSegmentId() == 0 {
return &datapb.AllocSegmentResponse{Status: merr.Status(merr.ErrParameterInvalid)}, nil
}

// Alloc new growing segment and return the segment info.
segmentInfo, err := s.segmentManager.AllocNewGrowingSegment(ctx, req.GetCollectionId(), req.GetPartitionId(), req.GetSegmentId(), req.GetVchannel())
if err != nil {
return &datapb.AllocSegmentResponse{Status: merr.Status(err)}, nil
}
clonedSegmentInfo := segmentInfo.Clone()
return &datapb.AllocSegmentResponse{
SegmentInfo: clonedSegmentInfo.SegmentInfo,
Status: merr.Success(),
}, nil
}

// GetSegmentStates returns segments state
func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
})
}

func (c *Client) AllocSegment(ctx context.Context, in *datapb.AllocSegmentRequest, opts ...grpc.CallOption) (*datapb.AllocSegmentResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.AllocSegmentResponse, error) {
return client.AllocSegment(ctx, in)
})
}

// GetSegmentStates requests segment state information
//
// ctx is the context to control request deadline and cancellation
Expand Down
5 changes: 5 additions & 0 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
return s.dataCoord.AssignSegmentID(ctx, req)
}

// AllocSegment alloc a new growing segment, add it into segment meta.
func (s *Server) AllocSegment(ctx context.Context, req *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) {
return s.dataCoord.AllocSegment(ctx, req)
}

// GetSegmentStates gets states of segments
func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return s.dataCoord.GetSegmentStates(ctx, req)
Expand Down
55 changes: 55 additions & 0 deletions internal/mocks/mock_datacoord.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions internal/mocks/mock_datacoord_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion internal/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ service DataCoord {

rpc Flush(FlushRequest) returns (FlushResponse) {}

rpc AssignSegmentID(AssignSegmentIDRequest) returns (AssignSegmentIDResponse) {}
// AllocSegment alloc a new growing segment, add it into segment meta.
rpc AllocSegment(AllocSegmentRequest) returns (AllocSegmentResponse) {}

rpc AssignSegmentID(AssignSegmentIDRequest) returns (AssignSegmentIDResponse) {
option deprecated = true;
}

rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {}
rpc GetSegmentStates(GetSegmentStatesRequest) returns (GetSegmentStatesResponse) {}
Expand Down Expand Up @@ -168,6 +173,18 @@ message SegmentIDRequest {
SegmentLevel level = 7;
}

message AllocSegmentRequest {
int64 collection_id = 1;
int64 partition_id = 2;
int64 segment_id = 3; // segment id must be allocate from rootcoord idalloc service.
string vchannel = 4;
}

message AllocSegmentResponse {
SegmentInfo segment_info = 1;
common.Status status = 2;
}

message AssignSegmentIDRequest {
int64 nodeID = 1;
string peer_role = 2;
Expand Down

0 comments on commit 1cff553

Please sign in to comment.