Skip to content

Commit d96fc20

Browse files
committed
fix: Cannot convert NULL value to non-Nullable type issue
1 parent fab44a6 commit d96fc20

File tree

4 files changed

+66
-7
lines changed

4 files changed

+66
-7
lines changed

model/metric.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ type DimMetrics struct {
5353

5454
// ColumnWithType
5555
type ColumnWithType struct {
56-
Name string
57-
Type *TypeInfo
58-
SourceName string
56+
Name string
57+
Type *TypeInfo
58+
SourceName string
59+
NotNullable bool
5960
}
6061

6162
// struct for ingesting a clickhouse Map type value

output/clickhouse.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,25 @@ import (
3737
)
3838

3939
var (
40-
ErrTblNotExist = errors.Newf("table doesn't exist")
41-
selectSQLTemplate = `select name, type, default_kind from system.columns where database = '%s' and table = '%s'`
42-
40+
ErrTblNotExist = errors.Newf("table doesn't exist")
41+
selectSQLTemplate = `select name, type, default_kind from system.columns where database = '%s' and table = '%s'`
42+
referedSQLTemplate = `SELECT
43+
current_col.default_expression,
44+
referenced_col.type AS referenced_col_type,
45+
current_col.name,
46+
current_col.type
47+
FROM
48+
system.columns AS current_col
49+
JOIN
50+
system.columns AS referenced_col
51+
ON
52+
current_col.database = referenced_col.database
53+
AND current_col.table = referenced_col.table
54+
AND current_col.default_expression = referenced_col.name
55+
WHERE
56+
current_col.database = '%s'
57+
AND
58+
current_col.table = '%s';`
4359
wrSeriesQuota int = 16384
4460

4561
SeriesQuotas sync.Map

output/clickhouse_util.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package output
22

33
import (
44
"fmt"
5+
"strings"
56

67
"github.com/housepower/clickhouse_sinker/model"
78
"github.com/housepower/clickhouse_sinker/pool"
@@ -15,6 +16,23 @@ func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn *p
1516

1617
func getDims(database, table string, excludedColumns []string, parser string, conn *pool.Conn) (dims []*model.ColumnWithType, err error) {
1718
var rs *pool.Rows
19+
notNullable := make(map[string]bool)
20+
if rs, err = conn.Query(fmt.Sprintf(referedSQLTemplate, database, table)); err != nil {
21+
err = errors.Wrapf(err, "")
22+
return
23+
}
24+
var default_expression, referenced_col_type, col_name, ori_type string
25+
for rs.Next() {
26+
if err = rs.Scan(&default_expression, &referenced_col_type, &col_name, &ori_type); err != nil {
27+
err = errors.Wrapf(err, "")
28+
return
29+
}
30+
if strings.HasPrefix(referenced_col_type, "Nullable(") && !strings.HasSuffix(ori_type, "Nullable(") {
31+
notNullable[default_expression] = true
32+
}
33+
}
34+
35+
rs.Close()
1836
if rs, err = conn.Query(fmt.Sprintf(selectSQLTemplate, database, table)); err != nil {
1937
err = errors.Wrapf(err, "")
2038
return
@@ -29,7 +47,16 @@ func getDims(database, table string, excludedColumns []string, parser string, co
2947
return
3048
}
3149
if !util.StringContains(excludedColumns, name) && defaultKind != "MATERIALIZED" {
32-
dims = append(dims, &model.ColumnWithType{Name: name, Type: model.WhichType(typ), SourceName: util.GetSourceName(parser, name)})
50+
nnull, ok := notNullable[name]
51+
if !ok {
52+
nnull = false
53+
}
54+
dims = append(dims, &model.ColumnWithType{
55+
Name: name,
56+
Type: model.WhichType(typ),
57+
SourceName: util.GetSourceName(parser, name),
58+
NotNullable: nnull,
59+
})
3360
}
3461
}
3562
if len(dims) == 0 {

task/task.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ func (service *Service) Put(msg *model.InputMessage, traceId string, flushFn fun
172172
return nil
173173
} else {
174174
row = service.metric2Row(metric, msg)
175+
if row == nil {
176+
return nil
177+
}
175178
if taskCfg.DynamicSchema.Enable {
176179
foundNewKeys = metric.GetNewKeys(&service.knownKeys, &service.newKeys, &service.warnKeys, service.whiteList, service.blackList, msg.Partition, msg.Offset)
177180
}
@@ -277,6 +280,18 @@ func (service *Service) metric2Row(metric model.Metric, msg *model.InputMessage)
277280
}
278281
} else {
279282
val := model.GetValueByType(metric, dim)
283+
if dim.NotNullable && val == nil {
284+
// null 不能插入到非 nullbale字段中
285+
util.Logger.Warn("null value detected, throw this message",
286+
zap.String("dimension", dim.Name),
287+
zap.String("task", service.taskCfg.Name),
288+
zap.String("topic", msg.Topic),
289+
zap.Int("partition", msg.Partition),
290+
zap.Int64("offset", msg.Offset),
291+
zap.String("key", string(msg.Key)),
292+
zap.Time("timestamp", *msg.Timestamp))
293+
return nil
294+
}
280295
row = append(row, val)
281296
}
282297
}

0 commit comments

Comments
 (0)