Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3dabfc0
SNOW-1571459 parse jbdc properties (#909)
sfc-gh-bzabek Aug 13, 2024
b68d226
SNOW-1514185 Assign new channel when no offset is present in Snowflak…
sfc-gh-lshcharbaty Aug 20, 2024
d55cd76
NO-SNOW Ignore major updates on Kafka dependencies (#915)
sfc-gh-mbobowski Aug 21, 2024
56f96bb
SNOW-1623269 Fail sink task on authorization exception from Snowflake…
sfc-gh-mbobowski Aug 26, 2024
0b8af94
SNOW-1649161 Fix NaN value handling in schematization (#920)
sfc-gh-mbobowski Sep 9, 2024
b07c9e4
NO-SNOW Release 2.4.1 (#923)
sfc-gh-xhuang Sep 16, 2024
493c23e
ref: ENG-970/SF Schema Issue + schema auto config
wrehman-skap Oct 23, 2024
1996047
Merge branch 'upstream-v2.4.1' into snowflake-v2.4.1-upgrade
wrehman-skap Nov 19, 2024
fc3154d
Fixing some upgrading issues
wrehman-skap Nov 19, 2024
a29ccc3
Merge remote-tracking branch 'origin/ENG-970-Schema_creation_issue' i…
wrehman-skap Nov 19, 2024
62709b7
upgrade to v2.4.1 - fixing SnowflakeStreamkapSinkIT unit test
wrehman-skap Nov 19, 2024
f0d931f
ref STR-3777/snowflake-dt-creation-in-different-schema-findigs
acristu Feb 12, 2025
1ac84e1
ref STR-3777/snowflake-dt-creation-in-different-schema-findigs
acristu Feb 12, 2025
c6df2cd
ref STR-3777/snowflake-dt-creation-in-different-schema-findigs
acristu Feb 12, 2025
defa561
ref STR-3777 - add support for TABLE_DATA
acristu Feb 14, 2025
033bc8d
ref STR-3777 - add support for TABLE_DATA
acristu Feb 14, 2025
0c36c2d
STR-3748/Null value handling
wrehman-skap Feb 19, 2025
7fb8b50
Merge pull request #28 from streamkap-com/STR-3748-v2.4.1-null_value_…
wrehman-skap Mar 6, 2025
e2c42e8
Snowflake same name table on different schema issue
wrehman-skap Mar 7, 2025
e418d26
Snowflake same name table on different schema issue - Fixing some junit
wrehman-skap Mar 7, 2025
6df3af4
Merge pull request #27 from streamkap-com/STR-3777/snowflake-dt-creat…
wrehman-skap Mar 7, 2025
387799b
Merge pull request #29 from streamkap-com/SFV2.4-Schema_conflict_issue
wrehman-skap Mar 7, 2025
9d1b434
ref: STR-3825/MultiSchema Support
wrehman-skap Mar 13, 2025
fa7be75
ref: STR-3827/Decimal Support
wrehman-skap Mar 17, 2025
ac81c54
Merge pull request #31 from streamkap-com/STR-3827-Use_Decimals_SF2.4
wrehman-skap Mar 17, 2025
f146db8
ref: STR-3825/Multischema support - removing unused import
wrehman-skap Mar 19, 2025
949daa5
Merge pull request #30 from streamkap-com/STR-3825-MultiSchema_suppor…
wrehman-skap Mar 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/dependabot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ updates:
- package-ecosystem: "maven" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
interval: "weekly"
ignore:
- dependency-name: "org.apache.kafka:*"
update-types: ["version-update:semver-major"]
13 changes: 10 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ /*
~ * Copyright (c) 2019 - 2020 Snowflake Computing Inc. All rights reserved.
~ * Copyright (c) 2019 - 2024 Snowflake Computing Inc. All rights reserved.
~ */
-->

Expand All @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>2.4.0</version>
<version>2.4.1</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -363,7 +363,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.2.0</version>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down Expand Up @@ -572,6 +572,13 @@
</dependency>

<!--Kafka JSON converter for SMT unit test-->
<dependency>
<groupId>com.streamkap</groupId>
<artifactId>streamkap-kafka-connect-utilities</artifactId>
<version>0.0.1</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions pom_confluent.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ /*
~ * Copyright (c) 2019 - 2020 Snowflake Computing Inc. All rights reserved.
~ * Copyright (c) 2019 - 2024 Snowflake Computing Inc. All rights reserved.
~ */
-->

Expand All @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>2.4.0</version>
<version>2.4.1</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -510,7 +510,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.2.0</version>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,18 +294,20 @@ public Config validate(Map<String, String> connectorConfigs) {
return result;
}

// Disabling config validation due to ENG-789/Auto create schema feature for Append SF connector
/*try {
testConnection.schemaExists(connectorConfigs.get(Utils.SF_SCHEMA));
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error("Validate Error msg:{}, errorCode:{}", e.getMessage(), e.getCode());
if (e.getCode().equals("2001")) {
Utils.updateConfigErrorMessage(result, Utils.SF_SCHEMA, " schema does not exist");
} else {
throw e;
boolean createSchemaAuto = Boolean.parseBoolean(connectorConfigs.getOrDefault(Utils.CREATE_SCHEMA_AUTO,"false"));
if(!createSchemaAuto) {
try {
testConnection.schemaExists(connectorConfigs.get(Utils.SF_SCHEMA));
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error("Validate Error msg:{}, errorCode:{}", e.getMessage(), e.getCode());
if (e.getCode().equals("2001")) {
Utils.updateConfigErrorMessage(result, Utils.SF_SCHEMA, " schema does not exist");
} else {
throw e;
}
return result;
}
return result;
}*/
}

LOGGER.info("Validated config with no error");
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public class SnowflakeSinkConnectorConfig {
// JDBC trace Info (environment variable)
public static final String SNOWFLAKE_JDBC_TRACE = "JDBC_TRACE";

// JDBC properties map
public static final String SNOWFLAKE_JDBC_MAP = "snowflake.jdbc.map";

// Snowflake Metadata Flags
private static final String SNOWFLAKE_METADATA_FLAGS = "Snowflake Metadata Flags";
public static final String SNOWFLAKE_METADATA_CREATETIME = "snowflake.metadata.createtime";
Expand Down Expand Up @@ -218,6 +221,10 @@ public class SnowflakeSinkConnectorConfig {
+ " format is deprecated and V1 will be used always, disabling this config could have"
+ " ramifications. Please consult Snowflake support before setting this to false.";

public static final String ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS =
"enable.task.fail.on.authorization.errors";
public static final boolean ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT = false;

// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging";
Expand Down Expand Up @@ -721,7 +728,14 @@ public static ConfigDef newConfigDef() {
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY);
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY)
.define(
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS,
Type.BOOLEAN,
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT,
Importance.LOW,
"If set to true the Connector will fail its tasks when authorization error from"
+ " Snowflake occurred");
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public class SnowflakeSinkTask extends SinkTask {

private StreamkapQueryTemplate streamkapQueryTemplate = new StreamkapQueryTemplate();

private final SnowflakeSinkTaskAuthorizationExceptionTracker authorizationExceptionTracker =
new SnowflakeSinkTaskAuthorizationExceptionTracker();

/** default constructor, invoked by kafka connect framework */
public SnowflakeSinkTask() {
DYNAMIC_LOGGER = new KCLogger(this.getClass().getName());
Expand Down Expand Up @@ -159,6 +162,8 @@ public void start(final Map<String, String> parsedConfig) {
// generate topic to table map
this.topic2table = getTopicToTableMap(parsedConfig);

this.authorizationExceptionTracker.updateStateOnTaskStart(parsedConfig);

// generate metadataConfig table
SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig);

Expand Down Expand Up @@ -229,8 +234,11 @@ public void start(final Map<String, String> parsedConfig) {
.setErrorReporter(kafkaRecordErrorReporter)
.setSinkTaskContext(this.context)
.build();
createSchemaIfNotExists(getConnection(),
parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));

if(Boolean.parseBoolean(parsedConfig.getOrDefault(Utils.CREATE_SCHEMA_AUTO,"false"))) {
createSchemaIfNotExists(getConnection(),
parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));
}
this.streamkapQueryTemplate = StreamkapQueryTemplate.buildStreamkapQueryTemplateFromConfig(parsedConfig);

DYNAMIC_LOGGER.info(
Expand Down Expand Up @@ -306,6 +314,8 @@ public void close(final Collection<TopicPartition> partitions) {
*/
@Override
public void put(final Collection<SinkRecord> records) {
this.authorizationExceptionTracker.throwExceptionIfAuthorizationFailed();

final long recordSize = records.size();
if (enableRebalancing && recordSize > 0) {
processRebalancingTest();
Expand Down Expand Up @@ -359,6 +369,7 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
}
});
} catch (Exception e) {
this.authorizationExceptionTracker.reportPrecommitException(e);
this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT;
import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_1005;

import java.util.Map;

/**
* When the user rotates Snowflake key that is stored in an external file the Connector hangs and
* does not mark its tasks as failed. To fix this corner case we need to track the authorization
* exception thrown during preCommit() and stop tasks during put().
*
* <p>Note that exceptions thrown during preCommit() are swallowed by Kafka Connect and will not
* cause task failure.
*/
public class SnowflakeSinkTaskAuthorizationExceptionTracker {

private static final String AUTHORIZATION_EXCEPTION_MESSAGE = "Authorization failed after retry";

private boolean authorizationTaskFailureEnabled;
private boolean authorizationErrorReported;

public SnowflakeSinkTaskAuthorizationExceptionTracker() {
this.authorizationTaskFailureEnabled = true;
this.authorizationErrorReported = false;
}

public void updateStateOnTaskStart(Map<String, String> taskConfig) {
authorizationTaskFailureEnabled =
Boolean.parseBoolean(
taskConfig.getOrDefault(
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS,
Boolean.toString(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT)));
}

/**
* Check if the thrown exception is related to authorization
*
* @param ex - any exception that occurred during preCommit
*/
public void reportPrecommitException(Exception ex) {
if (ex.getMessage().contains(AUTHORIZATION_EXCEPTION_MESSAGE)) {
authorizationErrorReported = true;
}
}

/** Throw exception if authorization has failed before */
public void throwExceptionIfAuthorizationFailed() {
if (authorizationTaskFailureEnabled && authorizationErrorReported) {
throw ERROR_1005.getException();
}
}
}
5 changes: 3 additions & 2 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Snowflake Inc. All rights reserved.
* Copyright (c) 2024 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
Expand Down Expand Up @@ -65,7 +65,7 @@
public class Utils {

// Connector version, change every release
public static final String VERSION = "2.4.0";
public static final String VERSION = "2.4.1";

// connector parameter list
public static final String NAME = "name";
Expand Down Expand Up @@ -136,6 +136,7 @@ public class Utils {
public static final String TOPICS_MAP_CONF = "topics.config.map";
public static final String SCHEMA_CHANGE_CHECK_MS = "schema.changes.check.interval.ms";
public static final String APPLY_DYNAMIC_TABLE_SCRIPT_CONF = "apply.dynamic.table.script";
public static final String CREATE_SCHEMA_AUTO = "create.schema.auto";

private static final KCLogger LOGGER = new KCLogger(Utils.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static void assertNotEmpty(String name, Object value) {
if (value == null || (value instanceof String && value.toString().isEmpty())) {
switch (name.toLowerCase()) {
case "schemaname":
throw SnowflakeErrors.ERROR_0031.getException();
throw SnowflakeErrors.ERROR_S0031.getException();
case "tablename":
throw SnowflakeErrors.ERROR_0005.getException();
case "stagename":
Expand Down Expand Up @@ -308,6 +308,17 @@ protected static Properties generateProxyParametersIfRequired(Map<String, String
return proxyProperties;
}

protected static Properties parseJdbcPropertiesMap(Map<String, String> conf) {
String jdbcConfigMapInput = conf.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_JDBC_MAP);
if (jdbcConfigMapInput == null) {
return new Properties();
}
Map<String, String> jdbcMap = Utils.parseCommaSeparatedKeyValuePairs(jdbcConfigMapInput);
Properties properties = new Properties();
properties.putAll(jdbcMap);
return properties;
}

/**
* convert ingest status to ingested file status
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.snowflake.kafka.connector.internal;

import java.util.Properties;

/** Wrapper class for all snowflake jdbc properties */
public class JdbcProperties {

/** All jdbc properties including proxyProperties */
private final Properties properties;
/** Proxy related properties */
private final Properties proxyProperties;

private JdbcProperties(Properties combinedProperties, Properties proxyProperties) {
this.properties = combinedProperties;
this.proxyProperties = proxyProperties;
}

public Properties getProperties() {
return properties;
}

public String getProperty(String key) {
return properties.getProperty(key);
}

public Object get(String key) {
return properties.get(key);
}

public Properties getProxyProperties() {
return proxyProperties;
}

/**
* Combine all jdbc related properties. Throws error if jdbcPropertiesMap overrides any property
* defined in connectionProperties or proxyProperties.
*
* @param connectionProperties snowflake.database.name, snowflake.schema,name,
* snowflake.private.key etc.
* @param proxyProperties jvm.proxy.xxx
* @param jdbcPropertiesMap snowflake.jdbc.map
*/
static JdbcProperties create(
Properties connectionProperties, Properties proxyProperties, Properties jdbcPropertiesMap) {
InternalUtils.assertNotEmpty("connectionProperties", connectionProperties);
proxyProperties = setEmptyIfNull(proxyProperties);
jdbcPropertiesMap = setEmptyIfNull(jdbcPropertiesMap);

Properties proxyAndConnection = mergeProperties(connectionProperties, proxyProperties);
detectOverrides(proxyAndConnection, jdbcPropertiesMap);

Properties combinedProperties = mergeProperties(proxyAndConnection, jdbcPropertiesMap);

return new JdbcProperties(combinedProperties, proxyProperties);
}

/** Test method */
static JdbcProperties create(Properties connectionProperties) {
return create(connectionProperties, new Properties(), new Properties());
}

private static void detectOverrides(Properties proxyAndConnection, Properties jdbcPropertiesMap) {
jdbcPropertiesMap.forEach(
(k, v) -> {
if (proxyAndConnection.containsKey(k)) {
throw SnowflakeErrors.ERROR_0031.getException("Duplicated property: " + k);
}
});
}

private static Properties mergeProperties(
Properties connectionProperties, Properties proxyProperties) {
Properties mergedProperties = new Properties();
mergedProperties.putAll(connectionProperties);
mergedProperties.putAll(proxyProperties);
return mergedProperties;
}

/** Parsing methods does not return null. However, It's better to be perfectly sure. */
private static Properties setEmptyIfNull(Properties properties) {
if (properties != null) {
return properties;
}
return new Properties();
}
}
Loading
Loading