Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit 30eda7b

Browse files
author
Ganesh Vernekar
committed
Faster way to merge remapped postings
Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent c252f96 commit 30eda7b

File tree

1 file changed

+71
-37
lines changed

1 file changed

+71
-37
lines changed

compact.go

Lines changed: 71 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
738738
return c.ctx.Err()
739739
default:
740740
}
741-
742741
ref, lset, chks, dranges := set.At() // The chunks here are not fully deleted.
743742

744743
if overlapping {
@@ -885,7 +884,7 @@ func (c *LeveledCompactor) writePostings(indexw IndexWriter, totalSeries int, va
885884
idxw.HintPostingsWriteCount(numLabelValues)
886885
}
887886

888-
remapPostings := newRemappedPostings(seriesMap, totalSeries)
887+
remapPostings := newRemappedPostings(seriesMap, indexReaders, totalSeries)
889888
var postBuf index.Postings
890889
for _, n := range names {
891890
labelValuesBuf = labelValuesBuf[:0]
@@ -895,16 +894,11 @@ func (c *LeveledCompactor) writePostings(indexw IndexWriter, totalSeries int, va
895894
sort.Strings(labelValuesBuf)
896895

897896
for _, v := range labelValuesBuf {
898-
remapPostings.clearPostings()
899-
for i, ir := range indexReaders {
900-
postBuf, err = ir.Postings(n, v, postBuf)
901-
if err != nil {
902-
return errors.Wrap(err, "read postings")
903-
}
904-
remapPostings.add(i, postBuf)
897+
postBuf, err = remapPostings.get(n, v)
898+
if err != nil {
899+
return errors.Wrap(err, "remapping postings")
905900
}
906-
907-
if err := indexw.WritePostings(n, v, remapPostings.get()); err != nil {
901+
if err := indexw.WritePostings(n, v, postBuf); err != nil {
908902
return errors.Wrap(err, "write postings")
909903
}
910904
}
@@ -917,18 +911,30 @@ func (c *LeveledCompactor) writePostings(indexw IndexWriter, totalSeries int, va
917911
// from given set of maps.
918912
type remappedPostings struct {
919913
postingsMap []map[uint64]uint64
920-
postingBuf []uint64
921914
listPost *index.ListPostings
915+
irs []IndexReader
916+
917+
postObj index.Postings
918+
postingLists [3][]uint64
919+
currBufIdx, appendBufIdx, mergeBufIdx int
922920
}
923921

924922
// newRemappedPostings returns remappedPostings.
925923
// 'postingsMap' is the slice of maps used for remapping.
926924
// 'postingSizeHint' is for preallocation of memory to reduce allocs later.
927-
func newRemappedPostings(postingsMap []map[uint64]uint64, postingSizeHint int) *remappedPostings {
925+
func newRemappedPostings(postingsMap []map[uint64]uint64, irs []IndexReader, postingSizeHint int) *remappedPostings {
928926
return &remappedPostings{
929927
postingsMap: postingsMap,
930-
postingBuf: make([]uint64, 0, postingSizeHint),
931-
listPost: index.NewListPostings(),
928+
irs: irs,
929+
postingLists: [3][]uint64{
930+
make([]uint64, 0, postingSizeHint),
931+
make([]uint64, 0, postingSizeHint),
932+
make([]uint64, 0, postingSizeHint),
933+
},
934+
listPost: index.NewListPostings(),
935+
currBufIdx: 0,
936+
appendBufIdx: 1,
937+
mergeBufIdx: 2,
932938
}
933939
}
934940

@@ -937,44 +943,72 @@ func newRemappedPostings(postingsMap []map[uint64]uint64, postingSizeHint int) *
937943
// are added to the result buffer.
938944
func (rp *remappedPostings) add(mapIdx int, p index.Postings) {
939945
pMap := rp.postingsMap[mapIdx]
940-
idx, lastIdx := -1, -1
946+
947+
currBuf := rp.postingLists[rp.currBufIdx]
948+
appendBuf := rp.postingLists[rp.appendBufIdx][:0]
949+
mergeBuf := rp.postingLists[rp.mergeBufIdx][:0]
950+
941951
for p.Next() {
942952
newVal, ok := pMap[p.At()]
943953
if !ok {
944954
continue
945955
}
946-
// idx is the index at which newVal exists or index at which we need to insert.
947-
// 'p' consists postings in sorted order w.r.t. the series labels.
948-
// Hence the mapped series will also be in ascending order including the postings.
949-
// So we need not look at/before 'lastIdx' in 'postingBuf'.
950-
for idx = lastIdx + 1; idx < len(rp.postingBuf); idx++ {
951-
if rp.postingBuf[idx] >= newVal {
952-
break
956+
appendBuf = append(appendBuf, newVal)
957+
}
958+
959+
if mapIdx == 0 {
960+
rp.postingLists[rp.currBufIdx] = appendBuf
961+
rp.postingLists[rp.appendBufIdx] = currBuf
962+
} else {
963+
i, j := 0, 0
964+
for i < len(currBuf) && j < len(appendBuf) {
965+
if currBuf[i] < appendBuf[j] {
966+
mergeBuf = append(mergeBuf, currBuf[i])
967+
i++
968+
} else if appendBuf[j] < currBuf[i] {
969+
mergeBuf = append(mergeBuf, appendBuf[i])
970+
j++
971+
} else {
972+
mergeBuf = append(mergeBuf, currBuf[i])
973+
i++
974+
j++
953975
}
954976
}
955-
lastIdx = idx
956-
if idx == len(rp.postingBuf) {
957-
rp.postingBuf = append(rp.postingBuf, newVal)
958-
} else if rp.postingBuf[idx] != newVal {
959-
rp.postingBuf = append(rp.postingBuf[:idx], append([]uint64{newVal}, rp.postingBuf[idx:]...)...)
977+
for i < len(currBuf) {
978+
mergeBuf = append(mergeBuf, currBuf[i])
979+
i++
980+
}
981+
for j < len(appendBuf) {
982+
mergeBuf = append(mergeBuf, appendBuf[i])
983+
j++
960984
}
985+
986+
rp.postingLists[rp.currBufIdx] = mergeBuf
987+
rp.postingLists[rp.mergeBufIdx] = currBuf
961988
}
962989
}
963990

964-
// get returns the remapped postings.
965-
// The returned postings becomes invalid after calling any other exposed
966-
// methods of RemappedPostings (Add, Get, ClearPostings). Hence postings
967-
// should be used right after calling 'Get'.
968-
// This is because of the shared buffer for memory optimizations.
969-
func (rp *remappedPostings) get() index.Postings {
970-
rp.listPost.Reset(rp.postingBuf)
971-
return rp.listPost
991+
// get returns the remapped postings for the given label name.
992+
func (rp *remappedPostings) get(name, value string) (index.Postings, error) {
993+
rp.clearPostings()
994+
var err error
995+
for i, ir := range rp.irs {
996+
rp.postObj, err = ir.Postings(name, value, rp.postObj)
997+
if err != nil {
998+
return nil, errors.Wrap(err, "read postings")
999+
}
1000+
rp.add(i, rp.postObj)
1001+
}
1002+
rp.listPost.Reset(rp.postingLists[rp.currBufIdx])
1003+
return rp.listPost, nil
9721004
}
9731005

9741006
// clearPostings only clears the result postings
9751007
// buffer and not the map.
9761008
func (rp *remappedPostings) clearPostings() {
977-
rp.postingBuf = rp.postingBuf[:0]
1009+
rp.postingLists[0] = rp.postingLists[0][:0]
1010+
rp.postingLists[1] = rp.postingLists[1][:0]
1011+
rp.postingLists[2] = rp.postingLists[2][:0]
9781012
}
9791013

9801014
type compactionSeriesSet struct {

0 commit comments

Comments
 (0)