Skip to content

Commit 01cec9e

Browse files
FLOW-7873 Autocreate table (#1241)
1 parent aab0fee commit 01cec9e

File tree

5 files changed

+4
-17
lines changed

5 files changed

+4
-17
lines changed

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,6 @@ public enum SnowflakeErrors {
229229
"5028",
230230
"Failed to open Snowpipe Streaming v2 channel",
231231
"Failed to open Snowpipe Streaming v2 channel"),
232-
ERROR_5029(
233-
"5029",
234-
"Destination table does not exist",
235-
"Destination table does not exist. Please ensure the destination table exists in Snowflake"
236-
+ " before starting the connector."),
237232
ERROR_5030(
238233
"5030",
239234
"Channel error count threshold exceeded",

src/main/java/com/snowflake/kafka/connector/internal/StandardSnowflakeConnectionService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
8686
InternalUtils.assertNotEmpty("tableName", tableName);
8787
String createTableQuery =
8888
"create table if not exists identifier(?) (record_metadata variant comment 'created by"
89-
+ " automatic table creation from Snowflake Kafka Connector')";
89+
+ " automatic table creation from Snowflake Kafka Connector High Performance')"
90+
+ " enable_schema_evolution = true";
9091

9192
try {
9293
PreparedStatement stmt = conn.prepareStatement(createTableQuery);

src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,7 @@ public void startPartitions(Collection<TopicPartition> partitions) {
173173
for (String topic : topics) {
174174
final String tableName = getTableName(topic, this.topicToTableMap);
175175

176-
final boolean tableExists = this.conn.tableExist(tableName);
177-
if (!tableExists) {
178-
throw SnowflakeErrors.ERROR_5029.getException(
179-
"Table name: " + tableName, this.conn.getTelemetryClient());
180-
}
176+
createTableIfNotExists(tableName);
181177

182178
// not an error, by convention we're looking for the same name as table
183179
final boolean pipeExists = this.conn.pipeExist(tableName);
@@ -465,7 +461,6 @@ public Optional<MetricRegistry> getMetricRegistry(String partitionChannelKey) {
465461
private void createTableIfNotExists(final String tableName) {
466462
if (this.conn.tableExist(tableName)) {
467463
LOGGER.info("Using existing table {}.", tableName);
468-
this.conn.appendMetaColIfNotExist(tableName);
469464
} else {
470465
LOGGER.info("Creating new table {}.", tableName);
471466
this.conn.createTableWithOnlyMetadataColumn(tableName);

src/test/java/com/snowflake/kafka/connector/internal/streaming/CloseTopicPartitionChannelIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ void setUp() throws JsonProcessingException {
2626
topicName = TestUtils.randomTableName();
2727
connectorName = topicName + "_connector";
2828
connectCluster.kafka().createTopic(topicName, PARTITIONS_NUMBER);
29-
TestUtils.getConnectionService().createTableWithMetadataColumn(topicName);
3029
// JVM scoped Ingest client mock
3130
StreamingClientManager.setIngestClientSupplier(fakeClientSupplier);
3231
generateKafkaMessages();

src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public class SnowflakeSinkServiceV2IT extends SnowflakeSinkServiceV2BaseIT {
4848
@BeforeEach
4949
public void setup() {
5050
config = TestUtils.getConnectorConfigurationForStreaming(true);
51-
conn.createTableWithMetadataColumn(table);
5251
pipe = PipeNameProvider.buildPipeName(table);
5352
}
5453

@@ -202,9 +201,7 @@ public void testStreamingIngestion()
202201
public void testStreamingIngest_multipleChannelPartitions_withMetrics()
203202
throws Exception { // set up telemetry service spy
204203
SnowflakeConnectionService connectionService = Mockito.spy(this.conn);
205-
connectionService.createTableWithMetadataColumn(table);
206-
SnowflakeTelemetryService telemetryService =
207-
Mockito.spy((SnowflakeTelemetryService) this.conn.getTelemetryClient());
204+
SnowflakeTelemetryService telemetryService = Mockito.spy(this.conn.getTelemetryClient());
208205
Mockito.when(connectionService.getTelemetryClient()).thenReturn(telemetryService);
209206

210207
// opens a channel for partition 0, table and topic

0 commit comments

Comments
 (0)