Skip to content

Commit f007e0a

Browse files
committed
[ENH] Load blocks given keys under P0 priority
1 parent 7c106c4 commit f007e0a

File tree

6 files changed

+15
-12
lines changed

6 files changed

+15
-12
lines changed

rust/blockstore/src/arrow/blockfile.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
533533
if !self.block_manager.cached(block_id).await
534534
&& !self.loaded_blocks.read().contains_key(block_id)
535535
{
536-
futures.push(self.get_block(*block_id, StorageRequestPriority::P1));
536+
futures.push(self.get_block(*block_id, StorageRequestPriority::P0));
537537
}
538538
}
539539
join_all(futures).await;

rust/blockstore/src/types/reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,10 @@ impl<
137137
}
138138
}
139139

140-
pub async fn load_blocks_for_keys(&self, keys: impl IntoIterator<Item = (String, K)>) {
140+
// NOTE(sicheng): This loads the underlying data concurrently
141+
pub async fn load_data_for_keys(&self, keys: impl IntoIterator<Item = (String, K)>) {
141142
match self {
142-
BlockfileReader::MemoryBlockfileReader(_reader) => unimplemented!(),
143+
BlockfileReader::MemoryBlockfileReader(_) => (),
143144
BlockfileReader::ArrowBlockfileReader(reader) => {
144145
reader.load_blocks_for_keys(keys).await
145146
}

rust/index/src/fulltext/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ impl NgramLiteralProvider<FullTextIndexError> for FullTextIndexReader<'_> {
440440
6
441441
}
442442

443-
async fn prefetch_ngrams<'me, Ngrams>(&'me self, ngrams: Ngrams)
443+
async fn load_ngrams<'me, Ngrams>(&'me self, ngrams: Ngrams)
444444
where
445445
Ngrams: IntoIterator<Item = &'me str> + Send + Sync,
446446
{

rust/segment/src/blockfile_record.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -923,15 +923,15 @@ impl RecordSegmentReader<'_> {
923923
self.id_to_user_id.count().await
924924
}
925925

926-
pub async fn prefetch_id_to_data(&self, keys: &[u32]) {
926+
pub async fn load_id_to_data(&self, keys: impl Iterator<Item = u32>) {
927927
self.id_to_data
928-
.load_blocks_for_keys(keys.iter().map(|k| ("".to_string(), *k)))
928+
.load_data_for_keys(keys.map(|k| ("".to_string(), k)))
929929
.await
930930
}
931931

932-
pub(crate) async fn prefetch_user_id_to_id(&self, keys: &[&str]) {
932+
pub async fn load_user_id_to_id(&self, keys: impl Iterator<Item = &str>) {
933933
self.user_id_to_id
934-
.load_blocks_for_keys(keys.iter().map(|k| ("".to_string(), *k)))
934+
.load_data_for_keys(keys.map(|k| ("".to_string(), k)))
935935
.await
936936
}
937937

rust/segment/src/types.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ pub async fn materialize_logs(
597597
user_ids.sort_unstable();
598598
user_ids.dedup();
599599
async {
600-
reader.prefetch_user_id_to_id(&user_ids).await;
600+
reader.load_user_id_to_id(user_ids.iter().cloned()).await;
601601

602602
let mut existing_offset_ids = Vec::with_capacity(user_ids.len());
603603
for user_id in user_ids {
@@ -610,7 +610,9 @@ pub async fn materialize_logs(
610610
};
611611
}
612612

613-
reader.prefetch_id_to_data(&existing_offset_ids).await;
613+
reader
614+
.load_id_to_data(existing_offset_ids.iter().cloned())
615+
.await;
614616
Ok::<_, LogMaterializerError>(())
615617
}
616618
.instrument(Span::current())

rust/types/src/regex/literal_expr.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub trait NgramLiteralProvider<E, const N: usize = 3> {
7979
// Return the max branching factor during the search
8080
fn maximum_branching_factor(&self) -> usize;
8181

82-
async fn prefetch_ngrams<'me, Ngrams>(&'me self, _ngrams: Ngrams)
82+
async fn load_ngrams<'me, Ngrams>(&'me self, _ngrams: Ngrams)
8383
where
8484
Ngrams: IntoIterator<Item = &'me str> + Send + Sync,
8585
{
@@ -180,7 +180,7 @@ pub trait NgramLiteralProvider<E, const N: usize = 3> {
180180
return Ok(HashSet::new());
181181
}
182182

183-
self.prefetch_ngrams(
183+
self.load_ngrams(
184184
ngram_vec
185185
.iter()
186186
.flat_map(|ngrams| ngrams.iter().map(|ngram| ngram.as_str())),

0 commit comments

Comments
 (0)