Skip to content

Commit 85be471

Browse files
CONN-10463 Get rid of Jackson based serialization for SSv2 (#1141)
1 parent f2c1526 commit 85be471

12 files changed

+40
-127
lines changed

src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,7 @@ public Map<String, Object> processSnowflakeRecord(SnowflakeTableRow row, boolean
3131
}
3232
}
3333
if (includeMetadata) {
34-
Map<String, Object> mapForMetadata = getMapForMetadata(row.getMetadata());
35-
if (ssv2Enabled) {
36-
MetadataRecord metadata = metadataFromMap(mapForMetadata);
37-
streamingIngestRow.put(TABLE_COLUMN_METADATA, metadata);
38-
} else {
39-
streamingIngestRow.put(TABLE_COLUMN_METADATA, mapForMetadata);
40-
}
34+
streamingIngestRow.put(TABLE_COLUMN_METADATA, getMapForMetadata(row.getMetadata()));
4135
}
4236
return streamingIngestRow;
4337
}

src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,7 @@ private Map<String, Object> getMapFromJsonNodeForStreamingIngest(
126126

127127
private Object getMetadata(RecordService.SnowflakeTableRow row) throws JsonProcessingException {
128128
if (ssv2Enabled) {
129-
Map<String, Object> mapForMetadata = getMapForMetadata(row.getMetadata());
130-
return metadataFromMap(mapForMetadata);
129+
return getMapForMetadata(row.getMetadata());
131130
} else {
132131
return mapper.writeValueAsString(row.getMetadata());
133132
}

src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
package com.snowflake.kafka.connector.records;
22

33
import static com.snowflake.kafka.connector.records.RecordService.HEADERS;
4-
import static com.snowflake.kafka.connector.records.RecordService.KEY;
5-
import static com.snowflake.kafka.connector.records.RecordService.KEY_SCHEMA_ID;
6-
import static com.snowflake.kafka.connector.records.RecordService.OFFSET;
7-
import static com.snowflake.kafka.connector.records.RecordService.PARTITION;
8-
import static com.snowflake.kafka.connector.records.RecordService.SCHEMA_ID;
9-
import static com.snowflake.kafka.connector.records.RecordService.TOPIC;
104

115
import com.fasterxml.jackson.core.JsonProcessingException;
126
import com.fasterxml.jackson.core.type.TypeReference;
@@ -85,15 +79,6 @@ protected String writeValueAsStringOrNanOrInfinity(JsonNode columnNode)
8579
}
8680
}
8781

88-
protected static Long getNullSafeLong(Map<String, Object> mapForMetadata, String key) {
89-
return mapForMetadata.get(key) == null ? null : ((Number) mapForMetadata.get(key)).longValue();
90-
}
91-
92-
protected static String getNullSafeString(Map<String, Object> mapForMetadata, String key) {
93-
Object object = mapForMetadata.get(key);
94-
return object == null ? null : object.toString();
95-
}
96-
9782
protected Map<String, Object> getMapForMetadata(JsonNode metadataNode)
9883
throws JsonProcessingException {
9984
Map<String, Object> values = mapper.convertValue(metadataNode, OBJECTS_MAP_TYPE_REFERENCE);
@@ -121,18 +106,4 @@ protected Map<String, String> convertHeaders(JsonNode headersNode)
121106
}
122107
return headers;
123108
}
124-
125-
protected MetadataRecord metadataFromMap(Map<String, Object> mapForMetadata) {
126-
return new MetadataRecord(
127-
getNullSafeLong(mapForMetadata, OFFSET),
128-
(String) mapForMetadata.get(TOPIC),
129-
(Integer) mapForMetadata.get(PARTITION),
130-
getNullSafeString(mapForMetadata, KEY),
131-
(Integer) mapForMetadata.get(SCHEMA_ID),
132-
(Integer) mapForMetadata.get(KEY_SCHEMA_ID),
133-
getNullSafeLong(mapForMetadata, "CreateTime"),
134-
getNullSafeLong(mapForMetadata, "LogAppendTime"),
135-
getNullSafeLong(mapForMetadata, "SnowflakeConnectorPushTime"),
136-
(Map<String, String>) mapForMetadata.get(HEADERS));
137-
}
138109
}

