Skip to content

Commit c3ad569

Browse files
sfc-gh-zlibinglihub
authored andcommitted
SNOW-121816 added additional flags to control the amount of metadata
changed to use snowflake metadata config object to pass config added more assert to config testting typo changed function call to constructor call construc from another construc
1 parent fab65e1 commit c3ad569

File tree

8 files changed

+266
-19
lines changed

8 files changed

+266
-19
lines changed

src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ public class SnowflakeSinkConnectorConfig
4343
public static final long BUFFER_SIZE_BYTES_MAX = 100000000;
4444
static final String TOPICS_TABLES_MAP = "snowflake.topic2table.map";
4545

46-
//in second
46+
// Time in seconds
4747
public static final long BUFFER_FLUSH_TIME_SEC_MIN = 10;
4848
public static final long BUFFER_FLUSH_TIME_SEC_DEFAULT = 30;
4949
static final String BUFFER_FLUSH_TIME_SEC = "buffer.flush.time";
5050

51-
// snowflake connection and database config
51+
// Snowflake connection and database config
5252
private static final String SNOWFLAKE_LOGIN_INFO = "Snowflake Login Info";
5353
static final String SNOWFLAKE_URL = Utils.SF_URL;
5454
static final String SNOWFLAKE_USER = Utils.SF_USER;
@@ -57,19 +57,29 @@ public class SnowflakeSinkConnectorConfig
5757
static final String SNOWFLAKE_SCHEMA = Utils.SF_SCHEMA;
5858
static final String SNOWFLAKE_PRIVATE_KEY_PASSPHRASE = Utils.PRIVATE_KEY_PASSPHRASE;
5959

60-
//Proxy Info
60+
// Proxy Info
6161
private static final String PROXY_INFO = "Proxy Info";
6262
static final String JVM_PROXY_HOST = "jvm.proxy.host";
6363
static final String JVM_PROXY_PORT = "jvm.proxy.port";
6464

65-
//Schema Registry Info
65+
// Schema Registry Info
6666
private static final String SCHEMA_REGISTRY_INFO = "Schema Registry Info";
6767
static final String SCHEMA_REGISTRY_AUTH_CREDENTIALS_SOURCE =
6868
"value.converter.basic.auth.credentials.source";
6969
static final String SCHEMA_REGISTRY_AUTH_USER_INFO =
7070
"value.converter.basic.auth.user.info";
7171
private static final String REGISTRY_URL = "value.converter.schema.registry.url";
7272

73+
// Snowflake Metadata Flags
74+
private static final String SNOWFLAKE_METADATA_FLAGS = "Snowflake Metadata Flags";
75+
public static final String SNOWFLAKE_METADATA_CREATETIME = "snowflake.metadata.createtime";
76+
public static final String SNOWFLAKE_METADATA_TOPIC = "snowflake.metadata.topic";
77+
public static final String SNOWFLAKE_METADATA_OFFSET_AND_PARTITION =
78+
"snowflake.metadata.offset.and.partition";
79+
public static final String SNOWFLAKE_METADATA_ALL = "snowflake.metadata.all";
80+
public static final String SNOWFLAKE_METADATA_DEFAULT = "true";
81+
82+
7383
static ConfigDef newConfigDef()
7484
{
7585
return new ConfigDef()
@@ -214,6 +224,42 @@ static ConfigDef newConfigDef()
214224
3,
215225
ConfigDef.Width.NONE,
216226
BUFFER_FLUSH_TIME_SEC)
227+
.define(SNOWFLAKE_METADATA_ALL,
228+
Type.STRING,
229+
SNOWFLAKE_METADATA_DEFAULT,
230+
Importance.LOW,
231+
"Flag to control whether there is metadata collected. If set to false, all metadata will be dropped",
232+
SNOWFLAKE_METADATA_FLAGS,
233+
0,
234+
ConfigDef.Width.NONE,
235+
SNOWFLAKE_METADATA_ALL)
236+
.define(SNOWFLAKE_METADATA_CREATETIME,
237+
Type.STRING,
238+
SNOWFLAKE_METADATA_DEFAULT,
239+
Importance.LOW,
240+
"Flag to control whether createtime is collected in snowflake metadata",
241+
SNOWFLAKE_METADATA_FLAGS,
242+
1,
243+
ConfigDef.Width.NONE,
244+
SNOWFLAKE_METADATA_CREATETIME)
245+
.define(SNOWFLAKE_METADATA_TOPIC,
246+
Type.STRING,
247+
SNOWFLAKE_METADATA_DEFAULT,
248+
Importance.LOW,
249+
"Flag to control whether kafka topic name is collected in snowflake metadata",
250+
SNOWFLAKE_METADATA_FLAGS,
251+
2,
252+
ConfigDef.Width.NONE,
253+
SNOWFLAKE_METADATA_TOPIC)
254+
.define(SNOWFLAKE_METADATA_OFFSET_AND_PARTITION,
255+
Type.STRING,
256+
SNOWFLAKE_METADATA_DEFAULT,
257+
Importance.LOW,
258+
"Flag to control whether kafka partition and offset are collected in snowflake metadata",
259+
SNOWFLAKE_METADATA_FLAGS,
260+
3,
261+
ConfigDef.Width.NONE,
262+
SNOWFLAKE_METADATA_OFFSET_AND_PARTITION)
217263
;
218264
}
219265
}

