Skip to content

Commit ac5a4c7

Browse files
authored
Merge pull request #70 from taosdata/fix/TD-32108
fix binay out put, fix json array in json format, fix thread leak in sink mode
2 parents 1a60352 + fa5f984 commit ac5a4c7

File tree

7 files changed

+59
-19
lines changed

7 files changed

+59
-19
lines changed

pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.taosdata</groupId>
88
<artifactId>kafka-connect-tdengine</artifactId>
9-
<version>1.1.5</version>
9+
<version>1.1.6</version>
1010
<packaging>jar</packaging>
1111
<name>kafka-connect-tdengine</name>
1212
<description>Kafka Connect Source and Sink Connectors for TDengine</description>
@@ -49,7 +49,7 @@
4949
<maven.compiler.source>1.8</maven.compiler.source>
5050
<maven.compiler.target>1.8</maven.compiler.target>
5151
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
52-
<taos.jdbcdriver.version>3.2.5</taos.jdbcdriver.version>
52+
<taos.jdbcdriver.version>3.3.3</taos.jdbcdriver.version>
5353
<kafka.version>2.8.0</kafka.version>
5454

5555
<!-- test-->

src/main/java/com/taosdata/kafka/connect/db/CacheProcessor.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package com.taosdata.kafka.connect.db;
22

3-
import com.taosdata.jdbc.SchemalessWriter;
3+
import com.taosdata.jdbc.AbstractConnection;
44
import com.taosdata.jdbc.enums.SchemalessProtocolType;
55
import com.taosdata.jdbc.enums.SchemalessTimestampType;
66
import org.apache.kafka.connect.errors.ConnectException;
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

10-
import java.sql.*;
10+
import java.sql.Connection;
11+
import java.sql.ResultSet;
12+
import java.sql.SQLException;
13+
import java.sql.Statement;
1114

