Skip to content

Commit

Permalink
Index measure_name and tags in entity to improve the query performance (
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Dec 21, 2024
1 parent 13c194f commit 3658400
Show file tree
Hide file tree
Showing 28 changed files with 902 additions and 400 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Release Notes.
- Measure: Introduce "index_mode" to save data exclusively in the series index, ideal for non-timeseries measures.
- Index: Use numeric index type to support Int and Float
- TopN: Group top n pre-calculation result by the group key in the new introduced `_top_n_result` measure, which is used to store the pre-calculation result.
- Index Mode: Index `measure_nam` and `tags` in `entity` to improve the query performance.
- Encoding: Improve the performance of encoding and decoding the variable-length int64.

### Bug Fixes

Expand Down Expand Up @@ -43,6 +45,7 @@ Release Notes.
### CVEs

- GO-2024-3321: Misuse of ServerConfig.PublicKeyCallback may cause authorization bypass in golang.org/x/crypto
- GO-2024-3333: Non-linear parsing of case-insensitive content in golang.org/x/net/html

## 0.7.0

Expand Down
82 changes: 77 additions & 5 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ func (s *seriesIndex) Update(docs index.Documents) error {
func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
projection []index.FieldKey, secondaryQuery index.Query, timeRange *timestamp.TimeRange,
) (data SeriesData, err error) {
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return SeriesData{}, err
var seriesMatchers []index.SeriesMatcher
if len(series) > 0 {
seriesMatchers = make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return SeriesData{}, err
}
}
}

indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery, timeRange)
if err != nil {
return SeriesData{}, err
Expand Down Expand Up @@ -286,6 +290,74 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
return sd, sortedValues, err
}

func (s *seriesIndex) SearchWithoutSeries(ctx context.Context, opts IndexSearchOpts) (sd SeriesData, sortedValues [][]byte, err error) {
tracer := query.GetTracer(ctx)
if tracer != nil {
var span *query.Span
span, ctx = tracer.StartSpan(ctx, "seriesIndex.SearchWithoutSeries")
if opts.Query != nil {
span.Tagf("secondary_query", "%s", opts.Query.String())
}
defer func() {
if err != nil {
span.Error(err)
}
span.Stop()
}()
}

if opts.Order == nil || opts.Order.Index == nil {
sd, err = s.filter(ctx, nil, opts.Projection, opts.Query, opts.TimeRange)
if err != nil {
return sd, nil, err
}
return sd, nil, nil
}
var span *query.Span
if tracer != nil {
span, _ = tracer.StartSpan(ctx, "sort")
span.Tagf("preload", "%d", opts.PreloadSize)
defer func() {
if err != nil {
span.Error(err)
}
span.Stop()
}()
}

iter, err := s.store.SeriesSort(ctx, opts.Query, opts.Order,
opts.PreloadSize, opts.Projection)
if err != nil {
return sd, nil, err
}
defer func() {
err = multierr.Append(err, iter.Close())
}()

var r int
for iter.Next() {
r++
val := iter.Val()
var series pbv1.Series
if err = series.Unmarshal(val.EntityValues); err != nil {
return sd, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", val.EntityValues)
}
sd.SeriesList = append(sd.SeriesList, &series)
sd.Timestamps = append(sd.Timestamps, val.Timestamp)
sd.Versions = append(sd.Versions, val.Version)
if len(opts.Projection) > 0 {
sd.Fields = append(sd.Fields, maps.Clone(val.Values))
}
sortedValues = append(sortedValues, val.SortedValue)
}
if span != nil {
span.Tagf("query", "%s", iter.Query().String())
span.Tagf("rounds", "%d", r)
span.Tagf("size", "%d", len(sd.SeriesList))
}
return sd, sortedValues, err
}

