Skip to content

Commit 7e2f0a1

Browse files
authored
Distribute subqueries (#370)
* Distribute subqueries Distributing subqueries can be done in the same manner like we can distribute regular matrix selectors. The only difference is that we need to make sure a remote engine has sufficient scope to calculate both the inner and outer query range. To do this, we simply add the two ranges together when calculating the offset for the remote query and based on this we can decide when it is safe to distribute. Signed-off-by: Filip Petkovski <[email protected]> * Add logical plan tests Signed-off-by: Filip Petkovski <[email protected]> * Add test for multiple subqueries Signed-off-by: Filip Petkovski <[email protected]> * Fix failing test Signed-off-by: Filip Petkovski <[email protected]> --------- Signed-off-by: Filip Petkovski <[email protected]>
1 parent 9cc5f1c commit 7e2f0a1

File tree

5 files changed

+72
-10
lines changed

5 files changed

+72
-10
lines changed

engine/distributed_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (p partition) mint() int64 {
4242
mint := p.series[0].timestamps[0]
4343
for _, s := range p.series {
4444
ts := s.timestamps[0]
45-
if ts < mint {
45+
if ts > mint {
4646
mint = ts
4747
}
4848
}
@@ -234,7 +234,8 @@ func TestDistributedAggregations(t *testing.T) {
234234
{name: "absent for existing metric with aggregation", query: `sum(absent(foo))`},
235235
{name: "absent for existing metric", query: `absent(bar{pod="nginx-1"})`},
236236
{name: "absent for existing metric with aggregation", query: `sum(absent(bar{pod="nginx-1"}))`},
237-
{name: "subquery", query: `max_over_time(sum_over_time(bar[1m])[10m:1m])`, expectFallback: true},
237+
{name: "subquery with window within engine range", query: `max_over_time(sum_over_time(bar[30s])[30s:15s])`, expectFallback: true},
238+
{name: "subquery with window outside of engine range", query: `max_over_time(sum_over_time(bar[1m])[10m:1m])`, expectFallback: true},
238239
}
239240

240241
lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute}

execution/remote/operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (e *Execution) GetPool() *model.VectorPool {
7272
}
7373

7474
func (e *Execution) Explain() (me string, next []model.VectorOperator) {
75-
return fmt.Sprintf("[*remoteExec] %s (%d, %d)", e.query, e.opts.Start.Unix(), e.opts.End.Unix()), nil
75+
return fmt.Sprintf("[*remoteExec] %s (%d, %d)", e.query, e.queryRangeStart.Unix(), e.opts.End.Unix()), nil
7676
}
7777

7878
type storageAdapter struct {

logicalplan/distribute.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -410,11 +410,13 @@ func calculateStartOffset(expr *parser.Expr, lookbackDelta time.Duration) time.D
410410
var selectRange time.Duration
411411
var offset time.Duration
412412
traverse(expr, func(node *parser.Expr) {
413-
if matrixSelector, ok := (*node).(*parser.MatrixSelector); ok {
414-
selectRange = matrixSelector.Range
415-
}
416-
if vectorSelector, ok := (*node).(*parser.VectorSelector); ok {
417-
offset = vectorSelector.Offset
413+
switch n := (*node).(type) {
414+
case *parser.SubqueryExpr:
415+
selectRange += n.Range
416+
case *parser.MatrixSelector:
417+
selectRange += n.Range
418+
case *parser.VectorSelector:
419+
offset = n.Offset
418420
}
419421
})
420422
return maxDuration(offset+selectRange, lookbackDelta)
@@ -430,8 +432,6 @@ func isDistributive(expr *parser.Expr, skipBinaryPushdown bool) bool {
430432
}
431433

432434
switch aggr := (*expr).(type) {
433-
case *parser.SubqueryExpr:
434-
return false
435435
case *parser.BinaryExpr:
436436
return isBinaryExpressionWithOneConstantSide(aggr) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(aggr))
437437
case *parser.AggregateExpr:

logicalplan/distribute_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,21 @@ histogram_quantile(0.5, sum by (le) (dedup(
262262
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60)),
263263
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
264264
},
265+
{
266+
name: "subquery",
267+
expr: `sum_over_time(http_requests_total[5m:1m])`,
268+
expected: `dedup(remote(sum_over_time(http_requests_total[5m:1m])), remote(sum_over_time(http_requests_total[5m:1m])))`,
269+
},
270+
{
271+
name: "subquery over range function",
272+
expr: `sum_over_time(rate(http_requests_total[5m])[5m:1m])`,
273+
expected: `dedup(remote(sum_over_time(rate(http_requests_total[5m])[5m:1m])), remote(sum_over_time(rate(http_requests_total[5m])[5m:1m])))`,
274+
},
275+
{
276+
name: "subquery over range aggregation",
277+
expr: `sum_over_time(max(http_requests_total)[5m:1m])`,
278+
expected: `sum_over_time(max(dedup(remote(max by (region) (http_requests_total)), remote(max by (region) (http_requests_total))))[5m:1m])`,
279+
},
265280
{
266281
name: "label based pruning matches one engine",
267282
expr: `sum by (pod) (rate(http_requests_total{region="west"}[2m]))`,
@@ -385,6 +400,51 @@ dedup(
385400
remote(sum_over_time(metric[2h])) [1970-01-01 08:00:00 +0000 UTC]
386401
)`,
387402
},
403+
{
404+
name: "subquery with a total 2h range is distributed with proper offsets",
405+
firstEngineOpts: engineOpts{
406+
minTime: queryStart,
407+
maxTime: time.Unix(0, 0).Add(eightHours),
408+
},
409+
secondEngineOpts: engineOpts{
410+
minTime: time.Unix(0, 0).Add(sixHours),
411+
maxTime: queryEnd,
412+
},
413+
expr: `sum_over_time(sum_over_time(metric[1h])[1h:30m])`,
414+
expected: `
415+
dedup(
416+
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])),
417+
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC]
418+
)`,
419+
},
420+
{
421+
name: "multiple subqueries with a total 90m range get distributed with proper offsets",
422+
firstEngineOpts: engineOpts{
423+
minTime: queryStart,
424+
maxTime: time.Unix(0, 0).Add(eightHours),
425+
},
426+
secondEngineOpts: engineOpts{
427+
minTime: time.Unix(0, 0).Add(sixHours),
428+
maxTime: queryEnd,
429+
},
430+
expr: `max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])`,
431+
expected: `dedup(
432+
remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])),
433+
remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])) [1970-01-01 07:05:00 +0000 UTC])`,
434+
},
435+
{
436+
name: "subquery with a total 4h range is cannot be distributed",
437+
firstEngineOpts: engineOpts{
438+
minTime: queryStart,
439+
maxTime: time.Unix(0, 0).Add(eightHours),
440+
},
441+
secondEngineOpts: engineOpts{
442+
minTime: time.Unix(0, 0).Add(sixHours),
443+
maxTime: queryEnd,
444+
},
445+
expr: `sum_over_time(sum_over_time(metric[2h])[2h:30m])`,
446+
expected: `sum_over_time(sum_over_time(metric[2h])[2h:30m])`,
447+
},
388448
{
389449
name: "sum over 3h does not distribute the query due to insufficient engine overlap",
390450
firstEngineOpts: engineOpts{

logicalplan/plan.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ func traverse(expr *parser.Expr, transform func(*parser.Expr)) {
101101
case *parser.ParenExpr:
102102
traverse(&node.Expr, transform)
103103
case *parser.SubqueryExpr:
104+
transform(expr)
104105
traverse(&node.Expr, transform)
105106
}
106107
}

0 commit comments

Comments
 (0)