Skip to content

Commit bb2df13

Browse files
authored
Merge 'Kill unwrap in incremental module' from Pekka Enberg
As preparation for #3992 which prohibits use of unwrap, remove uses of it in the incremental module. Closes #4001
2 parents 06a01c4 + f1401ab commit bb2df13

File tree

4 files changed

+131
-54
lines changed

4 files changed

+131
-54
lines changed

core/incremental/aggregate_operator.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,7 @@ impl AggregateOperator {
13331333
group_by: Vec<usize>,
13341334
aggregates: Vec<AggregateFunction>,
13351335
input_column_names: Vec<String>,
1336-
) -> Self {
1336+
) -> Result<Self> {
13371337
// Precompute flags for runtime efficiency
13381338
// Plain DISTINCT is indicated by empty aggregates vector
13391339
let is_distinct_only = aggregates.is_empty();
@@ -1361,7 +1361,11 @@ impl AggregateOperator {
13611361
for agg in &aggregates {
13621362
match agg {
13631363
AggregateFunction::Min(col_idx) => {
1364-
let storage_index = *storage_indices.get(col_idx).unwrap();
1364+
let storage_index = *storage_indices.get(col_idx).ok_or_else(|| {
1365+
LimboError::InternalError(
1366+
"storage index for MIN column should exist from first pass".to_string(),
1367+
)
1368+
})?;
13651369
let entry = column_min_max.entry(*col_idx).or_insert(AggColumnInfo {
13661370
index: storage_index,
13671371
has_min: false,
@@ -1370,7 +1374,11 @@ impl AggregateOperator {
13701374
entry.has_min = true;
13711375
}
13721376
AggregateFunction::Max(col_idx) => {
1373-
let storage_index = *storage_indices.get(col_idx).unwrap();
1377+
let storage_index = *storage_indices.get(col_idx).ok_or_else(|| {
1378+
LimboError::InternalError(
1379+
"storage index for MAX column should exist from first pass".to_string(),
1380+
)
1381+
})?;
13741382
let entry = column_min_max.entry(*col_idx).or_insert(AggColumnInfo {
13751383
index: storage_index,
13761384
has_min: false,
@@ -1395,7 +1403,7 @@ impl AggregateOperator {
13951403
}
13961404
}
13971405

1398-
Self {
1406+
Ok(Self {
13991407
operator_id,
14001408
group_by,
14011409
aggregates,
@@ -1405,7 +1413,7 @@ impl AggregateOperator {
14051413
tracker: None,
14061414
commit_state: AggregateCommitState::Idle,
14071415
is_distinct_only,
1408-
}
1416+
})
14091417
}
14101418

14111419
pub fn has_min_max(&self) -> bool {
@@ -2639,8 +2647,12 @@ impl FetchDistinctState {
26392647
for (col_idx, values) in cols_values {
26402648
for hashable_row in values {
26412649
// Extract the value from HashableRow
2642-
values_to_fetch
2643-
.push((*col_idx, hashable_row.values.first().unwrap().clone()));
2650+
let value = hashable_row.values.first().ok_or_else(|| {
2651+
LimboError::InternalError(
2652+
"hashable_row should have at least one value".to_string(),
2653+
)
2654+
})?;
2655+
values_to_fetch.push((*col_idx, value.clone()));
26442656
}
26452657
}
26462658
}
@@ -2900,7 +2912,15 @@ impl DistinctPersistState {
29002912
let (col_idx, hashable_row) = value_keys[*value_idx].clone();
29012913
let weight = distinct_deltas[&group_key][&(col_idx, hashable_row.clone())];
29022914
// Extract the value from HashableRow (it's the first element in values vector)
2903-
let value = hashable_row.values.first().unwrap().clone();
2915+
let value = hashable_row
2916+
.values
2917+
.first()
2918+
.ok_or_else(|| {
2919+
LimboError::InternalError(
2920+
"hashable_row should have at least one value".to_string(),
2921+
)
2922+
})?
2923+
.clone();
29042924

29052925
let distinct_deltas = std::mem::take(distinct_deltas);
29062926
let group_keys = std::mem::take(group_keys);

core/incremental/compiler.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -886,13 +886,21 @@ impl DbspCompiler {
886886
// Determine the correct pairing: one column must be from left, one from right
887887
if first_in_left.is_some() && second_in_right.is_some() {
888888
// first is from left, second is from right
889-
let (left_idx, _) = first_in_left.unwrap();
890-
let (right_idx, _) = second_in_right.unwrap();
889+
let (left_idx, _) = first_in_left.ok_or_else(|| {
890+
LimboError::InternalError("first_in_left should exist".to_string())
891+
})?;
892+
let (right_idx, _) = second_in_right.ok_or_else(|| {
893+
LimboError::InternalError("second_in_right should exist".to_string())
894+
})?;
891895
Ok((first_col.clone(), left_idx, second_col.clone(), right_idx))
892896
} else if first_in_right.is_some() && second_in_left.is_some() {
893897
// first is from right, second is from left
894-
let (left_idx, _) = second_in_left.unwrap();
895-
let (right_idx, _) = first_in_right.unwrap();
898+
let (left_idx, _) = second_in_left.ok_or_else(|| {
899+
LimboError::InternalError("second_in_left should exist".to_string())
900+
})?;
901+
let (right_idx, _) = first_in_right.ok_or_else(|| {
902+
LimboError::InternalError("first_in_right should exist".to_string())
903+
})?;
896904
Ok((second_col.clone(), left_idx, first_col.clone(), right_idx))
897905
} else {
898906
// Provide specific error messages for different failure cases
@@ -1276,7 +1284,7 @@ impl DbspCompiler {
12761284
group_by_indices.clone(),
12771285
aggregate_functions.clone(),
12781286
input_column_names.clone(),
1279-
));
1287+
)?);
12801288

12811289
let result_node_id = self.circuit.add_node(
12821290
DbspOperator::Aggregate {
@@ -1421,7 +1429,7 @@ impl DbspCompiler {
14211429
group_by,
14221430
vec![], // Empty aggregates indicates plain DISTINCT
14231431
input_column_names,
1424-
),
1432+
)?,
14251433
);
14261434

14271435
// Add the node to the circuit

0 commit comments

Comments
 (0)