func (s *seriesIndex) Close() error {
s.metrics.DeleteAll(s.p.SegLabelValues()...)
return s.store.Close()
Expand Down
1 change: 1 addition & 0 deletions banyand/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type IndexDB interface {
Insert(docs index.Documents) error
Update(docs index.Documents) error
Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (SeriesData, [][]byte, error)
SearchWithoutSeries(ctx context.Context, opts IndexSearchOpts) (sd SeriesData, sortedValues [][]byte, err error)
}

// TSDB allows listing and getting shard details.
Expand Down
55 changes: 15 additions & 40 deletions banyand/measure/block_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,13 @@ func (b *dataBlock) marshal(dst []byte) []byte {
return dst
}

func (b *dataBlock) unmarshal(src []byte) ([]byte, error) {
src, n, err := encoding.BytesToVarUint64(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal offset: %w", err)
}
func (b *dataBlock) unmarshal(src []byte) []byte {
src, n := encoding.BytesToVarUint64(src)
b.offset = n

src, n, err = encoding.BytesToVarUint64(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal size: %w", err)
}
src, n = encoding.BytesToVarUint64(src)
b.size = n
return src, nil
return src
}

type blockMetadata struct {
Expand Down Expand Up @@ -141,43 +135,30 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) {
}
bm.seriesID = common.SeriesID(encoding.BytesToUint64(src))
src = src[8:]
src, n, err := encoding.BytesToVarUint64(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: %w", err)
}
src, n := encoding.BytesToVarUint64(src)
bm.uncompressedSizeBytes = n

src, n, err = encoding.BytesToVarUint64(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal count: %w", err)
}
src, n = encoding.BytesToVarUint64(src)
bm.count = n
src, err = bm.timestamps.unmarshal(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal timestampsMetadata: %w", err)
}
src, n, err = encoding.BytesToVarUint64(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tagFamilies count: %w", err)
}
src = bm.timestamps.unmarshal(src)
src, n = encoding.BytesToVarUint64(src)
if n > 0 {
if bm.tagFamilies == nil {
bm.tagFamilies = make(map[string]*dataBlock, n)
}
var nameBytes []byte
var err error
for i := uint64(0); i < n; i++ {
src, nameBytes, err = encoding.DecodeBytes(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err)
}
tf := &dataBlock{}
src, err = tf.unmarshal(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tagFamily dataBlock: %w", err)
}
src = tf.unmarshal(src)
bm.tagFamilies[string(nameBytes)] = tf
}
}
var err error
src, err = bm.field.unmarshal(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal columnFamilyMetadata: %w", err)
Expand Down Expand Up @@ -274,26 +255,20 @@ func (tm *timestampsMetadata) marshal(dst []byte) []byte {
return dst
}

func (tm *timestampsMetadata) unmarshal(src []byte) ([]byte, error) {
src, err := tm.dataBlock.unmarshal(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal ts dataBlock: %w", err)
}
func (tm *timestampsMetadata) unmarshal(src []byte) []byte {
src = tm.dataBlock.unmarshal(src)
tm.min = int64(encoding.BytesToUint64(src))
src = src[8:]
tm.max = int64(encoding.BytesToUint64(src))
src = src[8:]
tm.encodeType = encoding.EncodeType(src[0])
src = src[1:]
src, n, err := encoding.BytesToVarUint64(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal ts offset: %w", err)
}
src, n := encoding.BytesToVarUint64(src)
tm.versionOffset = n
tm.versionFirst = int64(encoding.BytesToUint64(src))
src = src[8:]
tm.versionEncodeType = encoding.EncodeType(src[0])
return src[1:], nil
return src[1:]
}

func unmarshalBlockMetadata(dst []blockMetadata, src []byte) ([]blockMetadata, error) {
Expand Down
6 changes: 2 additions & 4 deletions banyand/measure/block_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ func Test_dataBlock_marshal_unmarshal(t *testing.T) {

unmarshaled := &dataBlock{}

_, err := unmarshaled.unmarshal(marshaled)
require.NoError(t, err)
_ = unmarshaled.unmarshal(marshaled)

assert.Equal(t, original.offset, unmarshaled.offset)
assert.Equal(t, original.size, unmarshaled.size)
Expand Down Expand Up @@ -157,8 +156,7 @@ func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) {

unmarshaled := &timestampsMetadata{}

_, err := unmarshaled.unmarshal(marshaled)
require.NoError(t, err)
_ = unmarshaled.unmarshal(marshaled)

assert.Equal(t, original.dataBlock.offset, unmarshaled.dataBlock.offset)
assert.Equal(t, original.dataBlock.size, unmarshaled.dataBlock.size)
Expand Down
11 changes: 3 additions & 8 deletions banyand/measure/column_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,7 @@ func (cm *columnMetadata) unmarshal(src []byte) ([]byte, error) {
}
cm.valueType = pbv1.ValueType(src[0])
src = src[1:]
src, err = cm.dataBlock.unmarshal(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal columnMetadata.dataBlock: %w", err)
}
src = cm.dataBlock.unmarshal(src)
return src, nil
}

Expand Down Expand Up @@ -126,14 +123,12 @@ func (cfm *columnFamilyMetadata) marshal(dst []byte) []byte {
}

func (cfm *columnFamilyMetadata) unmarshal(src []byte) ([]byte, error) {
src, columnMetadataLen, err := encoding.BytesToVarUint64(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal columnMetadataLen: %w", err)
}
src, columnMetadataLen := encoding.BytesToVarUint64(src)
if columnMetadataLen < 1 {
return src, nil
}
cms := cfm.resizeColumnMetadata(int(columnMetadataLen))
var err error
for i := range cms {
src, err = cms[i].unmarshal(src)
if err != nil {
Expand Down
30 changes: 17 additions & 13 deletions banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ type queryOptions struct {
}

func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr model.MeasureQueryResult, err error) {
if mqo.TimeRange == nil || len(mqo.Entities) < 1 {
return nil, errors.New("invalid query options: timeRange and series are required")
if mqo.TimeRange == nil {
return nil, errors.New("invalid query options: timeRange are required")
}
if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 {
return nil, errors.New("invalid query options: tagProjection or fieldProjection is required")
Expand All @@ -82,22 +82,26 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr
return mqr, nil
}

series := make([]*pbv1.Series, len(mqo.Entities))
for i := range mqo.Entities {
series[i] = &pbv1.Series{
Subject: mqo.Name,
EntityValues: mqo.Entities[i],
}
}

tsdb := db.(storage.TSDB[*tsTable, option])
segments := tsdb.SelectSegments(*mqo.TimeRange)
if len(segments) < 1 {
return nilResult, nil
}

if s.schema.IndexMode {
return s.buildIndexQueryResult(ctx, series, mqo, segments)
return s.buildIndexQueryResult(ctx, mqo, segments)
}

if len(mqo.Entities) < 1 {
return nil, errors.New("invalid query options: series is required")
}

series := make([]*pbv1.Series, len(mqo.Entities))
for i := range mqo.Entities {
series[i] = &pbv1.Series{
Subject: mqo.Name,
EntityValues: mqo.Entities[i],
}
}

sids, tables, storedIndexValue, newTagProjection, err := s.searchSeriesList(ctx, series, mqo, segments)
Expand Down Expand Up @@ -256,7 +260,7 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m
return sl, tables, storedIndexValue, newTagProjection, nil
}

func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Series, mqo model.MeasureQueryOptions,
func (s *measure) buildIndexQueryResult(ctx context.Context, mqo model.MeasureQueryOptions,
segments []storage.Segment[*tsTable, option],
) (model.MeasureQueryResult, error) {
defer func() {
Expand Down Expand Up @@ -310,7 +314,7 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Seri
opts.TimeRange = mqo.TimeRange
}
sr := &segResult{}
sr.SeriesData, sr.sortedValues, err = segments[i].IndexDB().Search(ctx, series, opts)
sr.SeriesData, sr.sortedValues, err = segments[i].IndexDB().SearchWithoutSeries(ctx, opts)
if err != nil {
return nil, err
}
Expand Down
15 changes: 3 additions & 12 deletions banyand/measure/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,10 +621,7 @@ func (t *TopNValue) Unmarshal(src []byte, decoder *encoding.BytesBlockDecoder) e
t.valueName = convert.BytesToString(nameBytes)

var entityTagNamesCount uint64
src, entityTagNamesCount, err = encoding.BytesToVarUint64(src)
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.entityTagNamesCount: %w", err)
}
src, entityTagNamesCount = encoding.BytesToVarUint64(src)
t.entityTagNames = make([]string, 0, entityTagNamesCount)
var entityTagNameBytes []byte
for i := uint64(0); i < entityTagNamesCount; i++ {
Expand All @@ -636,10 +633,7 @@ func (t *TopNValue) Unmarshal(src []byte, decoder *encoding.BytesBlockDecoder) e
}

var valuesCount uint64
src, valuesCount, err = encoding.BytesToVarUint64(src)
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.valuesCount: %w", err)
}
src, valuesCount = encoding.BytesToVarUint64(src)

if len(src) < 1 {
return fmt.Errorf("cannot unmarshal topNValue.encodeType: src is too short")
Expand All @@ -658,10 +652,7 @@ func (t *TopNValue) Unmarshal(src []byte, decoder *encoding.BytesBlockDecoder) e
return fmt.Errorf("cannot unmarshal topNValue.valueLen: src is too short")
}
var valueLen uint64
src, valueLen, err = encoding.BytesToVarUint64(src)
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.valueLen: %w", err)
}
src, valueLen = encoding.BytesToVarUint64(src)

if uint64(len(src)) < valueLen {
return fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", valueLen, len(src))
Expand Down
Loading

0 comments on commit 3658400

Please sign in to comment.