Skip to content

Commit

Permalink
Support nil in Tablet (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
HTHou authored Apr 30, 2024
1 parent c8eaf12 commit 1fcceb5
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ jobs:
matrix:
os: [macos-latest, ubuntu-latest, windows-latest]
go: ['1.13', 'stable']
exclude:
- os: macos-latest
go: '1.13'
steps:

- name: Set up Go ${{ matrix.go }}
Expand Down
75 changes: 75 additions & 0 deletions client/bitmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package client

type BitMap struct {
size int
bits []byte
}

var BitUtil = []byte{1, 2, 4, 8, 16, 32, 64, 128}
var UnmarkBitUtil = []byte{
0xFE, // 11111110
0xFD, // 11111101
0xFB, // 11111011
0xF7, // 11110111
0xEF, // 11101111
0xDF, // 11011111
0xBF, // 10111111
0x7F, // 01111111
}

func NewBitMap(size int) *BitMap {
bitMap := &BitMap{
size: size,
bits: make([]byte, (size+7)/8),
}
return bitMap
}

func (b *BitMap) Mark(position int) {
b.bits[position/8] |= BitUtil[position%8]
}

func (b *BitMap) UnMark(position int) {
b.bits[position/8] &= UnmarkBitUtil[position%8]
}

func (b *BitMap) IsMarked(position int) bool {
return (b.bits[position/8] & BitUtil[position%8]) != 0
}

func (b *BitMap) IsAllUnmarked() bool {
for i := 0; i < b.size/8; i++ {
if b.bits[i] != 0 {
return false
}
}
for i := 0; i < b.size%8; i++ {
if (b.bits[b.size/8] & BitUtil[i]) != 0 {
return false
}
}
return true
}

func (b *BitMap) GetBits() []byte {
return b.bits
}
53 changes: 47 additions & 6 deletions client/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package client
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"reflect"
"sort"
Expand All @@ -38,6 +37,7 @@ type Tablet struct {
measurementSchemas []*MeasurementSchema
timestamps []int64
values []interface{}
bitMaps []*BitMap
maxRowNumber int
RowSize int
}
Expand Down Expand Up @@ -69,6 +69,24 @@ func (t *Tablet) Swap(i, j int) {
sortedSlice[i], sortedSlice[j] = sortedSlice[j], sortedSlice[i]
}
}
if t.bitMaps != nil {
for _, bitMap := range t.bitMaps {
if bitMap != nil {
isNilI := bitMap.IsMarked(i)
isNilJ := bitMap.IsMarked(j)
if isNilI {
bitMap.Mark(j)
} else {
bitMap.UnMark(j)
}
if isNilJ {
bitMap.Mark(i)
} else {
bitMap.UnMark(i)
}
}
}
}
t.timestamps[i], t.timestamps[j] = t.timestamps[j], t.timestamps[i]
}

Expand All @@ -81,18 +99,27 @@ func (t *Tablet) SetTimestamp(timestamp int64, rowIndex int) {
}

