Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yugabyte/yugabyte-db#20410] Add annotation MinimumYBVersion for tests #315

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.debezium.connector.yugabytedb.connection;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Class to denote YugabyteDB version.
*
* @author Vaibhav Kushwaha ([email protected])
*/
public class YBVersion implements Comparable<YBVersion> {
public static final String DEFAULT_YB_VERSION = "2.21.0.0";
public Integer major;
public Integer minor;
public Integer patch = 0;
public Integer revision = 0;

public YBVersion(int major, int minor, int patch, int revision) {
this.major = major;
this.minor = minor;
this.patch = patch;
this.revision = revision;
}

public YBVersion(String versionString) {
assert !versionString.isEmpty();
String[] version = versionString.split("\\.");

this.major = Integer.parseInt(version[0]);
this.minor = Integer.parseInt(version[1]);

if (version.length >= 3) {
this.patch = Integer.parseInt(version[2]);
}

if (version.length == 4) {
this.revision = Integer.parseInt(version[3]);
}
}

@Override
public int compareTo(YBVersion o) {
if (!this.major.equals(o.major)) {
return this.major.compareTo(o.major);
} else if (!this.minor.equals(o.minor)) {
return this.minor.compareTo(o.minor);
} else if (!this.patch.equals(o.patch)) {
return this.patch.compareTo(o.patch);
} else if (!this.revision.equals(o.revision)) {
return this.revision.compareTo(o.revision);
}

// Both the versions are equal.
return 0;
}

@Override
public String toString() {
return String.format("%s.%s.%s.%s", major, minor, patch, revision);
}

public static YBVersion getCurrentYBVersionEnv() {
String imageName = System.getenv("YB_DOCKER_IMAGE");

// If no environment variable is specified, it will be assumed that the current YugabyteDB
// version is the default version i.e. latest. This needs to be updated every time YugabyteDB's
// latest version changes.
if (imageName.isEmpty()) {
return new YBVersion(DEFAULT_YB_VERSION);
}

String regexPattern = "yugabyte:(.*?)-b*";
Pattern pattern = Pattern.compile(regexPattern);
Matcher matcher = pattern.matcher(imageName);

if (matcher.find()) {
return new YBVersion(matcher.group(1));
}

return new YBVersion(DEFAULT_YB_VERSION);
}

public static YBVersion getCurrentYBVersion(Connection conn) {
try (Statement st = conn.createStatement()) {
ResultSet rs = st.executeQuery("select version();");

if (rs.next()) {
String fullVersionString = rs.getString("version");

String regexPattern = "YB-(.*?)-b*";
Pattern pattern = Pattern.compile(regexPattern);
Matcher matcher = pattern.matcher(fullVersionString);

if (matcher.find()) {
return new YBVersion(matcher.group(1));
}
}

return new YBVersion(DEFAULT_YB_VERSION);
} catch (SQLException sqle) {
throw new RuntimeException("Exception while trying to get current YB version", sqle);
}
}
}
31 changes: 25 additions & 6 deletions src/test/java/io/debezium/connector/yugabytedb/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import javax.sql.DataSource;

