Skip to content

Commit 41a49d6

Browse files
feat(core/translate): implement SQLite min/max special-case for bare columns
1 parent caa71ea commit 41a49d6

File tree

2 files changed

+400
-13
lines changed

2 files changed

+400
-13
lines changed

core/translate/group_by.rs

Lines changed: 320 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use crate::{
2929
};
3030
use crate::{translate::plan::ResultSetColumn, types::KeyInfo};
3131

32+
use crate::function::AggFunc;
33+
3234
/// Labels needed for various jumps in GROUP BY handling.
3335
#[derive(Debug)]
3436
pub struct GroupByLabels {
@@ -72,6 +74,12 @@ pub struct GroupByRegisters {
7274
/// The comparison result is used to determine if the current row belongs to the same group as the previous row
7375
/// Each group by expression has a corresponding register
7476
pub reg_group_exprs_cmp: usize,
77+
/// Register holding the current min/max value for comparison (used for MIN/MAX special case)
78+
/// Only allocated when there's exactly one MIN or MAX aggregate
79+
pub reg_current_min_max: Option<usize>,
80+
/// Register holding a flag indicating whether to update bare columns (used for MIN/MAX special case)
81+
/// Only allocated when there's exactly one MIN or MAX aggregate
82+
pub reg_should_update_bare_cols: Option<usize>,
7583
}
7684

7785
// Metadata for handling GROUP BY operations
@@ -115,6 +123,21 @@ pub fn init_group_by<'a>(
115123
let reg_abort_flag = program.alloc_register();
116124
let reg_group_exprs_cmp = program.alloc_registers(group_by.exprs.len());
117125

126+
// Check if there's exactly one MIN or MAX aggregate (MIN/MAX special case)
127+
let min_max_aggs_count = plan
128+
.aggregates
129+
.iter()
130+
.filter(|agg| matches!(agg.func, AggFunc::Min | AggFunc::Max))
131+
.count();
132+
let (reg_current_min_max, reg_should_update_bare_cols) = if min_max_aggs_count == 1 {
133+
(
134+
Some(program.alloc_register()),
135+
Some(program.alloc_register()),
136+
)
137+
} else {
138+
(None, None)
139+
};
140+
118141
// The following two blocks of registers should always be allocated contiguously,
119142
// because they are cleared in a contiguous block in the GROUP BYs clear accumulator subroutine.
120143
// START BLOCK
@@ -216,6 +239,8 @@ pub fn init_group_by<'a>(
216239
reg_group_exprs_cmp,
217240
reg_subrtn_acc_clear_return_offset,
218241
reg_group_by_source_cols_start,
242+
reg_current_min_max,
243+
reg_should_update_bare_cols,
219244
},
220245
});
221246
Ok(())
@@ -435,6 +460,202 @@ pub fn emit_group_by_sort_loop_end(
435460
program.preassign_label_to_next_insn(label_sort_loop_end);
436461
}
437462

