Skip to content

Commit fcf8381

Browse files
authored
Optimize aggregate accumulators (#323)
* Optimize aggregate accumulators Scalar tables used for the hash aggregate with grouping labels are currently very wasteful because of the dynamic functions they create for each output group. Each function ends up as a new object on the heap, so for queries where output groups have high cardinality we end up using a lot of memory. This commit fixes that by converting accumulators to structs, so that functions between accumulator instances do not end up as heap-allocated objects. Signed-off-by: Filip Petkovski <[email protected]> * Fix test Signed-off-by: Filip Petkovski <[email protected]> --------- Signed-off-by: Filip Petkovski <[email protected]>
1 parent dfb4b4f commit fcf8381

File tree

2 files changed

+272
-226
lines changed

2 files changed

+272
-226
lines changed

execution/aggregate/accumulator.go

+258
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
package aggregate
2+
3+
import (
4+
"math"
5+
6+
"github.com/prometheus/prometheus/model/histogram"
7+
)
8+
9+
type accumulator interface {
10+
Add(v float64, h *histogram.FloatHistogram)
11+
Value() (float64, *histogram.FloatHistogram)
12+
HasValue() bool
13+
Reset(float64)
14+
}
15+
16+
type newAccumulatorFunc func() accumulator
17+
18+
type sumAcc struct {
19+
value float64
20+
histSum *histogram.FloatHistogram
21+
hasFloatVal bool
22+
}
23+
24+
func newSumAcc() accumulator {
25+
return &sumAcc{}
26+
}
27+
28+
func (s *sumAcc) Add(v float64, h *histogram.FloatHistogram) {
29+
if h == nil {
30+
s.hasFloatVal = true
31+
s.value += v
32+
return
33+
}
34+
if s.histSum == nil {
35+
s.histSum = h.Copy()
36+
return
37+
}
38+
// The histogram being added must have an equal or larger schema.
39+
// https://github.com/prometheus/prometheus/blob/57bcbf18880f7554ae34c5b341d52fc53f059a97/promql/engine.go#L2448-L2456
40+
if h.Schema >= s.histSum.Schema {
41+
s.histSum = s.histSum.Add(h)
42+
} else {
43+
t := h.Copy()
44+
t.Add(s.histSum)
45+
s.histSum = t
46+
}
47+
}
48+
49+
func (s *sumAcc) Value() (float64, *histogram.FloatHistogram) {
50+
return s.value, s.histSum
51+
}
52+
53+
// HasValue for sum returns an empty result when floats are histograms are aggregated.
54+
func (s *sumAcc) HasValue() bool {
55+
return s.hasFloatVal != (s.histSum != nil)
56+
}
57+
58+
func (s *sumAcc) Reset(_ float64) {
59+
s.histSum = nil
60+
s.hasFloatVal = false
61+
s.value = 0
62+
}
63+
64+
type genericAcc struct {
65+
value float64
66+
hasValue bool
67+
aggregate func(float64, float64) float64
68+
}
69+
70+
func maxAggregate(a, b float64) float64 {
71+
if a > b {
72+
return a
73+
}
74+
return b
75+
}
76+
func minAggregate(a, b float64) float64 {
77+
if a < b {
78+
return a
79+
}
80+
return b
81+
}
82+
func groupAggregate(_, _ float64) float64 { return 1 }
83+
84+
func newMaxAcc() accumulator {
85+
return &genericAcc{aggregate: maxAggregate}
86+
}
87+
88+
func newMinAcc() accumulator {
89+
return &genericAcc{aggregate: minAggregate}
90+
}
91+
92+
func newCountAcc() accumulator {
93+
return &countAcc{}
94+
}
95+
96+
func newGroupAcc() accumulator {
97+
return &genericAcc{aggregate: groupAggregate}
98+
}
99+
100+
func (g *genericAcc) Add(v float64, _ *histogram.FloatHistogram) {
101+
if !g.hasValue || math.IsNaN(g.value) {
102+
g.value = v
103+
}
104+
g.hasValue = true
105+
g.value = g.aggregate(g.value, v)
106+
}
107+
108+
func (g *genericAcc) Value() (float64, *histogram.FloatHistogram) {
109+
return g.value, nil
110+
}
111+
112+
func (g *genericAcc) HasValue() bool {
113+
return g.hasValue
114+
}
115+
116+
func (g *genericAcc) Reset(_ float64) {
117+
g.hasValue = false
118+
g.value = 0
119+
}
120+
121+
type countAcc struct {
122+
value float64
123+
hasValue bool
124+
}
125+
126+
func (c *countAcc) Add(v float64, h *histogram.FloatHistogram) {
127+
c.hasValue = true
128+
c.value += 1
129+
}
130+
131+
func (c *countAcc) Value() (float64, *histogram.FloatHistogram) {
132+
return c.value, nil
133+
}
134+
135+
func (c *countAcc) HasValue() bool {
136+
return c.hasValue
137+
}
138+
139+
func (c *countAcc) Reset(_ float64) {
140+
c.hasValue = false
141+
c.value = 0
142+
}
143+
144+
type avgAcc struct {
145+
count float64
146+
sum float64
147+
hasValue bool
148+
}
149+
150+
func newAvgAcc() accumulator {
151+
return &avgAcc{}
152+
}
153+
154+
func (a *avgAcc) Add(v float64, h *histogram.FloatHistogram) {
155+
a.hasValue = true
156+
a.count += 1
157+
a.sum += v
158+
}
159+
160+
func (a *avgAcc) Value() (float64, *histogram.FloatHistogram) {
161+
return a.sum / a.count, nil
162+
}
163+
164+
func (a *avgAcc) HasValue() bool {
165+
return a.hasValue
166+
}
167+
168+
func (a *avgAcc) Reset(_ float64) {
169+
a.hasValue = false
170+
a.sum = 0
171+
a.count = 0
172+
}
173+
174+
type statAcc struct {
175+
count float64
176+
mean float64
177+
value float64
178+
hasValue bool
179+
}
180+
181+
func (s *statAcc) Add(v float64, h *histogram.FloatHistogram) {
182+
s.hasValue = true
183+
s.count++
184+
185+
delta := v - s.mean
186+
s.mean += delta / s.count
187+
s.value += delta * (v - s.mean)
188+
}
189+
190+
func (s *statAcc) HasValue() bool {
191+
return s.hasValue
192+
}
193+
194+
func (s *statAcc) Reset(_ float64) {
195+
s.hasValue = false
196+
s.count = 0
197+
s.mean = 0
198+
s.value = 0
199+
}
200+
201+
type stdDevAcc struct {
202+
statAcc
203+
}
204+
205+
func newStdDevAcc() accumulator {
206+
return &stdDevAcc{}
207+
}
208+
209+
func (s *stdDevAcc) Value() (float64, *histogram.FloatHistogram) {
210+
if s.count == 1 {
211+
return 0, nil
212+
}
213+
return math.Sqrt(s.value / s.count), nil
214+
}
215+
216+
type stdVarAcc struct {
217+
statAcc
218+
}
219+
220+
func newStdVarAcc() accumulator {
221+
return &stdVarAcc{}
222+
}
223+
224+
func (s *stdVarAcc) Value() (float64, *histogram.FloatHistogram) {
225+
if s.count == 1 {
226+
return 0, nil
227+
}
228+
return s.value / s.count, nil
229+
}
230+
231+
type quantileAcc struct {
232+
arg float64
233+
points []float64
234+
hasValue bool
235+
}
236+
237+
func newQuantileAcc() accumulator {
238+
return &quantileAcc{}
239+
}
240+
241+
func (q *quantileAcc) Add(v float64, h *histogram.FloatHistogram) {
242+
q.hasValue = true
243+
q.points = append(q.points, v)
244+
}
245+
246+
func (q *quantileAcc) Value() (float64, *histogram.FloatHistogram) {
247+
return quantile(q.arg, q.points), nil
248+
}
249+
250+
func (q *quantileAcc) HasValue() bool {
251+
return q.hasValue
252+
}
253+
254+
func (q *quantileAcc) Reset(f float64) {
255+
q.hasValue = false
256+
q.arg = f
257+
q.points = q.points[:0]
258+
}

0 commit comments

Comments
 (0)