Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ private long streamingApiFallbackSupplier(
"{} Channel recovery initiated for channel: {}",
streamingApiFallbackInvoker,
this.getChannelNameFormatV1());

// close old channel before reopening a new one
if (!channel.isClosed()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this check is redundant as we do this in openChannelForTable() method, right?

channel.close();
}
SnowflakeStreamingIngestChannel newChannel = openChannelForTable(channelName);

LOGGER.warn(
Expand Down Expand Up @@ -499,6 +504,13 @@ private SnowflakeStreamingIngestChannel openChannelForTable(final String channel
final SnowflakeStreamingIngestClient streamingIngestClient =
StreamingClientManager.getClient(
connectorName, taskId, pipeName, connectorConfig, streamingClientProperties);
// always close the channel first before attempting to open/reopen it. This is to prevent the
// "Channel xxx has already been opened on the client. Please close the channel first before
// re-opening"
// adding additional safety
if (channelIsOpen()) {
this.channel.close();
}
final OpenChannelResult result = streamingIngestClient.openChannel(channelName, null);
final ChannelStatus channelStatus = result.getChannelStatus();
if (channelStatus.getStatusCode().equals("SUCCESS")) {
Expand Down Expand Up @@ -540,6 +552,10 @@ private CompletableFuture<Void> closeChannelWrapped() {
}
}

private boolean channelIsOpen() {
return this.channel != null && !this.channel.isClosed();
}

private void onCloseChannelSuccess() {
LOGGER.info("Successfully closed streaming channel: {}", this.getChannelNameFormatV1());
this.telemetryService.reportKafkaPartitionUsage(this.snowflakeTelemetryChannelStatus, true);
Expand Down
Loading
Loading