From 442aaf53c063255b630b6ae91baa55838b6624ce Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 2 Jan 2024 15:05:17 +0530 Subject: [PATCH 1/5] added annotation to run against minimum YB builds --- .../yugabytedb/connection/YBVersion.java | 104 ++++++++++++++++++ .../connector/yugabytedb/TestHelper.java | 31 +++++- .../yugabytedb/YugabyteDBCQLTest.java | 7 ++ .../yugabytedb/YugabyteDBDatatypesTest.java | 1 + .../YugabyteDBExplicitCheckpointingTest.java | 2 + .../YugabyteDBPublicationReplicationTest.java | 2 + .../yugabytedb/YugabyteDBSnapshotTest.java | 22 +++- .../annotations/MinimumYBVersion.java | 23 ++++ .../conditions/RunWithMinimumYBVersion.java | 37 +++++++ .../yugabytedb/common/TestBaseClass.java | 9 ++ .../yugabytedb/consistent/MergerTest.java | 2 + .../yugabytedb/consistent/MessageTest.java | 2 + ...ugabyteDBConsistencyWithColocatedTest.java | 2 + .../YugabyteDBStreamConsistencyTest.java | 2 + 14 files changed, 235 insertions(+), 11 deletions(-) create mode 100644 src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java create mode 100644 src/test/java/io/debezium/connector/yugabytedb/annotations/MinimumYBVersion.java create mode 100644 src/test/java/io/debezium/connector/yugabytedb/annotations/conditions/RunWithMinimumYBVersion.java diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java b/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java new file mode 100644 index 00000000..8c6ee9cb --- /dev/null +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java @@ -0,0 +1,104 @@ +package io.debezium.connector.yugabytedb.connection; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Class to denote YugabyteDB version. + * + * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) + */ +public class YBVersion implements Comparable { + public Integer major; + public Integer minor; + public Integer patch = 0; + public Integer revision = 0; + + public YBVersion(int major, int minor, int patch, int revision) { + this.major = major; + this.minor = minor; + this.patch = patch; + this.revision = revision; + } + + public YBVersion(String versionString) { + assert !versionString.isEmpty(); + String[] version = versionString.split("\\."); + + this.major = Integer.parseInt(version[0]); + this.minor = Integer.parseInt(version[1]); + + if (version.length >= 3) { + this.patch = Integer.parseInt(version[2]); + } + + if (version.length == 4) { + this.revision = Integer.parseInt(version[3]); + } + } + + @Override + public int compareTo(YBVersion o) { + if (!this.major.equals(o.major)) { + return this.major.compareTo(o.major); + } else if (!this.minor.equals(o.minor)) { + return this.minor.compareTo(o.minor); + } else if (!this.patch.equals(o.patch)) { + return this.patch.compareTo(o.patch); + } else if (!this.revision.equals(o.revision)) { + return this.revision.compareTo(o.revision); + } + + // Both the versions are equal. + return 0; + } + + @Override + public String toString() { + return String.format("%s.%s.%s.%s", major, minor, patch, revision); + } + + public static YBVersion getCurrentYBVersionEnv() { + String imageName = System.getenv("YB_DOCKER_IMAGE"); + + if (imageName.isEmpty()) { + return new YBVersion("2.18.2.0"); + } + + String regexPattern = "yugabyte:(.*?)-b*"; + Pattern pattern = Pattern.compile(regexPattern); + Matcher matcher = pattern.matcher(imageName); + + if (matcher.find()) { + return new YBVersion(matcher.group(1)); + } + + return new YBVersion("2.18.2.0"); + } + + public static YBVersion getCurrentYBVersion(Connection conn) { + try (Statement st = conn.createStatement()) { + ResultSet rs = st.executeQuery("select version();"); + + if (rs.next()) { + String fullVersionString = rs.getString("version"); + + String regexPattern = "YB-(.*?)-b*"; + Pattern pattern = Pattern.compile(regexPattern); + Matcher matcher = pattern.matcher(fullVersionString); + + if (matcher.find()) { + return new YBVersion(matcher.group(1)); + } + } + + return new YBVersion("2.18.2.0"); + } catch (SQLException sqle) { + throw new RuntimeException("Exception while trying to get current YB version", sqle); + } + } +} \ No newline at end of file diff --git a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java index 35b36a74..ff1f2010 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java +++ b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java @@ -26,6 +26,7 @@ import javax.sql.DataSource; +import io.debezium.connector.yugabytedb.connection.YBVersion; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.awaitility.Awaitility; @@ -91,6 +92,8 @@ public final class TestHelper { private static String DEFAULT_DATABASE_NAME = "yugabyte"; private static String DEFAULT_CASSANDRA_USER = "cassandra"; + public static final YBVersion MIN_YB_VERSION_CONSISTENT_SNAPSHOT = new YBVersion("2.23.0.0"); + /** * Key for schema parameter used to store DECIMAL/NUMERIC columns' precision. */ @@ -761,14 +764,30 @@ private static YugabyteDBValueConverterBuilder getPostgresValueConverterBuilder( } public static Stream streamTypeProviderForStreaming() { - return Stream.of( - Arguments.of(false, false), // Older stream - Arguments.of(true, false)); // NO_EXPORT stream + List result = new ArrayList<>(List.of(Arguments.of(false, false))); // Old stream + + // If current YB version is less than the required version for feature, then do not pass the + // parameter for running the tests against it. + if (YBVersion.getCurrentYBVersionEnv().compareTo(MIN_YB_VERSION_CONSISTENT_SNAPSHOT) >= 0) { + result.add(Arguments.of(true, false)); // NO_EXPORT stream. + } else { + LOGGER.info("Skipping to run the test as minimum YB version required is {}", MIN_YB_VERSION_CONSISTENT_SNAPSHOT); + } + + return result.stream(); } public static Stream streamTypeProviderForSnapshot() { - return Stream.of( - Arguments.of(false, false), // Older stream - Arguments.of(true, true)); // USE_SNAPSHOT stream + List result = new ArrayList<>(List.of(Arguments.of(false, false))); // Old stream + + // If current YB version is less than the required version for feature, then do not pass the + // parameter for running the tests against it. + if (YBVersion.getCurrentYBVersionEnv().compareTo(MIN_YB_VERSION_CONSISTENT_SNAPSHOT) >= 0) { + result.add(Arguments.of(true, true)); // USE_SNAPSHOT stream. + } else { + LOGGER.info("Skipping consistent snapshot arguments as minimum YB version required is {}", MIN_YB_VERSION_CONSISTENT_SNAPSHOT); + } + + return result.stream(); } } diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCQLTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCQLTest.java index e2eff3f4..2513f1f1 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCQLTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCQLTest.java @@ -6,6 +6,7 @@ import com.datastax.oss.driver.api.core.cql.Row; import io.debezium.config.Configuration; import io.debezium.connector.yugabytedb.HelperBeforeImageModes.BeforeImageMode; +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; import org.apache.kafka.connect.source.SourceRecord; @@ -31,6 +32,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; +/** + * Tests to verify the streaming of changes from YCQL tables. + * + * @author Sumukh Phalgaonkar (sumukh.phalgaonkar@yugabyte.com) + */ +@MinimumYBVersion(value = "2.20", reason = "Feature was introduced in the given version only") public class YugabyteDBCQLTest extends YugabyteDBContainerTestBase { CqlSession session; diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java index 5a453eba..44a03540 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java @@ -6,6 +6,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBExplicitCheckpointingTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBExplicitCheckpointingTest.java index 2e2f88ac..d4ba459d 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBExplicitCheckpointingTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBExplicitCheckpointingTest.java @@ -1,6 +1,7 @@ package io.debezium.connector.yugabytedb; import io.debezium.config.Configuration; +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; @@ -38,6 +39,7 @@ * * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) */ +@MinimumYBVersion("2.18.2") public class YugabyteDBExplicitCheckpointingTest extends YugabyteDBContainerTestBase { private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBExplicitCheckpointingTest.class); diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBPublicationReplicationTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBPublicationReplicationTest.java index 4f3601b0..aba54839 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBPublicationReplicationTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBPublicationReplicationTest.java @@ -12,6 +12,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; @@ -37,6 +38,7 @@ * * @author Sumukh Phalgaonkar (sumukh.phalgaonkar@yugabyte.com) */ +@MinimumYBVersion(value = "2.21", reason = "Feature was introduced in the given version only") public class YugabyteDBPublicationReplicationTest extends YugabyteDBContainerTestBase { public static String insertStatementFormatfort2 = "INSERT INTO t2 values (%d);"; diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotTest.java index 54b0350d..c78f86f8 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotTest.java @@ -1,9 +1,11 @@ package io.debezium.connector.yugabytedb; import io.debezium.config.Configuration; +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; +import io.debezium.connector.yugabytedb.connection.YBVersion; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.awaitility.Awaitility; @@ -388,6 +390,7 @@ public void snapshotColocatedNonColocatedThenStream(boolean consistentSnapshot, // This test should not be run with consistent snapshot stream since it verifies // the behaviour on failure after snapshot bootstrap call. For consistent // snapshot streams, the very first getChanges call starts the snapshot consumption + @MinimumYBVersion("2.18.4") @ParameterizedTest @ValueSource(booleans = {true, false}) public void shouldSnapshotWithFailureAfterBootstrapSnapshotCall(boolean colocation) @@ -445,6 +448,7 @@ public void shouldSnapshotWithFailureAfterBootstrapSnapshotCall(boolean colocati assertEquals(recordsCount, recordsForTest2.size()); } + @MinimumYBVersion("2.18.4") @ParameterizedTest @MethodSource("streamTypeProviderForSnapshotWithColocation") public void shouldSnapshotWithFailureAfterSettingInitialCheckpoint(boolean consistentSnapshot, boolean useSnapshot, boolean colocation) @@ -502,6 +506,7 @@ public void shouldSnapshotWithFailureAfterSettingInitialCheckpoint(boolean consi assertEquals(recordsCount, recordsForTest2.size()); } + @MinimumYBVersion("2.18.4") @ParameterizedTest @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForSnapshot") public void shouldNotSnapshotAgainIfSnapshotCompletedOnce(boolean consistentSnapshot, boolean useSnapshot) throws Exception { @@ -563,6 +568,7 @@ public void shouldNotSnapshotAgainIfSnapshotCompletedOnce(boolean consistentSnap } } + @MinimumYBVersion("2.18.4") @ParameterizedTest @MethodSource("streamTypeProviderForSnapshotWithColocation") public void shouldContinueStreamingInNeverAfterSnapshotCompleteInInitialOnly(boolean consistentSnapshot, boolean useSnapshot, boolean colocation) @@ -644,6 +650,7 @@ public void shouldContinueStreamingInNeverAfterSnapshotCompleteInInitialOnly(boo } } + @MinimumYBVersion("2.18.4") @ParameterizedTest @MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForSnapshot") public void snapshotShouldNotBeAffectedByDroppingUnrelatedTables(boolean consistentSnapshot, boolean useSnapshot) throws Exception { @@ -835,6 +842,7 @@ public void snapshotTwoColocatedAndEmptyNonEmptyNonColocatedThenStream(boolean e } } + // TODO: Confirm version from Siddharth @ParameterizedTest @MethodSource("argumentProviderForEmptyNonEmptyNonColocatedTables") public void snapshotTwoColocatedAndEmptyNonEmptyNonColocatedThenStreamWithConsistentSnapshot(boolean emptyNonColocated, boolean colocation) throws Exception { @@ -946,6 +954,7 @@ public void verifyConnectorFailsIfMarkSnapshotDoneFails(boolean consistentSnapsh YugabyteDBSnapshotChangeEventSource.FAIL_WHEN_MARKING_SNAPSHOT_DONE = false; } + // TODO: Confirm version from Siddharth @Test public void snapshotShouldBeCompletedOnParentIfSplitHappenedAfterStreamCreation() throws Exception { /* @@ -1020,11 +1029,14 @@ private void verifyRecordCount(long recordsCount) { } static Stream streamTypeProviderForSnapshotWithColocation() { - return Stream.of( - Arguments.of(false, false, true), // Older stream with colocation - Arguments.of(false, false, false), // Older stream without colocation - Arguments.of(true, true, true), // USE_SNAPSHOT stream with colocation - Arguments.of(true, true, false)); // USE_SNAPSHOT stream without colocation + List result = new ArrayList<>(List.of(Arguments.of(false, false, true), Arguments.of(false, false, false))); // Older stream arguments. + + if (YBVersion.getCurrentYBVersionEnv().compareTo(TestHelper.MIN_YB_VERSION_CONSISTENT_SNAPSHOT) >= 0) { + result.add(Arguments.of(true, true, true)); + result.add(Arguments.of(true, true, false)); + } + + return result.stream(); } static Stream argumentProviderForEmptyNonEmptyNonColocatedTables() { diff --git a/src/test/java/io/debezium/connector/yugabytedb/annotations/MinimumYBVersion.java b/src/test/java/io/debezium/connector/yugabytedb/annotations/MinimumYBVersion.java new file mode 100644 index 00000000..239f1022 --- /dev/null +++ b/src/test/java/io/debezium/connector/yugabytedb/annotations/MinimumYBVersion.java @@ -0,0 +1,23 @@ +package io.debezium.connector.yugabytedb.annotations; + +import io.debezium.connector.yugabytedb.annotations.conditions.RunWithMinimumYBVersion; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to specify what is the minimum YB version a test should run against.

+ * + * Usage:
+ * {@code @MinimumYBVersion("2.21")} + */ +@Target({ElementType.TYPE, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@ExtendWith(RunWithMinimumYBVersion.class) +public @interface MinimumYBVersion { + String value() default "2.18.2.0"; + String reason() default ""; +} diff --git a/src/test/java/io/debezium/connector/yugabytedb/annotations/conditions/RunWithMinimumYBVersion.java b/src/test/java/io/debezium/connector/yugabytedb/annotations/conditions/RunWithMinimumYBVersion.java new file mode 100644 index 00000000..7b6d554a --- /dev/null +++ b/src/test/java/io/debezium/connector/yugabytedb/annotations/conditions/RunWithMinimumYBVersion.java @@ -0,0 +1,37 @@ +package io.debezium.connector.yugabytedb.annotations.conditions; + +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; +import io.debezium.connector.yugabytedb.connection.YBVersion; +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.lang.reflect.AnnotatedElement; +import java.util.Optional; + +/** + * This class is to define the condition with which certain test will be executed if the + * annotation {@code @MinimumYBVersion} is present. + * + * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) + */ +public class RunWithMinimumYBVersion implements ExecutionCondition { + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + final Optional element = context.getElement(); + if (element.isPresent()) { + YBVersion testYBVersion = new YBVersion(element.get().getAnnotation(MinimumYBVersion.class).value()); + + if (YBVersion.getCurrentYBVersionEnv().compareTo(testYBVersion) >= 0) { + return ConditionEvaluationResult.enabled("Test enabled"); + } else { + return ConditionEvaluationResult.disabled("Test disabled as minimum YB version required to run this test is " + testYBVersion); + } + } + + // The possibility of hitting this code block is not even there. If this is encountered, you've + // unleashed the devil now - run as fast as you can. + return ConditionEvaluationResult.disabled("No test element found"); + } + +} diff --git a/src/test/java/io/debezium/connector/yugabytedb/common/TestBaseClass.java b/src/test/java/io/debezium/connector/yugabytedb/common/TestBaseClass.java index 1e1db749..e3674c66 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/common/TestBaseClass.java +++ b/src/test/java/io/debezium/connector/yugabytedb/common/TestBaseClass.java @@ -3,7 +3,9 @@ import io.debezium.config.Configuration; import io.debezium.connector.yugabytedb.TestHelper; import io.debezium.connector.yugabytedb.YugabyteDBConnector; +import io.debezium.connector.yugabytedb.annotations.conditions.RunWithMinimumYBVersion; import io.debezium.connector.yugabytedb.connection.OpId; +import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; import io.debezium.connector.yugabytedb.container.YugabyteCustomContainer; import io.debezium.connector.yugabytedb.rules.YugabyteDBLogTestName; import io.debezium.embedded.AbstractConnectorTest; @@ -18,17 +20,24 @@ import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; import org.eclipse.jetty.util.BlockingArrayQueue; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.YugabyteYSQLContainer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.*; diff --git a/src/test/java/io/debezium/connector/yugabytedb/consistent/MergerTest.java b/src/test/java/io/debezium/connector/yugabytedb/consistent/MergerTest.java index 16256495..3b5f8abe 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/consistent/MergerTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/consistent/MergerTest.java @@ -1,5 +1,6 @@ package io.debezium.connector.yugabytedb.consistent; +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -12,6 +13,7 @@ import static org.junit.jupiter.api.Assertions.*; +@MinimumYBVersion("2.18.2") class MergerTest { private static final Logger LOGGER = LoggerFactory.getLogger(MergerTest.class); private final String DUMMY_TABLE_ID = "dummy_table_id"; diff --git a/src/test/java/io/debezium/connector/yugabytedb/consistent/MessageTest.java b/src/test/java/io/debezium/connector/yugabytedb/consistent/MessageTest.java index f7cea64c..1de70941 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/consistent/MessageTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/consistent/MessageTest.java @@ -1,6 +1,7 @@ package io.debezium.connector.yugabytedb.consistent; import com.google.protobuf.ByteString; +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.yb.cdc.CdcService; @@ -19,6 +20,7 @@ /** * @author Rajat Venkatesh, Vaibhav Kushwaha */ +@MinimumYBVersion("2.18.2") public class MessageTest { private static final Logger LOGGER = LoggerFactory.getLogger(MessageTest.class); private final String DUMMY_TABLE_ID = "dummy_table_id"; diff --git a/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBConsistencyWithColocatedTest.java b/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBConsistencyWithColocatedTest.java index c45435e4..91188d55 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBConsistencyWithColocatedTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBConsistencyWithColocatedTest.java @@ -13,6 +13,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.awaitility.Awaitility; @@ -30,6 +31,7 @@ import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; +@MinimumYBVersion("2.18.2") public class YugabyteDBConsistencyWithColocatedTest extends YugabyteDBContainerTestBase { @BeforeAll public static void beforeClass() throws SQLException { diff --git a/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBStreamConsistencyTest.java b/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBStreamConsistencyTest.java index 456e5cc0..cc324681 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBStreamConsistencyTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/consistent/YugabyteDBStreamConsistencyTest.java @@ -18,6 +18,7 @@ import io.debezium.connector.yugabytedb.TestHelper; import io.debezium.connector.yugabytedb.YugabyteDBConnector; import io.debezium.connector.yugabytedb.YugabyteDBConnectorConfig; +import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion; import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase; import io.debezium.connector.yugabytedb.common.YugabytedTestBase; import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection; @@ -39,6 +40,7 @@ * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) */ +@MinimumYBVersion("2.18.2") public class YugabyteDBStreamConsistencyTest extends YugabyteDBContainerTestBase { private final static Logger LOGGER = LoggerFactory.getLogger(YugabyteDBStreamConsistencyTest.class); From 6057ed10cd70ede28625ea76e360b1635d8dcc4a Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 2 Jan 2024 15:41:21 +0530 Subject: [PATCH 2/5] updated default YB version --- .../debezium/connector/yugabytedb/connection/YBVersion.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java b/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java index 8c6ee9cb..173424ee 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java @@ -66,7 +66,7 @@ public static YBVersion getCurrentYBVersionEnv() { String imageName = System.getenv("YB_DOCKER_IMAGE"); if (imageName.isEmpty()) { - return new YBVersion("2.18.2.0"); + return new YBVersion("2.21.0.0"); } String regexPattern = "yugabyte:(.*?)-b*"; @@ -77,7 +77,7 @@ public static YBVersion getCurrentYBVersionEnv() { return new YBVersion(matcher.group(1)); } - return new YBVersion("2.18.2.0"); + return new YBVersion("2.21.0.0"); } public static YBVersion getCurrentYBVersion(Connection conn) { @@ -96,7 +96,7 @@ public static YBVersion getCurrentYBVersion(Connection conn) { } } - return new YBVersion("2.18.2.0"); + return new YBVersion("2.21.0.0"); } catch (SQLException sqle) { throw new RuntimeException("Exception while trying to get current YB version", sqle); } From 5176990de44122ebb2a443d582437c3fe7701b8c Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 2 Jan 2024 15:42:09 +0530 Subject: [PATCH 3/5] updated default YB version --- .../connector/yugabytedb/connection/YBVersion.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java b/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java index 173424ee..c23620e0 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java @@ -13,6 +13,7 @@ * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) */ public class YBVersion implements Comparable { + public static final String DEFAULT_YB_VERSION = "2.21.0.0"; public Integer major; public Integer minor; public Integer patch = 0; @@ -66,7 +67,7 @@ public static YBVersion getCurrentYBVersionEnv() { String imageName = System.getenv("YB_DOCKER_IMAGE"); if (imageName.isEmpty()) { - return new YBVersion("2.21.0.0"); + return new YBVersion(DEFAULT_YB_VERSION); } String regexPattern = "yugabyte:(.*?)-b*"; @@ -77,7 +78,7 @@ public static YBVersion getCurrentYBVersionEnv() { return new YBVersion(matcher.group(1)); } - return new YBVersion("2.21.0.0"); + return new YBVersion(DEFAULT_YB_VERSION); } public static YBVersion getCurrentYBVersion(Connection conn) { @@ -96,7 +97,7 @@ public static YBVersion getCurrentYBVersion(Connection conn) { } } - return new YBVersion("2.21.0.0"); + return new YBVersion(DEFAULT_YB_VERSION); } catch (SQLException sqle) { throw new RuntimeException("Exception while trying to get current YB version", sqle); } From eaf1a6f08378403b6bdae61c1ea900309702b870 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 2 Jan 2024 16:41:13 +0530 Subject: [PATCH 4/5] updated consistent snapshot YB version --- src/test/java/io/debezium/connector/yugabytedb/TestHelper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java index ff1f2010..53701742 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java +++ b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java @@ -92,7 +92,7 @@ public final class TestHelper { private static String DEFAULT_DATABASE_NAME = "yugabyte"; private static String DEFAULT_CASSANDRA_USER = "cassandra"; - public static final YBVersion MIN_YB_VERSION_CONSISTENT_SNAPSHOT = new YBVersion("2.23.0.0"); + public static final YBVersion MIN_YB_VERSION_CONSISTENT_SNAPSHOT = new YBVersion("2.21.0.0"); /** * Key for schema parameter used to store DECIMAL/NUMERIC columns' precision. From 216d28bcba59b5a67c8ba9f155021dde6bf98e8d Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 2 Jan 2024 17:34:06 +0530 Subject: [PATCH 5/5] addressed todos --- .../debezium/connector/yugabytedb/connection/YBVersion.java | 3 +++ .../java/io/debezium/connector/yugabytedb/TestHelper.java | 2 +- .../debezium/connector/yugabytedb/YugabyteDBSnapshotTest.java | 4 ++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java b/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java index c23620e0..329c263c 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java +++ b/src/main/java/io/debezium/connector/yugabytedb/connection/YBVersion.java @@ -66,6 +66,9 @@ public String toString() { public static YBVersion getCurrentYBVersionEnv() { String imageName = System.getenv("YB_DOCKER_IMAGE"); + // If no environment variable is specified, it will be assumed that the current YugabyteDB + // version is the default version i.e. latest. This needs to be updated every time YugabyteDB's + // latest version changes. if (imageName.isEmpty()) { return new YBVersion(DEFAULT_YB_VERSION); } diff --git a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java index 53701742..779e1ab6 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java +++ b/src/test/java/io/debezium/connector/yugabytedb/TestHelper.java @@ -92,7 +92,7 @@ public final class TestHelper { private static String DEFAULT_DATABASE_NAME = "yugabyte"; private static String DEFAULT_CASSANDRA_USER = "cassandra"; - public static final YBVersion MIN_YB_VERSION_CONSISTENT_SNAPSHOT = new YBVersion("2.21.0.0"); + public static final YBVersion MIN_YB_VERSION_CONSISTENT_SNAPSHOT = new YBVersion("2.20.2.0"); /** * Key for schema parameter used to store DECIMAL/NUMERIC columns' precision. diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotTest.java index c78f86f8..e1675c91 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotTest.java @@ -842,7 +842,7 @@ public void snapshotTwoColocatedAndEmptyNonEmptyNonColocatedThenStream(boolean e } } - // TODO: Confirm version from Siddharth + @MinimumYBVersion(value = "2.20.2", reason = "Test meant to run with consistent snapshot streams") @ParameterizedTest @MethodSource("argumentProviderForEmptyNonEmptyNonColocatedTables") public void snapshotTwoColocatedAndEmptyNonEmptyNonColocatedThenStreamWithConsistentSnapshot(boolean emptyNonColocated, boolean colocation) throws Exception { @@ -954,7 +954,7 @@ public void verifyConnectorFailsIfMarkSnapshotDoneFails(boolean consistentSnapsh YugabyteDBSnapshotChangeEventSource.FAIL_WHEN_MARKING_SNAPSHOT_DONE = false; } - // TODO: Confirm version from Siddharth + @MinimumYBVersion(value = "2.20.2", reason = "Test meant to be run with consistent snapshot stream") @Test public void snapshotShouldBeCompletedOnParentIfSplitHappenedAfterStreamCreation() throws Exception { /*