Skip to content

Commit fd650fc

Browse files
committed
tsdb: add Appender GetRefFunc interface
So we can look up series references from a Builder. Signed-off-by: Bryan Boreham <[email protected]>
1 parent dfe8e64 commit fd650fc

File tree

4 files changed

+60
-1
lines changed

4 files changed

+60
-1
lines changed

storage/interface.go

+8
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,14 @@ type GetRef interface {
271271
GetRef(lset labels.Labels, hash uint64) (SeriesRef, labels.Labels)
272272
}
273273

274+
type GetRefFunc interface {
275+
// Returns reference number that can be used to pass to Appender.Append(),
276+
// and a set of labels that will not cause another copy when passed to Appender.Append().
277+
// 0 means the appender does not have a reference to this series.
278+
// hash should be a hash of lset. cmp should return true if labels match.
279+
GetRefFunc(hash uint64, cmp func(labels.Labels) bool) (SeriesRef, labels.Labels)
280+
}
281+
274282
// ExemplarAppender provides an interface for adding samples to exemplar storage, which
275283
// within Prometheus is in-memory only.
276284
type ExemplarAppender interface {

tsdb/db.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -1232,7 +1232,10 @@ type dbAppender struct {
12321232
db *DB
12331233
}
12341234

1235-
var _ storage.GetRef = dbAppender{}
1235+
var (
1236+
_ storage.GetRef = dbAppender{}
1237+
_ storage.GetRefFunc = dbAppender{}
1238+
)
12361239

12371240
func (a dbAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
12381241
if g, ok := a.Appender.(storage.GetRef); ok {
@@ -1241,6 +1244,13 @@ func (a dbAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef,
12411244
return 0, labels.EmptyLabels()
12421245
}
12431246

1247+
func (a dbAppender) GetRefFunc(hash uint64, cmp func(labels.Labels) bool) (storage.SeriesRef, labels.Labels) {
1248+
if g, ok := a.Appender.(storage.GetRefFunc); ok {
1249+
return g.GetRefFunc(hash, cmp)
1250+
}
1251+
return 0, labels.EmptyLabels()
1252+
}
1253+
12441254
func (a dbAppender) Commit() error {
12451255
err := a.Appender.Commit()
12461256

tsdb/head.go

+25
Original file line numberDiff line numberDiff line change
@@ -1822,6 +1822,21 @@ func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
18221822
return nil
18231823
}
18241824

1825+
// Fetch a series from the map, given a function which says whether it's the right Labels.
1826+
func (m *seriesHashmap) getByFunc(hash uint64, cmp func(labels.Labels) bool) *memSeries {
1827+
if s, found := m.unique[hash]; found {
1828+
if cmp(s.lset) {
1829+
return s
1830+
}
1831+
}
1832+
for _, s := range m.conflicts[hash] {
1833+
if cmp(s.lset) {
1834+
return s
1835+
}
1836+
}
1837+
return nil
1838+
}
1839+
18251840
func (m *seriesHashmap) set(hash uint64, s *memSeries) {
18261841
if existing, found := m.unique[hash]; !found || labels.Equal(existing.labels(), s.labels()) {
18271842
m.unique[hash] = s
@@ -2043,6 +2058,16 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
20432058
return series
20442059
}
20452060

2061+
func (s *stripeSeries) getByHashFunc(hash uint64, cmp func(labels.Labels) bool) *memSeries {
2062+
i := hash & uint64(s.size-1)
2063+
2064+
s.locks[i].RLock()
2065+
series := s.hashes[i].getByFunc(hash, cmp)
2066+
s.locks[i].RUnlock()
2067+
2068+
return series
2069+
}
2070+
20462071
func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) {
20472072
// PreCreation is called here to avoid calling it inside the lock.
20482073
// It is not necessary to call it just before creating a series,

tsdb/head_append.go

+16
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ func (a *initAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
116116
return 0, labels.EmptyLabels()
117117
}
118118

119+
func (a *initAppender) GetRefFunc(hash uint64, cmp func(labels.Labels) bool) (storage.SeriesRef, labels.Labels) {
120+
if g, ok := a.app.(storage.GetRefFunc); ok {
121+
return g.GetRefFunc(hash, cmp)
122+
}
123+
return 0, labels.EmptyLabels()
124+
}
125+
119126
func (a *initAppender) Commit() error {
120127
if a.app == nil {
121128
a.head.metrics.activeAppenders.Dec()
@@ -711,6 +718,15 @@ func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
711718
return storage.SeriesRef(s.ref), s.labels()
712719
}
713720

721+
func (a *headAppender) GetRefFunc(hash uint64, cmp func(labels.Labels) bool) (storage.SeriesRef, labels.Labels) {
722+
s := a.head.series.getByHashFunc(hash, cmp)
723+
if s == nil {
724+
return 0, labels.EmptyLabels()
725+
}
726+
// returned labels must be suitable to pass to Append()
727+
return storage.SeriesRef(s.ref), s.lset
728+
}
729+
714730
// log writes all headAppender's data to the WAL.
715731
func (a *headAppender) log() error {
716732
if a.head.wal == nil {

0 commit comments

Comments
 (0)