src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
2323
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
2424
import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory;
25+
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
2526
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2627
import org.apache.kafka.common.TopicPartition;
2728
import org.apache.kafka.connect.errors.RetriableException;
@@ -108,6 +109,9 @@ public void start(final Map<String, String> parsedConfig)
108109
//generate topic to table map
109110
this.topic2table = getTopicToTableMap(parsedConfig);
110111

112+
// generate metadataConfig table
113+
SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig);
114+
111115
//enable jvm proxy
112116
Utils.enableJVMProxy(parsedConfig);
113117

@@ -135,6 +139,7 @@ public void start(final Map<String, String> parsedConfig)
135139
.setRecordNumber(bufferCountRecords)
136140
.setFlushTime(bufferFlushTime)
137141
.setTopic2TableMap(topic2table)
142+
.setMetadataConfig(metadataConfig)
138143
.build();
139144
}
140145

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.snowflake.kafka.connector.internal;
22

3+
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
34
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
45
import org.apache.kafka.common.TopicPartition;
56
import org.apache.kafka.connect.sink.SinkRecord;
@@ -81,6 +82,12 @@ public interface SnowflakeSinkService
8182
*/
8283
void setFlushTime(long time);
8384

85+
/**
86+
* set the metadata config to let user control what metadata to be collected into SF db
87+
* @param configMap a String to String Map
88+
*/
89+
void setMetadataConfig(SnowflakeMetadataConfig configMap);
90+
8491
/**
8592
* @return current number of record limitation
8693
*/

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.snowflake.kafka.connector.internal;
22

3+
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
4+
35
import java.security.PrivateKey;
46
import java.util.Map;
57

