Skip to content

Commit b9b0c61

Browse files
sfc-gh-japatelkhsoneji
authored andcommitted
SNOW-811265 Not clearing entire map of partitionToChannel, only remove (snowflakedb#687)
1 parent 5f7944a commit b9b0c61

File tree

2 files changed

+93
-4
lines changed

2 files changed

+93
-4
lines changed

src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,13 @@ public void close(Collection<TopicPartition> partitions) {
331331
topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(),
332332
topicPartition.topic(),
333333
topicPartition.partition());
334+
partitionsToChannel.remove(partitionChannelKey);
334335
});
335-
partitionsToChannel.clear();
336+
LOGGER.info(
337+
"Closing {} partitions and remaining partitions which are not closed are:{}, with size:{}",
338+
partitions.size(),
339+
partitionsToChannel.keySet().toString(),
340+
partitionsToChannel.size());
336341
}
337342

338343
@Override

src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.snowflake.kafka.connector.internal.streaming;
22

3+
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;
34
import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
45

56
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
@@ -25,7 +26,6 @@
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
28-
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
2929
import org.apache.kafka.common.TopicPartition;
3030
import org.apache.kafka.connect.data.Schema;
3131
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -43,9 +43,10 @@ public class SnowflakeSinkServiceV2IT {
4343
private String table = TestUtils.randomTableName();
4444
private int partition = 0;
4545
private int partition2 = 1;
46-
private String topic = "test";
46+
47+
// Topic name should be same as table name. (Only for testing, not necessarily in real deployment)
48+
private String topic = table;
4749
private TopicPartition topicPartition = new TopicPartition(topic, partition);
48-
private static ObjectMapper MAPPER = new ObjectMapper();
4950

5051
@After
5152
public void afterEach() {
@@ -125,6 +126,89 @@ public void testChannelCloseIngestion() throws Exception {
125126
service.closeAll();
126127
}
127128

129+
// Two partitions, insert Record, one partition gets rebalanced (closed).
130+
// just before rebalance, there is data in buffer for other partition,
131+
// Send data again for both partitions.
132+
// Successfully able to ingest all records
133+
@Test
134+
public void testStreamingIngest_multipleChannelPartitions_closeSubsetOfPartitionsAssigned()
135+
throws Exception {
136+
Map<String, String> config = TestUtils.getConfForStreaming();
137+
SnowflakeSinkConnectorConfig.setDefaultValues(config);
138+
conn.createTable(table);
139+
TopicPartition tp1 = new TopicPartition(table, partition);
140+
TopicPartition tp2 = new TopicPartition(table, partition2);
141+
142+
// opens a channel for partition 0, table and topic
143+
SnowflakeSinkService service =
144+
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
145+
.setRecordNumber(5)
146+
.setFlushTime(5)
147+
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
148+
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
149+
.addTask(table, tp1) // Internally calls startTask
150+
.addTask(table, tp2) // Internally calls startTask
151+
.build();
152+
153+
final int recordsInPartition1 = 2;
154+
final int recordsInPartition2 = 2;
155+
List<SinkRecord> recordsPartition1 =
156+
TestUtils.createJsonStringSinkRecords(0, recordsInPartition1, table, partition);
157+
158+
List<SinkRecord> recordsPartition2 =
159+
TestUtils.createJsonStringSinkRecords(0, recordsInPartition2, table, partition2);
160+
161+
List<SinkRecord> records = new ArrayList<>(recordsPartition1);
162+
records.addAll(recordsPartition2);
163+
164+
service.insert(records);
165+
166+
TestUtils.assertWithRetry(
167+
() -> {
168+
// This is how we will trigger flush. (Mimicking poll API)
169+
service.insert(new ArrayList<>()); // trigger time based flush
170+
return TestUtils.tableSize(table) == recordsInPartition1 + recordsInPartition2;
171+
},
172+
10,
173+
20);
174+
175+
TestUtils.assertWithRetry(() -> service.getOffset(tp1) == recordsInPartition1, 20, 5);
176+
TestUtils.assertWithRetry(() -> service.getOffset(tp2) == recordsInPartition2, 20, 5);
177+
// before you close partition 1, there should be some data in partition 2
178+
List<SinkRecord> newRecordsPartition2 =
179+
TestUtils.createJsonStringSinkRecords(2, recordsInPartition1, table, partition2);
180+
service.insert(newRecordsPartition2);
181+
// partitions to close = 1 out of 2
182+
List<TopicPartition> partitionsToClose = Collections.singletonList(tp1);
183+
service.close(partitionsToClose);
184+
185+
// remaining partition should be present in the map
186+
SnowflakeSinkServiceV2 snowflakeSinkServiceV2 = (SnowflakeSinkServiceV2) service;
187+
188+
Assert.assertTrue(
189+
snowflakeSinkServiceV2
190+
.getTopicPartitionChannelFromCacheKey(partitionChannelKey(tp2.topic(), tp2.partition()))
191+
.isPresent());
192+
193+
List<SinkRecord> newRecordsPartition1 =
194+
TestUtils.createJsonStringSinkRecords(2, recordsInPartition1, table, partition);
195+
196+
List<SinkRecord> newRecords2Partition2 =
197+
TestUtils.createJsonStringSinkRecords(4, recordsInPartition1, table, partition2);
198+
List<SinkRecord> newrecords = new ArrayList<>(newRecordsPartition1);
199+
newrecords.addAll(newRecords2Partition2);
200+
201+
service.insert(newrecords);
202+
TestUtils.assertWithRetry(
203+
() -> {
204+
// This is how we will trigger flush. (Mimicking poll API)
205+
service.insert(new ArrayList<>()); // trigger time based flush
206+
return TestUtils.tableSize(table) == recordsInPartition1 * 5;
207+
},
208+
10,
209+
20);
210+
}
211+
128212
@Test
129213
public void testRebalanceOpenCloseIngestion() throws Exception {
130214
Map<String, String> config = TestUtils.getConfForStreaming();

0 commit comments

Comments
 (0)