From dbf8aa8d24eb59661829e6d1366b751656f50889 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Tue, 6 Dec 2022 20:25:30 +0530 Subject: [PATCH 1/4] add E2ETestcase for tombstone record --- .../kusto/kafka/connect/sink/E2ETest.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java index 8630a087..184b2b44 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java @@ -27,7 +27,7 @@ import java.util.*; import java.util.logging.Logger; -@Disabled("We don't want these tests running as part of the build or CI. Comment this line to test manually.") +//@Disabled("We don't want these tests running as part of the build or CI. Comment this line to test manually.") public class E2ETest { private static final String testPrefix = "tmpKafkaE2ETest"; private static final String appId = System.getProperty("appId"); @@ -62,7 +62,7 @@ public void testE2ECsv() throws URISyntaxException, DataClientException, DataSer String[] messages = new String[] {"first field a,11", "first field b,22"}; List messagesBytes = new ArrayList<>(); messagesBytes.add(messages[0].getBytes()); - messagesBytes.add(messages[1].getBytes()); + messagesBytes.add(null); long flushInterval = 100; if (!executeTest(dataFormat, ingestionMappingKind, mapping, messagesBytes, flushInterval, false)) { @@ -87,6 +87,23 @@ public void testE2EJson() throws URISyntaxException, DataClientException, DataSe } } + @Test + public void testE2EJsonTombstone() throws URISyntaxException, DataClientException, DataServiceException { + String dataFormat = "json"; + IngestionMapping.IngestionMappingKind ingestionMappingKind = IngestionMapping.IngestionMappingKind.JSON; + String mapping = "{\"column\":\"ColA\", \"DataType\":\"string\", \"Properties\":{\"Path\":\"$.ColA\"}}," + + "{\"column\":\"ColB\", \"DataType\":\"int\", \"Properties\":{\"Path\":\"$.ColB\"}},"; + String[] messages = new String[] {"{'ColA': 'first field a', 'ColB': '11'}","{'ColA': 'first field a', 'ColB': '11'}", null}; + List messagesBytes = new ArrayList<>(); + messagesBytes.add(messages[0].getBytes()); + messagesBytes.add(null); + long flushInterval = 100; + + if (!executeTest(dataFormat, ingestionMappingKind, mapping, messagesBytes, flushInterval, false)) { + Assertions.fail("Test failed"); + } + } + @Test public void testE2EAvro() throws URISyntaxException, DataClientException, DataServiceException { String dataFormat = "avro"; From 115c77c35e92995bf0fa7e259ee3a1ff2a70c750 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Tue, 6 Dec 2022 20:27:34 +0530 Subject: [PATCH 2/4] fixed --- .../com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java index 184b2b44..e318ee1b 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java @@ -62,7 +62,7 @@ public void testE2ECsv() throws URISyntaxException, DataClientException, DataSer String[] messages = new String[] {"first field a,11", "first field b,22"}; List messagesBytes = new ArrayList<>(); messagesBytes.add(messages[0].getBytes()); - messagesBytes.add(null); + messagesBytes.add(messages[1].getBytes()); long flushInterval = 100; if (!executeTest(dataFormat, ingestionMappingKind, mapping, messagesBytes, flushInterval, false)) { From 10dbf7f4d6a98c2c3835ab6ad7ae3ad14eb5dd84 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Tue, 6 Dec 2022 20:29:14 +0530 Subject: [PATCH 3/4] uncomment disabled --- .../com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java index e318ee1b..96c9bd28 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java @@ -27,7 +27,7 @@ import java.util.*; import java.util.logging.Logger; -//@Disabled("We don't want these tests running as part of the build or CI. Comment this line to test manually.") +@Disabled("We don't want these tests running as part of the build or CI. Comment this line to test manually.") public class E2ETest { private static final String testPrefix = "tmpKafkaE2ETest"; private static final String appId = System.getProperty("appId"); From cd9dc070e910abcf556aacf07d314a58a01682c2 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Tue, 6 Dec 2022 21:06:30 +0530 Subject: [PATCH 4/4] added test case for jsonWriter --- .../JsonRecordWriterProviderTest.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProviderTest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProviderTest.java index acda6634..5fada32b 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProviderTest.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatWriter/JsonRecordWriterProviderTest.java @@ -11,6 +11,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; // TODO: Significant duplication among these 4 classes public class JsonRecordWriterProviderTest { @@ -39,4 +40,33 @@ public void testJsonData() throws IOException { } file.delete(); } + + @Test + public void testJsonDataNull() throws IOException { + List records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Map map = new HashMap<>(); + map.put("hello", i); + records.add(new SinkRecord("mytopic", 0, null, null, null, map, i)); + } + records.add(new SinkRecord("mytopic", 0, null, null, null, null, 11)); + File file = new File("abc.json"); + JsonRecordWriterProvider jsonWriter = new JsonRecordWriterProvider(); + OutputStream out = new FileOutputStream(file); + RecordWriter rd = jsonWriter.getRecordWriter(file.getPath(), out); + for (SinkRecord record : records) { + rd.write(record); + } + rd.commit(); + BufferedReader br = new BufferedReader(new FileReader(file)); + for(int i = 0; i <= 10; i++){ + String st = br.readLine(); + if(i == 10){ + assertEquals(st,"null"); + }else { + assertEquals(st, String.format("{\"hello\":%s}", i)); + } + } + file.delete(); + } }