Skip to content

Commit cc784eb

Browse files
authored
Go Client support V2 read interface
1 parent 29ef3c0 commit cc784eb

16 files changed

+2663
-1471
lines changed

client/column.go

+821
Large diffs are not rendered by default.

client/column_decoder.go

+269
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package client
21+
22+
import (
23+
"bytes"
24+
"encoding/binary"
25+
"fmt"
26+
)
27+
28+
type ColumnDecoder interface {
29+
ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error)
30+
}
31+
32+
func deserializeNullIndicators(reader *bytes.Reader, positionCount int32) ([]bool, error) {
33+
b, err := reader.ReadByte()
34+
if err != nil {
35+
return nil, err
36+
}
37+
mayHaveNull := b != 0
38+
if !mayHaveNull {
39+
return nil, nil
40+
}
41+
return deserializeBooleanArray(reader, positionCount)
42+
}
43+
44+
func deserializeBooleanArray(reader *bytes.Reader, size int32) ([]bool, error) {
45+
packedSize := (size + 7) / 8
46+
packedBytes := make([]byte, packedSize)
47+
48+
_, err := reader.Read(packedBytes)
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
// read null bits 8 at a time
54+
output := make([]bool, size)
55+
currentByte := 0
56+
fullGroups := int(size) & ^0b111
57+
for pos := 0; pos < fullGroups; pos += 8 {
58+
b := packedBytes[currentByte]
59+
currentByte++
60+
61+
output[pos+0] = (b & 0b10000000) != 0
62+
output[pos+1] = (b & 0b01000000) != 0
63+
output[pos+2] = (b & 0b00100000) != 0
64+
output[pos+3] = (b & 0b00010000) != 0
65+
output[pos+4] = (b & 0b00001000) != 0
66+
output[pos+5] = (b & 0b00000100) != 0
67+
output[pos+6] = (b & 0b00000010) != 0
68+
output[pos+7] = (b & 0b00000001) != 0
69+
}
70+
71+
// read last null bits
72+
if remaining := int(size) % 8; remaining > 0 {
73+
b := packedBytes[len(packedBytes)-1]
74+
mask := uint8(0b10000000)
75+
76+
for pos := fullGroups; pos < int(size); pos++ {
77+
output[pos] = (b & mask) != 0
78+
mask >>= 1
79+
}
80+
}
81+
82+
return output, nil
83+
}
84+
85+
type baseColumnDecoder struct{}
86+
87+
type Int32ArrayColumnDecoder struct {
88+
baseColumnDecoder
89+
}
90+
91+
func (decoder *Int32ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
92+
// Serialized data layout:
93+
// +---------------+-----------------+-------------+
94+
// | may have null | null indicators | values |
95+
// +---------------+-----------------+-------------+
96+
// | byte | list[byte] | list[int32] |
97+
// +---------------+-----------------+-------------+
98+
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
99+
if err != nil {
100+
return nil, err
101+
}
102+
switch dataType {
103+
case INT32, DATE:
104+
intValues := make([]int32, positionCount)
105+
for i := int32(0); i < positionCount; i++ {
106+
if nullIndicators != nil && nullIndicators[i] {
107+
continue
108+
}
109+
err := binary.Read(reader, binary.BigEndian, &intValues[i])
110+
if err != nil {
111+
return nil, err
112+
}
113+
}
114+
return NewIntColumn(0, positionCount, nullIndicators, intValues)
115+
case FLOAT:
116+
floatValues := make([]float32, positionCount)
117+
for i := int32(0); i < positionCount; i++ {
118+
if nullIndicators != nil && nullIndicators[i] {
119+
continue
120+
}
121+
err := binary.Read(reader, binary.BigEndian, &floatValues[i])
122+
if err != nil {
123+
return nil, err
124+
}
125+
}
126+
return NewFloatColumn(0, positionCount, nullIndicators, floatValues)
127+
}
128+
return nil, fmt.Errorf("invalid data type: %v", dataType)
129+
}
130+
131+
type Int64ArrayColumnDecoder struct {
132+
baseColumnDecoder
133+
}
134+
135+
func (decoder *Int64ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
136+
// Serialized data layout:
137+
// +---------------+-----------------+-------------+
138+
// | may have null | null indicators | values |
139+
// +---------------+-----------------+-------------+
140+
// | byte | list[byte] | list[int64] |
141+
// +---------------+-----------------+-------------+
142+
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
143+
if err != nil {
144+
return nil, err
145+
}
146+
switch dataType {
147+
case INT64, TIMESTAMP:
148+
values := make([]int64, positionCount)
149+
for i := int32(0); i < positionCount; i++ {
150+
if nullIndicators != nil && nullIndicators[i] {
151+
continue
152+
}
153+
if err = binary.Read(reader, binary.BigEndian, &values[i]); err != nil {
154+
return nil, err
155+
}
156+
}
157+
return NewLongColumn(0, positionCount, nullIndicators, values)
158+
case DOUBLE:
159+
values := make([]float64, positionCount)
160+
for i := int32(0); i < positionCount; i++ {
161+
if nullIndicators != nil && nullIndicators[i] {
162+
continue
163+
}
164+
if err = binary.Read(reader, binary.BigEndian, &values[i]); err != nil {
165+
return nil, err
166+
}
167+
}
168+
return NewDoubleColumn(0, positionCount, nullIndicators, values)
169+
}
170+
return nil, fmt.Errorf("invalid data type: %v", dataType)
171+
}
172+
173+
type ByteArrayColumnDecoder struct {
174+
baseColumnDecoder
175+
}
176+
177+
func (decoder *ByteArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
178+
// Serialized data layout:
179+
// +---------------+-----------------+-------------+
180+
// | may have null | null indicators | values |
181+
// +---------------+-----------------+-------------+
182+
// | byte | list[byte] | list[byte] |
183+
// +---------------+-----------------+-------------+
184+
185+
if dataType != BOOLEAN {
186+
return nil, fmt.Errorf("invalid data type: %v", dataType)
187+
}
188+
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
189+
if err != nil {
190+
return nil, err
191+
}
192+
values, err := deserializeBooleanArray(reader, positionCount)
193+
if err != nil {
194+
return nil, err
195+
}
196+
return NewBooleanColumn(0, positionCount, nullIndicators, values)
197+
}
198+
199+
type BinaryArrayColumnDecoder struct {
200+
baseColumnDecoder
201+
}
202+
203+
func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
204+
// Serialized data layout:
205+
// +---------------+-----------------+-------------+
206+
// | may have null | null indicators | values |
207+
// +---------------+-----------------+-------------+
208+
// | byte | list[byte] | list[entry] |
209+
// +---------------+-----------------+-------------+
210+
//
211+
// Each entry is represented as:
212+
// +---------------+-------+
213+
// | value length | value |
214+
// +---------------+-------+
215+
// | int32 | bytes |
216+
// +---------------+-------+
217+
218+
if TEXT != dataType {
219+
return nil, fmt.Errorf("invalid data type: %v", dataType)
220+
}
221+
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
222+
if err != nil {
223+
return nil, err
224+
}
225+
values := make([]*Binary, positionCount)
226+
for i := int32(0); i < positionCount; i++ {
227+
if nullIndicators != nil && nullIndicators[i] {
228+
continue
229+
}
230+
var length int32
231+
err := binary.Read(reader, binary.BigEndian, &length)
232+
if err != nil {
233+
return nil, err
234+
}
235+
value := make([]byte, length)
236+
_, err = reader.Read(value)
237+
if err != nil {
238+
return nil, err
239+
}
240+
values[i] = NewBinary(value)
241+
}
242+
return NewBinaryColumn(0, positionCount, nullIndicators, values)
243+
}
244+
245+
type RunLengthColumnDecoder struct {
246+
baseColumnDecoder
247+
}
248+
249+
func (decoder *RunLengthColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
250+
// Serialized data layout:
251+
// +-----------+-------------------------+
252+
// | encoding | serialized inner column |
253+
// +-----------+-------------------------+
254+
// | byte | list[byte] |
255+
// +-----------+-------------------------+
256+
columnEncoding, err := deserializeColumnEncoding(reader)
257+
if err != nil {
258+
return nil, err
259+
}
260+
columnDecoder, err := getColumnDecoder(columnEncoding)
261+
if err != nil {
262+
return nil, err
263+
}
264+
column, err := columnDecoder.ReadColumn(reader, dataType, 1)
265+
if err != nil {
266+
return nil, err
267+
}
268+
return NewRunLengthEncodedColumn(column, positionCount)
269+
}

