Skip to content

Conversation

sfc-gh-aminyaylov
Copy link

@sfc-gh-aminyaylov sfc-gh-aminyaylov commented Oct 15, 2025

Overview

SNOW-2193898 Transparent handling of failover with client redirect in streaming ingest

Problem

When Streaming Ingest SDK returns a CLOSED_CLIENT error, Kafka Connector will attempt to reopen channels with the same closed client, leading to repeated failures.

Customers are currently required to manually restart their KC after changing deployment: https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-classic-kafka#failover-limitations

Solution

Implemented automatic client recreation when a CLOSED_CLIENT error is detected. Streaming Ingest SDK will separately be updated to close the client when it detects a change to the primary deployment.

Changes

SSv1

  • DirectTopicPartitionChannel
    • Make streamingIngestClient field non-final to allow updates
    • Update insertRowFallbackSupplier() to call StreamingClientProvider.recreateClient() when client is closed
    public SnowflakeStreamingIngestClient recreateClient(Map<String, String> connectorConfig) {
        // Atomically: close old client, create new client, and update the cache entry
        // Return new client
    }

SSv2

  • SnowpipeStreamingV2PartitionChannel
    • Add CLIENT_CLOSED_ERROR_CODE constant with TODO (SSv2 SDK does not yet return structured error codes)
    • Update insertRowFallbackSupplier() to call StreamingIngestClientV2Provider.recreateClient()
    public void recreateClient(
        Map<String, String> connectorConfig,
        String pipeName,
        StreamingClientProperties streamingClientProperties) {
        // Close old client and create new client for the pipe
    }

Refactor

  • Remove cached field SnowflakeSinkServiceV2.streamingIngestClient
    • Rationale:
      • Service had no way to be notified of client recreation
      • Provider already maintains an optimized cache (Caffeine)
      • Cache lookup is negligible (~10-50ns) vs channel creation cost (milliseconds)
    • Changes:
      • Remove private final SnowflakeStreamingIngestClient streamingIngestClient field
      • All methods now call StreamingClientProvider.getStreamingClientProviderInstance().getClient()
      • Provider is the single source of truth for client lifecycle

Testing

  • Added IT testClientRecreationOn_ClosedClientError() that validates:
    1. Client is closed (simulating CLOSED_CLIENT error)
    2. New client is created on next operation
    3. Data continues to be inserted successfully
    4. Works for both SSv1 and SSv2

Urgency

This review is high priority. Customer requesting KC streaming ingest support for Client Redirect.

Risks

Client recreate is an expensive operation. However:

  1. The current response of reopening channel without fixing the underlying issue is wasteful.
  2. A well-behaved customer following documentation will already be performing this action automatically when changing primary deployments.

Backward/forward compatible

[x] This change does not introduce any breaking API or data format changes.

Pre-review checklist

  • This change should be part of a Behavior Change Release. See go/behavior-change.
  • This change has passed Merge gate tests
  • Snowpipe Changes
  • Snowpipe Streaming Changes
  • This change is TEST-ONLY
  • This change is README/Javadocs only
  • This change is protected by a config parameter <PARAMETER_NAME> eg snowflake.ingestion.method.
    • Yes - Added end to end and Unit Tests.
    • No - Suggest why it is not param protected
  • Is this change protected by parameter <PARAMETER_NAME> on the server side?
    • The parameter/feature is not yet active in production (partial rollout or PrPr, see Changes for Unreleased Features and Fixes).
    • If there is an issue, it can be safely mitigated by turning the parameter off. This is also verified by a test (See go/ppp).

* @param e the exception to check
* @return true if the exception is a CLOSED_CLIENT error
*/
private boolean isClientInvalidError(Throwable e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to handle this in streamingIngestClient.openChannel()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants