From 25ff5edd65dc0ca2687770b141bfb93b34b3985a Mon Sep 17 00:00:00 2001 From: Sicheng Pan Date: Tue, 31 Dec 2024 11:11:51 -0800 Subject: [PATCH] [ENH] Handle one-off compaction message in compaction manager --- .../src/compactor/compaction_manager.rs | 74 ++++++++++--------- rust/worker/src/compactor/types.rs | 1 - 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index c7c2b44a4bb..3d3a80acf04 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -22,6 +22,7 @@ use chroma_storage::Storage; use chroma_types::CollectionUuid; use futures::stream::FuturesUnordered; use futures::StreamExt; +use std::collections::HashSet; use std::fmt::Debug; use std::fmt::Formatter; use std::time::Duration; @@ -144,36 +145,43 @@ impl CompactionManager { }; } - // TODO: make the return type more informative #[instrument(name = "CompactionManager::compact_batch")] pub(crate) async fn compact_batch( &mut self, - compacted: &mut Vec, - ) -> (u32, u32) { + // The set of collection ids that are allowed to compact + mask: Option>, + ) -> Vec { self.scheduler.schedule().await; - let mut jobs = FuturesUnordered::new(); - for job in self.scheduler.get_jobs() { - let instrumented_span = span!(parent: None, tracing::Level::INFO, "Compacting job", collection_id = ?job.collection_id); - instrumented_span.follows_from(Span::current()); - jobs.push(self.compact(job).instrument(instrumented_span)); - } - tracing::info!("Compacting {} jobs", jobs.len()); - let mut num_completed_jobs = 0; - let mut num_failed_jobs = 0; - while let Some(job) = jobs.next().await { - match job { - Ok(result) => { - tracing::info!("Compaction completed: {:?}", result); - compacted.push(result.compaction_job.collection_id); - num_completed_jobs += 1; + let job_futures = self.scheduler.get_jobs().filter_map(|job| { + match &mask { + Some(allowed_collections) if !allowed_collections.contains(&job.collection_id) => { + None } - Err(e) => { - tracing::info!("Compaction failed {}", e); - num_failed_jobs += 1; + _ => { + let instrumented_span = span!(parent: None, tracing::Level::INFO, "Compacting job", collection_id = ?job.collection_id); + instrumented_span.follows_from(Span::current()); + Some(self.compact(job).instrument(instrumented_span)) } } - } - (num_completed_jobs, num_failed_jobs) + }).collect::>(); + + tracing::info!("Running {} compaction jobs", job_futures.len()); + + job_futures + .filter_map(|result| async move { + match result { + Ok(response) => { + tracing::info!("Compaction completed: {response:?}"); + Some(response.compaction_job.collection_id) + } + Err(err) => { + tracing::error!("Compaction failed {err}"); + None + } + } + }) + .collect() + .await } pub(crate) fn set_dispatcher(&mut self, dispatcher: ComponentHandle) { @@ -303,10 +311,8 @@ impl Handler for CompactionManager { _message: ScheduledCompactionMessage, ctx: &ComponentContext, ) { - tracing::info!("CompactionManager: Performing compaction"); - let mut ids = Vec::new(); - self.compact_batch(&mut ids).await; - + tracing::info!("CompactionManager: Performing scheduled compaction"); + let ids = self.compact_batch(None).await; self.hnsw_index_provider.purge_by_id(&ids).await; // Compaction is done, schedule the next compaction @@ -324,12 +330,13 @@ impl Handler for CompactionManager { type Result = (); async fn handle( &mut self, - _message: OneOffCompactionMessage, + message: OneOffCompactionMessage, _ctx: &ComponentContext, ) { - tracing::info!("CompactionManager: Performing compaction"); - let mut ids = Vec::new(); - self.compact_batch(&mut ids).await; + tracing::info!("CompactionManager: Performing one-off compaction"); + let ids = self + .compact_batch(message.collection_ids.map(HashSet::from_iter)) + .await; self.hnsw_index_provider.purge_by_id(&ids).await; } } @@ -576,10 +583,7 @@ mod tests { let dispatcher_handle = system.start_component(dispatcher); manager.set_dispatcher(dispatcher_handle); manager.set_system(system); - let mut compacted = vec![]; - let (num_completed, number_failed) = manager.compact_batch(&mut compacted).await; - assert_eq!(num_completed, 2); - assert_eq!(number_failed, 0); + let compacted = manager.compact_batch(None).await; assert!( (compacted == vec![collection_uuid_1, collection_uuid_2]) || (compacted == vec![collection_uuid_2, collection_uuid_1]) diff --git a/rust/worker/src/compactor/types.rs b/rust/worker/src/compactor/types.rs index 80de3641588..404b8ef7db9 100644 --- a/rust/worker/src/compactor/types.rs +++ b/rust/worker/src/compactor/types.rs @@ -13,6 +13,5 @@ pub struct ScheduledCompactionMessage {} #[derive(Clone, Debug)] pub struct OneOffCompactionMessage { - #[allow(dead_code)] pub collection_ids: Option>, }