Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 30 additions & 24 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
FormatV1 = 1
// FormatV2 represents 2 version of index.
FormatV2 = 2
// FormatV3 represents 3 version of index (using PrefixCompressedPostings for postings).
FormatV3 = 3

labelNameSeperator = "\xff"

Expand Down Expand Up @@ -121,7 +123,7 @@ type Writer struct {
// Reusable memory.
buf1 encoding.Encbuf
buf2 encoding.Encbuf
uint32s []uint32
uint64s []uint64

symbols map[string]uint32 // symbol offsets
seriesOffsets map[uint64]uint64 // offsets of series
Expand Down Expand Up @@ -205,7 +207,7 @@ func NewWriter(fn string) (*Writer, error) {
// Reusable memory.
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
uint32s: make([]uint32, 0, 1<<15),
uint64s: make([]uint64, 0, 1<<15),

// Caches.
symbols: make(map[string]uint32, 1<<13),
Expand Down Expand Up @@ -290,7 +292,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
func (w *Writer) writeMeta() error {
w.buf1.Reset()
w.buf1.PutBE32(MagicIndex)
w.buf1.PutByte(FormatV2)
w.buf1.PutByte(FormatV3)

return w.write(w.buf1.Get())
}
Expand Down Expand Up @@ -522,30 +524,25 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
// Order of the references in the postings list does not imply order
// of the series references within the persisted block they are mapped to.
// We have to sort the new references again.
refs := w.uint32s[:0]
refs := w.uint64s[:0]

for it.Next() {
offset, ok := w.seriesOffsets[it.At()]
if !ok {
return errors.Errorf("%p series for reference %d not found", w, it.At())
}
if offset > (1<<32)-1 {
return errors.Errorf("series offset %d exceeds 4 bytes", offset)
}
refs = append(refs, uint32(offset))
refs = append(refs, offset)
}
if err := it.Err(); err != nil {
return err
}
sort.Sort(uint32slice(refs))
sort.Sort(uint64slice(refs))

w.buf2.Reset()
w.buf2.PutBE32int(len(refs))

for _, r := range refs {
w.buf2.PutBE32(r)
}
w.uint32s = refs
writePrefixCompressedPostings(&w.buf2, refs)
w.uint64s = refs

w.buf1.Reset()
w.buf1.PutBE32int(w.buf2.Len())
Expand All @@ -556,11 +553,11 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
return errors.Wrap(err, "write postings")
}

type uint32slice []uint32
type uint64slice []uint64

func (s uint32slice) Len() int { return len(s) }
func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
func (s uint64slice) Len() int { return len(s) }
func (s uint64slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s uint64slice) Less(i, j int) bool { return s[i] < s[j] }

type labelIndexHashEntry struct {
keys []string
Expand Down Expand Up @@ -678,7 +675,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
}
r.version = int(r.b.Range(4, 5)[0])

if r.version != FormatV1 && r.version != FormatV2 {
if r.version != FormatV1 && r.version != FormatV2 && r.version != FormatV3 {
return nil, errors.Errorf("unknown index file version %d", r.version)
}

Expand Down Expand Up @@ -782,14 +779,14 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin
symbolSlice []string
symbols = map[uint32]string{}
)
if version == FormatV2 {
if version == FormatV2 || version == FormatV3 {
symbolSlice = make([]string, 0, cnt)
}

for d.Err() == nil && d.Len() > 0 && cnt > 0 {
s := d.UvarintStr()

if version == FormatV2 {
if version == FormatV2 || version == FormatV3 {
symbolSlice = append(symbolSlice, s)
} else {
symbols[nextPos] = s
Expand Down Expand Up @@ -911,7 +908,7 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == FormatV2 {
if r.version == FormatV2 || r.version == FormatV3 {
offset = id * 16
}
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
Expand All @@ -935,7 +932,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) {
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "get postings entry")
}
_, p, err := r.dec.Postings(d.Get())
_, p, err := r.dec.Postings(d.Get(), r.version)
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
Expand Down Expand Up @@ -1059,11 +1056,20 @@ type Decoder struct {
}

// Postings returns a postings list for b and its number of elements.
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
func (dec *Decoder) Postings(b []byte, version int) (int, Postings, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if n == 0 {
return n, EmptyPostings(), d.Err()
}
l := d.Get()
return n, newBigEndianPostings(l), d.Err()
if version == FormatV3 {
return n, newPrefixCompressedPostings(l), d.Err()
} else if version == FormatV1 || version == FormatV2 {
return n, newBigEndianPostings(l), d.Err()
} else {
return n, EmptyPostings(), d.Err()
}
}

// Series decodes a series entry from the given byte slice into lset and chks.
Expand Down
170 changes: 170 additions & 0 deletions index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"sync"

"github.com/prometheus/tsdb/encoding"
"github.com/prometheus/tsdb/labels"
)

Expand Down Expand Up @@ -689,3 +690,172 @@ func (it *bigEndianPostings) Seek(x uint64) bool {
func (it *bigEndianPostings) Err() error {
return nil
}

type prefixCompressedPostings struct {
bs []byte
cur uint64
initialized bool
idx int // The current offset inside the bs.
footerAddr int
key uint64
numBlock int
blockIdx int // The current block idx.
nextBlock int // The starting offset of the next block.
}

func newPrefixCompressedPostings(bstream []byte) *prefixCompressedPostings {
return &prefixCompressedPostings{
bs: bstream[8:],
numBlock: int(binary.BigEndian.Uint32(bstream[4:])),
footerAddr: int(binary.BigEndian.Uint32(bstream)),
}
}

func (it *prefixCompressedPostings) At() uint64 {
return it.cur
}

func (it *prefixCompressedPostings) Next() bool {
if it.initialized { // Already entered the block.
if it.idx < it.nextBlock {
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
it.idx += 2
return true
}
it.blockIdx++ // Go to the next block.
}
// Currently not entered any block.
if it.idx < it.footerAddr {
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
it.idx += 8
it.initialized = true
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
it.idx += 2
return true
} else {
return false
}
}

func (it *prefixCompressedPostings) seekInBlock(x uint64) bool {
curVal := uint16(x & 0xffff)
num := (it.nextBlock - it.idx) >> 1
j := sort.Search(num, func(i int) bool {
return binary.BigEndian.Uint16(it.bs[it.idx+(i<<1):]) >= curVal
})
if j < num {
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx+(j<<1):]))
it.idx += (j + 1) << 1
return true
}
// Fast-path to the next block.
// The first element in next block should be >= x.
it.idx = it.nextBlock
it.blockIdx++
if it.idx >= it.footerAddr {
return false
}
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
it.idx += 8
it.initialized = true
it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
it.idx += 2
return true
}

func (it *prefixCompressedPostings) Seek(x uint64) bool {
if it.cur >= x {
return true
}
curKey := x & 0xffffffffffff0000
if it.initialized && it.key == curKey {
// Fast path for x in current block.
return it.seekInBlock(x)
}
i := sort.Search(it.numBlock-it.blockIdx, func(i int) bool {
off := int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+i)<<2):]))
k := binary.BigEndian.Uint64(it.bs[off:])
return k >= curKey
})
if i == it.numBlock-it.blockIdx {
it.idx = it.footerAddr
return false
}
it.blockIdx += i
if i > 0 {
it.idx = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx)<<2):]))
}
it.key = binary.BigEndian.Uint64(it.bs[it.idx:])
it.idx += 8

