Skip to content

Commit e5ea351

Browse files
committed
[ENH]: Globalize dead letter queue in SysDB
1 parent 8963e1d commit e5ea351

File tree

18 files changed

+177
-147
lines changed

18 files changed

+177
-147
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,3 +331,7 @@ func (s *Coordinator) FinishDatabaseDeletion(ctx context.Context, req *coordinat
331331
}
332332
return res, nil
333333
}
334+
335+
func (s *Coordinator) IncrementCompactionFailureCount(ctx context.Context, collectionID types.UniqueID) (int32, error) {
336+
return s.catalog.IncrementCompactionFailureCount(ctx, collectionID)
337+
}

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Collection struct {
2828
VersionFileName string
2929
CreatedAt time.Time
3030
DatabaseId types.UniqueID
31+
CompactionFailureCount int32
3132
}
3233

3334
type CollectionToGc struct {

go/pkg/sysdb/coordinator/model_db_convert.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd
4141
CreatedAt: collectionAndMetadata.Collection.CreatedAt,
4242
UpdatedAt: collectionAndMetadata.Collection.UpdatedAt.Unix(),
4343
DatabaseId: types.MustParse(collectionAndMetadata.Collection.DatabaseID),
44+
CompactionFailureCount: collectionAndMetadata.Collection.CompactionFailureCount,
4445
}
4546
collection.Metadata = convertCollectionMetadataToModel(collectionAndMetadata.CollectionMetadata)
4647
collections = append(collections, collection)

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2465,3 +2465,7 @@ func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantI
24652465
func (tc *Catalog) FinishDatabaseDeletion(ctx context.Context, cutoffTime time.Time) (uint64, error) {
24662466
return tc.metaDomain.DatabaseDb(ctx).FinishDatabaseDeletion(cutoffTime)
24672467
}
2468+
2469+
func (tc *Catalog) IncrementCompactionFailureCount(ctx context.Context, collectionID types.UniqueID) (int32, error) {
2470+
return tc.metaDomain.CollectionDb(ctx).IncrementCompactionFailureCount(collectionID.String())
2471+
}

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,3 +767,22 @@ func (s *Server) BatchGetCollectionSoftDeleteStatus(ctx context.Context, req *co
767767
}
768768
return res, nil
769769
}
770+
771+
func (s *Server) IncrementCompactionFailureCount(ctx context.Context, req *coordinatorpb.IncrementCompactionFailureCountRequest) (*coordinatorpb.IncrementCompactionFailureCountResponse, error) {
772+
collectionID := req.CollectionId
773+
parsedCollectionID, err := types.ToUniqueID(&collectionID)
774+
if err != nil {
775+
log.Error("IncrementCompactionFailureCount failed. collection id format error", zap.Error(err), zap.String("collection_id", collectionID))
776+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
777+
}
778+
779+
failureCount, err := s.coordinator.IncrementCompactionFailureCount(ctx, parsedCollectionID)
780+
if err != nil {
781+
log.Error("IncrementCompactionFailureCount failed", zap.Error(err), zap.String("collection_id", collectionID))
782+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
783+
}
784+
785+
return &coordinatorpb.IncrementCompactionFailureCountResponse{
786+
FailureCount: failureCount,
787+
}, nil
788+
}

go/pkg/sysdb/grpc/proto_model_convert.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ func convertCollectionToProto(collection *model.Collection) *coordinatorpb.Colle
6262
Seconds: collection.UpdatedAt,
6363
Nanos: 0,
6464
},
65-
DatabaseId: &dbId,
65+
DatabaseId: &dbId,
66+
CompactionFailureCount: collection.CompactionFailureCount,
6667
}
6768