463+
/// Finds the single MIN or MAX aggregate if exactly one exists.
464+
/// Returns (index, aggregate) if found, None otherwise.
465+
fn find_single_min_max_aggregate(aggregates: &[Aggregate]) -> Option<(usize, &Aggregate)> {
466+
let min_max_aggs: Vec<_> = aggregates
467+
.iter()
468+
.enumerate()
469+
.filter(|(_, agg)| matches!(agg.func, AggFunc::Min | AggFunc::Max))
470+
.collect();
471+
472+
if min_max_aggs.len() == 1 {
473+
Some((min_max_aggs[0].0, min_max_aggs[0].1))
474+
} else {
475+
None
476+
}
477+
}
478+
479+
/// Gets the register containing the aggregate argument value.
480+
/// This handles both Sorter and MainLoop row sources.
481+
fn get_aggregate_argument_register(
482+
program: &mut ProgramBuilder,
483+
row_source: &GroupByRowSource,
484+
cursor_index: usize,
485+
offset: usize,
486+
non_aggregate_exprs_len: usize,
487+
) -> usize {
488+
match row_source {
489+
GroupByRowSource::Sorter { pseudo_cursor, .. } => {
490+
let arg_reg = program.alloc_register();
491+
program.emit_column_or_rowid(*pseudo_cursor, cursor_index + offset, arg_reg);
492+
arg_reg
493+
}
494+
GroupByRowSource::MainLoop { start_reg_src, .. } => {
495+
let src_reg = start_reg_src + non_aggregate_exprs_len + offset;
496+
let arg_reg = program.alloc_register();
497+
program.emit_insn(Insn::Copy {
498+
src_reg,
499+
dst_reg: arg_reg,
500+
extra_amount: 0,
501+
});
502+
arg_reg
503+
}
504+
}
505+
}
506+
507+
/// Emits bytecode to compare the aggregate argument with the current min/max value
508+
/// and set the update flag if this row should update the bare columns.
509+
fn emit_min_max_comparison(
510+
program: &mut ProgramBuilder,
511+
agg: &Aggregate,
512+
agg_arg_reg: usize,
513+
current_min_max_reg: usize,
514+
update_flag_reg: usize,
515+
plan: &SelectPlan,
516+
) -> Result<()> {
517+
// Initialize update flag to 0 (don't update by default)
518+
program.emit_insn(Insn::Integer {
519+
value: 0,
520+
dest: update_flag_reg,
521+
});
522+
523+
// Skip comparison if aggregate argument is NULL (NULLs are ignored in MIN/MAX)
524+
let after_comparison_label = program.allocate_label();
525+
let check_first_row_label = program.allocate_label();
526+
527+
program.emit_insn(Insn::NotNull {
528+
reg: agg_arg_reg,
529+
target_pc: check_first_row_label,
530+
});
531+
program.emit_insn(Insn::Goto {
532+
target_pc: after_comparison_label,
533+
});
534+
535+
// Aggregate argument is not NULL: proceed with comparison
536+
program.preassign_label_to_next_insn(check_first_row_label);
537+
538+
// Check if this is the first row in the group (current_min_max_reg is NULL)
539+
let compare_label = program.allocate_label();
540+
program.emit_insn(Insn::NotNull {
541+
reg: current_min_max_reg,
542+
target_pc: compare_label,
543+
});
544+
// First row: always update bare columns
545+
program.emit_insn(Insn::Integer {
546+
value: 1,
547+
dest: update_flag_reg,
548+
});
549+
program.emit_insn(Insn::Goto {
550+
target_pc: after_comparison_label,
551+
});
552+
553+
// Not first row: compare aggregate argument with current min/max
554+
program.preassign_label_to_next_insn(compare_label);
555+
556+
// Get collation for comparison
557+
let collation = agg
558+
.args
559+
.first()
560+
.and_then(|expr| get_collseq_from_expr(expr, &plan.table_references).ok())
561+
.flatten()
562+
.unwrap_or_default();
563+
564+
// Ensure register ordering for Compare instruction (requires start_reg_a < start_reg_b)
565+
let (compare_reg_a, compare_reg_b) = if agg_arg_reg < current_min_max_reg {
566+
(agg_arg_reg, current_min_max_reg)
567+
} else {
568+
// Copy to a temp register that's guaranteed to be after agg_arg_reg
569+
let temp_reg = program.alloc_register();
570+
program.emit_insn(Insn::Copy {
571+
src_reg: current_min_max_reg,
572+
dst_reg: temp_reg,
573+
extra_amount: 0,
574+
});
575+
(agg_arg_reg, temp_reg)
576+
};
577+
578+
// Compare values
579+
program.emit_insn(Insn::Compare {
580+
start_reg_a: compare_reg_a,
581+
start_reg_b: compare_reg_b,
582+
count: 1,
583+
key_info: vec![KeyInfo {
584+
sort_order: SortOrder::Asc,
585+
collation,
586+
}],
587+
});
588+
589+
// Set flag based on comparison: MIN updates if new < old, MAX updates if new > old
590+
let set_flag_label = program.allocate_label();
591+
let skip_flag_label = program.allocate_label();
592+
593+
match agg.func {
594+
AggFunc::Min => {
595+
program.emit_insn(Insn::Jump {
596+
target_pc_lt: set_flag_label,
597+
target_pc_eq: skip_flag_label,
598+
target_pc_gt: skip_flag_label,
599+
});
600+
}
601+
AggFunc::Max => {
602+
program.emit_insn(Insn::Jump {
603+
target_pc_lt: skip_flag_label,
604+
target_pc_eq: skip_flag_label,
605+
target_pc_gt: set_flag_label,
606+
});
607+
}
608+
_ => unreachable!(),
609+
}
610+
611+
program.preassign_label_to_next_insn(set_flag_label);
612+
program.emit_insn(Insn::Integer {
613+
value: 1,
614+
dest: update_flag_reg,
615+
});
616+
617+
program.preassign_label_to_next_insn(skip_flag_label);
618+
program.preassign_label_to_next_insn(after_comparison_label);
619+
620+
Ok(())
621+
}
622+
623+
/// Updates the current min/max tracking register with the new value.
624+
fn update_min_max_tracking_register(
625+
program: &mut ProgramBuilder,
626+
row_source: &GroupByRowSource,
627+
current_min_max_reg: usize,
628+
cursor_index: usize,
629+
aggregate_offset: usize,
630+
non_aggregate_exprs_len: usize,
631+
) {
632+
let agg_arg_reg = match row_source {
633+
GroupByRowSource::Sorter { pseudo_cursor, .. } => {
634+
let arg_reg = program.alloc_register();
635+
program.emit_column_or_rowid(*pseudo_cursor, cursor_index + aggregate_offset, arg_reg);
636+
arg_reg
637+
}
638+
GroupByRowSource::MainLoop { start_reg_src, .. } => {
639+
*start_reg_src + non_aggregate_exprs_len + aggregate_offset
640+
}
641+
};
642+
643+
program.emit_insn(Insn::Copy {
644+
src_reg: agg_arg_reg,
645+
dst_reg: current_min_max_reg,
646+
extra_amount: 0,
647+
});
648+
}
649+
650+
/// Calculates the offset to the aggregate argument for a given aggregate index.
651+
fn calculate_aggregate_offset(aggregates: &[Aggregate], target_index: usize) -> usize {
652+
aggregates
653+
.iter()
654+
.take(target_index)
655+
.map(|agg| agg.args.len())
656+
.sum()
657+
}
658+
438659
/// Enum representing the source for the rows processed during a GROUP BY.
439660
/// In case sorting is needed (which is most of the time), the variant
440661
/// [GroupByRowSource::Sorter] encodes the necessary information about that
@@ -582,15 +803,47 @@ pub fn group_by_process_single_group(
582803
return_reg: registers.reg_subrtn_acc_clear_return_offset,
583804
});
584805