func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error {
if value == nil {
return errors.New("illegal argument value can't be nil")
}

if columnIndex < 0 || columnIndex > len(t.measurementSchemas) {
return fmt.Errorf("illegal argument columnIndex %d", columnIndex)
}

if rowIndex < 0 || rowIndex > int(t.maxRowNumber) {
if rowIndex < 0 || rowIndex > t.maxRowNumber {
return fmt.Errorf("illegal argument rowIndex %d", rowIndex)
}

if value == nil {
// Init the bitMap to mark nil value
if t.bitMaps == nil {
t.bitMaps = make([]*BitMap, len(t.values))
}
if t.bitMaps[columnIndex] == nil {
t.bitMaps[columnIndex] = NewBitMap(t.maxRowNumber)
}
// Mark the nil value position
t.bitMaps[columnIndex].Mark(rowIndex)
}

switch t.measurementSchemas[columnIndex].DataType {
case BOOLEAN:
values := t.values[columnIndex].([]bool)
Expand Down Expand Up @@ -167,11 +194,15 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) {
return nil, fmt.Errorf("illegal argument columnIndex %d", columnIndex)
}

if rowIndex < 0 || rowIndex > int(t.maxRowNumber) {
if rowIndex < 0 || rowIndex > t.maxRowNumber {
return nil, fmt.Errorf("illegal argument rowIndex %d", rowIndex)
}

schema := t.measurementSchemas[columnIndex]

if t.bitMaps != nil && t.bitMaps[columnIndex] != nil && t.bitMaps[columnIndex].IsMarked(rowIndex) {
return nil, nil
}
switch schema.DataType {
case BOOLEAN:
return t.values[columnIndex].([]bool)[rowIndex], nil
Expand Down Expand Up @@ -235,6 +266,15 @@ func (t *Tablet) getValuesBytes() ([]byte, error) {
return nil, fmt.Errorf("illegal datatype %v", schema.DataType)
}
}
if t.bitMaps != nil {
for _, bitMap := range t.bitMaps {
columnHasNil := bitMap != nil && !bitMap.IsAllUnmarked()
binary.Write(buff, binary.BigEndian, columnHasNil)
if columnHasNil {
binary.Write(buff, binary.BigEndian, bitMap.GetBits()[0:t.RowSize/8+1])
}
}
}
return buff.Bytes(), nil
}

Expand All @@ -245,6 +285,7 @@ func (t *Tablet) Sort() error {

func (t *Tablet) Reset() {
t.RowSize = 0
t.bitMaps = nil
}

func NewTablet(deviceId string, measurementSchemas []*MeasurementSchema, maxRowNumber int) (*Tablet, error) {
Expand Down
83 changes: 83 additions & 0 deletions client/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,89 @@ func TestTablet_GetValueAt(t *testing.T) {
}
}

func TestTablet_GetNilValueAt(t *testing.T) {
type args struct {
columnIndex int
rowIndex int
}
tests := []struct {
name string
args args
want interface{}
wantErr bool
}{
{
name: "INT32",
args: args{
columnIndex: 0,
rowIndex: 0,
},
want: int32(256),
wantErr: false,
}, {
name: "FLOAT64",
args: args{
columnIndex: 1,
rowIndex: 0,
},
want: nil,
wantErr: false,
}, {
name: "INT64",
args: args{
columnIndex: 2,
rowIndex: 0,
},
want: int64(65535),
wantErr: false,
}, {
name: "FLOAT32",
args: args{
columnIndex: 3,
rowIndex: 0,
},
want: float32(36.5),
wantErr: false,
}, {
name: "STRING",
args: args{
columnIndex: 4,
rowIndex: 0,
},
want: "Hello World!",
wantErr: false,
}, {
name: "BOOLEAN",
args: args{
columnIndex: 5,
rowIndex: 0,
},
want: true,
wantErr: false,
},
}
if tablet, err := createTablet(1); err == nil {
tablet.SetValueAt(int32(256), 0, 0)
tablet.SetValueAt(nil, 1, 0)
tablet.SetValueAt(int64(65535), 2, 0)
tablet.SetValueAt(float32(36.5), 3, 0)
tablet.SetValueAt("Hello World!", 4, 0)
tablet.SetValueAt(true, 5, 0)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tablet.GetValueAt(tt.args.columnIndex, tt.args.rowIndex)
if (err != nil) != tt.wantErr {
t.Errorf("Tablet.GetValueAt() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Tablet.GetValueAt() = %v, want %v", got, tt.want)
}
})
}
}
}

func TestTablet_Sort(t *testing.T) {

tests := []struct {
Expand Down
6 changes: 5 additions & 1 deletion example/session_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,11 @@ func createTablet(rowCount int) (*client.Tablet, error) {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt(rand.Int31(), 0, row)
tablet.SetValueAt(rand.Float64(), 1, row)
if row%2 == 1 {
tablet.SetValueAt(rand.Float64(), 1, row)
} else {
tablet.SetValueAt(nil, 1, row)
}
tablet.SetValueAt(rand.Int63(), 2, row)
tablet.SetValueAt(rand.Float32(), 3, row)
tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
Expand Down
73 changes: 73 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,79 @@ func (s *e2eTestSuite) Test_InsertAlignedTablet() {
assert.Equal(status, "12")
s.session.DeleteStorageGroup("root.ln.**")
}

func (s *e2eTestSuite) Test_InsertAlignedTabletWithNilValue() {
var timeseries = []string{"root.ln.device1.**"}
s.session.DeleteTimeseries(timeseries)
if tablet, err := createTabletWithNil(12); err == nil {
status, err := s.session.InsertAlignedTablet(tablet, false)
s.checkError(status, err)
tablet.Reset()
} else {
log.Fatal(err)
}
var timeout int64 = 1000
ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", &timeout)
assert := s.Require()
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
var status string
assert.NoError(ds.Scan(&status))
assert.Equal(status, "12")
s.session.DeleteStorageGroup("root.ln.**")
}

func createTabletWithNil(rowCount int) (*client.Tablet, error) {
tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
{
Measurement: "restart_count",
DataType: client.INT32,
}, {
Measurement: "price",
DataType: client.DOUBLE,
}, {
Measurement: "tick_count",
DataType: client.INT64,
}, {
Measurement: "temperature",
DataType: client.FLOAT,
}, {
Measurement: "description",
DataType: client.TEXT,
},
{
Measurement: "status",
DataType: client.BOOLEAN,
},
}, rowCount)

if err != nil {
return nil, err
}
ts := time.Now().UTC().UnixNano() / 1000000
for row := 0; row < int(rowCount); row++ {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt(rand.Int31(), 0, row)
if row%2 == 1 {
tablet.SetValueAt(rand.Float64(), 1, row)
} else {
tablet.SetValueAt(nil, 1, row)
}
tablet.SetValueAt(rand.Int63(), 2, row)
if row%3 == 1 {
tablet.SetValueAt(rand.Float32(), 3, row)
} else {
tablet.SetValueAt(nil, 3, row)
}
tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
tablet.SetValueAt(bool(ts%2 == 0), 5, row)
tablet.RowSize++
}
return tablet, nil
}

func createTablet(rowCount int) (*client.Tablet, error) {
tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
{
Expand Down

0 comments on commit 1fcceb5

Please sign in to comment.