Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit 06eee5b

Browse files
committed
add prefixCompressedPostings
Signed-off-by: naivewong <[email protected]>
1 parent d5b3f07 commit 06eee5b

File tree

3 files changed

+433
-24
lines changed

3 files changed

+433
-24
lines changed

index/index.go

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ const (
4444
FormatV1 = 1
4545
// FormatV2 represents 2 version of index.
4646
FormatV2 = 2
47+
// FormatV3 represents 3 version of index (using PrefixCompressedPostings for postings).
48+
FormatV3 = 3
4749

4850
labelNameSeperator = "\xff"
4951

@@ -121,7 +123,7 @@ type Writer struct {
121123
// Reusable memory.
122124
buf1 encoding.Encbuf
123125
buf2 encoding.Encbuf
124-
uint32s []uint32
126+
uint64s []uint64
125127

126128
symbols map[string]uint32 // symbol offsets
127129
seriesOffsets map[uint64]uint64 // offsets of series
@@ -205,7 +207,7 @@ func NewWriter(fn string) (*Writer, error) {
205207
// Reusable memory.
206208
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
207209
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
208-
uint32s: make([]uint32, 0, 1<<15),
210+
uint64s: make([]uint64, 0, 1<<15),
209211

210212
// Caches.
211213
symbols: make(map[string]uint32, 1<<13),
@@ -290,7 +292,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
290292
func (w *Writer) writeMeta() error {
291293
w.buf1.Reset()
292294
w.buf1.PutBE32(MagicIndex)
293-
w.buf1.PutByte(FormatV2)
295+
w.buf1.PutByte(FormatV3)
294296

295297
return w.write(w.buf1.Get())
296298
}
@@ -522,30 +524,25 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
522524
// Order of the references in the postings list does not imply order
523525
// of the series references within the persisted block they are mapped to.
524526
// We have to sort the new references again.
525-
refs := w.uint32s[:0]
527+
refs := w.uint64s[:0]
526528

527529
for it.Next() {
528530
offset, ok := w.seriesOffsets[it.At()]
529531
if !ok {
530532
return errors.Errorf("%p series for reference %d not found", w, it.At())
531533
}
532-
if offset > (1<<32)-1 {
533-
return errors.Errorf("series offset %d exceeds 4 bytes", offset)
534-
}
535-
refs = append(refs, uint32(offset))
534+
refs = append(refs, offset)
536535
}
537536
if err := it.Err(); err != nil {
538537
return err
539538
}
540-
sort.Sort(uint32slice(refs))
539+
sort.Sort(uint64slice(refs))
541540

542541
w.buf2.Reset()
543542
w.buf2.PutBE32int(len(refs))
544543

545-
for _, r := range refs {
546-
w.buf2.PutBE32(r)
547-
}
548-
w.uint32s = refs
544+
writePrefixCompressedPostings(&w.buf2, refs)
545+
w.uint64s = refs
549546

550547
w.buf1.Reset()
551548
w.buf1.PutBE32int(w.buf2.Len())
@@ -556,11 +553,11 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
556553
return errors.Wrap(err, "write postings")
557554
}
558555

559-
type uint32slice []uint32
556+
type uint64slice []uint64
560557

561-
func (s uint32slice) Len() int { return len(s) }
562-
func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
563-
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
558+
func (s uint64slice) Len() int { return len(s) }
559+
func (s uint64slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
560+
func (s uint64slice) Less(i, j int) bool { return s[i] < s[j] }
564561

565562
type labelIndexHashEntry struct {
566563
keys []string
@@ -678,7 +675,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
678675
}
679676
r.version = int(r.b.Range(4, 5)[0])
680677

681-
if r.version != FormatV1 && r.version != FormatV2 {
678+
if r.version != FormatV1 && r.version != FormatV2 && r.version != FormatV3 {
682679
return nil, errors.Errorf("unknown index file version %d", r.version)
683680
}
684681

@@ -782,14 +779,14 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin
782779
symbolSlice []string
783780
symbols = map[uint32]string{}
784781
)
785-
if version == FormatV2 {
782+
if version == FormatV2 || version == FormatV3 {
786783
symbolSlice = make([]string, 0, cnt)
787784
}
788785

789786
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
790787
s := d.UvarintStr()
791788

792-
if version == FormatV2 {
789+
if version == FormatV2 || version == FormatV3 {
793790
symbolSlice = append(symbolSlice, s)
794791
} else {
795792
symbols[nextPos] = s
@@ -911,7 +908,7 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
911908
offset := id
912909
// In version 2 series IDs are no longer exact references but series are 16-byte padded
913910
// and the ID is the multiple of 16 of the actual position.
914-
if r.version == FormatV2 {
911+
if r.version == FormatV2 || r.version == FormatV3 {
915912
offset = id * 16
916913
}
917914
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
@@ -935,7 +932,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) {
935932
if d.Err() != nil {
936933
return nil, errors.Wrap(d.Err(), "get postings entry")
937934
}
938-
_, p, err := r.dec.Postings(d.Get())
935+
_, p, err := r.dec.Postings(d.Get(), r.version)
939936
if err != nil {
940937
return nil, errors.Wrap(err, "decode postings")
941938
}
@@ -1059,11 +1056,18 @@ type Decoder struct {
10591056
}
10601057

10611058
// Postings returns a postings list for b and its number of elements.
1062-
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
1059+
func (dec *Decoder) Postings(b []byte, version int) (int, Postings, error) {
10631060
d := encoding.Decbuf{B: b}
10641061
n := d.Be32int()
1062+
if n == 0 {
1063+
return n, EmptyPostings(), d.Err()
1064+
}
10651065
l := d.Get()
1066-
return n, newBigEndianPostings(l), d.Err()
1066+
if version == FormatV3 {
1067+
return n, newPrefixCompressedPostings(l), d.Err()
1068+
} else {
1069+
return n, newBigEndianPostings(l), d.Err()
1070+
}
10671071
}
10681072

10691073
// Series decodes a series entry from the given byte slice into lset and chks.

index/postings.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"strings"
2222
"sync"
2323

24+
"github.com/prometheus/tsdb/encoding"
2425
"github.com/prometheus/tsdb/labels"
2526
)
2627

@@ -689,3 +690,172 @@ func (it *bigEndianPostings) Seek(x uint64) bool {
689690
func (it *bigEndianPostings) Err() error {
690691
return nil
691692
}
693+
694+
type prefixCompressedPostings struct {
695+
bs []byte
696+
cur uint64
697+
inside bool
698+
idx int // The current offset inside the bs.
699+
footerAddr int
700+
key uint64
701+
numBlock int
702+
blockIdx int // The current block idx.
703+
nextBlock int // The starting offset of the next block.
704+
}
705+
706+
func newPrefixCompressedPostings(bstream []byte) *prefixCompressedPostings {
707+
return &prefixCompressedPostings{
708+
bs: bstream[8:],
709+
numBlock: int(binary.BigEndian.Uint32(bstream[4:])),
710+
footerAddr: int(binary.BigEndian.Uint32(bstream)),
711+
}
712+
}
713+
714+
func (it *prefixCompressedPostings) At() uint64 {
715+
return it.cur
716+
}
717+
718+
func (it *prefixCompressedPostings) Next() bool {
719+
if it.inside { // Already entered the block.
720+
if it.idx < it.nextBlock {
721+
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
722+
it.idx += 2
723+
return true
724+
}
725+
it.blockIdx += 1 // Go to the next block.
726+
}
727+
// Currently not entered any block.
728+
if it.idx < it.footerAddr {
729+
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
730+
it.idx += 8
731+
it.inside = true
732+
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
733+
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
734+
it.idx += 2
735+
return true
736+
} else {
737+
return false
738+
}
739+
}
740+
741+
func (it *prefixCompressedPostings) seekInBlock(x uint64) bool {
742+
curVal := x & 0xffff
743+
num := (it.nextBlock - it.idx) >> 1
744+
j := sort.Search(num, func(i int) bool {
745+
return uint64(binary.BigEndian.Uint16(it.bs[it.idx+(i<<1):])) >= curVal
746+
})
747+
if j == num {
748+
// Fast-path to the next block.
749+
// The first element in next block should be >= x.
750+
it.idx = it.nextBlock
751+
it.blockIdx += 1
752+
if it.idx < it.footerAddr {
753+
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
754+
it.idx += 8
755+
it.inside = true
756+
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
757+
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
758+
it.idx += 2
759+
return true
760+
} else {
761+
return false
762+
}
763+
}
764+
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx+(j<<1):]))
765+
it.idx += (j + 1) << 1
766+
return true
767+
}
768+
769+
func (it *prefixCompressedPostings) Seek(x uint64) bool {
770+
if it.cur >= x {
771+
return true
772+
}
773+
curKey := (x >> 16) << 16
774+
if it.inside && it.key == curKey {
775+
// Fast path for x in current block.
776+
return it.seekInBlock(x)
777+
} else {
778+
i := sort.Search(it.numBlock-it.blockIdx, func(i int) bool {
779+
off := int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+i)<<2):]))
780+
k := binary.BigEndian.Uint64(it.bs[off:])
781+
return k >= curKey
782+
})
783+
if i == it.numBlock-it.blockIdx {
784+
return false
785+
}
786+
it.blockIdx += i
787+
if i > 0 {
788+
it.inside = false
789+
it.idx = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx)<<2):]))
790+
}
791+
}
792+
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
793+
it.idx += 8
794+
795+
it.inside = true
796+
797+
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
798+
return it.seekInBlock(x)
799+
}
800+
801+
func (it *prefixCompressedPostings) Err() error {
802+
return nil
803+
}
804+
805+
// The size of values inside the block is 2 bytes.
806+
func writePrefixCompressedPostingsBlock(e *encoding.Encbuf, vals []uint16, key uint64, c []byte) {
807+
e.PutBE64(key)
808+
for _, val := range vals {
809+
binary.BigEndian.PutUint16(c[:], val)
810+
e.PutByte(c[0])
811+
e.PutByte(c[1])
812+
}
813+
}
814+
815+
func writePrefixCompressedPostings(e *encoding.Encbuf, arr []uint64) {
816+
if len(arr) == 0 {
817+
return
818+
}
819+
key := uint64(0)
820+
mask := uint64((1 << uint(16)) - 1) // Mask for the elements in the block.
821+
invertedMask := ^mask
822+
var (
823+
curKey uint64
824+
curVal uint64
825+
idx int // Index of current element in arr.
826+
startingOffs []uint32 // The starting offsets of each block.
827+
vals []uint16 // The converted values in the current block.
828+
startOff = len(e.Get())
829+
c = make([]byte, 2)
830+
)
831+
e.PutBE32(0) // Footer starting offset.
832+
e.PutBE32(0) // Number of blocks.
833+
for idx < len(arr) {
834+
curKey = arr[idx] & invertedMask // Key of block.
835+
curVal = arr[idx] & mask // Value inside block.
836+
if curKey != key {
837+
// Move to next block.
838+
if idx != 0 {
839+
// We don't need to store the starting offset of the first block because it won't be used.
840+
startingOffs = append(startingOffs, uint32(len(e.B)))
841+
writePrefixCompressedPostingsBlock(e, vals, key, c)
842+
vals = vals[:0]
843+
}
844+
key = curKey
845+
}
846+
vals = append(vals, uint16(curVal))
847+
idx += 1
848+
}
849+
startingOffs = append(startingOffs, uint32(len(e.B)))
850+
writePrefixCompressedPostingsBlock(e, vals, key, c)
851+
852+
// Store the ending offset can save the check of whether to read the next block address from the footer or
853+
// just len(it.bs) each time before entering the next block.
854+
startingOffs = append(startingOffs, uint32(len(e.B)))
855+
856+
binary.BigEndian.PutUint32(e.B[startOff:], uint32(len(e.B)-8-startOff)) // Put footer starting offset.
857+
binary.BigEndian.PutUint32(e.B[startOff+4:], uint32(len(startingOffs)-1)) // Put number of blocks.
858+
for _, off := range startingOffs {
859+
e.PutBE32(off - 8 - uint32(startOff))
860+
}
861+
}

0 commit comments

Comments
 (0)