Skip to content

Commit 323b15e

Browse files
committed
Cleanup
1 parent 171ce7f commit 323b15e

File tree

4 files changed

+24
-50
lines changed

4 files changed

+24
-50
lines changed

rust/blockstore/src/arrow/blockfile.rs

-38
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@ use crate::arrow::root::CURRENT_VERSION;
1111
use crate::arrow::sparse_index::SparseIndexWriter;
1212
use crate::key::CompositeKey;
1313
use crate::key::KeyWrapper;
14-
use crate::Key;
1514
use chroma_error::ChromaError;
1615
use chroma_error::ErrorCodes;
1716
use futures::future::join_all;
18-
use futures::stream::FuturesUnordered;
1917
use futures::{Stream, StreamExt, TryStreamExt};
2018
use parking_lot::Mutex;
2119
use std::collections::HashSet;
@@ -344,42 +342,6 @@ impl ArrowUnorderedBlockfileWriter {
344342
Ok(())
345343
}
346344

347-
pub(crate) async fn prefetch<K: Key>(
348-
&self,
349-
keys: impl IntoIterator<Item = (String, K)>,
350-
) -> Result<(), Box<dyn ChromaError>> {
351-
let mut block_ids = HashSet::new();
352-
353-
for key in keys {
354-
let block_id = self
355-
.root
356-
.sparse_index
357-
.get_target_block_id(&CompositeKey::new(key.0, key.1));
358-
block_ids.insert(block_id);
359-
}
360-
361-
let mut block_fetches = FuturesUnordered::new();
362-
for block_id in &block_ids {
363-
block_fetches.push(self.block_manager.get(block_id));
364-
}
365-
366-
while let Some(block) = block_fetches.next().await {
367-
match block {
368-
Ok(_) => {}
369-
Err(err) => match err {
370-
GetError::StorageGetError(chroma_storage::GetError::NoSuchKey(_)) => {
371-
// Ignore
372-
}
373-
_ => {
374-
return Err(Box::new(err) as Box<dyn ChromaError>);
375-
}
376-
},
377-
}
378-
}
379-
380-
Ok(())
381-
}
382-
383345
pub(crate) fn id(&self) -> Uuid {
384346
self.id
385347
}

rust/blockstore/src/arrow/ordered_blockfile_writer.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,11 @@ use super::{
1010
flusher::ArrowBlockfileFlusher,
1111
types::{ArrowWriteableKey, ArrowWriteableValue},
1212
};
13-
use crate::arrow::provider::GetError;
1413
use crate::arrow::root::CURRENT_VERSION;
1514
use crate::arrow::sparse_index::SparseIndexWriter;
1615
use crate::key::CompositeKey;
17-
use crate::Key;
1816
use chroma_error::ChromaError;
1917
use chroma_error::ErrorCodes;
20-
use futures::stream::FuturesUnordered;
21-
use futures::StreamExt;
2218
use itertools::Itertools;
2319
use std::collections::HashSet;
2420
use std::collections::VecDeque;
@@ -156,11 +152,16 @@ impl ArrowOrderedBlockfileWriter {
156152
.load(std::sync::atomic::Ordering::Relaxed);
157153
let block_cache_hit_rate = 100.0 * num_block_cache_hits as f64
158154
/ (num_block_cache_hits + num_block_cache_misses) as f64;
159-
tracing::debug!(
160-
block_cache_hit_rate = block_cache_hit_rate,
161-
"Block cache hit rate: {:.2}%",
162-
block_cache_hit_rate,
163-
);
155+
if block_cache_hit_rate.is_nan() {
156+
// This can happen if there are no cache hits or misses
157+
tracing::debug!("Block cache hit rate: N/A");
158+
} else {
159+
tracing::debug!(
160+
block_cache_hit_rate = block_cache_hit_rate,
161+
"Block cache hit rate: {:.2}%",
162+
block_cache_hit_rate,
163+
);
164+
}
164165

165166
let mut split_block_deltas = Vec::new();
166167
for delta in inner.completed_block_deltas.drain(..) {

rust/index/src/fulltext/types.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,10 @@ impl FullTextIndexWriter {
166166
&self,
167167
blockfile_provider: &BlockfileProvider,
168168
) -> Result<(), FullTextIndexError> {
169-
const PERCENT_TO_PREFETCH: f32 = 0.001;
169+
// `token_instances` can be millions of items. Imagine there are 10k documents, each with 1000 tokens. There are then ~10000*1000*3=30M token instances.
170+
// In comparison, the number of blocks required to store these is much, much smaller. Say (worst case) each token instance consumes 12 bytes of a block (trigram, offset ID, position) and a block is 8MB. Then a single block can store ~660k token instances.
171+
// Given this, and assuming that token instances are relatively uniformly distributed across blocks, we can prefetch a very small percentage of the key space and the cache hit rate will still be at or near 100%.
172+
const PERCENT_TO_PREFETCH: f32 = 0.0005;
170173

171174
if let Some(id) = self.posting_lists_blockfile_writer.forked_from_id() {
172175
let mut keys = vec![];

rust/worker/src/execution/orchestration/compact.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,11 @@ impl Handler<TaskResult<ApplyLogToSegmentWriterOutput, ApplyLogToSegmentWriterOp
818818
};
819819

820820
if num_tasks_left == 0 {
821-
let writers = self.get_segment_writers().await.unwrap();
821+
let writers = self.get_segment_writers().await;
822+
let writers = match self.ok_or_terminate(writers, ctx) {
823+
Some(writers) => writers,
824+
None => return,
825+
};
822826

823827
if message.segment_id == writers.metadata.id {
824828
let span = self.get_segment_writer_span(&ChromaSegmentWriter::MetadataSegment(
@@ -862,7 +866,11 @@ impl Handler<TaskResult<PrefetchForMetadataWriterOutput, PrefetchForMetadataWrit
862866
None => return,
863867
};
864868

865-
let writers = self.get_segment_writers().await.unwrap();
869+
let writers = self.get_segment_writers().await;
870+
let writers = match self.ok_or_terminate(writers, ctx) {
871+
Some(writers) => writers,
872+
None => return,
873+
};
866874

867875
self.dispatch_segment_writer_commit(
868876
ChromaSegmentWriter::MetadataSegment(writers.metadata),

0 commit comments

Comments
 (0)