Skip to content

Commit ad06e9f

Browse files
committed
core/incremental: Improve error handling in aggregate_operator.rs
1 parent fdf61e2 commit ad06e9f

File tree

1 file changed

+28
-8
lines changed

1 file changed

+28
-8
lines changed

core/incremental/aggregate_operator.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,7 @@ impl AggregateOperator {
13331333
group_by: Vec<usize>,
13341334
aggregates: Vec<AggregateFunction>,
13351335
input_column_names: Vec<String>,
1336-
) -> Self {
1336+
) -> Result<Self> {
13371337
// Precompute flags for runtime efficiency
13381338
// Plain DISTINCT is indicated by empty aggregates vector
13391339
let is_distinct_only = aggregates.is_empty();
@@ -1361,7 +1361,11 @@ impl AggregateOperator {
13611361
for agg in &aggregates {
13621362
match agg {
13631363
AggregateFunction::Min(col_idx) => {
1364-
let storage_index = *storage_indices.get(col_idx).unwrap();
1364+
let storage_index = *storage_indices.get(col_idx).ok_or_else(|| {
1365+
LimboError::InternalError(
1366+
"storage index for MIN column should exist from first pass".to_string(),
1367+
)
1368+
})?;
13651369
let entry = column_min_max.entry(*col_idx).or_insert(AggColumnInfo {
13661370
index: storage_index,
13671371
has_min: false,
@@ -1370,7 +1374,11 @@ impl AggregateOperator {
13701374
entry.has_min = true;
13711375
}
13721376
AggregateFunction::Max(col_idx) => {
1373-
let storage_index = *storage_indices.get(col_idx).unwrap();
1377+
let storage_index = *storage_indices.get(col_idx).ok_or_else(|| {
1378+
LimboError::InternalError(
1379+
"storage index for MAX column should exist from first pass".to_string(),
1380+
)
1381+
})?;
13741382
let entry = column_min_max.entry(*col_idx).or_insert(AggColumnInfo {
13751383
index: storage_index,
13761384
has_min: false,
@@ -1395,7 +1403,7 @@ impl AggregateOperator {
13951403
}
13961404
}
13971405

1398-
Self {
1406+
Ok(Self {
13991407
operator_id,
14001408
group_by,
14011409
aggregates,
@@ -1405,7 +1413,7 @@ impl AggregateOperator {
14051413
tracker: None,
14061414
commit_state: AggregateCommitState::Idle,
14071415
is_distinct_only,
1408-
}
1416+
})
14091417
}
14101418

14111419
pub fn has_min_max(&self) -> bool {
@@ -2639,8 +2647,12 @@ impl FetchDistinctState {
26392647
for (col_idx, values) in cols_values {
26402648
for hashable_row in values {
26412649
// Extract the value from HashableRow
2642-
values_to_fetch
2643-
.push((*col_idx, hashable_row.values.first().unwrap().clone()));
2650+
let value = hashable_row.values.first().ok_or_else(|| {
2651+
LimboError::InternalError(
2652+
"hashable_row should have at least one value".to_string(),
2653+
)
2654+
})?;
2655+
values_to_fetch.push((*col_idx, value.clone()));
26442656
}
26452657
}
26462658
}
@@ -2900,7 +2912,15 @@ impl DistinctPersistState {
29002912
let (col_idx, hashable_row) = value_keys[*value_idx].clone();
29012913
let weight = distinct_deltas[&group_key][&(col_idx, hashable_row.clone())];
29022914
// Extract the value from HashableRow (it's the first element in values vector)
2903-
let value = hashable_row.values.first().unwrap().clone();
2915+
let value = hashable_row
2916+
.values
2917+
.first()
2918+
.ok_or_else(|| {
2919+
LimboError::InternalError(
2920+
"hashable_row should have at least one value".to_string(),
2921+
)
2922+
})?
2923+
.clone();
29042924

29052925
let distinct_deltas = std::mem::take(distinct_deltas);
29062926
let group_keys = std::mem::take(group_keys);

0 commit comments

Comments
 (0)