Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit b504c5d

Browse files
committed
add prefixCompressedPostings
Signed-off-by: naivewong <[email protected]>
1 parent d5b3f07 commit b504c5d

File tree

3 files changed

+426
-24
lines changed

3 files changed

+426
-24
lines changed

index/index.go

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ const (
4444
FormatV1 = 1
4545
// FormatV2 represents 2 version of index.
4646
FormatV2 = 2
47+
// FormatV3 represents 3 version of index (using PrefixCompressedPostings for postings).
48+
FormatV3 = 3
4749

4850
labelNameSeperator = "\xff"
4951

@@ -121,7 +123,7 @@ type Writer struct {
121123
// Reusable memory.
122124
buf1 encoding.Encbuf
123125
buf2 encoding.Encbuf
124-
uint32s []uint32
126+
uint64s []uint64
125127

126128
symbols map[string]uint32 // symbol offsets
127129
seriesOffsets map[uint64]uint64 // offsets of series
@@ -205,7 +207,7 @@ func NewWriter(fn string) (*Writer, error) {
205207
// Reusable memory.
206208
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
207209
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
208-
uint32s: make([]uint32, 0, 1<<15),
210+
uint64s: make([]uint64, 0, 1<<15),
209211

210212
// Caches.
211213
symbols: make(map[string]uint32, 1<<13),
@@ -290,7 +292,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
290292
func (w *Writer) writeMeta() error {
291293
w.buf1.Reset()
292294
w.buf1.PutBE32(MagicIndex)
293-
w.buf1.PutByte(FormatV2)
295+
w.buf1.PutByte(FormatV3)
294296

295297
return w.write(w.buf1.Get())
296298
}
@@ -522,30 +524,25 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
522524
// Order of the references in the postings list does not imply order
523525
// of the series references within the persisted block they are mapped to.
524526
// We have to sort the new references again.
525-
refs := w.uint32s[:0]
527+
refs := w.uint64s[:0]
526528

527529
for it.Next() {
528530
offset, ok := w.seriesOffsets[it.At()]
529531
if !ok {
530532
return errors.Errorf("%p series for reference %d not found", w, it.At())
531533
}
532-
if offset > (1<<32)-1 {
533-
return errors.Errorf("series offset %d exceeds 4 bytes", offset)
534-
}
535-
refs = append(refs, uint32(offset))
534+
refs = append(refs, offset)
536535
}
537536
if err := it.Err(); err != nil {
538537
return err
539538
}
540-
sort.Sort(uint32slice(refs))
539+
sort.Sort(uint64slice(refs))
541540

542541
w.buf2.Reset()
543542
w.buf2.PutBE32int(len(refs))
544543

545-
for _, r := range refs {
546-
w.buf2.PutBE32(r)
547-
}
548-
w.uint32s = refs
544+
writePrefixCompressedPostings(&w.buf2, refs)
545+
w.uint64s = refs
549546

550547
w.buf1.Reset()
551548
w.buf1.PutBE32int(w.buf2.Len())
@@ -556,11 +553,11 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
556553
return errors.Wrap(err, "write postings")
557554
}
558555

559-
type uint32slice []uint32
556+
type uint64slice []uint64
560557

561-
func (s uint32slice) Len() int { return len(s) }
562-
func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
563-
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
558+
func (s uint64slice) Len() int { return len(s) }
559+
func (s uint64slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
560+
func (s uint64slice) Less(i, j int) bool { return s[i] < s[j] }
564561

565562
type labelIndexHashEntry struct {
566563
keys []string
@@ -678,7 +675,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
678675
}
679676
r.version = int(r.b.Range(4, 5)[0])
680677

681-
if r.version != FormatV1 && r.version != FormatV2 {
678+
if r.version != FormatV1 && r.version != FormatV2 && r.version != FormatV3 {
682679
return nil, errors.Errorf("unknown index file version %d", r.version)
683680
}
684681

@@ -782,14 +779,14 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin
782779
symbolSlice []string
783780
symbols = map[uint32]string{}
784781
)
785-
if version == FormatV2 {
782+
if version == FormatV2 || version == FormatV3 {
786783
symbolSlice = make([]string, 0, cnt)
787784
}
788785

789786
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
790787
s := d.UvarintStr()
791788

792-
if version == FormatV2 {
789+
if version == FormatV2 || version == FormatV3 {
793790
symbolSlice = append(symbolSlice, s)
794791
} else {
795792
symbols[nextPos] = s
@@ -911,7 +908,7 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
911908
offset := id
912909
// In version 2 series IDs are no longer exact references but series are 16-byte padded
913910
// and the ID is the multiple of 16 of the actual position.
914-
if r.version == FormatV2 {
911+
if r.version == FormatV2 || r.version == FormatV3 {
915912
offset = id * 16
916913
}
917914
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
@@ -935,7 +932,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) {
935932
if d.Err() != nil {
936933
return nil, errors.Wrap(d.Err(), "get postings entry")
937934
}
938-
_, p, err := r.dec.Postings(d.Get())
935+
_, p, err := r.dec.Postings(d.Get(), r.version)
939936
if err != nil {
940937
return nil, errors.Wrap(err, "decode postings")
941938
}
@@ -1059,11 +1056,18 @@ type Decoder struct {
10591056
}
10601057

10611058
// Postings returns a postings list for b and its number of elements.
1062-
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
1059+
func (dec *Decoder) Postings(b []byte, version int) (int, Postings, error) {
10631060
d := encoding.Decbuf{B: b}
10641061
n := d.Be32int()
1062+
if n == 0 {
1063+
return n, EmptyPostings(), d.Err()
1064+
}
10651065
l := d.Get()
1066-
return n, newBigEndianPostings(l), d.Err()
1066+
if version == FormatV3 {
1067+
return n, newPrefixCompressedPostings(l), d.Err()
1068+
} else {
1069+
return n, newBigEndianPostings(l), d.Err()
1070+
}
10671071
}
10681072

10691073
// Series decodes a series entry from the given byte slice into lset and chks.

index/postings.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"strings"
2222
"sync"
2323

24+
"github.com/prometheus/tsdb/encoding"
2425
"github.com/prometheus/tsdb/labels"
2526
)
2627

@@ -689,3 +690,165 @@ func (it *bigEndianPostings) Seek(x uint64) bool {
689690
func (it *bigEndianPostings) Err() error {
690691
return nil
691692
}
693+
694+
type prefixCompressedPostings struct {
695+
bs []byte
696+
cur uint64
697+
inside bool
698+
idx int // The current offset inside the bs.
699+
footerAddr int
700+
key uint64
701+
numBlock int
702+
blockIdx int // The current block idx.
703+
nextBlock int // The starting offset of the next block.
704+
}
705+
706+
func newPrefixCompressedPostings(bstream []byte) *prefixCompressedPostings {
707+
x := binary.BigEndian.Uint32(bstream) // Read the footer address.
708+
return &prefixCompressedPostings{bs: bstream[8:], numBlock: int(binary.BigEndian.Uint32(bstream[4:])), footerAddr: int(x)}
709+
}
710+
711+
func (it *prefixCompressedPostings) At() uint64 {
712+
return it.cur
713+
}
714+
715+
func (it *prefixCompressedPostings) Next() bool {
716+
if it.inside { // Already entered the block.
717+
if it.idx < it.nextBlock {
718+
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
719+
it.idx += 2
720+
return true
721+
}
722+
it.blockIdx += 1 // Go to the next block.
723+
}
724+
// Currently not entered any block.
725+
if it.idx < it.footerAddr {
726+
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
727+
it.idx += 8
728+
it.inside = true
729+
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
730+
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
731+
it.idx += 2
732+
return true
733+
} else {
734+
return false
735+
}
736+
}
737+
738+
func (it *prefixCompressedPostings) seekInBlock(x uint64) bool {
739+
curVal := x & 0xffff
740+
num := (it.nextBlock - it.idx) >> 1
741+
j := sort.Search(num, func(i int) bool {
742+
return uint64(binary.BigEndian.Uint16(it.bs[it.idx+(i<<1):])) >= curVal
743+
})
744+
if j == num {
745+
// Fast-path to the next block.
746+
// The first element in next block should be >= x.
747+
it.idx = it.nextBlock
748+
it.blockIdx += 1
749+
if it.idx < it.footerAddr {
750+
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
751+
it.idx += 8
752+
it.inside = true
753+
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
754+
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
755+
it.idx += 2
756+
return true
757+
} else {
758+
return false
759+
}
760+
}
761+
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx+(j<<1):]))
762+
it.idx += (j + 1) << 1
763+
return true
764+
}
765+
766+
func (it *prefixCompressedPostings) Seek(x uint64) bool {
767+
if it.cur >= x {
768+
return true
769+
}
770+
curKey := (x >> 16) << 16
771+
if it.inside && it.key == curKey {
772+
// Fast path for x in current block.
773+
return it.seekInBlock(x)
774+
} else {
775+
i := sort.Search(it.numBlock-it.blockIdx, func(i int) bool {
776+
off := int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+i)<<2):]))
777+
k := binary.BigEndian.Uint64(it.bs[off:])
778+
return k >= curKey
779+
})
780+
if i == it.numBlock-it.blockIdx {
781+
return false
782+
}
783+
it.blockIdx += i
784+
if i != 0 { // i > 0.
785+
it.inside = false
786+
it.idx = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx)<<2):]))
787+
}
788+
}
789+
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
790+
it.idx += 8
791+
792+
it.inside = true
793+
794+
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
795+
return it.seekInBlock(x)
796+
}
797+
798+
func (it *prefixCompressedPostings) Err() error {
799+
return nil
800+
}
801+
802+
// The size of values inside the block is 2 bytes.
803+
func writePrefixCompressedPostingsBlock(e *encoding.Encbuf, vals []uint16, key uint64) {
804+
e.PutBE64(key)
805+
c := make([]byte, 2)
806+
for _, val := range vals {
807+
binary.BigEndian.PutUint16(c[:], val)
808+
e.PutByte(c[0])
809+
e.PutByte(c[1])
810+
}
811+
}
812+
813+
func writePrefixCompressedPostings(e *encoding.Encbuf, arr []uint64) {
814+
if len(arr) == 0 {
815+
return
816+
}
817+
key := uint64(0xffffffff) // The initial key should be unique.
818+
mask := uint64((1 << uint(16)) - 1) // Mask for the elements in the block.
819+
invertedMask := ^mask
820+
var (
821+
curKey uint64
822+
curVal uint64
823+
idx int // Index of current element in arr.
824+
startingOffs []uint32 // The starting offsets of each block.
825+
vals []uint16 // The converted values in the current block.
826+
startOff = len(e.Get())
827+
)
828+
e.PutBE32(0) // Footer starting offset.
829+
e.PutBE32(0) // Number of blocks.
830+
for idx < len(arr) {
831+
curKey = arr[idx] & invertedMask // Key of block.
832+
curVal = arr[idx] & mask // Value inside block.
833+
if curKey != key {
834+
// Move to next block.
835+
if idx != 0 {
836+
startingOffs = append(startingOffs, uint32(len(e.B)))
837+
writePrefixCompressedPostingsBlock(e, vals, key)
838+
vals = vals[:0]
839+
}
840+
key = curKey
841+
}
842+
vals = append(vals, uint16(curVal))
843+
idx += 1
844+
}
845+
startingOffs = append(startingOffs, uint32(len(e.B)))
846+
writePrefixCompressedPostingsBlock(e, vals, key)
847+
startingOffs = append(startingOffs, uint32(len(e.B)))
848+
849+
binary.BigEndian.PutUint32(e.B[startOff:], uint32(len(e.B)-8-startOff)) // Put footer starting offset.
850+
binary.BigEndian.PutUint32(e.B[startOff+4:], uint32(len(startingOffs)-1)) // Put number of blocks.
851+
for _, off := range startingOffs {
852+
e.PutBE32(off-8-uint32(startOff))
853+
}
854+
}

0 commit comments

Comments
 (0)