Skip to content

Commit

Permalink
fix: don't ignore upsert chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
aaryanpunia authored and skeptrunedev committed Jul 24, 2024
1 parent 20ae143 commit f62a23a
Showing 1 changed file with 14 additions and 19 deletions.
33 changes: 14 additions & 19 deletions server/src/handlers/chunk_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,25 +332,19 @@ pub async fn create_chunk(
let dataset_config =
DatasetConfiguration::from_json(dataset_org_plan_sub.dataset.server_configuration.clone());

let (upsert_chunks, non_upsert_chunks) = chunks
.iter()
.filter_map(|chunk| {
if !chunk.upsert_by_tracking_id.unwrap_or(false) {
let non_empty_tracking_id = chunk
.tracking_id
.clone()
.filter(|tracking_id| !tracking_id.is_empty());
let new_chunk = ChunkReqPayload {
tracking_id: non_empty_tracking_id,
..chunk.clone()
};

Some(new_chunk)
} else {
None
}
})
.partition(|chunk| chunk.upsert_by_tracking_id.unwrap_or(false));
let chunks = chunks.into_iter().map(|chunk| {
let non_empty_tracking_id = chunk
.tracking_id
.clone()
.filter(|tracking_id| !tracking_id.is_empty());
ChunkReqPayload {
tracking_id: non_empty_tracking_id,
..chunk.clone()
}
});

let (upsert_chunks, non_upsert_chunks): (Vec<ChunkReqPayload>, Vec<ChunkReqPayload>) =
chunks.partition(|chunk| chunk.upsert_by_tracking_id.unwrap_or(false));

let (non_upsert_chunk_ingestion_message, non_upsert_chunk_metadatas) = create_chunk_metadata(
non_upsert_chunks,
Expand All @@ -359,6 +353,7 @@ pub async fn create_chunk(
pool.clone(),
)
.await?;

let (upsert_chunk_ingestion_message, upsert_chunk_metadatas) = create_chunk_metadata(
upsert_chunks,
dataset_org_plan_sub.dataset.id,
Expand Down

0 comments on commit f62a23a

Please sign in to comment.