From 9a858bcd55241279b4727767c8cf17dd3939af1b Mon Sep 17 00:00:00 2001 From: naivewong <867245430@qq.com> Date: Sat, 3 Aug 2019 21:45:47 +0800 Subject: [PATCH 1/2] add prefixCompressedPostings Signed-off-by: naivewong <867245430@qq.com> --- index/index.go | 52 ++++----- index/postings.go | 168 +++++++++++++++++++++++++++++ index/postings_test.go | 235 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 431 insertions(+), 24 deletions(-) diff --git a/index/index.go b/index/index.go index 1a1e9bf3..ab9b26ee 100644 --- a/index/index.go +++ b/index/index.go @@ -44,6 +44,8 @@ const ( FormatV1 = 1 // FormatV2 represents 2 version of index. FormatV2 = 2 + // FormatV3 represents 3 version of index (using PrefixCompressedPostings for postings). + FormatV3 = 3 labelNameSeperator = "\xff" @@ -121,7 +123,7 @@ type Writer struct { // Reusable memory. buf1 encoding.Encbuf buf2 encoding.Encbuf - uint32s []uint32 + uint64s []uint64 symbols map[string]uint32 // symbol offsets seriesOffsets map[uint64]uint64 // offsets of series @@ -205,7 +207,7 @@ func NewWriter(fn string) (*Writer, error) { // Reusable memory. buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - uint32s: make([]uint32, 0, 1<<15), + uint64s: make([]uint64, 0, 1<<15), // Caches. symbols: make(map[string]uint32, 1<<13), @@ -290,7 +292,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { func (w *Writer) writeMeta() error { w.buf1.Reset() w.buf1.PutBE32(MagicIndex) - w.buf1.PutByte(FormatV2) + w.buf1.PutByte(FormatV3) return w.write(w.buf1.Get()) } @@ -522,30 +524,25 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { // Order of the references in the postings list does not imply order // of the series references within the persisted block they are mapped to. // We have to sort the new references again. - refs := w.uint32s[:0] + refs := w.uint64s[:0] for it.Next() { offset, ok := w.seriesOffsets[it.At()] if !ok { return errors.Errorf("%p series for reference %d not found", w, it.At()) } - if offset > (1<<32)-1 { - return errors.Errorf("series offset %d exceeds 4 bytes", offset) - } - refs = append(refs, uint32(offset)) + refs = append(refs, offset) } if err := it.Err(); err != nil { return err } - sort.Sort(uint32slice(refs)) + sort.Sort(uint64slice(refs)) w.buf2.Reset() w.buf2.PutBE32int(len(refs)) - for _, r := range refs { - w.buf2.PutBE32(r) - } - w.uint32s = refs + writePrefixCompressedPostings(&w.buf2, refs) + w.uint64s = refs w.buf1.Reset() w.buf1.PutBE32int(w.buf2.Len()) @@ -556,11 +553,11 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { return errors.Wrap(err, "write postings") } -type uint32slice []uint32 +type uint64slice []uint64 -func (s uint32slice) Len() int { return len(s) } -func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } +func (s uint64slice) Len() int { return len(s) } +func (s uint64slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s uint64slice) Less(i, j int) bool { return s[i] < s[j] } type labelIndexHashEntry struct { keys []string @@ -678,7 +675,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { } r.version = int(r.b.Range(4, 5)[0]) - if r.version != FormatV1 && r.version != FormatV2 { + if r.version != FormatV1 && r.version != FormatV2 && r.version != FormatV3 { return nil, errors.Errorf("unknown index file version %d", r.version) } @@ -782,14 +779,14 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin symbolSlice []string symbols = map[uint32]string{} ) - if version == FormatV2 { + if version == FormatV2 || version == FormatV3 { symbolSlice = make([]string, 0, cnt) } for d.Err() == nil && d.Len() > 0 && cnt > 0 { s := d.UvarintStr() - if version == FormatV2 { + if version == FormatV2 || version == FormatV3 { symbolSlice = append(symbolSlice, s) } else { symbols[nextPos] = s @@ -911,7 +908,7 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. - if r.version == FormatV2 { + if r.version == FormatV2 || r.version == FormatV3 { offset = id * 16 } d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) @@ -935,7 +932,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) { if d.Err() != nil { return nil, errors.Wrap(d.Err(), "get postings entry") } - _, p, err := r.dec.Postings(d.Get()) + _, p, err := r.dec.Postings(d.Get(), r.version) if err != nil { return nil, errors.Wrap(err, "decode postings") } @@ -1059,11 +1056,18 @@ type Decoder struct { } // Postings returns a postings list for b and its number of elements. -func (dec *Decoder) Postings(b []byte) (int, Postings, error) { +func (dec *Decoder) Postings(b []byte, version int) (int, Postings, error) { d := encoding.Decbuf{B: b} n := d.Be32int() + if n == 0 { + return n, EmptyPostings(), d.Err() + } l := d.Get() - return n, newBigEndianPostings(l), d.Err() + if version == FormatV3 { + return n, newPrefixCompressedPostings(l), d.Err() + } else { + return n, newBigEndianPostings(l), d.Err() + } } // Series decodes a series entry from the given byte slice into lset and chks. diff --git a/index/postings.go b/index/postings.go index cef2d886..c1d2b930 100644 --- a/index/postings.go +++ b/index/postings.go @@ -21,6 +21,7 @@ import ( "strings" "sync" + "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/labels" ) @@ -689,3 +690,170 @@ func (it *bigEndianPostings) Seek(x uint64) bool { func (it *bigEndianPostings) Err() error { return nil } + +type prefixCompressedPostings struct { + bs []byte + cur uint64 + inside bool + idx int // The current offset inside the bs. + footerAddr int + key uint64 + numBlock int + blockIdx int // The current block idx. + nextBlock int // The starting offset of the next block. +} + +func newPrefixCompressedPostings(bstream []byte) *prefixCompressedPostings { + return &prefixCompressedPostings{ + bs: bstream[8:], + numBlock: int(binary.BigEndian.Uint32(bstream[4:])), + footerAddr: int(binary.BigEndian.Uint32(bstream)), + } +} + +func (it *prefixCompressedPostings) At() uint64 { + return it.cur +} + +func (it *prefixCompressedPostings) Next() bool { + if it.inside { // Already entered the block. + if it.idx < it.nextBlock { + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) + it.idx += 2 + return true + } + it.blockIdx += 1 // Go to the next block. + } + // Currently not entered any block. + if it.idx < it.footerAddr { + it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) + it.idx += 8 + it.inside = true + it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) + it.idx += 2 + return true + } else { + return false + } +} + +func (it *prefixCompressedPostings) seekInBlock(x uint64) bool { + curVal := uint16(x & 0xffff) + num := (it.nextBlock - it.idx) >> 1 + j := sort.Search(num, func(i int) bool { + return binary.BigEndian.Uint16(it.bs[it.idx+(i<<1):]) >= curVal + }) + if j == num { + // Fast-path to the next block. + // The first element in next block should be >= x. + it.idx = it.nextBlock + it.blockIdx += 1 + if it.idx >= it.footerAddr { + return false + } + it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) + it.idx += 8 + it.inside = true + it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) + it.idx += 2 + return true + } + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx+(j<<1):])) + it.idx += (j + 1) << 1 + return true +} + +func (it *prefixCompressedPostings) Seek(x uint64) bool { + if it.cur >= x { + return true + } + curKey := (x >> 16) << 16 + if it.inside && it.key == curKey { + // Fast path for x in current block. + return it.seekInBlock(x) + } else { + i := sort.Search(it.numBlock-it.blockIdx, func(i int) bool { + off := int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+i)<<2):])) + k := binary.BigEndian.Uint64(it.bs[off:]) + return k >= curKey + }) + if i == it.numBlock-it.blockIdx { + return false + } + it.blockIdx += i + if i > 0 { + it.idx = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx)<<2):])) + } + } + it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) + it.idx += 8 + + it.inside = true + + it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) + return it.seekInBlock(x) +} + +func (it *prefixCompressedPostings) Err() error { + return nil +} + +// The size of values inside the block is 2 bytes. +func writePrefixCompressedPostingsBlock(e *encoding.Encbuf, vals []uint16, key uint64, c []byte) { + e.PutBE64(key) + for _, val := range vals { + binary.BigEndian.PutUint16(c[:], val) + e.PutByte(c[0]) + e.PutByte(c[1]) + } +} + +func writePrefixCompressedPostings(e *encoding.Encbuf, arr []uint64) { + if len(arr) == 0 { + return + } + key := uint64(0) + mask := uint64((1 << uint(16)) - 1) // Mask for the elements in the block. + invertedMask := ^mask + var ( + curKey uint64 + curVal uint64 + idx int // Index of current element in arr. + startingOffs []uint32 // The starting offsets of each block. + vals []uint16 // The converted values in the current block. + startOff = len(e.Get()) + c = make([]byte, 2) + ) + e.PutBE32(0) // Footer starting offset. + e.PutBE32(0) // Number of blocks. + for idx < len(arr) { + curKey = arr[idx] & invertedMask // Key of block. + curVal = arr[idx] & mask // Value inside block. + if curKey != key { + // Move to next block. + if idx != 0 { + // We don't need to store the starting offset of the first block because it won't be used. + startingOffs = append(startingOffs, uint32(len(e.B))) + writePrefixCompressedPostingsBlock(e, vals, key, c) + vals = vals[:0] + } + key = curKey + } + vals = append(vals, uint16(curVal)) + idx += 1 + } + startingOffs = append(startingOffs, uint32(len(e.B))) + writePrefixCompressedPostingsBlock(e, vals, key, c) + + // Store the ending offset can save the check of whether to read the next block address from the footer or + // just len(it.bs) each time before entering the next block. + startingOffs = append(startingOffs, uint32(len(e.B))) + + binary.BigEndian.PutUint32(e.B[startOff:], uint32(len(e.B)-8-startOff)) // Put footer starting offset. + binary.BigEndian.PutUint32(e.B[startOff+4:], uint32(len(startingOffs)-1)) // Put number of blocks. + for _, off := range startingOffs { + e.PutBE32(off - 8 - uint32(startOff)) + } +} diff --git a/index/postings_test.go b/index/postings_test.go index 1eed1dbf..e5a1dadb 100644 --- a/index/postings_test.go +++ b/index/postings_test.go @@ -20,6 +20,7 @@ import ( "sort" "testing" + "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/testutil" ) @@ -718,6 +719,240 @@ func TestBigEndian(t *testing.T) { }) } +func TestPrefixCompressedPostings(t *testing.T) { + num := 1000 + // mock a list as postings + ls := make([]uint64, num) + ls[0] = 2 + for i := 1; i < num; i++ { + ls[i] = ls[i-1] + uint64(rand.Int31n(25)) + 2 + } + + buf := encoding.Encbuf{} + writePrefixCompressedPostings(&buf, ls) + + t.Run("Iteration", func(t *testing.T) { + rbp := newPrefixCompressedPostings(buf.Get()) + for i := 0; i < num; i++ { + testutil.Assert(t, rbp.Next() == true, "") + if uint64(ls[i]) != rbp.At() { + t.Log("ls[i] =", ls[i], "rbp.At() =", rbp.At(), " i =", i) + } + testutil.Equals(t, uint64(ls[i]), rbp.At()) + } + + testutil.Assert(t, rbp.Next() == false, "") + testutil.Assert(t, rbp.Err() == nil, "") + }) + + t.Run("Seek", func(t *testing.T) { + table := []struct { + seek uint64 + val uint64 + found bool + }{ + { + ls[0] - 1, ls[0], true, + }, + { + ls[4], ls[4], true, + }, + { + ls[500] - 1, ls[500], true, + }, + { + ls[600] + 1, ls[601], true, + }, + { + ls[600] + 1, ls[601], true, + }, + { + ls[600] + 1, ls[601], true, + }, + { + ls[0], ls[601], true, + }, + { + ls[600], ls[601], true, + }, + { + ls[999], ls[999], true, + }, + { + ls[999] + 10, ls[999], false, + }, + } + + rbp := newPrefixCompressedPostings(buf.Get()) + + for _, v := range table { + testutil.Equals(t, v.found, rbp.Seek(uint64(v.seek))) + testutil.Equals(t, uint64(v.val), rbp.At()) + testutil.Assert(t, rbp.Err() == nil, "") + } + }) +} + +func BenchmarkPostings(b *testing.B) { + num := 100000 + // mock a list as postings + ls := make([]uint32, num) + ls[0] = 2 + for i := 1; i < num; i++ { + ls[i] = ls[i-1] + uint32(rand.Int31n(25)) + 2 + } + + // bigEndianPostings. + bufBE := make([]byte, num*4) + for i := 0; i < num; i++ { + b := bufBE[i*4 : i*4+4] + binary.BigEndian.PutUint32(b, ls[i]) + } + + // prefixCompressedPostings. + bufPCP := encoding.Encbuf{} + temp := make([]uint64, 0, len(ls)) + for _, x := range ls { + temp = append(temp, uint64(x)) + } + writePrefixCompressedPostings(&bufPCP, temp) + + table := []struct { + seek uint32 + val uint32 + found bool + }{ + { + ls[0] - 1, ls[0], true, + }, + { + ls[1000], ls[1000], true, + }, + { + ls[1001], ls[1001], true, + }, + { + ls[2000] + 1, ls[2001], true, + }, + { + ls[3000], ls[3000], true, + }, + { + ls[3001], ls[3001], true, + }, + { + ls[4000] + 1, ls[4001], true, + }, + { + ls[5000], ls[5000], true, + }, + { + ls[5001], ls[5001], true, + }, + { + ls[6000] + 1, ls[6001], true, + }, + { + ls[10000], ls[10000], true, + }, + { + ls[10001], ls[10001], true, + }, + { + ls[20000] + 1, ls[20001], true, + }, + { + ls[30000], ls[30000], true, + }, + { + ls[30001], ls[30001], true, + }, + { + ls[40000] + 1, ls[40001], true, + }, + { + ls[50000], ls[50000], true, + }, + { + ls[50001], ls[50001], true, + }, + { + ls[60000] + 1, ls[60001], true, + }, + { + ls[70000], ls[70000], true, + }, + { + ls[70001], ls[70001], true, + }, + { + ls[80000] + 1, ls[80001], true, + }, + { + ls[99999], ls[99999], true, + }, + { + ls[99999] + 10, ls[99999], false, + }, + } + + b.Run("bigEndianIteration", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE) + + for i := 0; i < num; i++ { + testutil.Assert(bench, bep.Next() == true, "") + testutil.Equals(bench, uint64(ls[i]), bep.At()) + } + testutil.Assert(bench, bep.Next() == false, "") + testutil.Assert(bench, bep.Err() == nil, "") + } + }) + b.Run("prefixCompressedPostingsIteration", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP.Get()) + + for i := 0; i < num; i++ { + testutil.Assert(bench, rbm.Next() == true, "") + testutil.Equals(bench, uint64(ls[i]), rbm.At()) + } + testutil.Assert(bench, rbm.Next() == false, "") + testutil.Assert(bench, rbm.Err() == nil, "") + } + }) + + b.Run("bigEndianSeek", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE) + + for _, v := range table { + testutil.Equals(bench, v.found, bep.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), bep.At()) + testutil.Assert(bench, bep.Err() == nil, "") + } + } + }) + b.Run("prefixCompressedPostingsSeek", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP.Get()) + + for _, v := range table { + testutil.Equals(bench, v.found, rbm.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), rbm.At()) + testutil.Assert(bench, rbm.Err() == nil, "") + } + } + }) +} + func TestIntersectWithMerge(t *testing.T) { // One of the reproducible cases for: // https://github.com/prometheus/prometheus/issues/2616 From dadf7ee87a0eda4b5c731333aa8cc441ede621fd Mon Sep 17 00:00:00 2001 From: naivewong <867245430@qq.com> Date: Tue, 6 Aug 2019 22:28:42 +0800 Subject: [PATCH 2/2] add two more benchmarks Signed-off-by: naivewong <867245430@qq.com> --- index/index.go | 4 +- index/postings.go | 102 +++---- index/postings_test.go | 613 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 665 insertions(+), 54 deletions(-) diff --git a/index/index.go b/index/index.go index ab9b26ee..96c8afdc 100644 --- a/index/index.go +++ b/index/index.go @@ -1065,8 +1065,10 @@ func (dec *Decoder) Postings(b []byte, version int) (int, Postings, error) { l := d.Get() if version == FormatV3 { return n, newPrefixCompressedPostings(l), d.Err() - } else { + } else if version == FormatV1 || version == FormatV2 { return n, newBigEndianPostings(l), d.Err() + } else { + return n, EmptyPostings(), d.Err() } } diff --git a/index/postings.go b/index/postings.go index c1d2b930..6295f784 100644 --- a/index/postings.go +++ b/index/postings.go @@ -692,15 +692,15 @@ func (it *bigEndianPostings) Err() error { } type prefixCompressedPostings struct { - bs []byte - cur uint64 - inside bool - idx int // The current offset inside the bs. - footerAddr int - key uint64 - numBlock int - blockIdx int // The current block idx. - nextBlock int // The starting offset of the next block. + bs []byte + cur uint64 + initialized bool + idx int // The current offset inside the bs. + footerAddr int + key uint64 + numBlock int + blockIdx int // The current block idx. + nextBlock int // The starting offset of the next block. } func newPrefixCompressedPostings(bstream []byte) *prefixCompressedPostings { @@ -716,19 +716,19 @@ func (it *prefixCompressedPostings) At() uint64 { } func (it *prefixCompressedPostings) Next() bool { - if it.inside { // Already entered the block. + if it.initialized { // Already entered the block. if it.idx < it.nextBlock { it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) it.idx += 2 return true } - it.blockIdx += 1 // Go to the next block. + it.blockIdx++ // Go to the next block. } // Currently not entered any block. if it.idx < it.footerAddr { it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) it.idx += 8 - it.inside = true + it.initialized = true it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) it.idx += 2 @@ -744,24 +744,24 @@ func (it *prefixCompressedPostings) seekInBlock(x uint64) bool { j := sort.Search(num, func(i int) bool { return binary.BigEndian.Uint16(it.bs[it.idx+(i<<1):]) >= curVal }) - if j == num { - // Fast-path to the next block. - // The first element in next block should be >= x. - it.idx = it.nextBlock - it.blockIdx += 1 - if it.idx >= it.footerAddr { - return false - } - it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) - it.idx += 8 - it.inside = true - it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) - it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) - it.idx += 2 + if j < num { + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx+(j<<1):])) + it.idx += (j + 1) << 1 return true } - it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx+(j<<1):])) - it.idx += (j + 1) << 1 + // Fast-path to the next block. + // The first element in next block should be >= x. + it.idx = it.nextBlock + it.blockIdx++ + if it.idx >= it.footerAddr { + return false + } + it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) + it.idx += 8 + it.initialized = true + it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) + it.idx += 2 return true } @@ -769,30 +769,35 @@ func (it *prefixCompressedPostings) Seek(x uint64) bool { if it.cur >= x { return true } - curKey := (x >> 16) << 16 - if it.inside && it.key == curKey { + curKey := x & 0xffffffffffff0000 + if it.initialized && it.key == curKey { // Fast path for x in current block. return it.seekInBlock(x) - } else { - i := sort.Search(it.numBlock-it.blockIdx, func(i int) bool { - off := int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+i)<<2):])) - k := binary.BigEndian.Uint64(it.bs[off:]) - return k >= curKey - }) - if i == it.numBlock-it.blockIdx { - return false - } - it.blockIdx += i - if i > 0 { - it.idx = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx)<<2):])) - } + } + i := sort.Search(it.numBlock-it.blockIdx, func(i int) bool { + off := int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+i)<<2):])) + k := binary.BigEndian.Uint64(it.bs[off:]) + return k >= curKey + }) + if i == it.numBlock-it.blockIdx { + it.idx = it.footerAddr + return false + } + it.blockIdx += i + if i > 0 { + it.idx = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx)<<2):])) } it.key = binary.BigEndian.Uint64(it.bs[it.idx:]) it.idx += 8 - it.inside = true + it.initialized = true it.nextBlock = int(binary.BigEndian.Uint32(it.bs[it.footerAddr+((it.blockIdx+1)<<2):])) + if it.key != curKey { + it.cur = it.key | uint64(binary.BigEndian.Uint16(it.bs[it.idx:])) + it.idx += 2 + return true + } return it.seekInBlock(x) } @@ -815,12 +820,11 @@ func writePrefixCompressedPostings(e *encoding.Encbuf, arr []uint64) { return } key := uint64(0) - mask := uint64((1 << uint(16)) - 1) // Mask for the elements in the block. - invertedMask := ^mask + mask := uint64(0xFFFF) // Mask for the elements in the block. + invertedMask := uint64(0xFFFFFFFFFFFF0000) var ( curKey uint64 curVal uint64 - idx int // Index of current element in arr. startingOffs []uint32 // The starting offsets of each block. vals []uint16 // The converted values in the current block. startOff = len(e.Get()) @@ -828,13 +832,12 @@ func writePrefixCompressedPostings(e *encoding.Encbuf, arr []uint64) { ) e.PutBE32(0) // Footer starting offset. e.PutBE32(0) // Number of blocks. - for idx < len(arr) { + for idx := range arr { curKey = arr[idx] & invertedMask // Key of block. curVal = arr[idx] & mask // Value inside block. if curKey != key { // Move to next block. if idx != 0 { - // We don't need to store the starting offset of the first block because it won't be used. startingOffs = append(startingOffs, uint32(len(e.B))) writePrefixCompressedPostingsBlock(e, vals, key, c) vals = vals[:0] @@ -842,7 +845,6 @@ func writePrefixCompressedPostings(e *encoding.Encbuf, arr []uint64) { key = curKey } vals = append(vals, uint16(curVal)) - idx += 1 } startingOffs = append(startingOffs, uint32(len(e.B))) writePrefixCompressedPostingsBlock(e, vals, key, c) diff --git a/index/postings_test.go b/index/postings_test.go index e5a1dadb..6156f0c2 100644 --- a/index/postings_test.go +++ b/index/postings_test.go @@ -735,9 +735,6 @@ func TestPrefixCompressedPostings(t *testing.T) { rbp := newPrefixCompressedPostings(buf.Get()) for i := 0; i < num; i++ { testutil.Assert(t, rbp.Next() == true, "") - if uint64(ls[i]) != rbp.At() { - t.Log("ls[i] =", ls[i], "rbp.At() =", rbp.At(), " i =", i) - } testutil.Equals(t, uint64(ls[i]), rbp.At()) } @@ -793,6 +790,616 @@ func TestPrefixCompressedPostings(t *testing.T) { }) } +func BenchmarkPostingsOneBlock(b *testing.B) { + num := 1000 + ls1 := make([]uint32, num) // Block with key 0. + ls2 := make([]uint32, num) // Block with key > 0. + ls1[0] = 2 + ls2[0] = 655360 + for i := 1; i < num; i++ { + ls1[i] = ls1[i-1] + uint32(rand.Int31n(10)) + 2 + ls2[i] = ls2[i-1] + uint32(rand.Int31n(10)) + 2 + } + + // bigEndianPostings for ls1. + bufBE1 := make([]byte, num*4) + for i := 0; i < num; i++ { + b := bufBE1[i*4 : i*4+4] + binary.BigEndian.PutUint32(b, ls1[i]) + } + + // prefixCompressedPostings for ls1. + bufPCP1 := encoding.Encbuf{} + temp := make([]uint64, len(ls1)) + for i, x := range ls1 { + temp[i] = uint64(x) + } + writePrefixCompressedPostings(&bufPCP1, temp) + + // bigEndianPostings for ls2. + bufBE2 := make([]byte, num*4) + for i := 0; i < num; i++ { + b := bufBE2[i*4 : i*4+4] + binary.BigEndian.PutUint32(b, ls2[i]) + } + + // prefixCompressedPostings for ls2. + bufPCP2 := encoding.Encbuf{} + for i, x := range ls2 { + temp[i] = uint64(x) + } + writePrefixCompressedPostings(&bufPCP2, temp) + + table1 := []struct { + seek uint32 + val uint32 + found bool + }{ + { + ls1[0] - 1, ls1[0], true, + }, + { + ls1[50], ls1[50], true, + }, + { + ls1[100], ls1[100], true, + }, + { + ls1[150] + 1, ls1[151], true, + }, + { + ls1[200], ls1[200], true, + }, + { + ls1[250], ls1[250], true, + }, + { + ls1[300] + 1, ls1[301], true, + }, + { + ls1[350], ls1[350], true, + }, + { + ls1[400], ls1[400], true, + }, + { + ls1[450] + 1, ls1[451], true, + }, + { + ls1[500], ls1[500], true, + }, + { + ls1[550], ls1[550], true, + }, + { + ls1[600] + 1, ls1[601], true, + }, + { + ls1[650], ls1[650], true, + }, + { + ls1[700], ls1[700], true, + }, + { + ls1[750] + 1, ls1[751], true, + }, + { + ls1[800], ls1[800], true, + }, + { + ls1[850], ls1[850], true, + }, + { + ls1[900] + 1, ls1[901], true, + }, + { + ls1[950], ls1[950], true, + }, + { + ls1[999], ls1[999], true, + }, + { + ls1[999] + 10, ls1[999], false, + }, + } + + table2 := []struct { + seek uint32 + val uint32 + found bool + }{ + { + ls2[0] - 1, ls2[0], true, + }, + { + ls2[50], ls2[50], true, + }, + { + ls2[100], ls2[100], true, + }, + { + ls2[150] + 1, ls2[151], true, + }, + { + ls2[200], ls2[200], true, + }, + { + ls2[250], ls2[250], true, + }, + { + ls2[300] + 1, ls2[301], true, + }, + { + ls2[350], ls2[350], true, + }, + { + ls2[400], ls2[400], true, + }, + { + ls2[450] + 1, ls2[451], true, + }, + { + ls2[500], ls2[500], true, + }, + { + ls2[550], ls2[550], true, + }, + { + ls2[600] + 1, ls2[601], true, + }, + { + ls2[650], ls2[650], true, + }, + { + ls2[700], ls2[700], true, + }, + { + ls2[750] + 1, ls2[751], true, + }, + { + ls2[800], ls2[800], true, + }, + { + ls2[850], ls2[850], true, + }, + { + ls2[900] + 1, ls2[901], true, + }, + { + ls2[950], ls2[950], true, + }, + { + ls2[999], ls2[999], true, + }, + { + ls2[999] + 10, ls2[999], false, + }, + } + + b.Run("bigEndianIteration_one_block_key=0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE1) + + for i := 0; i < num; i++ { + testutil.Assert(bench, bep.Next() == true, "") + testutil.Equals(bench, uint64(ls1[i]), bep.At()) + } + testutil.Assert(bench, bep.Next() == false, "") + testutil.Assert(bench, bep.Err() == nil, "") + } + }) + b.Run("prefixCompressedPostingsIteration_one_block_key=0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP1.Get()) + + for i := 0; i < num; i++ { + testutil.Assert(bench, rbm.Next() == true, "") + testutil.Equals(bench, uint64(ls1[i]), rbm.At()) + } + testutil.Assert(bench, rbm.Next() == false, "") + testutil.Assert(bench, rbm.Err() == nil, "") + } + }) + + b.Run("bigEndianSeek_one_block_key=0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE1) + + for _, v := range table1 { + testutil.Equals(bench, v.found, bep.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), bep.At()) + testutil.Assert(bench, bep.Err() == nil, "") + } + } + }) + b.Run("prefixCompressedPostingsSeek_one_block_key=0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP1.Get()) + + for _, v := range table1 { + testutil.Equals(bench, v.found, rbm.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), rbm.At()) + testutil.Assert(bench, rbm.Err() == nil, "") + } + } + }) + + b.Run("bigEndianIteration_one_block_key>0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE2) + + for i := 0; i < num; i++ { + testutil.Assert(bench, bep.Next() == true, "") + testutil.Equals(bench, uint64(ls2[i]), bep.At()) + } + testutil.Assert(bench, bep.Next() == false, "") + testutil.Assert(bench, bep.Err() == nil, "") + } + }) + b.Run("prefixCompressedPostingsIteration_one_block_key>0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP2.Get()) + + for i := 0; i < num; i++ { + testutil.Assert(bench, rbm.Next() == true, "") + testutil.Equals(bench, uint64(ls2[i]), rbm.At()) + } + testutil.Assert(bench, rbm.Next() == false, "") + testutil.Assert(bench, rbm.Err() == nil, "") + } + }) + + b.Run("bigEndianSeek_one_block_key>0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE2) + + for _, v := range table2 { + testutil.Equals(bench, v.found, bep.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), bep.At()) + testutil.Assert(bench, bep.Err() == nil, "") + } + } + }) + b.Run("prefixCompressedPostingsSeek_one_block_key>0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP2.Get()) + + for _, v := range table2 { + testutil.Equals(bench, v.found, rbm.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), rbm.At()) + testutil.Assert(bench, rbm.Err() == nil, "") + } + } + }) +} + +func BenchmarkPostingsManyBlocks(b *testing.B) { + num := 100000 + ls1 := make([]uint32, num) // Block with key 0. + ls2 := make([]uint32, num) // Block with key > 0. + ls1[0] = 2 + ls2[0] = 655360 + for i := 1; i < num; i++ { + ls1[i] = ls1[i-1] + uint32(rand.Int31n(25)) + 2 + ls2[i] = ls2[i-1] + uint32(rand.Int31n(25)) + 2 + } + + // bigEndianPostings for ls1. + bufBE1 := make([]byte, num*4) + for i := 0; i < num; i++ { + b := bufBE1[i*4 : i*4+4] + binary.BigEndian.PutUint32(b, ls1[i]) + } + + // prefixCompressedPostings for ls1. + bufPCP1 := encoding.Encbuf{} + temp := make([]uint64, len(ls1)) + for i, x := range ls1 { + temp[i] = uint64(x) + } + writePrefixCompressedPostings(&bufPCP1, temp) + + // bigEndianPostings for ls2. + bufBE2 := make([]byte, num*4) + for i := 0; i < num; i++ { + b := bufBE2[i*4 : i*4+4] + binary.BigEndian.PutUint32(b, ls2[i]) + } + + // prefixCompressedPostings for ls2. + bufPCP2 := encoding.Encbuf{} + for i, x := range ls2 { + temp[i] = uint64(x) + } + writePrefixCompressedPostings(&bufPCP2, temp) + + table1 := []struct { + seek uint32 + val uint32 + found bool + }{ + { + ls1[0] - 1, ls1[0], true, + }, + { + ls1[1000], ls1[1000], true, + }, + { + ls1[1001], ls1[1001], true, + }, + { + ls1[2000] + 1, ls1[2001], true, + }, + { + ls1[3000], ls1[3000], true, + }, + { + ls1[3001], ls1[3001], true, + }, + { + ls1[4000] + 1, ls1[4001], true, + }, + { + ls1[5000], ls1[5000], true, + }, + { + ls1[5001], ls1[5001], true, + }, + { + ls1[6000] + 1, ls1[6001], true, + }, + { + ls1[10000], ls1[10000], true, + }, + { + ls1[10001], ls1[10001], true, + }, + { + ls1[20000] + 1, ls1[20001], true, + }, + { + ls1[30000], ls1[30000], true, + }, + { + ls1[30001], ls1[30001], true, + }, + { + ls1[40000] + 1, ls1[40001], true, + }, + { + ls1[50000], ls1[50000], true, + }, + { + ls1[50001], ls1[50001], true, + }, + { + ls1[60000] + 1, ls1[60001], true, + }, + { + ls1[70000], ls1[70000], true, + }, + { + ls1[70001], ls1[70001], true, + }, + { + ls1[80000] + 1, ls1[80001], true, + }, + { + ls1[99999], ls1[99999], true, + }, + { + ls1[99999] + 10, ls1[99999], false, + }, + } + + table2 := []struct { + seek uint32 + val uint32 + found bool + }{ + { + ls2[0] - 1, ls2[0], true, + }, + { + ls2[1000], ls2[1000], true, + }, + { + ls2[1001], ls2[1001], true, + }, + { + ls2[2000] + 1, ls2[2001], true, + }, + { + ls2[3000], ls2[3000], true, + }, + { + ls2[3001], ls2[3001], true, + }, + { + ls2[4000] + 1, ls2[4001], true, + }, + { + ls2[5000], ls2[5000], true, + }, + { + ls2[5001], ls2[5001], true, + }, + { + ls2[6000] + 1, ls2[6001], true, + }, + { + ls2[10000], ls2[10000], true, + }, + { + ls2[10001], ls2[10001], true, + }, + { + ls2[20000] + 1, ls2[20001], true, + }, + { + ls2[30000], ls2[30000], true, + }, + { + ls2[30001], ls2[30001], true, + }, + { + ls2[40000] + 1, ls2[40001], true, + }, + { + ls2[50000], ls2[50000], true, + }, + { + ls2[50001], ls2[50001], true, + }, + { + ls2[60000] + 1, ls2[60001], true, + }, + { + ls2[70000], ls2[70000], true, + }, + { + ls2[70001], ls2[70001], true, + }, + { + ls2[80000] + 1, ls2[80001], true, + }, + { + ls2[99999], ls2[99999], true, + }, + { + ls2[99999] + 10, ls2[99999], false, + }, + } + + b.Run("bigEndianIteration_many_blocks_key=0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE1) + + for i := 0; i < num; i++ { + testutil.Assert(bench, bep.Next() == true, "") + testutil.Equals(bench, uint64(ls1[i]), bep.At()) + } + testutil.Assert(bench, bep.Next() == false, "") + testutil.Assert(bench, bep.Err() == nil, "") + } + }) + b.Run("prefixCompressedPostingsIteration_many_blocks_key=0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP1.Get()) + + for i := 0; i < num; i++ { + testutil.Assert(bench, rbm.Next() == true, "") + testutil.Equals(bench, uint64(ls1[i]), rbm.At()) + } + testutil.Assert(bench, rbm.Next() == false, "") + testutil.Assert(bench, rbm.Err() == nil, "") + } + }) + + b.Run("bigEndianSeek_many_blocks_key=0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE1) + + for _, v := range table1 { + testutil.Equals(bench, v.found, bep.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), bep.At()) + testutil.Assert(bench, bep.Err() == nil, "") + } + } + }) + b.Run("prefixCompressedPostingsSeek_many_blocks_key=0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP1.Get()) + + for _, v := range table1 { + testutil.Equals(bench, v.found, rbm.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), rbm.At()) + testutil.Assert(bench, rbm.Err() == nil, "") + } + } + }) + + b.Run("bigEndianIteration_many_blocks_key>0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE2) + + for i := 0; i < num; i++ { + testutil.Assert(bench, bep.Next() == true, "") + testutil.Equals(bench, uint64(ls2[i]), bep.At()) + } + testutil.Assert(bench, bep.Next() == false, "") + testutil.Assert(bench, bep.Err() == nil, "") + } + }) + b.Run("prefixCompressedPostingsIteration_many_blocks_key>0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP2.Get()) + + for i := 0; i < num; i++ { + testutil.Assert(bench, rbm.Next() == true, "") + testutil.Equals(bench, uint64(ls2[i]), rbm.At()) + } + testutil.Assert(bench, rbm.Next() == false, "") + testutil.Assert(bench, rbm.Err() == nil, "") + } + }) + + b.Run("bigEndianSeek_many_blocks_key>0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + bep := newBigEndianPostings(bufBE2) + + for _, v := range table2 { + testutil.Equals(bench, v.found, bep.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), bep.At()) + testutil.Assert(bench, bep.Err() == nil, "") + } + } + }) + b.Run("prefixCompressedPostingsSeek_many_blocks_key>0", func(bench *testing.B) { + bench.ResetTimer() + bench.ReportAllocs() + for j := 0; j < bench.N; j++ { + rbm := newPrefixCompressedPostings(bufPCP2.Get()) + + for _, v := range table2 { + testutil.Equals(bench, v.found, rbm.Seek(uint64(v.seek))) + testutil.Equals(bench, uint64(v.val), rbm.At()) + testutil.Assert(bench, rbm.Err() == nil, "") + } + } + }) +} + func BenchmarkPostings(b *testing.B) { num := 100000 // mock a list as postings