Skip to content

Commit

Permalink
Refactor vector.go to merge annotations
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Jan 16, 2025
1 parent 85b9e94 commit 5d75bd3
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 38 deletions.
71 changes: 33 additions & 38 deletions execution/binary/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@ func (o *vectorOperator) execBinaryUnless(lhs, rhs model.StepVector) (model.Step
return step, nil
}

func (o *vectorOperator) computeBinaryPairing(ctx context.Context, hval, lval float64, hlhs, hrhs *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error) {
func (o *vectorOperator) computeBinaryPairing(hval, lval float64, hlhs, hrhs *histogram.FloatHistogram, annos *annotations.Annotations) (float64, *histogram.FloatHistogram, bool, error) {
// operand is not commutative so we need to address potential swapping
if o.matching.Card == parser.CardOneToMany {
v, h, keep, err := vectorElemBinop(ctx, o.opType, lval, hval, hlhs, hrhs)
v, h, keep, err := vectorElemBinop(o.opType, lval, hval, hlhs, hrhs, annos)
return v, h, keep, err
}
v, h, keep, err := vectorElemBinop(ctx, o.opType, hval, lval, hlhs, hrhs)
v, h, keep, err := vectorElemBinop(o.opType, hval, lval, hlhs, hrhs, annos)
return v, h, keep, err
}

Expand All @@ -298,6 +298,10 @@ func (o *vectorOperator) execBinaryArithmetic(ctx context.Context, lhs, rhs mode

var (
hcs, lcs model.StepVector
annos annotations.Annotations
h *histogram.FloatHistogram
keep bool
err error
)

switch o.matching.Card {
Expand Down Expand Up @@ -349,23 +353,17 @@ func (o *vectorOperator) execBinaryArithmetic(ctx context.Context, lhs, rhs mode
}
jp.bts = ts

var h *histogram.FloatHistogram
var keep bool
var err error

if jp.histogramVal != nil {
_, h, keep, err = o.computeBinaryPairing(ctx, 0, 0, hcs.Histograms[i], jp.histogramVal)
_, h, keep, err = o.computeBinaryPairing(0, 0, hcs.Histograms[i], jp.histogramVal, &annos)
} else {
_, h, keep, err = o.computeBinaryPairing(ctx, 0, jp.val, hcs.Histograms[i], nil)
_, h, keep, err = o.computeBinaryPairing(0, jp.val, hcs.Histograms[i], nil, &annos)
}
if countWarnings, countInfo := annos.CountWarningsAndInfo(); countWarnings > 0 || countInfo > 0 {
warnings.MergeToContext(annos, ctx)
continue
}
if err != nil {
if errors.Is(err, annotations.PromQLInfo) || errors.Is(err, annotations.PromQLWarning) {
// just continue when the errors are info and warn
continue
} else {
lastErr = err
continue
}
return model.StepVector{}, err
}

switch {
Expand Down Expand Up @@ -397,24 +395,23 @@ func (o *vectorOperator) execBinaryArithmetic(ctx context.Context, lhs, rhs mode
}
jp.bts = ts
var val float64
var h *histogram.FloatHistogram
var keep bool
var err error

if jp.histogramVal != nil {
_, h, _, err = o.computeBinaryPairing(ctx, hcs.Samples[i], 0, nil, jp.histogramVal)
_, h, _, err = o.computeBinaryPairing(hcs.Samples[i], 0, nil, jp.histogramVal, &annos)
if countWarnings, countInfo := annos.CountWarningsAndInfo(); countWarnings > 0 || countInfo > 0 {
warnings.MergeToContext(annos, ctx)
continue
}
if err != nil {
if errors.Is(err, annotations.PromQLInfo) || errors.Is(err, annotations.PromQLWarning) {
// just continue when the errors are info and warn
continue
} else {
lastErr = err
continue
}
return model.StepVector{}, err
}
step.AppendHistogram(o.pool, jp.sid, h)
} else {
val, _, keep, err = o.computeBinaryPairing(ctx, hcs.Samples[i], jp.val, nil, nil)
val, _, keep, err = o.computeBinaryPairing(hcs.Samples[i], jp.val, nil, nil, &annos)
if countWarnings, countInfo := annos.CountWarningsAndInfo(); countWarnings > 0 || countInfo > 0 {
warnings.MergeToContext(annos, ctx)
continue
}
if err != nil {
return model.StepVector{}, err
}
Expand Down Expand Up @@ -603,8 +600,9 @@ func signatureFunc(on bool, names ...string) func(labels.Labels) uint64 {
// Lifted from: https://github.com/prometheus/prometheus/blob/v3.1.0/promql/engine.go#L2797.
// nolint: unparam
// vectorElemBinop evaluates a binary operation between two Vector elements.
func vectorElemBinop(ctx context.Context, op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error) {
func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram, annos *annotations.Annotations) (float64, *histogram.FloatHistogram, bool, error) {
opName := parser.ItemTypeStr[op]

switch {
case hlhs == nil && hrhs == nil:
{
Expand Down Expand Up @@ -643,9 +641,8 @@ func vectorElemBinop(ctx context.Context, op parser.ItemType, lhs, rhs float64,
case parser.MUL:
return 0, hrhs.Copy().Mul(lhs).Compact(0), true, nil
case parser.ADD, parser.SUB, parser.DIV, parser.POW, parser.MOD, parser.EQLC, parser.NEQ, parser.GTR, parser.LSS, parser.GTE, parser.LTE, parser.ATAN2:
err := annotations.NewIncompatibleTypesInBinOpInfo("float", opName, "histogram", posrange.PositionRange{})
warnings.AddToContext(err, ctx)
return 0, nil, false, err
annos.Add(annotations.NewIncompatibleTypesInBinOpInfo("float", opName, "histogram", posrange.PositionRange{}))
return 0, nil, false, nil
}
}
case hlhs != nil && hrhs == nil:
Expand All @@ -656,9 +653,8 @@ func vectorElemBinop(ctx context.Context, op parser.ItemType, lhs, rhs float64,
case parser.DIV:
return 0, hlhs.Copy().Div(rhs).Compact(0), true, nil
case parser.ADD, parser.SUB, parser.POW, parser.MOD, parser.EQLC, parser.NEQ, parser.GTR, parser.LSS, parser.GTE, parser.LTE, parser.ATAN2:
err := annotations.NewIncompatibleTypesInBinOpInfo("histogram", opName, "float", posrange.PositionRange{})
warnings.AddToContext(err, ctx)
return 0, nil, false, err
annos.Add(annotations.NewIncompatibleTypesInBinOpInfo("histogram", opName, "float", posrange.PositionRange{}))
return 0, nil, false, nil
}
}
case hlhs != nil && hrhs != nil:
Expand All @@ -683,9 +679,8 @@ func vectorElemBinop(ctx context.Context, op parser.ItemType, lhs, rhs float64,
// This operation expects that both histograms are compacted.
return 0, hlhs, !hlhs.Equals(hrhs), nil
case parser.MUL, parser.DIV, parser.POW, parser.MOD, parser.GTR, parser.LSS, parser.GTE, parser.LTE, parser.ATAN2:
err := annotations.NewIncompatibleTypesInBinOpInfo("histogram", opName, "histogram", posrange.PositionRange{})
warnings.AddToContext(err, ctx)
return 0, nil, false, err
annos.Add(annotations.NewIncompatibleTypesInBinOpInfo("histogram", opName, "histogram", posrange.PositionRange{}))
return 0, nil, false, nil
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions execution/warnings/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func (w *warnings) get() annotations.Annotations {
return w.warns
}

func (w *warnings) merge(anno annotations.Annotations) {
w.mu.Lock()
defer w.mu.Unlock()
w.warns = w.warns.Merge(anno)
}

func NewContext(ctx context.Context) context.Context {
return context.WithValue(ctx, key, newWarnings())
}
Expand All @@ -47,6 +53,14 @@ func AddToContext(warn error, ctx context.Context) {
w.add(warn)
}

func MergeToContext(annos annotations.Annotations, ctx context.Context) {
w, ok := ctx.Value(key).(*warnings)
if !ok {
return
}
w.merge(annos)
}

func FromContext(ctx context.Context) annotations.Annotations {
return ctx.Value(key).(*warnings).get()
}

0 comments on commit 5d75bd3

Please sign in to comment.