src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,6 @@ class IcebergTableStreamingRecordMapperTest extends StreamingRecordMapperTest {
6666
"header3",
6767
"3.5"));
6868

69-
private static final MetadataRecord emptyRecordMetadata =
70-
new MetadataRecord(null, null, null, null, null, null, null, null, null, new HashMap<>());
71-
private static final MetadataRecord nullHeaderRecordMetadata =
72-
new MetadataRecord(
73-
null,
74-
null,
75-
null,
76-
null,
77-
null,
78-
null,
79-
null,
80-
null,
81-
null,
82-
(Map<String, String>) mapWithNullableValuesOf("key", null));
83-
private static final MetadataRecord nestedHeaderRecordMetadata =
84-
new MetadataRecord(
85-
null, null, null, null, null, null, null, null, null, Map.of("key", "{\"key2\":null}"));
86-
8769
@ParameterizedTest(name = "{0}")
8870
@MethodSource("prepareSchematizationData")
8971
void shouldMapRecord_schematizationEnabled(
@@ -100,11 +82,7 @@ void shouldMapRecord_schematizationEnabled(
10082

10183
@ParameterizedTest(name = "{0}")
10284
@MethodSource("prepareMetadataData")
103-
void shouldMapMetadata(
104-
String description,
105-
SnowflakeTableRow row,
106-
Map<String, Object> expected,
107-
MetadataRecord ssv2Expected)
85+
void shouldMapMetadata(String description, SnowflakeTableRow row, Map<String, Object> expected)
10886
throws JsonProcessingException {
10987
// When
11088
IcebergTableStreamingRecordMapper mapper =
@@ -120,7 +98,6 @@ void shouldMapMetadata(
12098
// Then
12199
assertThat(result.get(Utils.TABLE_COLUMN_METADATA)).isEqualTo(expected);
122100
assertThat(resultSchematized.get(Utils.TABLE_COLUMN_METADATA)).isEqualTo(expected);
123-
assertThat(resultSSv2.get(Utils.TABLE_COLUMN_METADATA)).isEqualTo(ssv2Expected);
124101
}
125102

126103
@Test
@@ -262,41 +239,29 @@ private static Stream<Arguments> prepareSchematizationData() throws JsonProcessi
262239

263240
private static Stream<Arguments> prepareMetadataData() throws JsonProcessingException {
264241
return Stream.of(
242+
Arguments.of("Full metadata", buildRowWithDefaultMetadata("{}"), fullMetadataJsonAsMap),
265243
Arguments.of(
266-
"Full metadata",
267-
buildRowWithDefaultMetadata("{}"),
268-
fullMetadataJsonAsMap,
269-
fullRecordMetadata),
270-
Arguments.of(
271-
"Empty metadata",
272-
buildRow("{}", "{}"),
273-
ImmutableMap.of("headers", ImmutableMap.of()),
274-
emptyRecordMetadata),
244+
"Empty metadata", buildRow("{}", "{}"), ImmutableMap.of("headers", ImmutableMap.of())),
275245
Arguments.of(
276246
"Metadata with null headers",
277247
buildRow("{}", "{\"headers\": null}"),
278-
ImmutableMap.of("headers", ImmutableMap.of()),
279-
emptyRecordMetadata),
248+
ImmutableMap.of("headers", ImmutableMap.of())),
280249
Arguments.of(
281250
"Metadata with empty headers",
282251
buildRow("{}", "{\"headers\": {}}"),
283-
ImmutableMap.of("headers", ImmutableMap.of()),
284-
emptyRecordMetadata),
252+
ImmutableMap.of("headers", ImmutableMap.of())),
285253
Arguments.of(
286254
"Metadata with headers with null keys",
287255
buildRow("{}", "{\"headers\": {\"key\": null}}"),
288-
ImmutableMap.of("headers", mapWithNullableValuesOf("key", null)),
289-
nullHeaderRecordMetadata),
256+
ImmutableMap.of("headers", mapWithNullableValuesOf("key", null))),
290257
Arguments.of(
291258
"Metadata with headers with nested null keys",
292259
buildRow("{}", "{\"headers\": {\"key\": {\"key2\": null }}}"),
293-
ImmutableMap.of("headers", ImmutableMap.of("key", "{\"key2\":null}")),
294-
nestedHeaderRecordMetadata),
260+
ImmutableMap.of("headers", ImmutableMap.of("key", "{\"key2\":null}"))),
295261
Arguments.of(
296262
"Metadata with null field value",
297263
buildRow("{}", "{\"offset\": null}"),
298-
mapWithNullableValuesOf("offset", null, "headers", ImmutableMap.of()),
299-
emptyRecordMetadata));
264+
mapWithNullableValuesOf("offset", null, "headers", ImmutableMap.of())));
300265
}
301266