6869
if collection.RootCollectionID != nil {

go/pkg/sysdb/metastore/db/dao/collection.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ func (s *collectionDb) UpdateLogPositionAndVersionInfo(
550550
"size_bytes_post_compaction": sizeBytesPostCompaction,
551551
"last_compaction_time_secs": lastCompactionTimeSecs,
552552
"num_versions": numVersions,
553+
"compaction_failure_count": 0, // Reset on successful compaction
553554
}
554555

555556
if schemaStr != nil {
@@ -592,9 +593,9 @@ func (s *collectionDb) UpdateLogPositionVersionTotalRecordsAndLogicalSize(collec
592593
version := currentCollectionVersion + 1
593594
// only writing if schemaStr is not nil to avoid overwriting the schemaStr
594595
if schemaStr != nil {
595-
err = s.db.Model(&dbmodel.Collection{}).Where("id = ?", collectionID).Updates(map[string]interface{}{"log_position": logPosition, "version": version, "total_records_post_compaction": totalRecordsPostCompaction, "size_bytes_post_compaction": sizeBytesPostCompaction, "last_compaction_time_secs": lastCompactionTimeSecs, "tenant": tenant, "schema_str": schemaStr}).Error
596+
err = s.db.Model(&dbmodel.Collection{}).Where("id = ?", collectionID).Updates(map[string]interface{}{"log_position": logPosition, "version": version, "total_records_post_compaction": totalRecordsPostCompaction, "size_bytes_post_compaction": sizeBytesPostCompaction, "last_compaction_time_secs": lastCompactionTimeSecs, "tenant": tenant, "schema_str": schemaStr, "compaction_failure_count": 0}).Error
596597
} else {
597-
err = s.db.Model(&dbmodel.Collection{}).Where("id = ?", collectionID).Updates(map[string]interface{}{"log_position": logPosition, "version": version, "total_records_post_compaction": totalRecordsPostCompaction, "size_bytes_post_compaction": sizeBytesPostCompaction, "last_compaction_time_secs": lastCompactionTimeSecs, "tenant": tenant}).Error
598+
err = s.db.Model(&dbmodel.Collection{}).Where("id = ?", collectionID).Updates(map[string]interface{}{"log_position": logPosition, "version": version, "total_records_post_compaction": totalRecordsPostCompaction, "size_bytes_post_compaction": sizeBytesPostCompaction, "last_compaction_time_secs": lastCompactionTimeSecs, "tenant": tenant, "compaction_failure_count": 0}).Error
598599
}
599600
if err != nil {
600601
return 0, err
@@ -714,3 +715,16 @@ func (s *collectionDb) BatchGetCollectionSoftDeleteStatus(collectionIDs []string
714715
}
715716
return result, nil
716717
}
718+
719+
func (s *collectionDb) IncrementCompactionFailureCount(collectionID string) (int32, error) {
720+
var collection dbmodel.Collection
721+
err := s.db.Model(&dbmodel.Collection{}).
722+
Where("id = ?", collectionID).
723+
UpdateColumn("compaction_failure_count", gorm.Expr("compaction_failure_count + 1")).
724+
First(&collection).Error
725+
if err != nil {
726+
log.Error("IncrementCompactionFailureCount failed", zap.Error(err), zap.String("collectionID", collectionID))
727+
return 0, err
728+
}
729+
return collection.CompactionFailureCount, nil
730+
}

go/pkg/sysdb/metastore/db/dbmodel/collection.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Collection struct {
2828
NumVersions uint32 `gorm:"num_versions;type:integer;default:0"`
2929
OldestVersionTs time.Time `gorm:"oldest_version_ts;type:timestamp"`
3030
Tenant string `gorm:"tenant"`
31+
CompactionFailureCount int32 `gorm:"compaction_failure_count;default:0"`
3132
}
3233

3334
type CollectionToGc struct {
@@ -73,4 +74,5 @@ type ICollectionDb interface {
7374
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath *string, newLineageFilePath string) error
7475
BatchGetCollectionVersionFilePaths(collectionIDs []string) (map[string]string, error)
7576
BatchGetCollectionSoftDeleteStatus(collectionIDs []string) (map[string]bool, error)
77+
IncrementCompactionFailureCount(collectionID string) (int32, error)
7678
}

go/pkg/sysdb/metastore/db/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:TI+7Y8LdtEPUUsspBjfrE9KdjS8XS8SLfKUoSJhmhEI=
1+
h1:Qu6wEENNffsixNyG6bzBEGihu8sC5xa6dtQB37QF7Gg=
22
20240313233558.sql h1:Gv0TiSYsqGoOZ2T2IWvX4BOasauxool8PrBOIjmmIdg=
33
20240321194713.sql h1:kVkNpqSFhrXGVGFFvL7JdK3Bw31twFcEhI6A0oCFCkg=
44
20240327075032.sql h1:nlr2J74XRU8erzHnKJgMr/tKqJxw9+R6RiiEBuvuzgo=
@@ -26,3 +26,4 @@ h1:TI+7Y8LdtEPUUsspBjfrE9KdjS8XS8SLfKUoSJhmhEI=
2626
20251114125442.sql h1:oRHN+AO+xYnYa3aF0QzSa3T/TRi8ETCydp2sDT/nSnI=
2727
20251114134400.sql h1:N30qnVNjR+d4RoArJ11YrixyIsNODxqXFpgiREEhczs=
2828
20251116154842.sql h1:G0qy4MPDayH+Y9/Dm9PS2xwvyLt+nmIw90uJTzZSJUM=
29+
20251209143000.sql h1:r7locVeKMfzp+/olpFQHx1Shy7juhR5Y+xWZw3vel5M=

idl/chromadb/proto/chroma.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ message Collection {
7474
// This is the database id of the collection.
7575
optional string database_id = 17;
7676
optional string schema_str = 18;
77+
// Number of consecutive compaction failures for this collection
78+
// Defaults to 0.
79+
int32 compaction_failure_count = 19;
7780
}
7881

7982
message Database {

0 commit comments

Comments
 (0)