Skip to content

Commit 0985e21

Browse files
Sicheng-Pangithub-actions[bot]
authored andcommitted
[ENH] Load blocks given keys under P0 priority (#6006)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - Rename `prefetch_*` to `load_*` to better indicate its usage. Prefetch is for lower priority data warmup, and should be used in a fire and forget fashion. Load higher priority and could be blocked on. - Make `load_blocks` use `P0` priority instead of `P1` when fetching blocks. Previously prefetch use `P1` priority and led to long tail latency on log materialization. - New functionality - N/A ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 7816887 commit 0985e21

File tree

6 files changed

+15
-12
lines changed

6 files changed

+15
-12
lines changed

rust/blockstore/src/arrow/blockfile.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
526526
if !self.block_manager.cached(block_id).await
527527
&& !self.loaded_blocks.read().contains_key(block_id)
528528
{
529-
futures.push(self.get_block(*block_id, StorageRequestPriority::P1));
529+
futures.push(self.get_block(*block_id, StorageRequestPriority::P0));
530530
}
531531
}
532532
join_all(futures).await;

rust/blockstore/src/types/reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,10 @@ impl<
137137
}
138138
}
139139

140-
pub async fn load_blocks_for_keys(&self, keys: impl IntoIterator<Item = (String, K)>) {
140+
// NOTE(sicheng): This loads the underlying data concurrently
141+
pub async fn load_data_for_keys(&self, keys: impl IntoIterator<Item = (String, K)>) {
141142
match self {
142-
BlockfileReader::MemoryBlockfileReader(_reader) => unimplemented!(),
143+
BlockfileReader::MemoryBlockfileReader(_) => (),
143144
BlockfileReader::ArrowBlockfileReader(reader) => {
144145
reader.load_blocks_for_keys(keys).await
145146
}

rust/index/src/fulltext/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ impl NgramLiteralProvider<FullTextIndexError> for FullTextIndexReader<'_> {
440440
6
441441
}
442442

443-
async fn prefetch_ngrams<'me, Ngrams>(&'me self, ngrams: Ngrams)
443+
async fn load_ngrams<'me, Ngrams>(&'me self, ngrams: Ngrams)
444444
where
445445
Ngrams: IntoIterator<Item = &'me str> + Send + Sync,
446446
{

rust/segment/src/blockfile_record.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -913,15 +913,15 @@ impl RecordSegmentReader<'_> {
913913
self.id_to_user_id.count().await
914914
}
915915

916-
pub async fn prefetch_id_to_data(&self, keys: &[u32]) {
916+
pub async fn load_id_to_data(&self, keys: impl Iterator<Item = u32>) {
917917
self.id_to_data
918-
.load_blocks_for_keys(keys.iter().map(|k| ("".to_string(), *k)))
918+
.load_data_for_keys(keys.map(|k| ("".to_string(), k)))
919919
.await
920920
}
921921

922-
pub(crate) async fn prefetch_user_id_to_id(&self, keys: &[&str]) {
922+
pub async fn load_user_id_to_id(&self, keys: impl Iterator<Item = &str>) {
923923
self.user_id_to_id
924-
.load_blocks_for_keys(keys.iter().map(|k| ("".to_string(), *k)))
924+
.load_data_for_keys(keys.map(|k| ("".to_string(), k)))
925925
.await
926926
}
927927

rust/segment/src/types.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ pub async fn materialize_logs(
597597
user_ids.sort_unstable();
598598
user_ids.dedup();
599599
async {
600-
reader.prefetch_user_id_to_id(&user_ids).await;
600+
reader.load_user_id_to_id(user_ids.iter().cloned()).await;
601601

602602
let mut existing_offset_ids = Vec::with_capacity(user_ids.len());
603603
for user_id in user_ids {
@@ -610,7 +610,9 @@ pub async fn materialize_logs(
610610
};
611611
}
612612

613-
reader.prefetch_id_to_data(&existing_offset_ids).await;
613+
reader
614+
.load_id_to_data(existing_offset_ids.iter().cloned())
615+
.await;
614616
Ok::<_, LogMaterializerError>(())
615617
}
616618
.instrument(Span::current())

rust/types/src/regex/literal_expr.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub trait NgramLiteralProvider<E, const N: usize = 3> {
7979
// Return the max branching factor during the search
8080
fn maximum_branching_factor(&self) -> usize;
8181

82-
async fn prefetch_ngrams<'me, Ngrams>(&'me self, _ngrams: Ngrams)
82+
async fn load_ngrams<'me, Ngrams>(&'me self, _ngrams: Ngrams)
8383
where
8484
Ngrams: IntoIterator<Item = &'me str> + Send + Sync,
8585
{
@@ -180,7 +180,7 @@ pub trait NgramLiteralProvider<E, const N: usize = 3> {
180180
return Ok(HashSet::new());
181181
}
182182

183-
self.prefetch_ngrams(
183+
self.load_ngrams(
184184
ngram_vec
185185
.iter()
186186
.flat_map(|ngrams| ngrams.iter().map(|ngram| ngram.as_str())),

0 commit comments

Comments
 (0)