Skip to content

Commit

Permalink
Merge branch 'main' into docker
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Sep 8, 2024
2 parents ed41e1f + 17dc8c0 commit d3da80d
Show file tree
Hide file tree
Showing 23 changed files with 462 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Release Notes.
- Optimize query performance of series index.
- Add liaison, remote queue, storage(rotation), time-series tables, metadata cache and scheduler metrics.
- Add HTTP health check endpoint for the data node.
- Add slow query log for the distributed query and local query.

### Bugs

Expand Down
2 changes: 1 addition & 1 deletion api/proto/banyandb/database/v1/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ message FieldSpec {
// compression_method indicates how to compress data during writing
CompressionMethod compression_method = 4 [(validate.rules).enum.defined_only = true];
// aggregate_function indicates how to aggregate data
model.v1.AggregationFunction aggregate_function = 5;
model.v1.MeasureAggregate aggregate_function = 5;
}

// Measure intends to store data point
Expand Down
12 changes: 12 additions & 0 deletions banyand/dquery/dquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dquery
import (
"context"
"errors"
"time"

"go.uber.org/multierr"

Expand Down Expand Up @@ -60,6 +61,7 @@ type queryService struct {
tqp *topNQueryProcessor
closer *run.Closer
nodeID string
slowQuery time.Duration
}

// NewService return a new query service.
Expand Down Expand Up @@ -90,6 +92,16 @@ func (q *queryService) Name() string {
return moduleName
}

func (q *queryService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("distributed-query")
fs.DurationVar(&q.slowQuery, "dst-slow-query", 0, "distributed slow query threshold, 0 means no slow query log")
return fs
}

func (q *queryService) Validate() error {
return nil
}

func (q *queryService) PreRun(ctx context.Context) error {
val := ctx.Value(common.ContextNodeKey)
if val == nil {
Expand Down
8 changes: 7 additions & 1 deletion banyand/dquery/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
timeRange: queryCriteria.TimeRange,
}))
if err != nil {
ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan")
ml.Error().Err(err).Dur("latency", time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to query")
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to execute the query plan for measure %s: %v", meta.GetName(), err))
return
}
Expand Down Expand Up @@ -144,5 +144,11 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
}
resp = bus.NewMessage(bus.MessageID(now), qr)
if !queryCriteria.Trace && p.slowQuery > 0 {
latency := time.Since(n)
if latency > p.slowQuery {
p.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(queryCriteria)).Int("resp_count", len(result)).Msg("measure slow query")
}
}
return
}
14 changes: 9 additions & 5 deletions banyand/dquery/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
Expand Down Expand Up @@ -80,9 +79,9 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
ctx := context.Background()
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
var tracer *query.Tracer
var span *query.Span
tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, ctx = tracer.StartSpan(ctx, "distributed-%s", p.queryService.nodeID)
span.Tag("plan", plan.String())
Expand All @@ -93,7 +92,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
d.Trace = tracer.ToProto()
case common.Error:
span.Error(errors.New(d.Msg()))
resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{Trace: tracer.ToProto()})
resp = bus.NewMessage(bus.MessageID(now), &streamv1.QueryResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
Expand All @@ -113,6 +112,11 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
}

resp = bus.NewMessage(bus.MessageID(now), &streamv1.QueryResponse{Elements: entities})

if !queryCriteria.Trace && p.slowQuery > 0 {
latency := time.Since(n)
if latency > p.slowQuery {
p.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(queryCriteria)).Int("resp_count", len(entities)).Msg("stream slow query")
}
}
return
}
34 changes: 32 additions & 2 deletions banyand/dquery/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package dquery

import (
"context"
"errors"
"time"

"go.uber.org/multierr"
Expand All @@ -30,7 +32,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
pkgquery "github.com/apache/skywalking-banyandb/pkg/query"
)