import io.debezium.connector.yugabytedb.connection.YBVersion;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -91,6 +92,8 @@ public final class TestHelper {
private static String DEFAULT_DATABASE_NAME = "yugabyte";
private static String DEFAULT_CASSANDRA_USER = "cassandra";

public static final YBVersion MIN_YB_VERSION_CONSISTENT_SNAPSHOT = new YBVersion("2.20.2.0");

/**
* Key for schema parameter used to store DECIMAL/NUMERIC columns' precision.
*/
Expand Down Expand Up @@ -761,14 +764,30 @@ private static YugabyteDBValueConverterBuilder getPostgresValueConverterBuilder(
}

public static Stream<Arguments> streamTypeProviderForStreaming() {
return Stream.of(
Arguments.of(false, false), // Older stream
Arguments.of(true, false)); // NO_EXPORT stream
List<Arguments> result = new ArrayList<>(List.of(Arguments.of(false, false))); // Old stream

// If current YB version is less than the required version for feature, then do not pass the
// parameter for running the tests against it.
if (YBVersion.getCurrentYBVersionEnv().compareTo(MIN_YB_VERSION_CONSISTENT_SNAPSHOT) >= 0) {
result.add(Arguments.of(true, false)); // NO_EXPORT stream.
} else {
LOGGER.info("Skipping to run the test as minimum YB version required is {}", MIN_YB_VERSION_CONSISTENT_SNAPSHOT);
}

return result.stream();
}

public static Stream<Arguments> streamTypeProviderForSnapshot() {
return Stream.of(
Arguments.of(false, false), // Older stream
Arguments.of(true, true)); // USE_SNAPSHOT stream
List<Arguments> result = new ArrayList<>(List.of(Arguments.of(false, false))); // Old stream

// If current YB version is less than the required version for feature, then do not pass the
// parameter for running the tests against it.
if (YBVersion.getCurrentYBVersionEnv().compareTo(MIN_YB_VERSION_CONSISTENT_SNAPSHOT) >= 0) {
result.add(Arguments.of(true, true)); // USE_SNAPSHOT stream.
} else {
LOGGER.info("Skipping consistent snapshot arguments as minimum YB version required is {}", MIN_YB_VERSION_CONSISTENT_SNAPSHOT);
}

return result.stream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.datastax.oss.driver.api.core.cql.Row;
import io.debezium.config.Configuration;
import io.debezium.connector.yugabytedb.HelperBeforeImageModes.BeforeImageMode;
import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion;
import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;
import io.debezium.connector.yugabytedb.common.YugabytedTestBase;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -31,6 +32,12 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Tests to verify the streaming of changes from YCQL tables.
*
* @author Sumukh Phalgaonkar ([email protected])
*/
@MinimumYBVersion(value = "2.20", reason = "Feature was introduced in the given version only")
public class YugabyteDBCQLTest extends YugabyteDBContainerTestBase {
CqlSession session;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion;
import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;
import io.debezium.connector.yugabytedb.common.YugabytedTestBase;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.debezium.connector.yugabytedb;

import io.debezium.config.Configuration;
import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion;
import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;

import io.debezium.connector.yugabytedb.common.YugabytedTestBase;
Expand Down Expand Up @@ -38,6 +39,7 @@
*
* @author Vaibhav Kushwaha ([email protected])
*/
@MinimumYBVersion("2.18.2")
public class YugabyteDBExplicitCheckpointingTest extends YugabyteDBContainerTestBase {
private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBExplicitCheckpointingTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion;
import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;
import io.debezium.connector.yugabytedb.common.YugabytedTestBase;
import io.debezium.connector.yugabytedb.connection.YugabyteDBConnection;
Expand All @@ -37,6 +38,7 @@
*
* @author Sumukh Phalgaonkar ([email protected])
*/
@MinimumYBVersion(value = "2.21", reason = "Feature was introduced in the given version only")
public class YugabyteDBPublicationReplicationTest extends YugabyteDBContainerTestBase {

public static String insertStatementFormatfort2 = "INSERT INTO t2 values (%d);";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.debezium.connector.yugabytedb;

import io.debezium.config.Configuration;
import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion;
import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;
import io.debezium.connector.yugabytedb.common.YugabytedTestBase;

import io.debezium.connector.yugabytedb.connection.YBVersion;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -388,6 +390,7 @@ public void snapshotColocatedNonColocatedThenStream(boolean consistentSnapshot,
// This test should not be run with consistent snapshot stream since it verifies
// the behaviour on failure after snapshot bootstrap call. For consistent
// snapshot streams, the very first getChanges call starts the snapshot consumption
@MinimumYBVersion("2.18.4")
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldSnapshotWithFailureAfterBootstrapSnapshotCall(boolean colocation)
Expand Down Expand Up @@ -445,6 +448,7 @@ public void shouldSnapshotWithFailureAfterBootstrapSnapshotCall(boolean colocati
assertEquals(recordsCount, recordsForTest2.size());
}

@MinimumYBVersion("2.18.4")
@ParameterizedTest
@MethodSource("streamTypeProviderForSnapshotWithColocation")
public void shouldSnapshotWithFailureAfterSettingInitialCheckpoint(boolean consistentSnapshot, boolean useSnapshot, boolean colocation)
Expand Down Expand Up @@ -502,6 +506,7 @@ public void shouldSnapshotWithFailureAfterSettingInitialCheckpoint(boolean consi
assertEquals(recordsCount, recordsForTest2.size());
}

@MinimumYBVersion("2.18.4")
@ParameterizedTest
@MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForSnapshot")
public void shouldNotSnapshotAgainIfSnapshotCompletedOnce(boolean consistentSnapshot, boolean useSnapshot) throws Exception {
Expand Down Expand Up @@ -563,6 +568,7 @@ public void shouldNotSnapshotAgainIfSnapshotCompletedOnce(boolean consistentSnap
}
}

@MinimumYBVersion("2.18.4")
@ParameterizedTest
@MethodSource("streamTypeProviderForSnapshotWithColocation")
public void shouldContinueStreamingInNeverAfterSnapshotCompleteInInitialOnly(boolean consistentSnapshot, boolean useSnapshot, boolean colocation)
Expand Down Expand Up @@ -644,6 +650,7 @@ public void shouldContinueStreamingInNeverAfterSnapshotCompleteInInitialOnly(boo
}
}

@MinimumYBVersion("2.18.4")
@ParameterizedTest
@MethodSource("io.debezium.connector.yugabytedb.TestHelper#streamTypeProviderForSnapshot")
public void snapshotShouldNotBeAffectedByDroppingUnrelatedTables(boolean consistentSnapshot, boolean useSnapshot) throws Exception {
Expand Down Expand Up @@ -835,6 +842,7 @@ public void snapshotTwoColocatedAndEmptyNonEmptyNonColocatedThenStream(boolean e
}
}

@MinimumYBVersion(value = "2.20.2", reason = "Test meant to run with consistent snapshot streams")
@ParameterizedTest
@MethodSource("argumentProviderForEmptyNonEmptyNonColocatedTables")
public void snapshotTwoColocatedAndEmptyNonEmptyNonColocatedThenStreamWithConsistentSnapshot(boolean emptyNonColocated, boolean colocation) throws Exception {
Expand Down Expand Up @@ -946,6 +954,7 @@ public void verifyConnectorFailsIfMarkSnapshotDoneFails(boolean consistentSnapsh
YugabyteDBSnapshotChangeEventSource.FAIL_WHEN_MARKING_SNAPSHOT_DONE = false;
}

@MinimumYBVersion(value = "2.20.2", reason = "Test meant to be run with consistent snapshot stream")
@Test
public void snapshotShouldBeCompletedOnParentIfSplitHappenedAfterStreamCreation() throws Exception {
/*
Expand Down Expand Up @@ -1020,11 +1029,14 @@ private void verifyRecordCount(long recordsCount) {
}

static Stream<Arguments> streamTypeProviderForSnapshotWithColocation() {
return Stream.of(
Arguments.of(false, false, true), // Older stream with colocation
Arguments.of(false, false, false), // Older stream without colocation
Arguments.of(true, true, true), // USE_SNAPSHOT stream with colocation
Arguments.of(true, true, false)); // USE_SNAPSHOT stream without colocation
List<Arguments> result = new ArrayList<>(List.of(Arguments.of(false, false, true), Arguments.of(false, false, false))); // Older stream arguments.

if (YBVersion.getCurrentYBVersionEnv().compareTo(TestHelper.MIN_YB_VERSION_CONSISTENT_SNAPSHOT) >= 0) {
result.add(Arguments.of(true, true, true));
result.add(Arguments.of(true, true, false));
}

return result.stream();
}

static Stream<Arguments> argumentProviderForEmptyNonEmptyNonColocatedTables() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.debezium.connector.yugabytedb.annotations;

import io.debezium.connector.yugabytedb.annotations.conditions.RunWithMinimumYBVersion;
import org.junit.jupiter.api.extension.ExtendWith;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation to specify what is the minimum YB version a test should run against. <br><br>
*
* <strong>Usage:</strong><br>
* {@code @MinimumYBVersion("2.21")}
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@ExtendWith(RunWithMinimumYBVersion.class)
public @interface MinimumYBVersion {
String value() default "2.18.2.0";
String reason() default "";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.debezium.connector.yugabytedb.annotations.conditions;

import io.debezium.connector.yugabytedb.annotations.MinimumYBVersion;
import io.debezium.connector.yugabytedb.connection.YBVersion;
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
import org.junit.jupiter.api.extension.ExecutionCondition;
import org.junit.jupiter.api.extension.ExtensionContext;

import java.lang.reflect.AnnotatedElement;
import java.util.Optional;

/**
* This class is to define the condition with which certain test will be executed if the
* annotation {@code @MinimumYBVersion} is present.
*
* @author Vaibhav Kushwaha ([email protected])
*/
public class RunWithMinimumYBVersion implements ExecutionCondition {
@Override
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
final Optional<AnnotatedElement> element = context.getElement();
if (element.isPresent()) {
YBVersion testYBVersion = new YBVersion(element.get().getAnnotation(MinimumYBVersion.class).value());

if (YBVersion.getCurrentYBVersionEnv().compareTo(testYBVersion) >= 0) {
return ConditionEvaluationResult.enabled("Test enabled");
} else {
return ConditionEvaluationResult.disabled("Test disabled as minimum YB version required to run this test is " + testYBVersion);
}
}

// The possibility of hitting this code block is not even there. If this is encountered, you've
// unleashed the devil now - run as fast as you can.
return ConditionEvaluationResult.disabled("No test element found");
}

}
Loading
Loading