From 88dd84ece216bf5d8673a7d1039472007bdbffb6 Mon Sep 17 00:00:00 2001 From: Jay Patel Date: Fri, 28 Jul 2023 16:19:31 -0700 Subject: [PATCH] Revert "SNOW-811265 Do not close channel on rebalance (#651)" (#678) --- .../streaming/SnowflakeSinkServiceV2.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 09d4824f1..19bb7339a 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -304,15 +304,14 @@ public void closeAll() { /** * This function is called during rebalance. * - *

We don't close the channels. Upon rebalance, (inside {@link - * com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel - * anyways. [Check c'tor of {@link TopicPartitionChannel}] + *

All the channels are closed. The client is still active. Upon rebalance, (inside {@link + * com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel. * *

We will wipe the cache partitionsToChannel so that in {@link * com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we reinstantiate and fetch * offsetToken * - * @param partitions a list of topic partitions to close/shutdown + * @param partitions a list of topic partition */ @Override public void close(Collection partitions) { @@ -324,16 +323,15 @@ public void close(Collection partitions) { partitionsToChannel.get(partitionChannelKey); // Check for null since it's possible that the something goes wrong even before the // channels are created + if (topicPartitionChannel != null) { + topicPartitionChannel.closeChannel(); + } LOGGER.info( - "Removing partitionChannel:{}, partition:{}, topic:{} from map(partitionsToChannel)", + "Closing partitionChannel:{}, partition:{}, topic:{}", topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(), topicPartition.topic(), topicPartition.partition()); }); - LOGGER.info( - "Closing {} partitions and Clearing partitionsToChannel Map of size:{}", - partitions.size(), - partitionsToChannel.size()); partitionsToChannel.clear(); }