Skip to content

[ENH] When the log offset is behind sysdb, this can repair it safely. #4722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *Coordinator) GetCollectionWithSegments(ctx context.Context, collectionI
return s.catalog.GetCollectionWithSegments(ctx, collectionID, false)
}

func (s *Coordinator) CheckCollection(ctx context.Context, collectionID types.UniqueID) (bool, error) {
func (s *Coordinator) CheckCollection(ctx context.Context, collectionID types.UniqueID) (bool, int64, error) {
return s.catalog.CheckCollection(ctx, collectionID)
}

Expand Down
14 changes: 5 additions & 9 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (tc *Catalog) CreateCollection(ctx context.Context, createCollection *model

// Returns true if collection is deleted (either soft-deleted or hard-deleted)
// and false otherwise.
func (tc *Catalog) CheckCollection(ctx context.Context, collectionID types.UniqueID) (bool, error) {
func (tc *Catalog) CheckCollection(ctx context.Context, collectionID types.UniqueID) (bool, int64, error) {
tracer := otel.Tracer
if tracer != nil {
_, span := tracer.Start(ctx, "Catalog.CheckCollection")
Expand All @@ -412,18 +412,14 @@ func (tc *Catalog) CheckCollection(ctx context.Context, collectionID types.Uniqu

collectionInfo, err := tc.metaDomain.CollectionDb(ctx).GetCollectionWithoutMetadata(types.FromUniqueID(collectionID), nil, nil)
if err != nil {
return false, err
return false, 0, err
}
// Collection is hard deleted.
if collectionInfo == nil {
return true, nil
return true, 0, nil
}
// Collection is soft deleted.
if collectionInfo.IsDeleted {
return true, nil
}
// Collection is not deleted.
return false, nil

return collectionInfo.IsDeleted, collectionInfo.LogPosition, nil
}

func (tc *Catalog) GetCollection(ctx context.Context, collectionID types.UniqueID, collectionName *string, tenantID string, databaseName string) (*model.Collection, error) {
Expand Down
3 changes: 2 additions & 1 deletion go/pkg/sysdb/grpc/collection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,15 @@ func (s *Server) CheckCollections(ctx context.Context, req *coordinatorpb.CheckC
log.Error("CheckCollection failed. collection id format error", zap.Error(err), zap.String("collection_id", collectionID))
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}
deleted, err := s.coordinator.CheckCollection(ctx, parsedId)
deleted, logPosition, err := s.coordinator.CheckCollection(ctx, parsedId)

if err != nil {
log.Error("CheckCollection failed", zap.Error(err), zap.String("collection_id", collectionID))
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}

res.Deleted[i] = deleted
res.LogPosition[i] = logPosition
}
return res, nil
}
Expand Down
1 change: 1 addition & 0 deletions idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ message CheckCollectionsRequest {

message CheckCollectionsResponse {
repeated bool deleted = 1;
repeated int64 log_position = 2;
}

message UpdateCollectionRequest {
Expand Down
54 changes: 54 additions & 0 deletions rust/log-service/src/bin/chroma-update-collection-log-offset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use tonic::transport::Channel;
use uuid::Uuid;

use chroma_types::chroma_proto::log_service_client::LogServiceClient;
use chroma_types::chroma_proto::sys_db_client::SysDbClient;
use chroma_types::chroma_proto::{CheckCollectionsRequest, UpdateCollectionLogOffsetRequest};
use chroma_types::CollectionUuid;

#[tokio::main]
async fn main() {
let args = std::env::args().skip(1).collect::<Vec<_>>();
if args.len() != 3 {
eprintln!("USAGE: chroma-migrate-log [LOG-HOST] [SYSDB-HOST] [COLLECTION-UUID]");
std::process::exit(13);
}
let logservice = Channel::from_shared(args[0].clone())
.expect("could not create channel")
.connect()
.await
.expect("could not connect to log service");
let mut log_client = LogServiceClient::new(logservice);
let sysdbservice = Channel::from_shared(args[1].clone())
.expect("could not create channel")
.connect()
.await
.expect("could not connect to sysdb service");
let mut sysdb_client = SysDbClient::new(sysdbservice);
let collection_id = Uuid::parse_str(&args[2])
.map(CollectionUuid)
.expect("Failed to parse collection_id");
let collection_info = sysdb_client
.check_collections(CheckCollectionsRequest {
collection_ids: vec![args[2].clone()],
})
.await
.expect("could not fetch collection info")
.into_inner();
eprintln!("{collection_info:?}");
if collection_info.deleted.len() != 1 || collection_info.log_position.len() != 1 {
eprintln!("got abnormal/non-length-1 results");
std::process::exit(13);
}
if collection_info.deleted[0] {
eprintln!("cowardly refusing to do anything with a deleted database");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "deleted collection"?

std::process::exit(13);
}
let _resp = log_client
.update_collection_log_offset(UpdateCollectionLogOffsetRequest {
collection_id: collection_id.to_string(),
log_offset: collection_info.log_position[0],
})
.await
.expect("migrate log request should succeed");
}
122 changes: 122 additions & 0 deletions rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3588,4 +3588,126 @@ mod tests {
.expect("Spawned thread should not fail to join");
}
}

#[tokio::test]
async fn update_collection_log_offset_never_moves_backwards() {
use chroma_storage::s3_client_for_test_with_new_bucket;
use chroma_types::chroma_proto::UpdateCollectionLogOffsetRequest;
use std::collections::HashMap;
use tonic::Request;
use wal3::{LogWriter, LogWriterOptions};

// Set up test storage using S3 (minio)
let storage = Arc::new(s3_client_for_test_with_new_bucket().await);

// Create the dirty log writer
let dirty_log = LogWriter::open_or_initialize(
LogWriterOptions::default(),
Arc::clone(&storage),
"dirty-test",
"dirty log writer",
(),
)
.await
.expect("Failed to create dirty log");
let dirty_log = Arc::new(dirty_log);

// Create LogServer manually
let config = LogServerConfig::default();
let log_server = LogServer {
config,
open_logs: Arc::new(StateHashTable::default()),
storage,
dirty_log,
proxy: None,
rolling_up: tokio::sync::Mutex::new(()),
backpressure: Mutex::new(Arc::new(HashSet::default())),
need_to_compact: Mutex::new(HashMap::default()),
cache: None,
metrics: Metrics::new(opentelemetry::global::meter("test")),
};

let collection_id = CollectionUuid::new();
let collection_id_str = collection_id.to_string();

// Manually initialize a log for this collection to avoid "proxy not initialized" error
let storage_prefix = storage_prefix_for_log(collection_id);
let _log_writer = LogWriter::open_or_initialize(
LogWriterOptions::default(),
Arc::clone(&log_server.storage),
&storage_prefix,
"test log writer",
(),
)
.await
.expect("Failed to initialize collection log");

// Step 1: Initialize collection log and set it to offset 100
let initial_request = UpdateCollectionLogOffsetRequest {
collection_id: collection_id_str.clone(),
log_offset: 100,
};

let response = log_server
.update_collection_log_offset(Request::new(initial_request))
.await;
assert!(
response.is_ok(),
"Initial offset update should succeed: {:?}",
response.err()
);

// Step 2: Verify we can move forward (to offset 150)
let forward_request = UpdateCollectionLogOffsetRequest {
collection_id: collection_id_str.clone(),
log_offset: 150,
};

let response = log_server
.update_collection_log_offset(Request::new(forward_request))
.await;
assert!(response.is_ok(), "Forward movement should succeed");

// Step 3: Attempt to move backwards (to offset 50) - this should be blocked
let backward_request = UpdateCollectionLogOffsetRequest {
collection_id: collection_id_str.clone(),
log_offset: 50,
};

let response = log_server
.update_collection_log_offset(Request::new(backward_request))
.await;

// The function should succeed but not actually move the offset backwards
// (it returns early with OK status when current offset > requested offset)
assert!(
response.is_ok(),
"Backward request should return OK but not move offset"
);

// Step 4: Verify that requesting the same offset works
let same_request = UpdateCollectionLogOffsetRequest {
collection_id: collection_id_str.clone(),
log_offset: 150, // Same as current
};

let response = log_server
.update_collection_log_offset(Request::new(same_request))
.await;
assert!(response.is_ok(), "Same offset request should succeed");

// Step 5: Verify we can still move forward after backward attempt was blocked
let final_forward_request = UpdateCollectionLogOffsetRequest {
collection_id: collection_id_str,
log_offset: 200,
};

let response = log_server
.update_collection_log_offset(Request::new(final_forward_request))
.await;
assert!(
response.is_ok(),
"Forward movement after backward attempt should succeed"
);
}
}
Loading