806+
// Check if there's exactly one MIN or MAX aggregate (SQLite special case for bare columns)
807+
// When there's exactly one MIN or MAX, bare columns should come from the row with min/max value
808+
let min_max_agg_info = find_single_min_max_aggregate(&plan.aggregates);
809+
585810
// Process each aggregate function for the current row
586811
program.preassign_label_to_next_insn(labels.label_grouping_agg_step);
587-
let cursor_index = t_ctx.non_aggregate_expressions.len(); // Skipping all columns in sorter that not an aggregation arguments
812+
let cursor_index = t_ctx.non_aggregate_expressions.len();
588813
let mut offset = 0;
589814
for (i, agg) in plan.aggregates.iter().enumerate() {
590815
let start_reg = t_ctx
591816
.reg_agg_start
592817
.expect("aggregate registers must be initialized");
593818
let agg_result_reg = start_reg + i;
819+
820+
// Special handling for single MIN/MAX aggregate: check if this row updates min/max
821+
if min_max_agg_info.map(|(idx, _)| idx == i).unwrap_or(false) {
822+
let agg_arg_reg = get_aggregate_argument_register(
823+
program,
824+
row_source,
825+
cursor_index,
826+
offset,
827+
t_ctx.non_aggregate_expressions.len(),
828+
);
829+
830+
let current_min_max_reg = registers.reg_current_min_max.expect(
831+
"reg_current_min_max should be allocated when there's a single MIN/MAX aggregate",
832+
);
833+
let update_flag_reg = registers.reg_should_update_bare_cols.expect(
834+
"reg_should_update_bare_cols should be allocated when there's a single MIN/MAX aggregate",
835+
);
836+
837+
emit_min_max_comparison(
838+
program,
839+
agg,
840+
agg_arg_reg,
841+
current_min_max_reg,
842+
update_flag_reg,
843+
plan,
844+
)?;
845+
}
846+
594847
let agg_arg_source = match &row_source {
595848
GroupByRowSource::Sorter { pseudo_cursor, .. } => AggArgumentSource::new_from_cursor(
596849
program,
@@ -620,17 +873,56 @@ pub fn group_by_process_single_group(
620873
offset += agg.args.len();
621874
}
622875

623-
// We only need to store non-aggregate columns once per group
624-
// Skip if we've already stored them for this group
625-
program.add_comment(
626-
program.offset(),
627-
"don't emit group columns if continuing existing group",
628-
);
629-
program.emit_insn(Insn::If {
630-
target_pc: labels.label_acc_indicator_set_flag_true,
631-
reg: registers.reg_data_in_acc_flag,
632-
jump_if_null: false,
633-
});
876+
// Handle bare column storage
877+
if let Some((min_max_idx, _)) = min_max_agg_info {
878+
// MIN/MAX case: store bare columns only when flag is set (new min/max found)
879+
let update_flag_reg = registers
880+
.reg_should_update_bare_cols
881+
.expect("reg_should_update_bare_cols should be allocated");
882+
let store_bare_cols_label = program.allocate_label();
883+
884+
// Check flag: if > 0, store bare columns
885+
program.emit_insn(Insn::IfPos {
886+
reg: update_flag_reg,
887+
target_pc: store_bare_cols_label,
888+
decrement_by: 0,
889+
});
890+
// Flag is 0, skip storing bare columns
891+
program.emit_insn(Insn::Goto {
892+
target_pc: labels.label_acc_indicator_set_flag_true,
893+
});
894+
895+
// Flag is 1: update current_min_max and store bare columns
896+
program.preassign_label_to_next_insn(store_bare_cols_label);
897+
898+
// Update the current min/max tracking register
899+
let aggregate_offset = calculate_aggregate_offset(&plan.aggregates, min_max_idx);
900+
let current_min_max_reg = registers
901+
.reg_current_min_max
902+
.expect("reg_current_min_max should be allocated");
903+
904+
update_min_max_tracking_register(
905+
program,
906+
row_source,
907+
current_min_max_reg,
908+
cursor_index,
909+
aggregate_offset,
910+
t_ctx.non_aggregate_expressions.len(),
911+
);
912+
913+
// Now store bare columns from current row (fall through to bare column storage code below)
914+
} else {
915+
// Normal case: store once per group
916+
program.add_comment(
917+
program.offset(),
918+
"don't emit group columns if continuing existing group",
919+
);
920+
program.emit_insn(Insn::If {
921+
target_pc: labels.label_acc_indicator_set_flag_true,
922+
reg: registers.reg_data_in_acc_flag,
923+
jump_if_null: false,
924+
});
925+
}
634926

635927
// Read non-aggregate columns from the current row
636928
match row_source {
@@ -863,6 +1155,22 @@ pub fn group_by_emit_row_phase<'a>(
8631155
),
8641156
});
8651157

1158+
// Reset min/max tracking register to NULL (for bare column special case)
1159+
if let Some(reg_current_min_max) = registers.reg_current_min_max {
1160+
program.emit_insn(Insn::Null {
1161+
dest: reg_current_min_max,
1162+
dest_end: None,
1163+
});
1164+
}
1165+
1166+
// Reset update flag to 0 (for bare column special case)
1167+
if let Some(reg_should_update) = registers.reg_should_update_bare_cols {
1168+
program.emit_insn(Insn::Integer {
1169+
value: 0,
1170+
dest: reg_should_update,
1171+
});
1172+
}
1173+
8661174
// Reopen ephemeral indexes for distinct aggregates (effectively clearing them).
8671175
plan.aggregates
8681176
.iter()

0 commit comments

Comments
 (0)