302267
private static Stream<Arguments> prepareNoSchematizationData() throws JsonProcessingException {

src/test/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapperTest.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,26 @@
1717
public class SnowflakeTableStreamingRecordMapperTest extends StreamingRecordMapperTest {
1818
private static final ObjectMapper objectMapper = new ObjectMapper();
1919

20+
// Helper constants for metadata structures
21+
private static final Map<String, Object> EMPTY_SSV2_METADATA = Map.of("headers", Map.of());
22+
23+
private static final Map<String, Object> FULL_SSV2_METADATA =
24+
Map.of(
25+
"offset", 10,
26+
"topic", "topic",
27+
"partition", 0,
28+
"key", "key",
29+
"schema_id", 1,
30+
"key_schema_id", 2,
31+
"CreateTime", 3,
32+
"LogAppendTime", 4,
33+
"SnowflakeConnectorPushTime", 5,
34+
"headers",
35+
Map.of(
36+
"header2", "testheaderstring",
37+
"header3", "3.5",
38+
"objectAsJsonStringHeader", "{\"key1\":\"value1\",\"key2\":\"value2\"}"));
39+
2040
@ParameterizedTest
2141
@MethodSource("ssv1NoSchematizationData")
2242
public void shouldMapDataForSsv1(
@@ -117,38 +137,30 @@ public static Stream<Arguments> ssv2NoSchematizationData() throws JsonProcessing
117137
Arguments.of(
118138
buildRow("{}", "{}"),
119139
Map.of(
120-
TABLE_COLUMN_METADATA,
121-
new MetadataRecord(null, null, null, null, null, null, null, null, null, Map.of()),
122-
TABLE_COLUMN_CONTENT,
123-
new HashMap<>())),
140+
TABLE_COLUMN_METADATA, EMPTY_SSV2_METADATA, TABLE_COLUMN_CONTENT, new HashMap<>())),
124141
Arguments.of(
125142
buildRowWithDefaultMetadata("{}"),
126143
Map.of(
127-
TABLE_COLUMN_METADATA, fullRecordMetadata, TABLE_COLUMN_CONTENT, new HashMap<>())),
144+
TABLE_COLUMN_METADATA, FULL_SSV2_METADATA, TABLE_COLUMN_CONTENT, new HashMap<>())),
128145
Arguments.of(
129146
buildRowWithDefaultMetadata("{\"key\": \"value\"}"),
130147
Map.of(
131148
TABLE_COLUMN_METADATA,
132-
fullRecordMetadata,
149+
FULL_SSV2_METADATA,
133150
TABLE_COLUMN_CONTENT,
134151
Map.of("key", "value"))));
135152
}
136153

