Skip to content

Commit

Permalink
Store Gateway: Fetch and cache expanded postings for individual posting
Browse files Browse the repository at this point in the history
group if it has more than 1 key

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 28, 2024
1 parent 6f03fcb commit 08d1160
Show file tree
Hide file tree
Showing 15 changed files with 611 additions and 142 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Changed

- [#8023](https://github.com/thanos-io/thanos/pull/8023) Store Gateway: Cache expanded postings for matchers with more than 1 key.

### Removed

## [v0.37.2](https://github.com/thanos-io/thanos/tree/release-0.37) - 11.12.2024
Expand Down
143 changes: 111 additions & 32 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []label
}

func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string) {}
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher, tenant string) ([]byte, bool) {
return []byte{}, false
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ [][]*labels.Matcher, tenant string) [][]byte {
return [][]byte{}
}

func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string) {}
Expand Down Expand Up @@ -2606,7 +2606,7 @@ func (r *bucketIndexReader) ExpandedPostings(
return nil, nil
}

hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant)
hit, postings, err := r.fetchAlignedExpandedPostingFromCacheAndExpand(ctx, ms, bytesLimiter, tenant)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2705,6 +2705,7 @@ type postingGroup struct {
cardinality int64
existentKeys int
lazy bool
postings index.Postings
}

func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup {
Expand Down Expand Up @@ -2955,48 +2956,81 @@ type postingPtr struct {
ptr index.Range
}

func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) {
dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant)
if !hit {
return false, nil, nil
}
if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil {
return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}

r.stats.add(PostingsTouched, 1, len(dataFromCache))
p, closeFns, err := r.decodeCachedPostings(dataFromCache)
func (r *bucketIndexReader) fetchAlignedExpandedPostingFromCacheAndExpand(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) {
postings, closers, err := r.fetchExpandedPostingsFromCache(ctx, [][]*labels.Matcher{ms}, bytesLimiter, tenant, true)
defer func() {
for _, closeFn := range closeFns {
closeFn()
for _, closer := range closers {
closer()
}
}()
// If failed to decode or expand cached postings, return and expand postings again.
if err != nil {
level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, err
}

// Cache miss.
if postings == nil || postings[0] == nil {
return false, nil, nil
}

ps, err := ExpandPostingsWithContext(ctx, p)
ps, err := ExpandPostingsWithContext(ctx, postings[0])
if err != nil {
level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, nil
}
return true, ps, nil
}

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return false, nil, errors.Wrap(err, "get index version")
func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms [][]*labels.Matcher, bytesLimiter BytesLimiter, tenant string, seriesByteAligned bool) ([]index.Postings, []func(), error) {
if len(ms) == 0 {
return nil, nil, nil
}

var closeFns []func()
res := make([]index.Postings, len(ms))

dataFromCache := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant)
size := 0
hits := 0
for _, v := range dataFromCache {
if v != nil {
size += len(v)
hits++
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
if err := bytesLimiter.ReserveWithType(uint64(size), PostingsTouched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}

r.stats.add(PostingsTouched, hits, len(dataFromCache))
for i, v := range dataFromCache {
if v != nil {
p, closers, err := r.decodeCachedPostings(v)
closeFns = append(closeFns, closers...)

// If failed to decode cached postings, continue the loop so we try to fetch it again.
if err != nil {
level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
continue
}

res[i] = p
if seriesByteAligned {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.IndexVersion()
if err != nil {
return nil, closeFns, errors.Wrap(err, "get index version")
}

if version >= 2 {
// Index version 2 series are padded by 16 bytes.
res[i] = newSeriesByteAlignedPostings(p)
}
}
}
}
return true, ps, nil

return res, closeFns, nil
}

func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, ps index.Postings, length int, tenant string) {
Expand All @@ -3020,15 +3054,54 @@ var bufioReaderPool = sync.Pool{
// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
var closeFns []func()

var (
expandedPostingMatchers [][]*labels.Matcher
expandedPostingMatchersIdx []int
)
keysLength := 0
expandedPostingGroupSet := map[string]struct{}{}
// Find out posting groups which fetch more than 1 key to fetch expanded posting cache.
for i := 0; i < len(postingGroups); i++ {
pg := postingGroups[i]
if !pg.lazy {
// If posting group has more than 1 key to fetch, fetch expanded postings first.
// This helps for matcher such as !="", =~".+" to avoid fetching too many keys from cache.
if len(pg.addKeys) > 1 || len(pg.removeKeys) > 1 {
expandedPostingMatchers = append(expandedPostingMatchers, pg.matchers)
expandedPostingMatchersIdx = append(expandedPostingMatchersIdx, i)
expandedPostingGroupSet[pg.name] = struct{}{}
continue
}

// A posting group has either add key or remove key and cannot have both the same time.
keysLength += len(pg.addKeys) + len(pg.removeKeys)
}
}

timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant))
defer timer.ObserveDuration()

