Skip to content

Commit

Permalink
[ospp] Supports measure aggregate function avg and min, and test case…
Browse files Browse the repository at this point in the history
…s. (#521)

* [API] MeasureAggregateFunctionService Support

* Supports measure aggregate function avg and min.

---------

Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
  • Loading branch information
3 people authored Sep 4, 2024
1 parent 2bad32a commit 17dc8c0
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 2 deletions.
2 changes: 1 addition & 1 deletion api/proto/banyandb/database/v1/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ message FieldSpec {
// compression_method indicates how to compress data during writing
CompressionMethod compression_method = 4 [(validate.rules).enum.defined_only = true];
// aggregate_function indicates how to aggregate data
model.v1.AggregationFunction aggregate_function = 5;
model.v1.MeasureAggregate aggregate_function = 5;
}

// Measure intends to store data point
Expand Down
91 changes: 91 additions & 0 deletions banyand/measure/aggregate/aggregate_function.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Package aggregate for measure aggregate function.
package aggregate

import (
"fmt"
"math"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
)

// Void type contains nothing. It works as a placeholder for type parameters of `Arguments`.
type Void struct{}

// Input covers possible types of Function's arguments. It synchronizes with `FieldType` in schema.
type Input interface {
Void | ~int64 | ~float64
}

// Output covers possible types of Function's return value.
type Output interface {
~int64 | ~float64
}

var errFieldValueType = fmt.Errorf("unsupported input value type on this field")

// Arguments represents the argument array, with one argument or two arguments.
type Arguments[A, B Input] struct {
arg0 []A
arg1 []B
}

// Function describes two stages of aggregation.
type Function[A, B Input, R Output] interface {
// Combine takes elements to do the aggregation.
// It uses a two-dimensional array to represent the argument array.
Combine(arguments Arguments[A, B]) error

// Result gives the result for the aggregation.
Result() R
}

// NewFunction constructs the aggregate function with given kind and parameter types.
func NewFunction[A, B Input, R Output](kind modelv1.MeasureAggregate) (Function[A, B, R], error) {
var function Function[A, B, R]
switch kind {
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN:
function = &Min[A, B, R]{minimum: maxValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG:
function = &Avg[A, B, R]{summation: zeroValue[R](), count: 0}
default:
return nil, fmt.Errorf("MeasureAggregate unknown")
}

return function, nil
}

func zeroValue[R Output]() R {
var r R
return r
}

func maxValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
*a = math.MaxInt64
case *float64:
*a = math.MaxFloat64
case *string:
*a = ""
default:
panic("unreachable")
}
return
}
69 changes: 69 additions & 0 deletions banyand/measure/aggregate/avg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Avg calculates the average value of elements.
type Avg[A, B Input, R Output] struct {
summation R
count int64
}

// Combine takes elements to do the aggregation.
// Avg uses type parameter A and B.
func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
a.summation += R(arg0)
case float64:
a.summation += R(arg0)
default:
return errFieldValueType
}
}

for _, arg1 := range arguments.arg1 {
switch arg1 := any(arg1).(type) {
case int64:
a.count += arg1
default:
return errFieldValueType
}
}

return nil
}

// Result gives the result for the aggregation.
func (a *Avg[A, B, R]) Result() R {
// In unusual situations it returns the zero value.
if a.count == 0 {
return zeroValue[R]()
}
// According to the semantics of GoLang, the division of one int by another int
// returns an int, instead of a float.
return a.summation / R(a.count)
}

// NewAvgArguments constructs arguments.
func NewAvgArguments[A Input](a []A, b []int64) Arguments[A, int64] {
return Arguments[A, int64]{
arg0: a,
arg1: b,
}
}
49 changes: 49 additions & 0 deletions banyand/measure/aggregate/avg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestAvg(t *testing.T) {
var err error

// case1: input int64 values
avgInt64, _ := aggregate.NewFunction[int64, int64, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG)
err = avgInt64.Combine(aggregate.NewAvgArguments[int64](
[]int64{1, 2, 3}, // mock the "summation" column
[]int64{1, 1, 1}, // mock the "count" column
))
assert.NoError(t, err)
assert.Equal(t, int64(2), avgInt64.Result()) // note that 7/3 becomes 2 as int

// case2: input float64 elements
avgFloat64, _ := aggregate.NewFunction[float64, int64, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG)
err = avgFloat64.Combine(aggregate.NewAvgArguments[float64](
[]float64{1.0, 3.0, 3.0}, // mock the "summation" column
[]int64{1, 1, 1}, // mock the "count" column
))
assert.NoError(t, err)
assert.Equal(t, 7.0/3, avgFloat64.Result())
}
56 changes: 56 additions & 0 deletions banyand/measure/aggregate/min.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Min calculates the minimum value of elements.
type Min[A, B Input, R Output] struct {
minimum R
}

// Combine takes elements to do the aggregation.
// Min uses type parameter A.
func (m *Min[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
if R(arg0) < m.minimum {
m.minimum = R(arg0)
}
case float64:
if R(arg0) < m.minimum {
m.minimum = R(arg0)
}
default:
return errFieldValueType
}
}
return nil
}

// Result gives the result for the aggregation.
func (m *Min[A, B, R]) Result() R {
return m.minimum
}

// NewMinArguments constructs arguments.
func NewMinArguments[A Input](a []A) Arguments[A, Void] {
return Arguments[A, Void]{
arg0: a,
arg1: nil,
}
}
47 changes: 47 additions & 0 deletions banyand/measure/aggregate/min_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestMin(t *testing.T) {
var err error

// case1: input int64 values
minInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN)
err = minInt64.Combine(aggregate.NewMinArguments[int64](
[]int64{1, 2, 3}, // mock the "minimum" column
))
assert.NoError(t, err)
assert.Equal(t, int64(1), minInt64.Result())

// case2: input float64 values
minFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN)
err = minFloat64.Combine(aggregate.NewMinArguments[float64](
[]float64{1.0, 2.0, 3.0}, // mock the "minimum" column
))
assert.NoError(t, err)
assert.Equal(t, 1.0, minFloat64.Result())
}
2 changes: 1 addition & 1 deletion docs/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ FieldSpec is the specification of field
| field_type | [FieldType](#banyandb-database-v1-FieldType) | | field_type denotes the type of field value |
| encoding_method | [EncodingMethod](#banyandb-database-v1-EncodingMethod) | | encoding_method indicates how to encode data during writing |
| compression_method | [CompressionMethod](#banyandb-database-v1-CompressionMethod) | | compression_method indicates how to compress data during writing |
| aggregate_function | [banyandb.model.v1.AggregationFunction](#banyandb-model-v1-AggregationFunction) | | aggregate_function indicates how to aggregate data |
| aggregate_function | [banyandb.model.v1.MeasureAggregate](#banyandb-model-v1-MeasureAggregate) | | aggregate_function indicates how to aggregate data |



Expand Down

0 comments on commit 17dc8c0

Please sign in to comment.