client/protocol.go

+48
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package client
2121

22+
import "fmt"
23+
2224
type TSDataType int8
2325

2426
type TSEncoding uint8
@@ -39,6 +41,48 @@ const (
3941
STRING TSDataType = 11
4042
)
4143

44+
var tsTypeMap = map[string]TSDataType{
45+
"BOOLEAN": BOOLEAN,
46+
"INT32": INT32,
47+
"INT64": INT64,
48+
"FLOAT": FLOAT,
49+
"DOUBLE": DOUBLE,
50+
"TEXT": TEXT,
51+
"TIMESTAMP": TIMESTAMP,
52+
"DATE": DATE,
53+
"BLOB": BLOB,
54+
"STRING": STRING,
55+
}
56+
57+
var byteToTsDataType = map[byte]TSDataType{
58+
0: BOOLEAN,
59+
1: INT32,
60+
2: INT64,
61+
3: FLOAT,
62+
4: DOUBLE,
63+
5: TEXT,
64+
8: TIMESTAMP,
65+
9: DATE,
66+
10: BLOB,
67+
11: STRING,
68+
}
69+
70+
func GetDataTypeByStr(name string) (TSDataType, error) {
71+
dataType, exists := tsTypeMap[name]
72+
if !exists {
73+
return UNKNOWN, fmt.Errorf("invalid input: %v", name)
74+
}
75+
return dataType, nil
76+
}
77+
78+
func getDataTypeByByte(b byte) (TSDataType, error) {
79+
dataType, exists := byteToTsDataType[b]
80+
if !exists {
81+
return UNKNOWN, fmt.Errorf("invalid input: %v", b)
82+
}
83+
return dataType, nil
84+
}
85+
4286
const (
4387
PLAIN TSEncoding = 0
4488
DICTIONARY TSEncoding = 1
@@ -202,3 +246,7 @@ const (
202246
CqAlreadyExist int32 = 1402
203247
CqUpdateLastExecTimeError int32 = 1403
204248
)
249+
250+
const (
251+
TimestampColumnName = "Time"
252+
)

0 commit comments

Comments
 (0)