137154
public static Stream<Arguments> ssv2SchematizationData() throws JsonProcessingException {
138155
return Stream.of(
156+
Arguments.of(buildRow("{}", "{}"), Map.of(TABLE_COLUMN_METADATA, EMPTY_SSV2_METADATA)),
139157
Arguments.of(
140-
buildRow("{}", "{}"),
141-
Map.of(
142-
TABLE_COLUMN_METADATA,
143-
new MetadataRecord(
144-
null, null, null, null, null, null, null, null, null, Map.of()))),
145-
Arguments.of(
146-
buildRowWithDefaultMetadata("{}"), Map.of(TABLE_COLUMN_METADATA, fullRecordMetadata)),
158+
buildRowWithDefaultMetadata("{}"), Map.of(TABLE_COLUMN_METADATA, FULL_SSV2_METADATA)),
147159
Arguments.of(
148160
buildRowWithDefaultMetadata("{\"key\": \"value\"}"),
149-
Map.of(TABLE_COLUMN_METADATA, fullRecordMetadata, "\"KEY\"", "value")),
161+
Map.of(TABLE_COLUMN_METADATA, FULL_SSV2_METADATA, "\"KEY\"", "value")),
150162
Arguments.of(
151163
buildRowWithDefaultMetadata("{\"key\": []}"),
152-
Map.of(TABLE_COLUMN_METADATA, fullRecordMetadata, "\"KEY\"", List.of())));
164+
Map.of(TABLE_COLUMN_METADATA, FULL_SSV2_METADATA, "\"KEY\"", List.of())));
153165
}
154166
}

src/test/java/com/snowflake/kafka/connector/records/StreamingRecordMapperTest.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.ObjectMapper;
5-
import java.util.Map;
65

76
abstract class StreamingRecordMapperTest {
87

@@ -30,25 +29,6 @@ abstract class StreamingRecordMapperTest {
3029
protected static final String fullMetadataWithoutWhitespace =
3130
fullMetadataJsonExample.replaceAll("\\s+", "");
3231

33-
protected static final MetadataRecord fullRecordMetadata =
34-
new MetadataRecord(
35-
10L,
36-
"topic",
37-
0,
38-
"key",
39-
1,
40-
2,
41-
3L,
42-
4L,
43-
5L,
44-
Map.of(
45-
"header3",
46-
"3.5",
47-
"header2",
48-
"testheaderstring",
49-
"objectAsJsonStringHeader",
50-
"{\"key1\":\"value1\",\"key2\":\"value2\"}"));
51-
5232
protected static RecordService.SnowflakeTableRow buildRowWithDefaultMetadata(String content)
5333
throws JsonProcessingException {
5434
return buildRow(content, fullMetadataJsonExample);

src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
import com.snowflake.kafka.connector.Utils;
99
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
10-
import com.snowflake.kafka.connector.records.MetadataRecord;
1110
import com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord;
11+
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
1212
import com.snowflake.kafka.connector.streaming.iceberg.sql.RecordWithMetadata;
1313
import java.util.Arrays;
1414
import java.util.Collections;

src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import com.snowflake.kafka.connector.Utils;
1010
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
1111
import com.snowflake.kafka.connector.internal.DescribeTableRow;
12-
import com.snowflake.kafka.connector.records.MetadataRecord;
12+
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
1313
import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord;
1414
import com.snowflake.kafka.connector.streaming.iceberg.sql.RecordWithMetadata;
1515
import java.util.Arrays;

src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.fasterxml.jackson.annotation.JsonProperty;
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import com.snowflake.kafka.connector.Utils;
9-
import com.snowflake.kafka.connector.records.MetadataRecord;
109
import java.io.IOException;
1110
import java.sql.ResultSet;
1211
import java.sql.SQLException;

src/main/java/com/snowflake/kafka/connector/records/MetadataRecord.java renamed to src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
package com.snowflake.kafka.connector.records;
1+
package com.snowflake.kafka.connector.streaming.iceberg.sql;
22

33
import com.fasterxml.jackson.annotation.JsonCreator;
44
import com.fasterxml.jackson.annotation.JsonProperty;
55
import java.util.Map;
66
import java.util.Objects;
77

8-
/**
9-
* POJO for writing metadata with SSv2. Passing this class to sdk instead of Map makes pipe
10-
* definition simpler.
11-
*/
128
public class MetadataRecord {
139
private final Long offset;
1410
private final String topic;

0 commit comments

Comments
 (0)