@@ -62,6 +64,7 @@ public SnowflakeSinkServiceBuilder setFlushTime(long time)
6264
logInfo("flush time is limited to {}", time);
6365
return this;
6466
}
67+
6568
public SnowflakeSinkServiceBuilder setTopic2TableMap(Map<String, String> topic2TableMap)
6669
{
6770
this.service.setTopic2TableMap(topic2TableMap);
@@ -74,6 +77,13 @@ public SnowflakeSinkServiceBuilder setTopic2TableMap(Map<String, String> topic2T
7477
return this;
7578
}
7679

80+
public SnowflakeSinkServiceBuilder setMetadataConfig(SnowflakeMetadataConfig configMap)
81+
{
82+
this.service.setMetadataConfig(configMap);
83+
logInfo("metadata config map is {}", configMap.toString());
84+
return this;
85+
}
86+
7787
public SnowflakeSinkService build()
7888
{
7989
logInfo("{} created", SnowflakeSinkService.class.getName());

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
44
import com.snowflake.kafka.connector.Utils;
55
import com.snowflake.kafka.connector.records.RecordService;
6+
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
67
import com.snowflake.kafka.connector.records.SnowflakeRecordContent;
78
import org.apache.kafka.common.TopicPartition;
89
import org.apache.kafka.connect.sink.SinkRecord;
@@ -206,6 +207,12 @@ public void setTopic2TableMap(Map<String, String> topic2TableMap)
206207
this.topic2TableMap = topic2TableMap;
207208
}
208209

210+
@Override
211+
public void setMetadataConfig(SnowflakeMetadataConfig configMap)
212+
{
213+
this.recordService.setMetadataConfig(configMap);
214+
}
215+
209216
@Override
210217
public long getRecordNumber()
211218
{

src/main/java/com/snowflake/kafka/connector/records/RecordService.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package com.snowflake.kafka.connector.records;
1818

19+
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
1920
import com.snowflake.kafka.connector.internal.Logging;
2021
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
2122
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
@@ -32,21 +33,26 @@
3233
import org.apache.kafka.connect.header.Headers;
3334
import org.apache.kafka.connect.sink.SinkRecord;
3435

36+
import java.util.HashMap;
3537
import java.util.List;
3638
import java.util.Map;
3739

3840
public class RecordService extends Logging
3941
{
4042
private static final ObjectMapper MAPPER = new ObjectMapper();
4143

42-
private static final String OFFSET = "offset";
43-
private static final String TOPIC = "topic";
44-
private static final String PARTITION = "partition";
45-
private static final String KEY = "key";
46-
private static final String CONTENT = "content";
47-
private static final String META = "meta";
48-
private static final String SCHEMA_ID = "schema_id";
49-
private static final String HEADERS = "headers";
44+
// deleted private to use these values in test
45+
static final String OFFSET = "offset";
46+
static final String TOPIC = "topic";
47+
static final String PARTITION = "partition";
48+
static final String KEY = "key";
49+
static final String CONTENT = "content";
50+
static final String META = "meta";
51+
static final String SCHEMA_ID = "schema_id";
52+
static final String HEADERS = "headers";
53+
54+
// This class is designed to work with empty metadata config map
55+
private SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig();
5056

5157
/**
5258
* process records
@@ -68,6 +74,9 @@ public RecordService()
6874
{
6975
}
7076

77+
public void setMetadataConfig(SnowflakeMetadataConfig metadataConfigIn) {
78+
metadataConfig = metadataConfigIn;
79+
}
7180

7281
/**
7382
* process given SinkRecord,
@@ -91,12 +100,19 @@ public String processRecord(SinkRecord record)
91100
SnowflakeRecordContent content = (SnowflakeRecordContent) record.value();
92101

93102
ObjectNode meta = MAPPER.createObjectNode();
94-
meta.put(OFFSET, record.kafkaOffset());
95-
meta.put(TOPIC, record.topic());
96-
meta.put(PARTITION, record.kafkaPartition());
103+
if (metadataConfig.topicFlag)
104+
{
105+
meta.put(TOPIC, record.topic());
106+
}
107+
if (metadataConfig.offsetAndPartitionFlag)
108+
{
109+
meta.put(OFFSET, record.kafkaOffset());
110+
meta.put(PARTITION, record.kafkaPartition());
111+
}
97112

98113
//ignore if no timestamp
99-
if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE)
114+
if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE &&
115+
metadataConfig.createtimeFlag)
100116
{
101117
meta.put(record.timestampType().name, record.timestamp());
102118
}
@@ -125,7 +141,10 @@ public String processRecord(SinkRecord record)
125141
{
126142
ObjectNode data = MAPPER.createObjectNode();
127143
data.set(CONTENT, node);
128-
data.set(META, meta);
144+
if (metadataConfig.allFlag)
145+
{
146+
data.set(META, meta);
147+
}
129148
buffer.append(data.toString());
130149
}
131150
return buffer.toString();
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.snowflake.kafka.connector.records;
2+
3+
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
4+
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
8+
public class SnowflakeMetadataConfig {
9+
final boolean createtimeFlag;
10+
final boolean topicFlag;
11+
final boolean offsetAndPartitionFlag;
12+
final boolean allFlag;
13+
14+
/**
15+
* initialize with default config
16+
*/
17+
public SnowflakeMetadataConfig()
18+
{
19+
this(new HashMap<String, String>());
20+
}
21+
22+
/**
23+
* set flag to false only if metadata config is set to false in config
24+
* @param config a String to String map of configs
25+
*/
26+
public SnowflakeMetadataConfig(Map<String, String> config)
27+
{
28+
// have those local variable to avoid assigning to final values multiple times
29+
// these values are the default values of the configuration
30+
boolean createtime = true;
31+
boolean topic = true;
32+
boolean offsetAndPartition = true;
33+
boolean all = true;
34+
if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_CREATETIME) &&
35+
!config.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_CREATETIME)
36+
.equals(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_DEFAULT))
37+
{
38+
createtime = false;
39+
}
40+
if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_TOPIC) &&
41+
!config.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_TOPIC)
42+
.equals(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_DEFAULT))
43+
{
44+
topic = false;
45+
}
46+
if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_OFFSET_AND_PARTITION) &&
47+
!config.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_OFFSET_AND_PARTITION)
48+
.equals(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_DEFAULT))
49+
{
50+
offsetAndPartition = false;
51+
}
52+
if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_ALL) &&
53+
!config.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_ALL)
54+
.equals(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_DEFAULT))
55+
{
56+
all = false;
57+
}
58+
59+
createtimeFlag = createtime;
60+
topicFlag = topic;
61+
offsetAndPartitionFlag = offsetAndPartition;
62+
allFlag = all;
63+
64+
}
65+
66+
public String toString()
67+
{
68+
return
69+
"{createtimeFlag: " + createtimeFlag + ", " +
70+
"topicFlag: " + topicFlag + ", " +
71+
"offsetAndPartitionFlag: " + offsetAndPartitionFlag + ", " +
72+
"allFlag: " + allFlag + "}";
73+
}
74+
}

0 commit comments

Comments
 (0)