From f62a23a7ee3047aaa42643ac2a7305f112d8ced1 Mon Sep 17 00:00:00 2001 From: aaryanpunia Date: Wed, 24 Jul 2024 12:58:29 -0700 Subject: [PATCH] fix: don't ignore upsert chunks --- server/src/handlers/chunk_handler.rs | 33 ++++++++++++---------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/server/src/handlers/chunk_handler.rs b/server/src/handlers/chunk_handler.rs index a114a498e..d6aadfbc0 100644 --- a/server/src/handlers/chunk_handler.rs +++ b/server/src/handlers/chunk_handler.rs @@ -332,25 +332,19 @@ pub async fn create_chunk( let dataset_config = DatasetConfiguration::from_json(dataset_org_plan_sub.dataset.server_configuration.clone()); - let (upsert_chunks, non_upsert_chunks) = chunks - .iter() - .filter_map(|chunk| { - if !chunk.upsert_by_tracking_id.unwrap_or(false) { - let non_empty_tracking_id = chunk - .tracking_id - .clone() - .filter(|tracking_id| !tracking_id.is_empty()); - let new_chunk = ChunkReqPayload { - tracking_id: non_empty_tracking_id, - ..chunk.clone() - }; - - Some(new_chunk) - } else { - None - } - }) - .partition(|chunk| chunk.upsert_by_tracking_id.unwrap_or(false)); + let chunks = chunks.into_iter().map(|chunk| { + let non_empty_tracking_id = chunk + .tracking_id + .clone() + .filter(|tracking_id| !tracking_id.is_empty()); + ChunkReqPayload { + tracking_id: non_empty_tracking_id, + ..chunk.clone() + } + }); + + let (upsert_chunks, non_upsert_chunks): (Vec, Vec) = + chunks.partition(|chunk| chunk.upsert_by_tracking_id.unwrap_or(false)); let (non_upsert_chunk_ingestion_message, non_upsert_chunk_metadatas) = create_chunk_metadata( non_upsert_chunks, @@ -359,6 +353,7 @@ pub async fn create_chunk( pool.clone(), ) .await?; + let (upsert_chunk_ingestion_message, upsert_chunk_metadatas) = create_chunk_metadata( upsert_chunks, dataset_org_plan_sub.dataset.id,