const defaultTopNQueryTimeout = 10 * time.Second
Expand All @@ -46,6 +50,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
t.log.Warn().Msg("invalid event data type")
return
}
n := time.Now()
now := bus.MessageID(request.TimeRange.Begin.Nanos)
if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
resp = bus.NewMessage(now, common.NewError("unspecified requested sort direction"))
Expand All @@ -56,7 +61,25 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
return
}
if e := t.log.Debug(); e.Enabled() {
e.Stringer("req", request).Msg("received a topN query event")
e.RawJSON("req", logger.Proto(request)).Msg("received a topN query event")
}
if request.Trace {
tracer, ctx := pkgquery.NewTracer(context.TODO(), n.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "distributed-client")
span.Tag("request", convert.BytesToString(logger.Proto(request)))
defer func() {
data := resp.Data()
switch d := data.(type) {
case *measurev1.TopNResponse:
d.Trace = tracer.ToProto()
case common.Error:
span.Error(errors.New(d.Msg()))
resp = bus.NewMessage(now, &measurev1.TopNResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
span.Stop()
}()
}
agg := request.Agg
request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
Expand Down Expand Up @@ -103,9 +126,16 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
resp = bus.NewMessage(now, &measurev1.TopNResponse{})
return
}
lists := aggregator.Val(tags)
resp = bus.NewMessage(now, &measurev1.TopNResponse{
Lists: aggregator.Val(tags),
Lists: lists,
})
if !request.Trace && t.slowQuery > 0 {
latency := time.Since(n)
if latency > t.slowQuery {
t.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(request)).Int("resp_count", len(lists)).Msg("top_n slow query")
}
}
return
}

Expand Down
2 changes: 1 addition & 1 deletion banyand/liaison/grpc/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
}
ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, "measure", "write")
if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Debug().Err(errResp).Msg("failed to send response")
logger.Debug().Err(errResp).Msg("failed to send measure write response")
ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "measure", "write")
}
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/liaison/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
}
s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, "stream", "write")
if errResp := stream.Send(&streamv1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Debug().Err(errResp).Msg("failed to send response")
logger.Debug().Err(errResp).Msg("failed to send stream write response")
s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "stream", "write")
}
}
Expand Down
91 changes: 91 additions & 0 deletions banyand/measure/aggregate/aggregate_function.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Package aggregate for measure aggregate function.
package aggregate

import (
"fmt"
"math"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
)

// Void type contains nothing. It works as a placeholder for type parameters of `Arguments`.
type Void struct{}

// Input covers possible types of Function's arguments. It synchronizes with `FieldType` in schema.
type Input interface {
Void | ~int64 | ~float64
}

// Output covers possible types of Function's return value.
type Output interface {
~int64 | ~float64
}

var errFieldValueType = fmt.Errorf("unsupported input value type on this field")

// Arguments represents the argument array, with one argument or two arguments.
type Arguments[A, B Input] struct {
arg0 []A
arg1 []B
}

// Function describes two stages of aggregation.
type Function[A, B Input, R Output] interface {
// Combine takes elements to do the aggregation.
// It uses a two-dimensional array to represent the argument array.
Combine(arguments Arguments[A, B]) error

// Result gives the result for the aggregation.
Result() R
}

// NewFunction constructs the aggregate function with given kind and parameter types.
func NewFunction[A, B Input, R Output](kind modelv1.MeasureAggregate) (Function[A, B, R], error) {
var function Function[A, B, R]
switch kind {
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN:
function = &Min[A, B, R]{minimum: maxValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG:
function = &Avg[A, B, R]{summation: zeroValue[R](), count: 0}
default:
return nil, fmt.Errorf("MeasureAggregate unknown")
}

return function, nil
}

func zeroValue[R Output]() R {
var r R
return r
}

func maxValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
*a = math.MaxInt64
case *float64:
*a = math.MaxFloat64
case *string:
*a = ""
default:
panic("unreachable")
}
return
}
69 changes: 69 additions & 0 deletions banyand/measure/aggregate/avg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Avg calculates the average value of elements.
type Avg[A, B Input, R Output] struct {
summation R
count int64
}

// Combine takes elements to do the aggregation.
// Avg uses type parameter A and B.
func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
a.summation += R(arg0)
case float64:
a.summation += R(arg0)
default:
return errFieldValueType
}
}

for _, arg1 := range arguments.arg1 {
switch arg1 := any(arg1).(type) {
case int64:
a.count += arg1
default:
return errFieldValueType
}
}

return nil
}

// Result gives the result for the aggregation.
func (a *Avg[A, B, R]) Result() R {
// In unusual situations it returns the zero value.
if a.count == 0 {
return zeroValue[R]()
}
// According to the semantics of GoLang, the division of one int by another int
// returns an int, instead of a float.
return a.summation / R(a.count)
}

// NewAvgArguments constructs arguments.
func NewAvgArguments[A Input](a []A, b []int64) Arguments[A, int64] {
return Arguments[A, int64]{
arg0: a,
arg1: b,
}
}
Loading

0 comments on commit d3da80d

Please sign in to comment.