-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH] Add GCS client to storage #5869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
fa1312c to
6ff4bb1
Compare
6ff4bb1 to
44658a3
Compare
rust/storage/src/object_storage.rs
Outdated
|
|
||
| /// Serializable wrapper for UpdateVersion | ||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| struct SerializableUpdateVersion { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a OneOf somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both e_tag and version might be populated, depending on the object_store impl.
| } | ||
|
|
||
| #[tracing::instrument(skip(options, storage))] | ||
| pub async fn copy_parquet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: this is dead code
|
Add first-class Google Cloud Storage backend to Rust storage layer This PR introduces native Google Cloud Storage (GCS) support to the Rust storage crate, allowing Chroma to read and write data directly to a GCS bucket in the same unified way it already supports local filesystems and Amazon S3. The change wires a new GCS client (via the Key Changes• Introduced Affected Areas• rust/storage/src/object_storage.rs This summary was automatically generated by @propel-code-bot |
| @@ -0,0 +1,431 @@ | |||
| //! Object storage backend implementation using object_store. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to note that this is mostly copied code
| } | ||
| } | ||
|
|
||
| #[tracing::instrument(skip(options, storage))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this removed? (curious)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously we use it to copy log, but now it seems we no longer use this.
| } | ||
|
|
||
| #[async_trait] | ||
| impl Configurable<StorageConfig> for ObjectStorage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add good tests for various key behaviors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: add in a follow on due to credential plumbing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do it in a follow up bc this need some setup
| pub async fn put_file( | ||
| &self, | ||
| key: &str, | ||
| path: &str, | ||
| options: PutOptions, | ||
| ) -> Result<ETag, StorageError> { | ||
| let bytes = tokio::fs::read(path) | ||
| .await | ||
| .map_err(|e| StorageError::Generic { | ||
| source: Arc::new(e), | ||
| })?; | ||
| self.oneshot_put(key, bytes, options).await | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[PerformanceOptimization]
The current implementation of put_file reads the entire file into memory before uploading using tokio::fs::read. This can lead to high memory consumption for large files.
Consider using a streaming approach for uploads. You could check the file's metadata for its size and, if it's large and no conditional options (if_match or if_not_exists) are set, use put_multipart with a stream from the file. For smaller files or conditional puts (which put_multipart doesn't support), you can retain the current oneshot_put logic.
This would make file uploads more memory-efficient and robust for files of any size.
Context for Agents
[**PerformanceOptimization**]
The current implementation of `put_file` reads the entire file into memory before uploading using `tokio::fs::read`. This can lead to high memory consumption for large files.
Consider using a streaming approach for uploads. You could check the file's metadata for its size and, if it's large and no conditional options (`if_match` or `if_not_exists`) are set, use `put_multipart` with a stream from the file. For smaller files or conditional puts (which `put_multipart` doesn't support), you can retain the current `oneshot_put` logic.
This would make file uploads more memory-efficient and robust for files of any size.
File: rust/storage/src/object_storage.rs
Line: 358There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code path should be unused and will be cleaned up later
a3db0de to
83750d4
Compare
| } | ||
| } | ||
|
|
||
| /// Convert ETag to UpdateVersion via deserialization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets discuss this with Robert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
| } | ||
|
|
||
| pub async fn put_file( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo clean up with direct_hnsw
| (&update_version).try_into() | ||
| } | ||
|
|
||
| async fn oneshot_put( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Critical: No idempotency checks in put operations
Direct PutMode::Create usage without existence checks can cause race conditions:
if options.if_not_exists {
put_options.mode = PutMode::Create; // Will fail if object exists
}With concurrent requests:
- Request A:
put(key, data1, if_not_exists=true)→ succeeds - Request B:
put(key, data2, if_not_exists=true)→ fails with AlreadyExists
But Request B might be a retry of the same operation, causing non-idempotent failures.
Fix: Add idempotency key support:
pub struct PutOptions {
pub idempotency_key: Option<String>,
// ... existing fields
}
// Check if same content already exists
if let Some(idem_key) = &options.idempotency_key {
if let Ok(meta) = self.store.head(&key.into()).await {
if meta.custom_metadata.get("idempotency_key") == Some(idem_key) {
return Ok(/* existing etag */);
}
}
}Context for Agents
[**CriticalError**]
**Critical: No idempotency checks in `put` operations**
Direct `PutMode::Create` usage without existence checks can cause race conditions:
```rust
if options.if_not_exists {
put_options.mode = PutMode::Create; // Will fail if object exists
}
```
With concurrent requests:
- Request A: `put(key, data1, if_not_exists=true)` → succeeds
- Request B: `put(key, data2, if_not_exists=true)` → fails with AlreadyExists
But Request B might be a retry of the same operation, causing non-idempotent failures.
**Fix**: Add idempotency key support:
```rust
pub struct PutOptions {
pub idempotency_key: Option<String>,
// ... existing fields
}
// Check if same content already exists
if let Some(idem_key) = &options.idempotency_key {
if let Ok(meta) = self.store.head(&key.into()).await {
if meta.custom_metadata.get("idempotency_key") == Some(idem_key) {
return Ok(/* existing etag */);
}
}
}
```
File: rust/storage/src/object_storage.rs
Line: 310| key: &str, | ||
| options: GetOptions, | ||
| ) -> Result<(Vec<u8>, ETag), StorageError> { | ||
| if options.request_parallelism { | ||
| self.multipart_get(key).await | ||
| } else { | ||
| self.oneshot_get(key).await | ||
| } | ||
| } | ||
|
|
||
| pub async fn multipart_put( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Critical: Potential data corruption on concurrent multipart uploads
Multiple concurrent uploads to the same key with multipart_put can corrupt data:
pub async fn multipart_put(
&self,
key: &str,
mut bytes: Vec<u8>,
_options: PutOptions, // Options ignored!
) -> Result<ETag, StorageError> {
let mut upload_handle = self.store.put_multipart(&key.into()).await?;If two callers upload to the same key simultaneously:
- Upload A: Creates multipart upload for
key - Upload B: Creates multipart upload for
key - Both complete successfully, but final state is non-deterministic
The _options parameter is completely ignored - no if_not_exists or if_match checks!
Fix:
pub async fn multipart_put(
&self,
key: &str,
mut bytes: Vec<u8>,
options: PutOptions, // Don't ignore!
) -> Result<ETag, StorageError> {
if options.if_not_exists {
if self.store.head(&key.into()).await.is_ok() {
return Err(StorageError::AlreadyExists {
path: key.to_string(),
source: /* ... */
});
}
}
// ... rest of upload
}Context for Agents
[**CriticalError**]
**Critical: Potential data corruption on concurrent multipart uploads**
Multiple concurrent uploads to the same key with `multipart_put` can corrupt data:
```rust
pub async fn multipart_put(
&self,
key: &str,
mut bytes: Vec<u8>,
_options: PutOptions, // Options ignored!
) -> Result<ETag, StorageError> {
let mut upload_handle = self.store.put_multipart(&key.into()).await?;
```
If two callers upload to the same key simultaneously:
- Upload A: Creates multipart upload for `key`
- Upload B: Creates multipart upload for `key`
- Both complete successfully, but final state is non-deterministic
**The `_options` parameter is completely ignored** - no `if_not_exists` or `if_match` checks!
**Fix**:
```rust
pub async fn multipart_put(
&self,
key: &str,
mut bytes: Vec<u8>,
options: PutOptions, // Don't ignore!
) -> Result<ETag, StorageError> {
if options.if_not_exists {
if self.store.head(&key.into()).await.is_ok() {
return Err(StorageError::AlreadyExists {
path: key.to_string(),
source: /* ... */
});
}
}
// ... rest of upload
}
```
File: rust/storage/src/object_storage.rs
Line: 274| ) -> Result<Option<ETag>, StorageError> { | ||
| match self { | ||
| Storage::S3(s3) => s3.put_bytes(key, bytes, options).await, | ||
| Storage::Object(obj) => obj.put(key, bytes.clone(), options).await.map(Some), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[PerformanceOptimization]
The put_bytes function takes ownership of bytes, so it can be moved directly into obj.put. The .clone() is unnecessary here and causes an extra allocation and copy.
Context for Agents
[**PerformanceOptimization**]
The `put_bytes` function takes ownership of `bytes`, so it can be moved directly into `obj.put`. The `.clone()` is unnecessary here and causes an extra allocation and copy.
File: rust/storage/src/lib.rs
Line: 394
Description of changes
Summarize the changes made by this PR.
object_storecrateTest plan
How are these changes tested?
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the docs section?