it.initialized = true

it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):]))
if it.key != curKey {
it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:]))
it.idx += 2
return true
}
return it.seekInBlock(x)
}

func (it *prefixCompressedPostings) Err() error {
return nil
}

// The size of values inside the block is 2 bytes.
func writePrefixCompressedPostingsBlock(e *encoding.Encbuf, vals []uint16, key uint64, c []byte) {
e.PutBE64(key)
for _, val := range vals {
binary.BigEndian.PutUint16(c[:], val)
e.PutByte(c[0])
e.PutByte(c[1])
}
}

func writePrefixCompressedPostings(e *encoding.Encbuf, arr []uint64) {
if len(arr) == 0 {
return
}
key := uint64(0)
mask := uint64(0xFFFF) // Mask for the elements in the block.
invertedMask := uint64(0xFFFFFFFFFFFF0000)
var (
curKey uint64
curVal uint64
startingOffs []uint32 // The starting offsets of each block.
vals []uint16 // The converted values in the current block.
startOff = len(e.Get())
c = make([]byte, 2)
)
e.PutBE32(0) // Footer starting offset.
e.PutBE32(0) // Number of blocks.
for idx := range arr {
curKey = arr[idx] & invertedMask // Key of block.
curVal = arr[idx] & mask // Value inside block.
if curKey != key {
// Move to next block.
if idx != 0 {
startingOffs = append(startingOffs, uint32(len(e.B)))
writePrefixCompressedPostingsBlock(e, vals, key, c)
vals = vals[:0]
}
key = curKey
}
vals = append(vals, uint16(curVal))
}
startingOffs = append(startingOffs, uint32(len(e.B)))
writePrefixCompressedPostingsBlock(e, vals, key, c)

// Store the ending offset can save the check of whether to read the next block address from the footer or
// just len(it.bs) each time before entering the next block.
startingOffs = append(startingOffs, uint32(len(e.B)))

binary.BigEndian.PutUint32(e.B[startOff:], uint32(len(e.B)-8-startOff)) // Put footer starting offset.
binary.BigEndian.PutUint32(e.B[startOff+4:], uint32(len(startingOffs)-1)) // Put number of blocks.
for _, off := range startingOffs {
e.PutBE32(off - 8 - uint32(startOff))
}
}
Loading