Skip to content

Commit b0e9a41

Browse files
committed
[ENH] Implement compaction client
1 parent fe94965 commit b0e9a41

File tree

7 files changed

+97
-4
lines changed

7 files changed

+97
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/worker/Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@ version = "0.1.0"
44
edition = "2021"
55

66
[[bin]]
7-
name = "query_service"
8-
path = "src/bin/query_service.rs"
7+
name = "compaction_client"
8+
path = "src/bin/compaction_client.rs"
99

1010
[[bin]]
1111
name = "compaction_service"
1212
path = "src/bin/compaction_service.rs"
1313

14+
[[bin]]
15+
name = "query_service"
16+
path = "src/bin/query_service.rs"
17+
1418
[dependencies]
1519
rand = "0.8.5"
1620
murmur3 = "0.5.2"
@@ -47,6 +51,7 @@ prost-types = { workspace = true }
4751
num_cpus = { workspace = true }
4852
flatbuffers = { workspace = true }
4953
tantivy = { workspace = true }
54+
clap = { workspace = true }
5055

5156
chroma-blockstore = { workspace = true }
5257
chroma-error = { workspace = true }
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#[cfg(not(target_env = "msvc"))]
2+
use tikv_jemallocator::Jemalloc;
3+
4+
#[cfg(not(target_env = "msvc"))]
5+
#[global_allocator]
6+
static GLOBAL: Jemalloc = Jemalloc;
7+
8+
#[tokio::main]
9+
async fn main() {
10+
worker::compaction_client_entrypoint().await;
11+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use chroma_types::chroma_proto::{
2+
compactor_client::CompactorClient, CollectionIds, CompactionRequest,
3+
};
4+
use clap::{Parser, Subcommand};
5+
use thiserror::Error;
6+
use tonic::transport::Channel;
7+
use uuid::Uuid;
8+
9+
/// Error for compaction client
10+
#[derive(Debug, Error)]
11+
pub enum CompactionClientError {
12+
#[error("Compactor failed: {0}")]
13+
Compactor(String),
14+
#[error("Unable to connect to compactor: {0}")]
15+
Connection(#[from] tonic::transport::Error),
16+
}
17+
18+
/// Tool to control compaction service
19+
#[derive(Debug, Parser)]
20+
#[command(version, about, long_about = None)]
21+
pub struct CompactionClient {
22+
/// Url of the target compactor
23+
#[arg(short, long)]
24+
url: String,
25+
/// Subcommand for compaction
26+
#[command(subcommand)]
27+
command: CompactionCommand,
28+
}
29+
30+
#[derive(Debug, Subcommand)]
31+
pub enum CompactionCommand {
32+
/// Trigger a one-off compaction
33+
Compact {
34+
/// Specify Uuids of the collections to compact. If unspecified, no compaction will occur unless --all flag is specified
35+
#[arg(short, long)]
36+
id: Vec<Uuid>,
37+
/// Compact all collections available. If specified, the Uuids specified with --id will be ignored
38+
#[arg(short, long)]
39+
all: bool,
40+
},
41+
}
42+
43+
impl CompactionClient {
44+
async fn grpc_client(&self) -> Result<CompactorClient<Channel>, CompactionClientError> {
45+
Ok(CompactorClient::connect(self.url.clone()).await?)
46+
}
47+
48+
pub async fn run(&self) -> Result<(), CompactionClientError> {
49+
match &self.command {
50+
CompactionCommand::Compact { id, all } => {
51+
let mut client = self.grpc_client().await?;
52+
let response = client
53+
.compact(CompactionRequest {
54+
ids: (!all).then_some(CollectionIds {
55+
ids: id.iter().map(ToString::to_string).collect(),
56+
}),
57+
})
58+
.await;
59+
if let Err(status) = response {
60+
return Err(CompactionClientError::Compactor(status.to_string()));
61+
}
62+
}
63+
};
64+
Ok(())
65+
}
66+
}

rust/worker/src/compactor/compaction_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub struct CompactionServer {
2020
impl CompactionServer {
2121
pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
2222
let addr = format!("[::]:{}", self.port).parse().unwrap();
23-
tracing::info!("Compaction server listing at {addr}");
23+
tracing::info!("Compaction server listening at {addr}");
2424
let server = Server::builder().add_service(CompactorServer::new(self));
2525
server
2626
.serve_with_shutdown(addr, async {

rust/worker/src/compactor/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ mod types;
77
pub(crate) use compaction_manager::*;
88
pub(crate) use types::*;
99

10+
pub mod compaction_client;
1011
pub mod compaction_server;

rust/worker/src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ mod tracing;
77
mod utils;
88

99
use chroma_config::Configurable;
10+
use clap::Parser;
11+
use compactor::compaction_client::CompactionClient;
1012
use compactor::compaction_server::CompactionServer;
1113
use memberlist::MemberlistProvider;
1214

@@ -139,7 +141,7 @@ pub async fn compaction_service_entrypoint() {
139141
};
140142

141143
let server_join_handle = tokio::spawn(async move {
142-
let _ = CompactionServer::run(compaction_server).await;
144+
let _ = compaction_server.run().await;
143145
});
144146

145147
let mut sigterm = match signal(SignalKind::terminate()) {
@@ -167,3 +169,10 @@ pub async fn compaction_service_entrypoint() {
167169
};
168170
println!("Server stopped");
169171
}
172+
173+
pub async fn compaction_client_entrypoint() {
174+
let client = CompactionClient::parse();
175+
if let Err(e) = client.run().await {
176+
eprintln!("{e}");
177+
}
178+
}

0 commit comments

Comments
 (0)