Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions rust/segment/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,6 @@ pub async fn materialize_logs(
);
};
}

reader
.load_id_to_data(existing_offset_ids.iter().cloned())
.await;
Ok::<_, LogMaterializerError>(())
}
.instrument(Span::current())
Expand Down
14 changes: 13 additions & 1 deletion rust/worker/src/execution/operators/materialize_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use chroma_error::ChromaError;
use chroma_segment::blockfile_record::{RecordSegmentReader, RecordSegmentReaderCreationError};
use chroma_segment::types::{materialize_logs, LogMaterializerError, MaterializeLogsResult};
use chroma_system::Operator;
use chroma_types::{Chunk, LogRecord};
use chroma_types::{Chunk, LogRecord, MaterializedLogOperation};
use futures::TryFutureExt;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
Expand Down Expand Up @@ -77,6 +77,18 @@ impl Operator<MaterializeLogInput, MaterializeLogOutput> for MaterializeLogOpera
.map_err(MaterializeLogOperatorError::LogMaterializationFailed)
.await?;

if let Some(reader) = input.record_reader.as_ref() {
reader
.load_id_to_data(result.iter().filter_map(|log| {
matches!(
log.get_operation(),
MaterializedLogOperation::UpdateExisting
)
.then_some(log.get_offset_id())
Comment on lines +83 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important

[Performance] The current filtering for pre-loading data is too restrictive. It only considers UpdateExisting operations. However, records with DeleteExisting and OverwriteExisting operations also require their original data from the record segment to correctly calculate the logical size delta and for segment writers to process them.

Without pre-loading, the data for these records will be fetched individually within the hydrate call inside the loop, which negates some of the performance benefit of this optimization.

To fix this, we should pre-load data for all operations on existing records. Since Initial operations are already filtered out by materialize_logs, this means we should load for all operations except AddNew.

Context for Agents
The current filtering for pre-loading data is too restrictive. It only considers `UpdateExisting` operations. However, records with `DeleteExisting` and `OverwriteExisting` operations also require their original data from the record segment to correctly calculate the logical size delta and for segment writers to process them.

Without pre-loading, the data for these records will be fetched individually within the `hydrate` call inside the loop, which negates some of the performance benefit of this optimization.

To fix this, we should pre-load data for all operations on existing records. Since `Initial` operations are already filtered out by `materialize_logs`, this means we should load for all operations except `AddNew`.

File: rust/worker/src/execution/operators/materialize_logs.rs
Line: 87

}))
.await;
}

let mut collection_logical_size_delta = 0;
for record in &result {
collection_logical_size_delta += record
Expand Down
Loading