diff --git a/Cargo.lock b/Cargo.lock index 57ca85b1335..7ddbf0dbea2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6834,6 +6834,7 @@ dependencies = [ "chroma-storage", "chroma-system", "chroma-types", + "clap", "criterion", "fastrace", "fastrace-opentelemetry", diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index 7419b59915a..842efadb195 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -4,13 +4,17 @@ version = "0.1.0" edition = "2021" [[bin]] -name = "query_service" -path = "src/bin/query_service.rs" +name = "compaction_client" +path = "src/bin/compaction_client.rs" [[bin]] name = "compaction_service" path = "src/bin/compaction_service.rs" +[[bin]] +name = "query_service" +path = "src/bin/query_service.rs" + [dependencies] rand = "0.8.5" murmur3 = "0.5.2" @@ -47,6 +51,7 @@ prost-types = { workspace = true } num_cpus = { workspace = true } flatbuffers = { workspace = true } tantivy = { workspace = true } +clap = { workspace = true } chroma-blockstore = { workspace = true } chroma-error = { workspace = true } diff --git a/rust/worker/src/bin/compaction_client.rs b/rust/worker/src/bin/compaction_client.rs new file mode 100644 index 00000000000..616bfe4688a --- /dev/null +++ b/rust/worker/src/bin/compaction_client.rs @@ -0,0 +1,11 @@ +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +#[tokio::main] +async fn main() { + worker::compaction_client_entrypoint().await; +} diff --git a/rust/worker/src/compactor/compaction_client.rs b/rust/worker/src/compactor/compaction_client.rs new file mode 100644 index 00000000000..88123322c39 --- /dev/null +++ b/rust/worker/src/compactor/compaction_client.rs @@ -0,0 +1,66 @@ +use chroma_types::chroma_proto::{ + compactor_client::CompactorClient, CollectionIds, CompactionRequest, +}; +use clap::{Parser, Subcommand}; +use thiserror::Error; +use tonic::transport::Channel; +use uuid::Uuid; + +/// Error for compaction client +#[derive(Debug, Error)] +pub enum CompactionClientError { + #[error("Compactor failed: {0}")] + Compactor(String), + #[error("Unable to connect to compactor: {0}")] + Connection(#[from] tonic::transport::Error), +} + +/// Tool to control compaction service +#[derive(Debug, Parser)] +#[command(version, about, long_about = None)] +pub struct CompactionClient { + /// Url of the target compactor + #[arg(short, long)] + url: String, + /// Subcommand for compaction + #[command(subcommand)] + command: CompactionCommand, +} + +#[derive(Debug, Subcommand)] +pub enum CompactionCommand { + /// Trigger a one-off compaction + Compact { + /// Specify Uuids of the collections to compact. If unspecified, no compaction will occur unless --all flag is specified + #[arg(short, long)] + id: Vec, + /// Compact all collections available. If specified, the Uuids specified with --id will be ignored + #[arg(short, long)] + all: bool, + }, +} + +impl CompactionClient { + async fn grpc_client(&self) -> Result, CompactionClientError> { + Ok(CompactorClient::connect(self.url.clone()).await?) + } + + pub async fn run(&self) -> Result<(), CompactionClientError> { + match &self.command { + CompactionCommand::Compact { id, all } => { + let mut client = self.grpc_client().await?; + let response = client + .compact(CompactionRequest { + ids: (!all).then_some(CollectionIds { + ids: id.iter().map(ToString::to_string).collect(), + }), + }) + .await; + if let Err(status) = response { + return Err(CompactionClientError::Compactor(status.to_string())); + } + } + }; + Ok(()) + } +} diff --git a/rust/worker/src/compactor/compaction_server.rs b/rust/worker/src/compactor/compaction_server.rs index 62fed472eb1..08c0d94781d 100644 --- a/rust/worker/src/compactor/compaction_server.rs +++ b/rust/worker/src/compactor/compaction_server.rs @@ -20,7 +20,7 @@ pub struct CompactionServer { impl CompactionServer { pub async fn run(self) -> Result<(), Box> { let addr = format!("[::]:{}", self.port).parse().unwrap(); - tracing::info!("Compaction server listing at {addr}"); + tracing::info!("Compaction server listening at {addr}"); let server = Server::builder().add_service(CompactorServer::new(self)); server .serve_with_shutdown(addr, async { diff --git a/rust/worker/src/compactor/mod.rs b/rust/worker/src/compactor/mod.rs index a636a4ba2db..1c07272a7bb 100644 --- a/rust/worker/src/compactor/mod.rs +++ b/rust/worker/src/compactor/mod.rs @@ -7,4 +7,5 @@ mod types; pub(crate) use compaction_manager::*; pub(crate) use types::*; +pub mod compaction_client; pub mod compaction_server; diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index 8ea3326f6b4..57b159c3ae6 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -7,6 +7,8 @@ mod tracing; mod utils; use chroma_config::Configurable; +use clap::Parser; +use compactor::compaction_client::CompactionClient; use compactor::compaction_server::CompactionServer; use memberlist::MemberlistProvider; @@ -139,7 +141,7 @@ pub async fn compaction_service_entrypoint() { }; let server_join_handle = tokio::spawn(async move { - let _ = CompactionServer::run(compaction_server).await; + let _ = compaction_server.run().await; }); let mut sigterm = match signal(SignalKind::terminate()) { @@ -167,3 +169,10 @@ pub async fn compaction_service_entrypoint() { }; println!("Server stopped"); } + +pub async fn compaction_client_entrypoint() { + let client = CompactionClient::parse(); + if let Err(e) = client.run().await { + eprintln!("{e}"); + } +}