11package com .snowflake .kafka .connector .internal .streaming ;
22
3+ import static com .snowflake .kafka .connector .internal .streaming .SnowflakeSinkServiceV2 .partitionChannelKey ;
34import static com .snowflake .kafka .connector .internal .streaming .TopicPartitionChannel .NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE ;
45
56import com .snowflake .kafka .connector .SnowflakeSinkConnectorConfig ;
2526import java .util .HashMap ;
2627import java .util .List ;
2728import java .util .Map ;
28- import net .snowflake .client .jdbc .internal .fasterxml .jackson .databind .ObjectMapper ;
2929import org .apache .kafka .common .TopicPartition ;
3030import org .apache .kafka .connect .data .Schema ;
3131import org .apache .kafka .connect .data .SchemaAndValue ;
@@ -43,9 +43,10 @@ public class SnowflakeSinkServiceV2IT {
4343 private String table = TestUtils .randomTableName ();
4444 private int partition = 0 ;
4545 private int partition2 = 1 ;
46- private String topic = "test" ;
46+
47+ // Topic name should be same as table name. (Only for testing, not necessarily in real deployment)
48+ private String topic = table ;
4749 private TopicPartition topicPartition = new TopicPartition (topic , partition );
48- private static ObjectMapper MAPPER = new ObjectMapper ();
4950
5051 @ After
5152 public void afterEach () {
@@ -125,6 +126,89 @@ public void testChannelCloseIngestion() throws Exception {
125126 service .closeAll ();
126127 }
127128
129+ // Two partitions, insert Record, one partition gets rebalanced (closed).
130+ // just before rebalance, there is data in buffer for other partition,
131+ // Send data again for both partitions.
132+ // Successfully able to ingest all records
133+ @ Test
134+ public void testStreamingIngest_multipleChannelPartitions_closeSubsetOfPartitionsAssigned ()
135+ throws Exception {
136+ Map <String , String > config = TestUtils .getConfForStreaming ();
137+ SnowflakeSinkConnectorConfig .setDefaultValues (config );
138+ conn .createTable (table );
139+ TopicPartition tp1 = new TopicPartition (table , partition );
140+ TopicPartition tp2 = new TopicPartition (table , partition2 );
141+
142+ // opens a channel for partition 0, table and topic
143+ SnowflakeSinkService service =
144+ SnowflakeSinkServiceFactory .builder (conn , IngestionMethodConfig .SNOWPIPE_STREAMING , config )
145+ .setRecordNumber (5 )
146+ .setFlushTime (5 )
147+ .setErrorReporter (new InMemoryKafkaRecordErrorReporter ())
148+ .setSinkTaskContext (new InMemorySinkTaskContext (Collections .singleton (topicPartition )))
149+ .addTask (table , tp1 ) // Internally calls startTask
150+ .addTask (table , tp2 ) // Internally calls startTask
151+ .build ();
152+
153+ final int recordsInPartition1 = 2 ;
154+ final int recordsInPartition2 = 2 ;
155+ List <SinkRecord > recordsPartition1 =
156+ TestUtils .createJsonStringSinkRecords (0 , recordsInPartition1 , table , partition );
157+
158+ List <SinkRecord > recordsPartition2 =
159+ TestUtils .createJsonStringSinkRecords (0 , recordsInPartition2 , table , partition2 );
160+
161+ List <SinkRecord > records = new ArrayList <>(recordsPartition1 );
162+ records .addAll (recordsPartition2 );
163+
164+ service .insert (records );
165+
166+ TestUtils .assertWithRetry (
167+ () -> {
168+ // This is how we will trigger flush. (Mimicking poll API)
169+ service .insert (new ArrayList <>()); // trigger time based flush
170+ return TestUtils .tableSize (table ) == recordsInPartition1 + recordsInPartition2 ;
171+ },
172+ 10 ,
173+ 20 );
174+
175+ TestUtils .assertWithRetry (() -> service .getOffset (tp1 ) == recordsInPartition1 , 20 , 5 );
176+ TestUtils .assertWithRetry (() -> service .getOffset (tp2 ) == recordsInPartition2 , 20 , 5 );
177+ // before you close partition 1, there should be some data in partition 2
178+ List <SinkRecord > newRecordsPartition2 =
179+ TestUtils .createJsonStringSinkRecords (2 , recordsInPartition1 , table , partition2 );
180+ service .insert (newRecordsPartition2 );
181+ // partitions to close = 1 out of 2
182+ List <TopicPartition > partitionsToClose = Collections .singletonList (tp1 );
183+ service .close (partitionsToClose );
184+
185+ // remaining partition should be present in the map
186+ SnowflakeSinkServiceV2 snowflakeSinkServiceV2 = (SnowflakeSinkServiceV2 ) service ;
187+
188+ Assert .assertTrue (
189+ snowflakeSinkServiceV2
190+ .getTopicPartitionChannelFromCacheKey (partitionChannelKey (tp2 .topic (), tp2 .partition ()))
191+ .isPresent ());
192+
193+ List <SinkRecord > newRecordsPartition1 =
194+ TestUtils .createJsonStringSinkRecords (2 , recordsInPartition1 , table , partition );
195+
196+ List <SinkRecord > newRecords2Partition2 =
197+ TestUtils .createJsonStringSinkRecords (4 , recordsInPartition1 , table , partition2 );
198+ List <SinkRecord > newrecords = new ArrayList <>(newRecordsPartition1 );
199+ newrecords .addAll (newRecords2Partition2 );
200+
201+ service .insert (newrecords );
202+ TestUtils .assertWithRetry (
203+ () -> {
204+ // This is how we will trigger flush. (Mimicking poll API)
205+ service .insert (new ArrayList <>()); // trigger time based flush
206+ return TestUtils .tableSize (table ) == recordsInPartition1 * 5 ;
207+ },
208+ 10 ,
209+ 20 );
210+ }
211+
128212 @ Test
129213 public void testRebalanceOpenCloseIngestion () throws Exception {
130214 Map <String , String > config = TestUtils .getConfForStreaming ();
0 commit comments