Skip to content

Commit 2ea2655

Browse files
authored
fix: fix EOF error when decoding columns with empty string or zero po… (#155)
* fix: fix EOF error when decoding columns with empty string or zero positionCount * Update column_decoder_test.go 增加license header * fix: return error when resp is nil after reconnect * fix: GetCurrentRowTime returns time.Time to avoid precision ambiguity
1 parent f00cf99 commit 2ea2655

File tree

4 files changed

+284
-21
lines changed

4 files changed

+284
-21
lines changed

client/column_decoder.go

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,18 @@ func (decoder *Int32ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataTyp
9595
// +---------------+-----------------+-------------+
9696
// | byte | list[byte] | list[int32] |
9797
// +---------------+-----------------+-------------+
98+
99+
if positionCount == 0 {
100+
switch dataType {
101+
case INT32, DATE:
102+
return NewIntColumn(0, 0, nil, []int32{})
103+
case FLOAT:
104+
return NewFloatColumn(0, 0, nil, []float32{})
105+
default:
106+
return nil, fmt.Errorf("invalid data type: %v", dataType)
107+
}
108+
}
109+
98110
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
99111
if err != nil {
100112
return nil, err
@@ -139,6 +151,18 @@ func (decoder *Int64ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataTyp
139151
// +---------------+-----------------+-------------+
140152
// | byte | list[byte] | list[int64] |
141153
// +---------------+-----------------+-------------+
154+
155+
if positionCount == 0 {
156+
switch dataType {
157+
case INT64, TIMESTAMP:
158+
return NewLongColumn(0, 0, nil, []int64{})
159+
case DOUBLE:
160+
return NewDoubleColumn(0, 0, nil, []float64{})
161+
default:
162+
return nil, fmt.Errorf("invalid data type: %v", dataType)
163+
}
164+
}
165+
142166
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
143167
if err != nil {
144168
return nil, err
@@ -185,6 +209,11 @@ func (decoder *ByteArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType
185209
if dataType != BOOLEAN {
186210
return nil, fmt.Errorf("invalid data type: %v", dataType)
187211
}
212+
213+
if positionCount == 0 {
214+
return NewBooleanColumn(0, 0, nil, []bool{})
215+
}
216+
188217
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
189218
if err != nil {
190219
return nil, err
@@ -218,6 +247,11 @@ func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataTy
218247
if TEXT != dataType {
219248
return nil, fmt.Errorf("invalid data type: %v", dataType)
220249
}
250+
251+
if positionCount == 0 {
252+
return NewBinaryColumn(0, 0, nil, []*Binary{})
253+
}
254+
221255
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
222256
if err != nil {
223257
return nil, err
@@ -232,12 +266,17 @@ func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataTy
232266
if err != nil {
233267
return nil, err
234268
}
235-
value := make([]byte, length)
236-
_, err = reader.Read(value)
237-
if err != nil {
238-
return nil, err
269+
270+
if length == 0 {
271+
values[i] = NewBinary([]byte{})
272+
} else {
273+
value := make([]byte, length)
274+
_, err = reader.Read(value)
275+
if err != nil {
276+
return nil, err
277+
}
278+
values[i] = NewBinary(value)
239279
}
240-
values[i] = NewBinary(value)
241280
}
242281
return NewBinaryColumn(0, positionCount, nullIndicators, values)
243282
}

client/column_decoder_test.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package client
21+
22+
import (
23+
"bytes"
24+
"encoding/binary"
25+
"testing"
26+
)
27+
28+
func buildNullIndicatorBytes(nulls []bool) []byte {
29+
var buf bytes.Buffer
30+
hasNull := false
31+
for _, n := range nulls {
32+
if n {
33+
hasNull = true
34+
break
35+
}
36+
}
37+
if !hasNull {
38+
buf.WriteByte(0)
39+
return buf.Bytes()
40+
}
41+
buf.WriteByte(1)
42+
packed := make([]byte, (len(nulls)+7)/8)
43+
for i, n := range nulls {
44+
if n {
45+
packed[i/8] |= 0b10000000 >> (uint(i) % 8)
46+
}
47+
}
48+
buf.Write(packed)
49+
return buf.Bytes()
50+
}
51+
52+
func TestBinaryArrayColumnDecoder_EmptyString(t *testing.T) {
53+
var buf bytes.Buffer
54+
buf.Write(buildNullIndicatorBytes([]bool{false}))
55+
_ = binary.Write(&buf, binary.BigEndian, int32(0))
56+
57+
col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 1)
58+
if err != nil {
59+
t.Fatalf("ReadColumn failed: %v", err)
60+
}
61+
if col.GetPositionCount() != 1 {
62+
t.Fatalf("expected positionCount=1, got %d", col.GetPositionCount())
63+
}
64+
if col.IsNull(0) {
65+
t.Fatal("row 0 should not be null")
66+
}
67+
val, err := col.GetBinary(0)
68+
if err != nil {
69+
t.Fatalf("GetBinary(0) failed: %v", err)
70+
}
71+
if len(val.values) != 0 {
72+
t.Fatalf("expected empty string, got %q", string(val.values))
73+
}
74+
}
75+
76+
func TestBinaryArrayColumnDecoder_NullThenEmptyString(t *testing.T) {
77+
var buf bytes.Buffer
78+
buf.Write(buildNullIndicatorBytes([]bool{true, false}))
79+
_ = binary.Write(&buf, binary.BigEndian, int32(0))
80+
81+
col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 2)
82+
if err != nil {
83+
t.Fatalf("ReadColumn failed: %v", err)
84+
}
85+
if !col.IsNull(0) {
86+
t.Error("row 0 should be null")
87+
}
88+
if col.IsNull(1) {
89+
t.Error("row 1 should not be null")
90+
}
91+
val, err := col.GetBinary(1)
92+
if err != nil {
93+
t.Fatalf("GetBinary(1) failed: %v", err)
94+
}
95+
if len(val.values) != 0 {
96+
t.Fatalf("expected empty string, got %q", string(val.values))
97+
}
98+
}
99+
100+
func TestBinaryArrayColumnDecoder_WithNull(t *testing.T) {
101+
var buf bytes.Buffer
102+
buf.Write(buildNullIndicatorBytes([]bool{false, true, false}))
103+
writeText := func(s string) {
104+
_ = binary.Write(&buf, binary.BigEndian, int32(len(s)))
105+
buf.WriteString(s)
106+
}
107+
writeText("hello")
108+
writeText("world")
109+
110+
col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 3)
111+
if err != nil {
112+
t.Fatalf("ReadColumn failed: %v", err)
113+
}
114+
if col.IsNull(0) {
115+
t.Error("row 0 should not be null")
116+
}
117+
if v, _ := col.GetBinary(0); string(v.values) != "hello" {
118+
t.Errorf("row 0: expected \"hello\", got %q", string(v.values))
119+
}
120+
if !col.IsNull(1) {
121+
t.Error("row 1 should be null")
122+
}
123+
if col.IsNull(2) {
124+
t.Error("row 2 should not be null")
125+
}
126+
if v, _ := col.GetBinary(2); string(v.values) != "world" {
127+
t.Errorf("row 2: expected \"world\", got %q", string(v.values))
128+
}
129+
}
130+
131+
func TestInt64ArrayColumnDecoder_WithNull(t *testing.T) {
132+
var buf bytes.Buffer
133+
buf.Write(buildNullIndicatorBytes([]bool{false, true, false}))
134+
_ = binary.Write(&buf, binary.BigEndian, int64(100))
135+
_ = binary.Write(&buf, binary.BigEndian, int64(200))
136+
137+
col, err := (&Int64ArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), INT64, 3)
138+
if err != nil {
139+
t.Fatalf("ReadColumn failed: %v", err)
140+
}
141+
if col.IsNull(0) {
142+
t.Error("row 0 should not be null")
143+
}
144+
if v, _ := col.GetLong(0); v != 100 {
145+
t.Errorf("row 0: expected 100, got %d", v)
146+
}
147+
if !col.IsNull(1) {
148+
t.Error("row 1 should be null")
149+
}
150+
if col.IsNull(2) {
151+
t.Error("row 2 should not be null")
152+
}
153+
if v, _ := col.GetLong(2); v != 200 {
154+
t.Errorf("row 2: expected 200, got %d", v)
155+
}
156+
}
157+
158+
func TestColumnDecoder_ZeroPositionCount(t *testing.T) {
159+
empty := func() *bytes.Reader { return bytes.NewReader([]byte{}) }
160+
161+
t.Run("Int32ArrayColumnDecoder", func(t *testing.T) {
162+
col, err := (&Int32ArrayColumnDecoder{}).ReadColumn(empty(), INT32, 0)
163+
if err != nil {
164+
t.Fatalf("ReadColumn failed: %v", err)
165+
}
166+
if col.GetPositionCount() != 0 {
167+
t.Errorf("expected positionCount=0, got %d", col.GetPositionCount())
168+
}
169+
})
170+
171+
t.Run("Int64ArrayColumnDecoder", func(t *testing.T) {
172+
col, err := (&Int64ArrayColumnDecoder{}).ReadColumn(empty(), INT64, 0)
173+
if err != nil {
174+
t.Fatalf("ReadColumn failed: %v", err)
175+
}
176+
if col.GetPositionCount() != 0 {
177+
t.Errorf("expected positionCount=0, got %d", col.GetPositionCount())
178+
}
179+
})
180+
181+
t.Run("ByteArrayColumnDecoder", func(t *testing.T) {
182+
col, err := (&ByteArrayColumnDecoder{}).ReadColumn(empty(), BOOLEAN, 0)
183+
if err != nil {
184+
t.Fatalf("ReadColumn failed: %v", err)
185+
}
186+
if col.GetPositionCount() != 0 {
187+
t.Errorf("expected positionCount=0, got %d", col.GetPositionCount())
188+
}
189+
})
190+
191+
t.Run("BinaryArrayColumnDecoder", func(t *testing.T) {
192+
col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(empty(), TEXT, 0)
193+
if err != nil {
194+
t.Fatalf("ReadColumn failed: %v", err)
195+
}
196+
if col.GetPositionCount() != 0 {
197+
t.Errorf("expected positionCount=0, got %d", col.GetPositionCount())
198+
}
199+
})
200+
}

client/session.go

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -569,10 +569,15 @@ func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionD
569569
request.SessionId = s.sessionId
570570
request.StatementId = s.requestStatementId
571571
resp, err = s.client.ExecuteQueryStatementV2(context.Background(), &request)
572-
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
573-
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
574-
} else {
575-
return nil, statusErr
572+
if err == nil {
573+
if resp == nil {
574+
return nil, fmt.Errorf("received nil response after reconnect")
575+
}
576+
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
577+
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
578+
} else {
579+
return nil, statusErr
580+
}
576581
}
577582
}
578583
return nil, err
@@ -597,10 +602,15 @@ func (s *Session) ExecuteAggregationQuery(paths []string, aggregations []common.
597602
if s.reconnect() {
598603
request.SessionId = s.sessionId
599604
resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request)
600-
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
601-
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
602-
} else {
603-
return nil, statusErr
605+
if err == nil {
606+
if resp == nil {
607+
return nil, fmt.Errorf("received nil response after reconnect")
608+
}
609+
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
610+
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
611+
} else {
612+
return nil, statusErr
613+
}
604614
}
605615
}
606616
return nil, err
@@ -626,10 +636,15 @@ func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, aggregat
626636
if s.reconnect() {
627637
request.SessionId = s.sessionId
628638
resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request)
629-
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
630-
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
631-
} else {
632-
return nil, statusErr
639+
if err == nil {
640+
if resp == nil {
641+
return nil, fmt.Errorf("received nil response after reconnect")
642+
}
643+
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
644+
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
645+
} else {
646+
return nil, statusErr
647+
}
633648
}
634649
}
635650
return nil, err
@@ -653,10 +668,15 @@ func (s *Session) ExecuteFastLastDataQueryForOnePrefixPath(prefixes []string, ti
653668
if s.reconnect() {
654669
request.SessionId = s.sessionId
655670
resp, err = s.client.ExecuteFastLastDataQueryForOnePrefixPath(context.Background(), &request)
656-
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
657-
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
658-
} else {
659-
return nil, statusErr
671+
if err == nil {
672+
if resp == nil {
673+
return nil, fmt.Errorf("received nil response after reconnect")
674+
}
675+
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
676+
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
677+
} else {
678+
return nil, statusErr
679+
}
660680
}
661681
}
662682
return nil, err

client/sessiondataset.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,7 @@ func (s *SessionDataSet) GetColumnNames() []string {
125125
func (s *SessionDataSet) GetColumnTypes() []string {
126126
return s.ioTDBRpcDataSet.columnTypeList
127127
}
128+
129+
func (s *SessionDataSet) GetCurrentRowTime() time.Time {
130+
return convertToTimestamp(s.ioTDBRpcDataSet.GetCurrentRowTime(), s.ioTDBRpcDataSet.timeFactor)
131+
}

0 commit comments

Comments
 (0)