From 4c53a5bbd011331c867e9662dacb7372eaf1b6a9 Mon Sep 17 00:00:00 2001 From: Kartikay Date: Tue, 2 Jan 2024 18:23:46 +0530 Subject: [PATCH] merged range and instant query tests Signed-off-by: Kartikay --- engine/enginefuzz_test.go | 304 +++++++++++++------------------------- 1 file changed, 104 insertions(+), 200 deletions(-) diff --git a/engine/enginefuzz_test.go b/engine/enginefuzz_test.go index 13511338..ff451030 100644 --- a/engine/enginefuzz_test.go +++ b/engine/enginefuzz_test.go @@ -31,12 +31,13 @@ import ( const testRuns = 100 type testCase struct { - query string - load string - oldRes, newRes *promql.Result + query string + load1 string + load2 string + oldRes1, oldRes2, newRes1, newRes2 *promql.Result } -func FuzzEnginePromQLSmithRangeQuery(f *testing.F) { +func FuzzEnginePromQLSmithQuery(f *testing.F) { f.Add(int64(0), uint32(0), uint32(120), uint32(30), 1.0, 1.0, 1.0, 2.0, 30) f.Fuzz(func(t *testing.T, seed int64, startTS, endTS, intervalSeconds uint32, initialVal1, initialVal2, inc1, inc2 float64, stepRange int) { @@ -51,10 +52,14 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) { } rnd := rand.New(rand.NewSource(seed)) - load := fmt.Sprintf(`load 30s + load1 := fmt.Sprintf(`load 30s http_requests_total{pod="nginx-1"} %.2f+%.2fx15 http_requests_total{pod="nginx-2"} %2.f+%.2fx21`, initialVal1, inc1, initialVal2, inc2) + load2 := fmt.Sprintf(`load 30s + http_requests_total{pod="nginx-1", route="/"} %.2f+%.2fx40 + http_requests_total{pod="nginx-2", route="/"} %2.f+%.2fx40`, initialVal1, inc1, initialVal2, inc2) + opts := promql.EngineOpts{ Timeout: 1 * time.Hour, MaxSamples: 1e10, @@ -62,14 +67,20 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) { EnableAtModifier: true, } - storage := promql.LoadedStorage(t, load) - defer storage.Close() + storageForRangeQuery := promql.LoadedStorage(t, load1) + storageForInstantQuery := promql.LoadedStorage(t, load2) + defer storageForRangeQuery.Close() + defer storageForInstantQuery.Close() + + queryTime := time.Unix(int64(startTS), 0) start := time.Unix(int64(startTS), 0) end := time.Unix(int64(endTS), 0) interval := time.Duration(intervalSeconds) * time.Second - seriesSet, err := getSeries(context.Background(), storage) + seriesSetForRangeQuery, err := getSeries(context.Background(), storageForRangeQuery) + seriesSetForInstantQuery, err := getSeries(context.Background(), storageForInstantQuery) + require.NoError(t, err) psOpts := []promqlsmith.Option{ promqlsmith.WithEnableOffset(true), @@ -77,14 +88,26 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) { // bottomk and topk sometimes lead to random failures since their result on equal values is essentially random promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.QUANTILE}), } - ps := promqlsmith.New(rnd, seriesSet, psOpts...) + psOptsForInstantQuery := []promqlsmith.Option{ + promqlsmith.WithEnableOffset(true), + promqlsmith.WithEnableAtModifier(true), + promqlsmith.WithAtModifierMaxTimestamp(180 * 1000), + promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.QUANTILE}), + } + psForRangeQuery := promqlsmith.New(rnd, seriesSetForRangeQuery, psOpts...) + psOpts = append(psOpts, promqlsmith.WithAtModifierMaxTimestamp(180*1000)) + psForInstantQuery := promqlsmith.New(rnd, seriesSetForInstantQuery, psOptsForInstantQuery...) + + newEngineForRangeQuery := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: true}) + newEngineForInstantQuery := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: true, LogicalOptimizers: logicalplan.AllOptimizers}) - newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: true}) oldEngine := promql.NewEngine(opts) var ( - q1 promql.Query - query string + q1 promql.Query + q2 promql.Query + rangeQuery string + instantQuery string ) cases := make([]*testCase, testRuns) for i := 0; i < testRuns; i++ { @@ -92,9 +115,10 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) { // that can be natively executed by the engine. // Parsing experimental function, like mad_over_time, will lead to a parser.ParseErrors, so we also ignore those. for { - expr := ps.WalkRangeQuery() - query = expr.Pretty(0) - q1, err = newEngine.NewRangeQuery(context.Background(), storage, nil, query, start, end, interval) + exprForRangeQuery := psForRangeQuery.WalkRangeQuery() + rangeQuery = exprForRangeQuery.Pretty(0) + q1, err = newEngineForRangeQuery.NewRangeQuery(context.Background(), storageForRangeQuery, nil, rangeQuery, start, end, interval) + if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) || errors.As(err, &parser.ParseErrors{}) { continue } else { @@ -103,106 +127,42 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) { } testutil.Ok(t, err) - newResult := q1.Exec(context.Background()) - - q2, err := oldEngine.NewRangeQuery(context.Background(), storage, nil, query, start, end, interval) + newResultForRangeQuery := q1.Exec(context.Background()) + q3, err := oldEngine.NewRangeQuery(context.Background(), storageForRangeQuery, nil, rangeQuery, start, end, interval) + testutil.Ok(t, err) testutil.Ok(t, err) - oldResult := q2.Exec(context.Background()) - - cases[i] = &testCase{ - query: query, - newRes: newResult, - oldRes: oldResult, - load: load, - } - } - validateTestCases(t, cases) - }) -} - -func FuzzEnginePromQLSmithInstantQuery(f *testing.F) { - f.Add(int64(0), uint32(0), 1.0, 1.0, 1.0, 2.0) - - f.Fuzz(func(t *testing.T, seed int64, ts uint32, initialVal1, initialVal2, inc1, inc2 float64) { - if inc1 < 0 || inc2 < 0 { - return - } - rnd := rand.New(rand.NewSource(seed)) - - load := fmt.Sprintf(`load 30s - http_requests_total{pod="nginx-1", route="/"} %.2f+%.2fx40 - http_requests_total{pod="nginx-2", route="/"} %2.f+%.2fx40`, initialVal1, inc1, initialVal2, inc2) - - opts := promql.EngineOpts{ - Timeout: 1 * time.Hour, - MaxSamples: 1e10, - EnableNegativeOffset: true, - EnableAtModifier: true, - } - - storage := promql.LoadedStorage(t, load) - defer storage.Close() - - queryTime := time.Unix(int64(ts), 0) - newEngine := engine.New(engine.Opts{ - EngineOpts: opts, - DisableFallback: true, - LogicalOptimizers: logicalplan.AllOptimizers, - }) - oldEngine := promql.NewEngine(opts) - - seriesSet, err := getSeries(context.Background(), storage) - require.NoError(t, err) - psOpts := []promqlsmith.Option{ - promqlsmith.WithEnableOffset(true), - promqlsmith.WithEnableAtModifier(true), - promqlsmith.WithAtModifierMaxTimestamp(180 * 1000), - // bottomk and topk sometimes lead to random failures since their result on equal values is essentially random - promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.QUANTILE}), - } - ps := promqlsmith.New(rnd, seriesSet, psOpts...) + oldResultForRangeQuery := q3.Exec(context.Background()) - var ( - q1 promql.Query - query string - ) - cases := make([]*testCase, testRuns) - for i := 0; i < testRuns; i++ { - // Since we disabled fallback, keep trying until we find a query - // that can be natively execute by the engine. - // Parsing experimental function, like mad_over_time, will lead to a parser.ParseErrors, so we also ignore those. for { - expr := ps.WalkInstantQuery() - query = expr.Pretty(0) - q1, err = newEngine.NewInstantQuery(context.Background(), storage, nil, query, queryTime) + exprForInstantQuery := psForInstantQuery.WalkInstantQuery() + instantQuery = exprForInstantQuery.Pretty(0) + q2, err = newEngineForInstantQuery.NewInstantQuery(context.Background(), storageForInstantQuery, nil, instantQuery, queryTime) if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) || errors.As(err, &parser.ParseErrors{}) { continue } else { break } } - - testutil.Ok(t, err) - newResult := q1.Exec(context.Background()) - - q2, err := oldEngine.NewInstantQuery(context.Background(), storage, nil, query, queryTime) - testutil.Ok(t, err) - - oldResult := q2.Exec(context.Background()) + newResultForInstantQuery := q2.Exec(context.Background()) + q4, err := oldEngine.NewInstantQuery(context.Background(), storageForInstantQuery, nil, instantQuery, queryTime) + oldResultForInstantQuery := q4.Exec(context.Background()) cases[i] = &testCase{ - query: query, - newRes: newResult, - oldRes: oldResult, - load: load, + query: rangeQuery, + newRes1: newResultForRangeQuery, + newRes2: newResultForInstantQuery, + oldRes1: oldResultForRangeQuery, + oldRes2: oldResultForInstantQuery, + load1: load1, + load2: load2, } } validateTestCases(t, cases) }) } -func FuzzDistributedEnginePromQLSmithRangeQuery(f *testing.F) { +func FuzzDistributedEnginePromQLSmithQuery(f *testing.F) { f.Skip("Skip from CI to repair later") f.Add(uint32(0), uint32(120), uint32(30), 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 30) @@ -230,12 +190,14 @@ func FuzzDistributedEnginePromQLSmithRangeQuery(f *testing.F) { EnableNegativeOffset: true, EnableAtModifier: true, } - engineOpts := engine.Opts{ + engineOptsForRangeQuery := engine.Opts{ EngineOpts: opts, DisableFallback: true, LogicalOptimizers: logicalplan.AllOptimizers, } + engineOptsForInstantQuery := engine.Opts{EngineOpts: opts, DisableFallback: true} + queryables := []*teststorage.TestStorage{} storage1 := promql.LoadedStorage(t, load) defer storage1.Close() @@ -253,18 +215,32 @@ func FuzzDistributedEnginePromQLSmithRangeQuery(f *testing.F) { {labels.FromStrings("zone", "west-1")}, {labels.FromStrings("zone", "west-2")}, } - remoteEngines := make([]api.RemoteEngine, 0, 2) + + queryTime := time.Unix(int64(startTS), 0) + + remoteEnginesForRangeQuery, remoteEnginesForInstantQuery := make([]api.RemoteEngine, 0, 2), make([]api.RemoteEngine, 0, 2) for i := 0; i < 2; i++ { e := engine.NewRemoteEngine( - engineOpts, + engineOptsForRangeQuery, + queryables[i], + queryables[i].DB.Head().MinTime(), + queryables[i].DB.Head().MaxTime(), + partitionLabels[i], + ) + e2 := engine.NewRemoteEngine( + engineOptsForInstantQuery, queryables[i], queryables[i].DB.Head().MinTime(), queryables[i].DB.Head().MaxTime(), partitionLabels[i], ) - remoteEngines = append(remoteEngines, e) + remoteEnginesForRangeQuery = append(remoteEnginesForRangeQuery, e) + remoteEnginesForInstantQuery = append(remoteEnginesForInstantQuery, e2) } - distEngine := engine.NewDistributedEngine(engineOpts, api.NewStaticEndpoints(remoteEngines)) + + distEngineForRangeQuery := engine.NewDistributedEngine(engineOptsForRangeQuery, api.NewStaticEndpoints(remoteEnginesForRangeQuery)) + distEngineForInstantQuery := engine.NewDistributedEngine(engineOptsForInstantQuery, api.NewStaticEndpoints(remoteEnginesForInstantQuery)) + oldEngine := promql.NewEngine(opts) mergeStore := storage.NewFanout(nil, storage1, storage2) @@ -282,6 +258,7 @@ func FuzzDistributedEnginePromQLSmithRangeQuery(f *testing.F) { var ( q1 promql.Query query string + q3 promql.Query ) cases := make([]*testCase, testRuns) ctx := context.Background() @@ -291,7 +268,7 @@ func FuzzDistributedEnginePromQLSmithRangeQuery(f *testing.F) { for { expr := ps.WalkRangeQuery() query = expr.Pretty(0) - q1, err = distEngine.NewRangeQuery(ctx, mergeStore, nil, query, start, end, interval) + q1, err = distEngineForRangeQuery.NewRangeQuery(ctx, mergeStore, nil, query, start, end, interval) if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) { continue } else { @@ -300,101 +277,17 @@ func FuzzDistributedEnginePromQLSmithRangeQuery(f *testing.F) { } testutil.Ok(t, err) - newResult := q1.Exec(ctx) + newResultForRangeQuery := q1.Exec(ctx) q2, err := oldEngine.NewRangeQuery(ctx, mergeStore, nil, query, start, end, interval) testutil.Ok(t, err) - oldResult := q2.Exec(ctx) - - cases[i] = &testCase{ - query: query, - newRes: newResult, - oldRes: oldResult, - load: load, - } - } - validateTestCases(t, cases) - }) -} - -func FuzzDistributedEnginePromQLSmithInstantQuery(f *testing.F) { - f.Skip("Skip from CI to repair later") - - f.Add(uint32(0), 1.0, 1.0, 1.0, 1.0, 1.0, 2.0) - - f.Fuzz(func(t *testing.T, ts uint32, initialVal1, initialVal2, initialVal3, initialVal4, inc1, inc2 float64) { - if inc1 < 0 || inc2 < 0 { - return - } - load := fmt.Sprintf(`load 30s - http_requests_total{pod="nginx-1", route="/"} %.2f+%.2fx4 - http_requests_total{pod="nginx-2", route="/"} %2.f+%.2fx4`, initialVal1, inc1, initialVal2, inc2) - load2 := fmt.Sprintf(`load 30s - http_requests_total{pod="nginx-1", route="/"} %.2f+%.2fx4 - http_requests_total{pod="nginx-2", route="/"} %2.f+%.2fx4`, initialVal3, inc1, initialVal4, inc2) - - opts := promql.EngineOpts{ - Timeout: 1 * time.Hour, - MaxSamples: 1e10, - EnableNegativeOffset: true, - EnableAtModifier: true, - } - engineOpts := engine.Opts{EngineOpts: opts, DisableFallback: true} - - queryables := []*teststorage.TestStorage{} - storage1 := promql.LoadedStorage(t, load) - defer storage1.Close() - queryables = append(queryables, storage1) - - storage2 := promql.LoadedStorage(t, load2) - defer storage2.Close() - queryables = append(queryables, storage1) - - partitionLabels := [][]labels.Labels{ - {labels.FromStrings("zone", "west-1")}, - {labels.FromStrings("zone", "west-2")}, - } - queryTime := time.Unix(int64(ts), 0) - remoteEngines := make([]api.RemoteEngine, 0, 2) - for i := 0; i < 2; i++ { - e := engine.NewRemoteEngine( - engineOpts, - queryables[i], - queryables[i].DB.Head().MinTime(), - queryables[i].DB.Head().MaxTime(), - partitionLabels[i], - ) - remoteEngines = append(remoteEngines, e) - } - distEngine := engine.NewDistributedEngine(engineOpts, api.NewStaticEndpoints(remoteEngines)) - oldEngine := promql.NewEngine(opts) + oldResultForRangeQuery := q2.Exec(ctx) - mergeStore := storage.NewFanout(nil, storage1, storage2) - seriesSet, err := getSeries(context.Background(), mergeStore) - require.NoError(t, err) - rnd := rand.New(rand.NewSource(time.Now().Unix())) - psOpts := []promqlsmith.Option{ - promqlsmith.WithEnableOffset(true), - promqlsmith.WithEnableAtModifier(true), - promqlsmith.WithAtModifierMaxTimestamp(180 * 1000), - promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.GROUP, parser.COUNT, parser.BOTTOMK, parser.TOPK}), - } - ps := promqlsmith.New(rnd, seriesSet, psOpts...) - ctx := context.Background() - - var ( - q1 promql.Query - query string - ) - cases := make([]*testCase, testRuns) - for i := 0; i < testRuns; i++ { - // Since we disabled fallback, keep trying until we find a query - // that can be natively execute by the engine. for { expr := ps.Walk(parser.ValueTypeVector, parser.ValueTypeMatrix) query = expr.Pretty(0) - q1, err = distEngine.NewInstantQuery(ctx, mergeStore, nil, query, queryTime) + q3, err = distEngineForInstantQuery.NewInstantQuery(ctx, mergeStore, nil, query, queryTime) if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) { continue } else { @@ -403,18 +296,22 @@ func FuzzDistributedEnginePromQLSmithInstantQuery(f *testing.F) { } testutil.Ok(t, err) - newResult := q1.Exec(ctx) + newResultForInstantQuery := q3.Exec(ctx) - q2, err := oldEngine.NewInstantQuery(ctx, mergeStore, nil, query, queryTime) + q3, err := oldEngine.NewInstantQuery(ctx, mergeStore, nil, query, queryTime) testutil.Ok(t, err) - oldResult := q2.Exec(ctx) + oldResultForInstantQuery := q3.Exec(ctx) + + fmt.Println(newResultForInstantQuery, oldResultForInstantQuery) cases[i] = &testCase{ - query: query, - newRes: newResult, - oldRes: oldResult, - load: load, + query: query, + newRes1: newResultForRangeQuery, + oldRes1: oldResultForRangeQuery, + newRes2: newResultForRangeQuery, + oldRes2: newResultForInstantQuery, + load1: load, } } validateTestCases(t, cases) @@ -441,11 +338,18 @@ func getSeries(ctx context.Context, q storage.Queryable) ([]labels.Labels, error func validateTestCases(t *testing.T, cases []*testCase) { failures := 0 for i, c := range cases { - if !cmp.Equal(c.oldRes, c.newRes, comparer) { - t.Logf(c.load) + if !cmp.Equal(c.oldRes1, c.newRes1, comparer) { + t.Logf(c.load1) + t.Logf(c.query) + + t.Logf("case %d error mismatch.\nnew result: %s\nold result: %s\n", i, c.newRes1.String(), c.oldRes1.String()) + failures++ + } + if !cmp.Equal(c.oldRes2, c.newRes2, comparer) { + t.Logf(c.load2) t.Logf(c.query) - t.Logf("case %d error mismatch.\nnew result: %s\nold result: %s\n", i, c.newRes.String(), c.oldRes.String()) + t.Logf("case %d error mismatch.\nnew result: %s\nold result: %s\n", i, c.newRes2.String(), c.oldRes2.String()) failures++ } }