var ptrs []postingPtr
expandedPostings, closers, err := r.fetchExpandedPostingsFromCache(ctx, expandedPostingMatchers, bytesLimiter, tenant, false)
closeFns = append(closeFns, closers...)
if err != nil {
return nil, closeFns, errors.Wrap(err, "fetch expanded postings from cache")
}
for i, ep := range expandedPostings {
pgIdx := expandedPostingMatchersIdx[i]
if ep != nil {
postingGroups[pgIdx].postings = ep
continue
}
// Cache miss, have additional keys to fetch.
keysLength += len(postingGroups[pgIdx].addKeys) + len(postingGroups[pgIdx].removeKeys)
}

output := make([]index.Postings, len(keys))
var ptrs []postingPtr
keys := keysToFetchFromPostingGroups(postingGroups, keysLength)
output := make([]index.Postings, keysLength)

// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant)
Expand Down Expand Up @@ -3131,6 +3204,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

output[keyID] = newDiffVarintPostings(diffVarintPostings, nil)

// If the corresponding posting group tries to fetch expanded postings, don't backfill each
// encoded postings in case a cache miss. Cache expanded postings for the whole group instead.
if _, ok := expandedPostingGroupSet[keys[keyID].Name]; ok {
continue
}

startCompression := time.Now()
dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*la
c.ptr.StoreExpandedPostings(blockID, matchers, v, tenant)
}

func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte {
return c.ptr.FetchExpandedPostings(ctx, blockID, matchers, tenant)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type IndexCache interface {
// StoreExpandedPostings stores expanded postings for a set of label matchers.
StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string)

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool)
// FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index
// of the response slice will be nil.
FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte

// StoreSeries stores a single series.
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string)
Expand Down
8 changes: 5 additions & 3 deletions pkg/store/cache/filter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [
}
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
// FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index
// of the response slice will be nil.
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte {
if c.expandedPostingsEnabled {
return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant)
}
return nil, false

return make([][]byte, len(matchers))
}

// StoreSeries sets the series identified by the ulid and id to the value v,
Expand Down
28 changes: 15 additions & 13 deletions pkg/store/cache/filter_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, testExpandedPostingsData, ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
Expand All @@ -77,9 +77,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Assert(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, testExpandedPostingsData, ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
Expand All @@ -98,8 +98,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, []byte(nil), ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
Expand All @@ -118,9 +119,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, testExpandedPostingsData, ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
Expand All @@ -139,8 +140,9 @@ func TestFilterCache(t *testing.T) {
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)
ep := c.FetchExpandedPostings(ctx, blockID, [][]*labels.Matcher{expandedPostingsMatchers}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(ep))
testutil.Equals(t, []byte(nil), ep[0])

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
Expand Down
35 changes: 25 additions & 10 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,20 +336,35 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [
c.set(CacheTypeExpandedPostings, CacheKey{Block: blockID.String(), Key: CacheKeyExpandedPostings(LabelMatchersToString(matchers))}, v)
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
// FetchExpandedPostings fetches expanded postings and returns a slice of bytes. If cache miss, the corresponding index
// of the response slice will be nil.
func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers [][]*labels.Matcher, tenant string) [][]byte {
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(CacheTypeExpandedPostings, tenant))
defer timer.ObserveDuration()

if ctx.Err() != nil {
return nil, false
}
c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Inc()
if b, ok := c.get(CacheKey{blockID.String(), CacheKeyExpandedPostings(LabelMatchersToString(matchers)), ""}); ok {
c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Inc()
return b, true
blockIDKey := blockID.String()
requests := 0
hits := 0
data := make([][]byte, len(matchers))
for i, matcher := range matchers {
if (i+1)%checkContextEveryNIterations == 0 {
if ctx.Err() != nil {
c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(hits))
c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(requests))
return data
}
}

requests++
if b, ok := c.get(CacheKey{blockIDKey, CacheKeyExpandedPostings(LabelMatchersToString(matcher)), ""}); ok {
hits++
data[i] = b
}
}
return nil, false
c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(hits))
c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(float64(requests))

return data
}

// StoreSeries sets the series identified by the ulid and id to the value v,
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/cache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) {
cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b, tenancy.DefaultTenant)
},
get: func(id storage.SeriesRef) ([]byte, bool) {
return cache.FetchExpandedPostings(ctx, uid(id), []*labels.Matcher{matcher}, tenancy.DefaultTenant)
res := cache.FetchExpandedPostings(ctx, uid(id), [][]*labels.Matcher{{matcher}}, tenancy.DefaultTenant)
return res[0], res[0] != nil
},
},
} {
Expand Down
Loading

0 comments on commit 08d1160

Please sign in to comment.