Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/blockstore/src/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
if !self.block_manager.cached(block_id).await
&& !self.loaded_blocks.read().contains_key(block_id)
{
futures.push(self.get_block(*block_id, StorageRequestPriority::P1));
futures.push(self.get_block(*block_id, StorageRequestPriority::P0));
}
}
join_all(futures).await;
Expand Down
5 changes: 3 additions & 2 deletions rust/blockstore/src/types/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ impl<
}
}

pub async fn load_blocks_for_keys(&self, keys: impl IntoIterator<Item = (String, K)>) {
// NOTE(sicheng): This loads the underlying data concurrently
pub async fn load_data_for_keys(&self, keys: impl IntoIterator<Item = (String, K)>) {
match self {
BlockfileReader::MemoryBlockfileReader(_reader) => unimplemented!(),
BlockfileReader::MemoryBlockfileReader(_) => (),
BlockfileReader::ArrowBlockfileReader(reader) => {
reader.load_blocks_for_keys(keys).await
}
Expand Down
2 changes: 1 addition & 1 deletion rust/index/src/fulltext/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ impl NgramLiteralProvider<FullTextIndexError> for FullTextIndexReader<'_> {
6
}

async fn prefetch_ngrams<'me, Ngrams>(&'me self, ngrams: Ngrams)
async fn load_ngrams<'me, Ngrams>(&'me self, ngrams: Ngrams)
where
Ngrams: IntoIterator<Item = &'me str> + Send + Sync,
{
Expand Down
8 changes: 4 additions & 4 deletions rust/segment/src/blockfile_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,15 +923,15 @@ impl RecordSegmentReader<'_> {
self.id_to_user_id.count().await
}

pub async fn prefetch_id_to_data(&self, keys: &[u32]) {
pub async fn load_id_to_data(&self, keys: impl Iterator<Item = u32>) {
self.id_to_data
.load_blocks_for_keys(keys.iter().map(|k| ("".to_string(), *k)))
.load_data_for_keys(keys.map(|k| ("".to_string(), k)))
.await
}

pub(crate) async fn prefetch_user_id_to_id(&self, keys: &[&str]) {
pub async fn load_user_id_to_id(&self, keys: impl Iterator<Item = &str>) {
self.user_id_to_id
.load_blocks_for_keys(keys.iter().map(|k| ("".to_string(), *k)))
.load_data_for_keys(keys.map(|k| ("".to_string(), k)))
.await
}

Expand Down
6 changes: 4 additions & 2 deletions rust/segment/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ pub async fn materialize_logs(
user_ids.sort_unstable();
user_ids.dedup();
async {
reader.prefetch_user_id_to_id(&user_ids).await;
reader.load_user_id_to_id(user_ids.iter().cloned()).await;

let mut existing_offset_ids = Vec::with_capacity(user_ids.len());
for user_id in user_ids {
Expand All @@ -610,7 +610,9 @@ pub async fn materialize_logs(
};
}

reader.prefetch_id_to_data(&existing_offset_ids).await;
reader
.load_id_to_data(existing_offset_ids.iter().cloned())
.await;
Ok::<_, LogMaterializerError>(())
}
.instrument(Span::current())
Expand Down
4 changes: 2 additions & 2 deletions rust/types/src/regex/literal_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub trait NgramLiteralProvider<E, const N: usize = 3> {
// Return the max branching factor during the search
fn maximum_branching_factor(&self) -> usize;

async fn prefetch_ngrams<'me, Ngrams>(&'me self, _ngrams: Ngrams)
async fn load_ngrams<'me, Ngrams>(&'me self, _ngrams: Ngrams)
where
Ngrams: IntoIterator<Item = &'me str> + Send + Sync,
{
Expand Down Expand Up @@ -180,7 +180,7 @@ pub trait NgramLiteralProvider<E, const N: usize = 3> {
return Ok(HashSet::new());
}

self.prefetch_ngrams(
self.load_ngrams(
ngram_vec
.iter()
.flat_map(|ngrams| ngrams.iter().map(|ngram| ngram.as_str())),
Expand Down
Loading