1215
/**
1316
* cache connection and provide write schemaless function
@@ -77,8 +80,8 @@ public boolean execute(String sql) throws SQLException {
7780

7881
@Override
7982
public boolean schemalessInsert(String[] records, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
80-
SchemalessWriter writer = new SchemalessWriter(this.getConnection());
81-
writer.write(records, protocolType, timestampType);
83+
AbstractConnection conn = this.getConnection().unwrap(AbstractConnection.class);
84+
conn.write(records, protocolType, timestampType);
8285
return true;
8386
}
8487

src/main/java/com/taosdata/kafka/connect/source/JsonMapper.java

+13-9
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package com.taosdata.kafka.connect.source;
22

3-
import com.alibaba.fastjson.JSON;
43
import com.taosdata.jdbc.tmq.ConsumerRecord;
54
import com.taosdata.jdbc.tmq.ConsumerRecords;
65
import com.taosdata.kafka.connect.db.Processor;
76
import com.taosdata.kafka.connect.enums.OutputFormatEnum;
8-
import org.apache.kafka.connect.data.Schema;
9-
import org.apache.kafka.connect.data.Struct;
107
import org.apache.kafka.connect.source.SourceRecord;
118
import org.slf4j.Logger;
129
import org.slf4j.LoggerFactory;
@@ -17,13 +14,15 @@
1714
import java.util.ArrayList;
1815
import java.util.List;
1916
import java.util.Map;
20-
import java.util.stream.Collectors;
2117

2218
public class JsonMapper extends TableMapper {
2319
private static final Logger log = LoggerFactory.getLogger(JsonMapper.class);
2420
public static int count;
25-
public JsonMapper(String topic, String tableName, int batchMaxRows, Processor processor) throws SQLException {
21+
22+
private boolean outFormatJsonNoArray = true;
23+
public JsonMapper(String topic, String tableName, int batchMaxRows, Processor processor, boolean outFormatJsonNoArray) throws SQLException {
2624
super(topic, tableName, batchMaxRows, processor, OutputFormatEnum.JSON);
25+
this.outFormatJsonNoArray = outFormatJsonNoArray;
2726
}
2827

2928
@Override
@@ -70,21 +69,26 @@ public List<SourceRecord> process(List<ConsumerRecords<Map<String, Object>>> rec
7069

7170
long ts = (Long) value.get(timestampColumn);
7271
for (String tag : tags) {
73-
tagStruct.put(tag, value.get(tag));
72+
tagStruct.put(tag, getValue(value.get(tag), columnType.get(tag)));
7473
}
7574
TDStruct valueStruct = new TDStruct(valueSchema);
7675
valueStruct.put(timestampColumn, ts);
7776
for (String column : columns) {
78-
valueStruct.put(column, value.get(column));
77+
valueStruct.put(column, getValue(value.get(column), columnType.get(column)));
7978
}
8079
if (!tags.isEmpty()) {
8180
valueStruct.put("tags", tagStruct);
8281
}
8382

8483
structs.add(valueStruct);
8584

86-
pendingRecords.add(new SourceRecord(
87-
partition, offset.toMap(), topic, valueSchema, structs));
85+
if (outFormatJsonNoArray){
86+
pendingRecords.add(new SourceRecord(
87+
partition, offset.toMap(), topic, valueSchema, structs.get(0)));
88+
}else {
89+
pendingRecords.add(new SourceRecord(
90+
partition, offset.toMap(), topic, valueSchema, structs));
91+
}
8892
}
8993
}
9094

src/main/java/com/taosdata/kafka/connect/source/LineMapper.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public List<SourceRecord> process(List<ConsumerRecords<Map<String, Object>>> rec
145145
break;
146146
case "BINARY":
147147
case "VARCHAR":
148-
sb.append(",").append(tag).append("=\"").append(map.get(tag)).append("\"");
148+
sb.append(",").append(tag).append("=\"").append(getValue(map.get(tag), "BINARY")).append("\"");
149149
break;
150150
case "BOOL":
151151
sb.append(",").append(tag).append("=").append(map.get(tag));
@@ -184,7 +184,7 @@ public List<SourceRecord> process(List<ConsumerRecords<Map<String, Object>>> rec
184184
break;
185185
case "BINARY":
186186
case "VARCHAR":
187-
columnString.append(column).append("=\"").append(map.get(column)).append("\",");
187+
columnString.append(column).append("=\"").append(getValue(map.get(column), "BINARY")).append("\",");
188188
break;
189189
case "BOOL":
190190
columnString.append(column).append("=").append(map.get(column)).append(",");

src/main/java/com/taosdata/kafka/connect/source/SourceConfig.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,10 @@ public class SourceConfig extends ConnectionConfig {
6161
private static final String FETCH_MAX_ROWS_DISPLAY = "Max Rows Per Batch";
6262

6363
public static final String TABLES_CONFIG = "tables";
64-
private static final String TABLES_DOC = "List of tables for this task to watch for changes.";
6564

65+
private static final String TABLES_DOC = "List of tables for this task to watch for changes.";
66+
public static final String OUT_FORMAT_JSON_NO_ARRAY = "out.format.json.no.array";
67+
private static final String OUT_FORMAT_JSON_NO_ARRAY_DOC = "out format json record without array";
6668
public static final String TOPIC_PER_SUPER_TABLE = "topic.per.stable";
6769
private static final boolean TOPIC_PER_SUPER_TABLE_DEFAULT = true;
6870
private static final String TOPIC_PER_SUPER_TABLE_DOC = "Whether to create a topic for each super table, default is true";
@@ -121,6 +123,7 @@ public class SourceConfig extends ConnectionConfig {
121123
private final boolean topicPerSuperTable;
122124
private final boolean topicNameIgnoreDb;
123125
private final String outFormat;
126+
private final boolean outFormatJsonNoArray;
124127
private final String timestampType;
125128
private final String topicDelimiter;
126129

@@ -147,6 +150,7 @@ public SourceConfig(Map<?, ?> props) {
147150
this.topicPerSuperTable = this.getBoolean(TOPIC_PER_SUPER_TABLE);
148151
this.topicNameIgnoreDb = this.getBoolean(TOPIC_NAME_IGNORE_DB);
149152
this.outFormat = this.getString(OUT_FORMAT_CONFIG).toLowerCase();
153+
this.outFormatJsonNoArray = this.getBoolean(OUT_FORMAT_JSON_NO_ARRAY);
150154
this.timestampType = getString(DATA_PRECISION).trim();
151155
this.topicDelimiter = this.getString(TOPIC_DELIMITER);
152156
this.subscriptionGroupId = this.getString(SUBSCRIPTION_GROUP_ID);
@@ -281,6 +285,12 @@ public static ConfigDef config() {
281285
ConfigDef.Width.SHORT,
282286
OUT_FORMAT_CONFIG_DISPLAY
283287
)
288+
.define(
289+
OUT_FORMAT_JSON_NO_ARRAY,
290+
ConfigDef.Type.BOOLEAN,
291+
true,
292+
ConfigDef.Importance.LOW,
293+
OUT_FORMAT_JSON_NO_ARRAY_DOC)
284294
.define(
285295
TOPIC_DELIMITER,
286296
ConfigDef.Type.STRING,
@@ -408,4 +418,8 @@ public String getSubscriptionAutoOffsetReset() {
408418
public String getTimestampType() {
409419
return timestampType;
410420
}
421+
422+
public Boolean getOutFormatJsonNoArray() {
423+
return outFormatJsonNoArray;
424+
}
411425
}

src/main/java/com/taosdata/kafka/connect/source/TableExecutor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public TableExecutor(String tableName,
6262
if (config.getOutFormat().equalsIgnoreCase("line")) {
6363
mapper = new LineMapper(topic, tableName, config.getFetchMaxRows(), processor);
6464
} else {
65-
mapper = new JsonMapper(topic, tableName, config.getFetchMaxRows(), processor);
65+
mapper = new JsonMapper(topic, tableName, config.getFetchMaxRows(), processor, config.getOutFormatJsonNoArray());
6666
}
6767

6868
this.readMethod = config.getReadMethod();

src/main/java/com/taosdata/kafka/connect/source/TableMapper.java

+19
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.collect.Lists;
44
import com.google.common.collect.Maps;
5+
import com.taosdata.jdbc.TaosGlobalConfig;
56
import com.taosdata.jdbc.tmq.ConsumerRecords;
67
import com.taosdata.kafka.connect.db.Processor;
78
import com.taosdata.kafka.connect.enums.OutputFormatEnum;
@@ -12,6 +13,7 @@
1213
import org.slf4j.Logger;
1314
import org.slf4j.LoggerFactory;
1415

16+
import java.io.UnsupportedEncodingException;
1517
import java.sql.*;
1618
import java.util.List;
1719
import java.util.Map;
@@ -169,4 +171,21 @@ private Schema convertType(String type) {
169171
return null;
170172
}
171173
}
174+
175+
protected Object getValue(Object value, String type) throws RuntimeException {
176+
switch (type) {
177+
case "BINARY":
178+
case "VARCHAR":
179+
if (value instanceof byte[]) {
180+
String charset = TaosGlobalConfig.getCharset();
181+
try {
182+
return new String((byte[]) value, charset);
183+
} catch (UnsupportedEncodingException e) {
184+
throw new RuntimeException(e.getMessage());
185+
}
186+
}
187+
default:
188+
return value;
189+
}
190+
}
172191
}

0 commit comments

Comments
 (0)