Skip to content

Commit

Permalink
[ENH] Handle one-off compaction message in compaction manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Sicheng Pan committed Dec 31, 2024
1 parent b4533be commit 30b27ce
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 36 deletions.
74 changes: 39 additions & 35 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use chroma_storage::Storage;
use chroma_types::CollectionUuid;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU32;
Expand Down Expand Up @@ -148,36 +149,43 @@ impl CompactionManager {
};
}

// TODO: make the return type more informative
#[instrument(name = "CompactionManager::compact_batch")]
pub(crate) async fn compact_batch(
&mut self,
compacted: &mut Vec<CollectionUuid>,
) -> (u32, u32) {
// The set of collection ids that are allowed to compact
mask: Option<HashSet<CollectionUuid>>,
) -> Vec<CollectionUuid> {
self.scheduler.schedule().await;
let mut jobs = FuturesUnordered::new();
for job in self.scheduler.get_jobs() {
let instrumented_span = span!(parent: None, tracing::Level::INFO, "Compacting job", collection_id = ?job.collection_id);
instrumented_span.follows_from(Span::current());
jobs.push(self.compact(job).instrument(instrumented_span));
}
tracing::info!("Compacting {} jobs", jobs.len());
let mut num_completed_jobs = 0;
let mut num_failed_jobs = 0;
while let Some(job) = jobs.next().await {
match job {
Ok(result) => {
tracing::info!("Compaction completed: {:?}", result);
compacted.push(result.compaction_job.collection_id);
num_completed_jobs += 1;
let job_futures = self.scheduler.get_jobs().filter_map(|job| {
match &mask {
Some(allowed_collections) if !allowed_collections.contains(&job.collection_id) => {
None
}
Err(e) => {
tracing::info!("Compaction failed {}", e);
num_failed_jobs += 1;
_ => {
let instrumented_span = span!(parent: None, tracing::Level::INFO, "Compacting job", collection_id = ?job.collection_id);
instrumented_span.follows_from(Span::current());
Some(self.compact(job).instrument(instrumented_span))
}
}
}
(num_completed_jobs, num_failed_jobs)
}).collect::<FuturesUnordered<_>>();

tracing::info!("Running {} compaction jobs", job_futures.len());

job_futures
.filter_map(|result| async move {
match result {
Ok(response) => {
tracing::info!("Compaction completed: {response:?}");
Some(response.compaction_job.collection_id)
}
Err(err) => {
tracing::error!("Compaction failed {err}");
None
}
}
})
.collect()
.await
}

pub(crate) fn set_dispatcher(&mut self, dispatcher: ComponentHandle<Dispatcher>) {
Expand Down Expand Up @@ -307,10 +315,8 @@ impl Handler<ScheduledCompactionMessage> for CompactionManager {
_message: ScheduledCompactionMessage,
ctx: &ComponentContext<CompactionManager>,
) {
tracing::info!("CompactionManager: Performing compaction");
let mut ids = Vec::new();
self.compact_batch(&mut ids).await;

tracing::info!("CompactionManager: Performing scheduled compaction");
let ids = self.compact_batch(None).await;
self.hnsw_index_provider.purge_by_id(&ids).await;

// Compaction is done, schedule the next compaction
Expand All @@ -328,12 +334,13 @@ impl Handler<OneOffCompactionMessage> for CompactionManager {
type Result = ();
async fn handle(
&mut self,
_message: OneOffCompactionMessage,
message: OneOffCompactionMessage,
_ctx: &ComponentContext<CompactionManager>,
) {
tracing::info!("CompactionManager: Performing compaction");
let mut ids = Vec::new();
self.compact_batch(&mut ids).await;
tracing::info!("CompactionManager: Performing one-off compaction");
let ids = self
.compact_batch(message.collection_ids.map(HashSet::from_iter))
.await;
self.hnsw_index_provider.purge_by_id(&ids).await;
}
}
Expand Down Expand Up @@ -580,10 +587,7 @@ mod tests {
let dispatcher_handle = system.start_component(dispatcher);
manager.set_dispatcher(dispatcher_handle);
manager.set_system(system);
let mut compacted = vec![];
let (num_completed, number_failed) = manager.compact_batch(&mut compacted).await;
assert_eq!(num_completed, 2);
assert_eq!(number_failed, 0);
let compacted = manager.compact_batch(None).await;
assert!(
(compacted == vec![collection_uuid_1, collection_uuid_2])
|| (compacted == vec![collection_uuid_2, collection_uuid_1])
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/compactor/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,5 @@ pub struct ScheduledCompactionMessage {}

#[derive(Clone, Debug)]
pub struct OneOffCompactionMessage {
#[allow(dead_code)]
pub collection_ids: Option<Vec<CollectionUuid>>,
}

0 comments on commit 30b27ce

Please sign in to comment.