Skip to content

Commit 3e95af2

Browse files
CONN-10496 Creating pipe for SSv2 (#1115)
1 parent 41171b7 commit 3e95af2

File tree

3 files changed

+61
-0
lines changed

3 files changed

+61
-0
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.snowflake.kafka.connector.internal.streaming.v2;
2+
3+
/** Default implementation does not perform any transformations on pipe level */
4+
public class DefaultPipeDefinitionProvider implements PipeDefinitionProvider {
5+
6+
private static final String CREATE_PIPE_IF_NOT_EXISTS_STATEMENT =
7+
"CREATE PIPE IF NOT EXISTS identifier(?) AS COPY INTO %s FROM TABLE (DATA_SOURCE(TYPE =>"
8+
+ " 'STREAMING')) MATCH_BY_COLUMN_NAME=CASE_SENSITIVE";
9+
10+
private static final String CREATE_OR_REPLACE_PIPE_STATEMENT =
11+
"CREATE OR REPLACE PIPE identifier(?) AS COPY INTO %s FROM TABLE (DATA_SOURCE(TYPE =>"
12+
+ " 'STREAMING')) MATCH_BY_COLUMN_NAME=CASE_SENSITIVE";
13+
14+
@Override
15+
public String getPipeDefinition(String tableName, boolean recreate) {
16+
String sqlTemplate =
17+
recreate ? CREATE_OR_REPLACE_PIPE_STATEMENT : CREATE_PIPE_IF_NOT_EXISTS_STATEMENT;
18+
return String.format(sqlTemplate, tableName);
19+
}
20+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.snowflake.kafka.connector.internal.streaming.v2;
2+
3+
/** Construct CREATE PIPE sql statement */
4+
public interface PipeDefinitionProvider {
5+
String getPipeDefinition(String tableName, boolean recreate);
6+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.snowflake.kafka.connector.internal.streaming.v2;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import java.util.stream.Stream;
6+
import org.junit.jupiter.params.ParameterizedTest;
7+
import org.junit.jupiter.params.provider.Arguments;
8+
import org.junit.jupiter.params.provider.MethodSource;
9+
10+
class DefaultPipeDefinitionProviderTest {
11+
12+
private static final PipeDefinitionProvider provider = new DefaultPipeDefinitionProvider();
13+
14+
@ParameterizedTest
15+
@MethodSource("testParams")
16+
void shouldCreatePipeDefinition(boolean recreate, String expected) {
17+
// when
18+
String result = provider.getPipeDefinition("foo", recreate);
19+
20+
// then
21+
assertThat(result).isEqualTo(expected);
22+
}
23+
24+
public static Stream<Arguments> testParams() {
25+
return Stream.of(
26+
Arguments.of(
27+
true,
28+
"CREATE OR REPLACE PIPE identifier(?) AS COPY INTO foo FROM TABLE (DATA_SOURCE(TYPE =>"
29+
+ " 'STREAMING')) MATCH_BY_COLUMN_NAME=CASE_SENSITIVE"),
30+
Arguments.of(
31+
false,
32+
"CREATE PIPE IF NOT EXISTS identifier(?) AS COPY INTO foo FROM TABLE (DATA_SOURCE(TYPE"
33+
+ " => 'STREAMING')) MATCH_BY_COLUMN_NAME=CASE_SENSITIVE"));
34+
}
35+
}

0 commit comments

Comments
 (0)