Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions core/incremental/aggregate_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ impl AggregateOperator {
group_by: Vec<usize>,
aggregates: Vec<AggregateFunction>,
input_column_names: Vec<String>,
) -> Self {
) -> Result<Self> {
// Precompute flags for runtime efficiency
// Plain DISTINCT is indicated by empty aggregates vector
let is_distinct_only = aggregates.is_empty();
Expand Down Expand Up @@ -1361,7 +1361,11 @@ impl AggregateOperator {
for agg in &aggregates {
match agg {
AggregateFunction::Min(col_idx) => {
let storage_index = *storage_indices.get(col_idx).unwrap();
let storage_index = *storage_indices.get(col_idx).ok_or_else(|| {
LimboError::InternalError(
"storage index for MIN column should exist from first pass".to_string(),
)
})?;
let entry = column_min_max.entry(*col_idx).or_insert(AggColumnInfo {
index: storage_index,
has_min: false,
Expand All @@ -1370,7 +1374,11 @@ impl AggregateOperator {
entry.has_min = true;
}
AggregateFunction::Max(col_idx) => {
let storage_index = *storage_indices.get(col_idx).unwrap();
let storage_index = *storage_indices.get(col_idx).ok_or_else(|| {
LimboError::InternalError(
"storage index for MAX column should exist from first pass".to_string(),
)
})?;
let entry = column_min_max.entry(*col_idx).or_insert(AggColumnInfo {
index: storage_index,
has_min: false,
Expand All @@ -1395,7 +1403,7 @@ impl AggregateOperator {
}
}

Self {
Ok(Self {
operator_id,
group_by,
aggregates,
Expand All @@ -1405,7 +1413,7 @@ impl AggregateOperator {
tracker: None,
commit_state: AggregateCommitState::Idle,
is_distinct_only,
}
})
}

pub fn has_min_max(&self) -> bool {
Expand Down Expand Up @@ -2639,8 +2647,12 @@ impl FetchDistinctState {
for (col_idx, values) in cols_values {
for hashable_row in values {
// Extract the value from HashableRow
values_to_fetch
.push((*col_idx, hashable_row.values.first().unwrap().clone()));
let value = hashable_row.values.first().ok_or_else(|| {
LimboError::InternalError(
"hashable_row should have at least one value".to_string(),
)
})?;
values_to_fetch.push((*col_idx, value.clone()));
}
}
}
Expand Down Expand Up @@ -2900,7 +2912,15 @@ impl DistinctPersistState {
let (col_idx, hashable_row) = value_keys[*value_idx].clone();
let weight = distinct_deltas[&group_key][&(col_idx, hashable_row.clone())];
// Extract the value from HashableRow (it's the first element in values vector)
let value = hashable_row.values.first().unwrap().clone();
let value = hashable_row
.values
.first()
.ok_or_else(|| {
LimboError::InternalError(
"hashable_row should have at least one value".to_string(),
)
})?
.clone();

let distinct_deltas = std::mem::take(distinct_deltas);
let group_keys = std::mem::take(group_keys);
Expand Down
20 changes: 14 additions & 6 deletions core/incremental/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,13 +886,21 @@ impl DbspCompiler {
// Determine the correct pairing: one column must be from left, one from right
if first_in_left.is_some() && second_in_right.is_some() {
// first is from left, second is from right
let (left_idx, _) = first_in_left.unwrap();
let (right_idx, _) = second_in_right.unwrap();
let (left_idx, _) = first_in_left.ok_or_else(|| {
LimboError::InternalError("first_in_left should exist".to_string())
})?;
let (right_idx, _) = second_in_right.ok_or_else(|| {
LimboError::InternalError("second_in_right should exist".to_string())
})?;
Ok((first_col.clone(), left_idx, second_col.clone(), right_idx))
} else if first_in_right.is_some() && second_in_left.is_some() {
// first is from right, second is from left
let (left_idx, _) = second_in_left.unwrap();
let (right_idx, _) = first_in_right.unwrap();
let (left_idx, _) = second_in_left.ok_or_else(|| {
LimboError::InternalError("second_in_left should exist".to_string())
})?;
let (right_idx, _) = first_in_right.ok_or_else(|| {
LimboError::InternalError("first_in_right should exist".to_string())
})?;
Ok((second_col.clone(), left_idx, first_col.clone(), right_idx))
} else {
// Provide specific error messages for different failure cases
Expand Down Expand Up @@ -1276,7 +1284,7 @@ impl DbspCompiler {
group_by_indices.clone(),
aggregate_functions.clone(),
input_column_names.clone(),
));
)?);

let result_node_id = self.circuit.add_node(
DbspOperator::Aggregate {
Expand Down Expand Up @@ -1421,7 +1429,7 @@ impl DbspCompiler {
group_by,
vec![], // Empty aggregates indicates plain DISTINCT
input_column_names,
),
)?,
);

// Add the node to the circuit
Expand Down
Loading
Loading