diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 303c4e9c..863d3288 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -6,18 +6,32 @@ jobs: build: runs-on: ubuntu-latest strategy: + fail-fast: false matrix: - clickhouse: [ "23.7", "24.3", "latest", "cloud" ] + clickhouse: ["23.7", "24.3", "latest", "cloud"] name: ClickHouse ${{ matrix.clickhouse }} tests steps: + - name: Check for Cloud Credentials + id: check-cloud-credentials + run: | + if [[ "${{ matrix.clickhouse }}" == "cloud" && (-z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}" || -z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}") ]]; then + echo "SKIP_STEP=true" >> $GITHUB_ENV + else + echo "SKIP_STEP=false" >> $GITHUB_ENV + fi + shell: bash + - uses: actions/checkout@v3 + if: env.SKIP_STEP != 'true' - name: Set up JDK 17 + if: env.SKIP_STEP != 'true' uses: actions/setup-java@v3 with: java-version: '17' distribution: 'adopt' architecture: x64 - name: Setup and execute Gradle 'test' task + if: env.SKIP_STEP != 'true' uses: gradle/gradle-build-action@v2 env: CLICKHOUSE_VERSION: ${{ matrix.clickhouse }} diff --git a/CHANGELOG.md b/CHANGELOG.md index b47058e1..0d1ade43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 1.1.0 +* Added support for Tuple type +* Added support for Variant type +* Added support for Nested type +* Refactored Column class so that we use Builder pattern using Lombok +* Refactored recursive Map type parsing to iterative approach using describe_include_subcolumns=1 + ## 1.0.17 * Added support for ClickHouse Enum type #370 * Added extra break down of time measurement for insert operations diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index cb17fea9..ac30ea1a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -123,10 +123,10 @@ A sample REST call you could use to create the connector (POST to `localhost:808 ## Proposing code changes -This is a relatively straightfoward process: +This is a relatively straightforward process: * Ensure there's unit test coverage for the changes (and that prior tests work still, of course). * Update VERSION to the next logical version number -* Add changes to CHANGELOG in a human readable way +* Add changes to CHANGELOG in a human-readable way * Submit a PR ## Releasing a new version diff --git a/VERSION b/VERSION index 3ec7c5c4..795460fc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.0.17 +v1.1.0 diff --git a/build.gradle.kts b/build.gradle.kts index 2d39999a..5d278e11 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -79,6 +79,17 @@ dependencies { implementation("com.google.code.gson:gson:2.10.1") // https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5 implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1") + // https://mvnrepository.com/artifact/com.google.guava/guava + implementation("com.google.guava:guava:33.1.0-jre") + + + // Avoid telescoping constructors problem with the builder pattern using Lombok + compileOnly("org.projectlombok:lombok:1.18.32") + annotationProcessor("org.projectlombok:lombok:1.18.32") + + // To parse JSON response from ClickHouse to parse complex data types correctly + implementation("com.fasterxml.jackson.core:jackson-databind:2.17.0") + // TODO: need to remove ??? implementation("org.slf4j:slf4j-reload4j:2.0.11") diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/Data.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/Data.java index 94990d45..f9b38f73 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/data/Data.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/Data.java @@ -1,7 +1,10 @@ package com.clickhouse.kafka.connect.sink.data; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import java.util.List; + public class Data { private Schema schema; private Object object; @@ -11,6 +14,10 @@ public Data(Schema schema, Object object) { this.object = object; } + public List getFields() { + return schema.fields(); + } + public Schema.Type getFieldType() { return schema.type(); } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/StructToJsonMap.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/StructToJsonMap.java index ac2a1a70..9dc51df0 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/data/StructToJsonMap.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/StructToJsonMap.java @@ -79,9 +79,10 @@ public static Map toJsonMap(Struct struct) { jsonMap.put(fieldName, new Data(field.schema(), toJsonMap(struct.getStruct(fieldName)))); break; case MAP: - Map fieldMap = struct.getMap(fieldName); - if (fieldMap != null && !fieldMap.isEmpty() && fieldMap.values().iterator().next() instanceof Struct) { + Map fieldMap = new HashMap<>(struct.getMap(fieldName)); + if (!fieldMap.isEmpty() && fieldMap.values().iterator().next() instanceof Struct) { // Map values are `Struct` + for (Map.Entry entry : fieldMap.entrySet()) { entry.setValue(toJsonMap((Struct) entry.getValue())); } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 22e88b51..6f7260be 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -9,6 +9,7 @@ import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig; import com.clickhouse.kafka.connect.sink.data.Data; import com.clickhouse.kafka.connect.sink.data.Record; +import com.clickhouse.kafka.connect.sink.data.StructToJsonMap; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.db.mapping.Column; import com.clickhouse.kafka.connect.sink.db.mapping.Table; @@ -32,6 +33,8 @@ import org.apache.kafka.connect.errors.DataException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Streams; +import reactor.util.function.Tuples; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -178,7 +181,7 @@ public void doInsert(List records, QueryIdentifier queryId, ErrorReporte private boolean validateDataSchema(Table table, Record record, boolean onlyFieldsName) { boolean validSchema = true; - for (Column col : table.getColumns()) { + for (Column col : table.getRootColumnsList()) { String colName = col.getName(); Type type = col.getType(); boolean isNullable = col.isNullable(); @@ -207,13 +210,19 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField break;//I notice we just break here, rather than actually validate the type default: if (!colTypeName.equals(dataTypeName)) { - if (!(colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))) { - LOGGER.debug("Data schema name: {}", objSchema.name()); - if (!("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal"))) { - validSchema = false; - LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName)); - } - } + LOGGER.debug("Data schema name: {}", objSchema.name()); + + if (colTypeName.equals("STRING") && dataTypeName.equals("BYTES")) + continue; + + if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT")) + continue; + + if (("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal"))) + continue; + + validSchema = false; + LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName)); } } } @@ -387,16 +396,80 @@ private void doWriteColValue(Column col, ClickHousePipedOutputStream stream, Dat BinaryStreamUtils.writeVarInt(stream, sizeArrObject); arrObject.forEach(v -> { try { - if (col.getSubType().isNullable() && v != null) { + if (col.getArrayType().isNullable() && v != null) { BinaryStreamUtils.writeNonNull(stream); } - doWriteColValue(col.getSubType(), stream, new Data(value.getNestedValueSchema(), v), defaultsSupport); + doWriteColValue(col.getArrayType(), stream, new Data(value.getNestedValueSchema(), v), defaultsSupport); } catch (IOException e) { throw new RuntimeException(e); } }); } break; + case TUPLE: + Map jsonMapValues; + + Object underlyingObject = value.getObject(); + if (underlyingObject.getClass() != Struct.class) { + // Tuples in the root structure are parsed using StructToJsonMap + jsonMapValues = (Map) underlyingObject; + } else { + jsonMapValues = StructToJsonMap.toJsonMap((Struct) underlyingObject); + } + + Streams.zip( + col.getTupleFields().stream(), value.getFields().stream(), Tuples::of + ).forEach((fields) -> { + Column column = fields.getT1(); + Field field = fields.getT2(); + + Data innerData = (Data) jsonMapValues.get(field.name()); + try { + doWriteColValue(column, stream, innerData, defaultsSupport); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + break; + case VARIANT: + // https://github.com/ClickHouse/ClickHouse/pull/58047/files#diff-f56b7f61d5a82c440bb1a078ea8e5dcf2679dc92adbbc28bd89638cbe499363dR368-R384 + // https://github.com/ClickHouse/ClickHouse/blob/658a8e9a9b1658cd12c78365f9829b35d016f1b2/src/Columns/ColumnVariant.h#L10-L56 + mapTmp = (Map) value.getObject(); + Optional variantValueOption = mapTmp.values().stream() + .map(o -> (Data) o) + .filter(data -> data.getObject() != null) + .findFirst(); + + // Null Discriminator (https://github.com/ClickHouse/ClickHouse/blob/658a8e9a9b1658cd12c78365f9829b35d016f1b2/src/Columns/ColumnVariant.h#L65) + int nullDiscriminator = 255; + if (variantValueOption.isEmpty()) { + BinaryStreamUtils.writeUnsignedInt8(stream, nullDiscriminator); + } else { + Data variantValue = variantValueOption.get(); + + String fieldTypeName = variantValue.getFieldType().getName(); + Optional globalDiscriminator = col.getVariantGlobalDiscriminator(fieldTypeName); + if (globalDiscriminator.isEmpty()) { + LOGGER.error("Unable to determine the global discriminator of {} variant! Writing NULL variant instead.", fieldTypeName); + BinaryStreamUtils.writeUnsignedInt8(stream, nullDiscriminator); + return; + } + BinaryStreamUtils.writeUnsignedInt8(stream, globalDiscriminator.get()); + + // Variants support parametrized types, such as Decimal(x, y). Because of that, we can't use + // the doWritePrimitive method. + doWriteColValue( + col.getVariantGlobalDiscriminators().get(globalDiscriminator.get()).getT1(), + stream, + variantValue, + defaultsSupport + ); + } + break; + default: + // If you wonder, how NESTED works in JDBC: + // https://github.com/ClickHouse/clickhouse-java/blob/6cbbd8fe3f86ac26d12a95e0c2b964f3a3755fc9/clickhouse-data/src/main/java/com/clickhouse/data/format/ClickHouseRowBinaryProcessor.java#L159 + LOGGER.error("Cannot serialize unsupported type {}", columnType); } } @@ -590,7 +663,7 @@ protected void doInsertRawBinary(List records, Table table, QueryIdentif // write bytes into the piped stream for (Record record : records) { if (record.getSinkRecord().value() != null) { - for (Column col : table.getColumns()) { + for (Column col : table.getRootColumnsList()) { long beforePushStream = System.currentTimeMillis(); doWriteCol(record, col, stream, supportDefaults); pushStreamTime += System.currentTimeMillis() - beforePushStream; diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseFieldDescriptor.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseFieldDescriptor.java new file mode 100644 index 00000000..44817dbb --- /dev/null +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseFieldDescriptor.java @@ -0,0 +1,46 @@ +package com.clickhouse.kafka.connect.sink.db.helper; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.Builder; +import lombok.Data; +import lombok.extern.jackson.Jacksonized; + +/** + * Java object representation of one DESCRIBE TABLE result row. + *

+ * We use Jackson to instantiate it from JSON. + */ +@Data +@Builder +@Jacksonized +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +public class ClickHouseFieldDescriptor { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private String name; + private String type; + private String defaultType; + private String defaultExpression; + private String comment; + private String codecExpression; + private String ttlExpression; + private boolean isSubcolumn; + + public boolean isAlias() { + return "ALIAS".equals(defaultType); + } + + public boolean isMaterialized() { + return "MATERIALIZED".equals(defaultType); + } + + public boolean hasDefault() { + return "DEFAULT".equals(defaultType); + } + + public static ClickHouseFieldDescriptor fromJsonRow(String json) throws JsonProcessingException { + return OBJECT_MAPPER.readValue(json.replace("\n", "\\n"), ClickHouseFieldDescriptor.class); + } +} diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java index 993414db..e36948f4 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java @@ -11,6 +11,8 @@ import com.clickhouse.kafka.connect.sink.db.mapping.Column; import com.clickhouse.kafka.connect.sink.db.mapping.Table; import com.clickhouse.kafka.connect.util.Utils; +import com.fasterxml.jackson.core.JsonProcessingException; +import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +34,7 @@ public class ClickHouseHelperClient { private final boolean sslEnabled; private final String jdbcConnectionProperties; private final int timeout; + @Getter private ClickHouseNode server = null; private final int retry; private ClickHouseProxyType proxyType = null; @@ -86,7 +89,7 @@ private ClickHouseNode create() { tmpJdbcConnectionProperties ); - LOGGER.info("ClickHouse URL: " + url); + LOGGER.info("ClickHouse URL: {}", url); if (username != null && password != null) { LOGGER.debug(String.format("Adding username [%s]", username)); @@ -137,10 +140,6 @@ public String version() { } } - public ClickHouseNode getServer() { - return this.server; - } - public ClickHouseResponse query(String query) { return query(query, null); } @@ -167,7 +166,6 @@ public ClickHouseResponse query(String query, ClickHouseFormat clickHouseFormat) throw new RuntimeException(ce); } - public List showTables() { List tablesNames = new ArrayList<>(); try (ClickHouseClient client = ClickHouseClient.builder() @@ -200,32 +198,30 @@ public Table describeTable(String tableName) { .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) .build(); ClickHouseResponse response = client.read(server) + .set("describe_include_subcolumns", true) + .format(ClickHouseFormat.JSONEachRow) .query(describeQuery) .executeAndWait()) { + Table table = new Table(tableName); for (ClickHouseRecord r : response.records()) { - boolean hasDefault = false; ClickHouseValue v = r.getValue(0); - String value = v.asString(); - String[] cols = value.split("\t"); - if (cols.length > 2) { - String defaultKind = cols[2]; - if ("ALIAS".equals(defaultKind) || "MATERIALIZED".equals(defaultKind)) { - LOGGER.debug("Skipping column {} as it is an alias or materialized view", cols[0]); - // Only insert into "real" columns - continue; - } else if("DEFAULT".equals(defaultKind)) { - table.setHasDefaults(true); - hasDefault = true; - } + + ClickHouseFieldDescriptor fieldDescriptor = ClickHouseFieldDescriptor.fromJsonRow(v.asString()); + if (fieldDescriptor.isAlias() || fieldDescriptor.isMaterialized()) { + LOGGER.debug("Skipping column {} as it is an alias or materialized view", fieldDescriptor.getName()); + continue; } - String name = cols[0]; - String type = cols[1]; - Column column = Column.extractColumn(name, type, false, hasDefault); + + if (fieldDescriptor.hasDefault()) { + table.hasDefaults(true); + } + + Column column = Column.extractColumn(fieldDescriptor); table.addColumn(column); } return table; - } catch (ClickHouseException e) { + } catch (ClickHouseException | JsonProcessingException e) { LOGGER.error(String.format("Exception when running describeTable %s", describeQuery), e); return null; } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java index 58446437..56473a5f 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java @@ -1,111 +1,78 @@ package com.clickhouse.kafka.connect.sink.db.mapping; -import com.clickhouse.kafka.connect.util.Utils; +import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseFieldDescriptor; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.regex.MatchResult; +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.Stream; +@Builder +@Getter public class Column { private static final Logger LOGGER = LoggerFactory.getLogger(Column.class); + private String name; private Type type; + + @Accessors(fluent = true) + private boolean hasDefault; private boolean isNullable; - private boolean hasDefaultValue; - private Column subType = null; - private Type mapKeyType = Type.NONE; - private Column mapValueType = null; + private boolean isSubColumn; + + private int precision; private int scale; - private Map enumValues = null; - - private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue) { - this.name = name; - this.type = type; - this.isNullable = isNullable; - this.hasDefaultValue = hasDefaultValue; - this.subType = null; - this.precision = 0; - } + private Map enumValues; - private Column(String name, Type type, boolean isNullable, Map enumValues) { - this.name = name; - this.type = type; - this.isNullable = isNullable; - this.subType = null; - this.precision = 0; - this.enumValues = enumValues; - } - private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue, Type mapKeyType, Column mapValueType) { - this.name = name; - this.type = type; - this.isNullable = isNullable; - this.hasDefaultValue = hasDefaultValue; - this.subType = null; - this.mapKeyType = mapKeyType; - this.mapValueType = mapValueType; - this.precision = 0; - } + private Type mapKeyType; + @Setter private Column mapValueType; + @Setter private Column arrayType; + private List tupleFields; + private List> variantTypes; - private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue, Column subType) { - this.name = name; - this.type = type; - this.isNullable = isNullable; - this.hasDefaultValue = hasDefaultValue; - this.subType = subType; - this.precision = 0; - } + @Setter private int mapDepth; + private int arrayDepth; - private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue, int precision, int scale) { - this.name = name; - this.type = type; - this.isNullable = isNullable; - this.hasDefaultValue = hasDefaultValue; - this.precision = precision; - this.scale = scale; - } + @Setter + private Column parent; + /** + * The Variant Global Discriminators are used to mark which variant is serialized on the wire. + * See Columns/ColumnVariant.h + */ + @Getter(lazy = true) + private final List> variantGlobalDiscriminators = variantTypes.stream() + .sorted(Comparator.comparing(Tuple2::getT2)) + .collect(Collectors.toList()); - public String getName() { - return name; - } + /** + * We need to map Kafka Connect type to ClickHouse type. This is tricky and might not work as expected for + * parametrized types, such as Decimal(x, y). But this is a problem only when the Variant holds multiple types of + * the same type, but with different parameters. + */ + public Optional getVariantGlobalDiscriminator(String clickHouseType) { + int index = this.getVariantGlobalDiscriminators().stream() + .map(Tuple2::getT2) + .map(String::toUpperCase) + .collect(Collectors.toList()) + .indexOf(clickHouseType.toUpperCase()); - public Type getType() { - return type; + if (index < 0) return Optional.empty(); + else return Optional.of(index); } - public Column getSubType() { - return subType; - } - public boolean isNullable() { - return isNullable; - } - public boolean hasDefault() { - return hasDefaultValue; - } - - public int getPrecision() { - return precision; - } - public int getScale() { - return scale; - } - public Type getMapKeyType() { - return mapKeyType; - } - public Column getMapValueType() { - return mapValueType; - } private static Type dispatchPrimitive(String valueType) { - Type type = Type.NONE; + Type type = Type.UNKNOWN; switch (valueType) { case "Int8": type = Type.INT8; @@ -172,6 +139,8 @@ private static Type dispatchPrimitive(String valueType) { // Need to understand why DateTime64(3) type = Type.DateTime64; } else if (valueType.startsWith("Decimal")) { + // FIXME: Map keys doesn't support Decimal: + // Type of Map key must be a type, that can be represented by integer or String or FixedString (possibly LowCardinality) or UUID or IPv6, but Decimal(5, 0) given. type = Type.Decimal; } else if (valueType.startsWith("FixedString")) { type = Type.FIXED_STRING; @@ -182,43 +151,117 @@ private static Type dispatchPrimitive(String valueType) { return type; } - public static Column extractColumn(String name, String valueType, boolean isNull, boolean hasDefaultValue) { + public static Column extractColumn(ClickHouseFieldDescriptor fieldDescriptor) { + return Column.extractColumn(fieldDescriptor.getName(), fieldDescriptor.getType(), false, fieldDescriptor.hasDefault(), fieldDescriptor.isSubcolumn()); + } + + public static Column extractColumn(String name, String valueType, boolean isNull, boolean hasDefaultValue, boolean isSubColumn) { + return extractColumn(name, valueType, isNull, hasDefaultValue, isSubColumn, 0); + } + + public static Column extractColumn(String name, String valueType, boolean isNull, boolean hasDefaultValue, boolean isSubColumn, int arrayDepth) { LOGGER.trace("Extracting column {} with type {}", name, valueType); - Type type = dispatchPrimitive(valueType); + ColumnBuilder builder = Column.builder() + .name(name) + .arrayDepth(arrayDepth) + .hasDefault(hasDefaultValue) + .isSubColumn(isSubColumn); + if (valueType.startsWith("Enum")) { + Type type; if (valueType.startsWith("Enum16")) { type = Type.Enum16; - } else { type = Type.Enum8; } - Map enumValues = extractEnumValues(valueType); - return new Column(name, type, false, enumValues); + return builder.type(type) + .enumValues(extractEnumValues(valueType)) + .build(); } else if (valueType.startsWith("Array")) { - type = Type.ARRAY; - Column subType = extractColumn(name, valueType.substring("Array".length() + 1, valueType.length() - 1), false, hasDefaultValue); - return new Column(name, type, false, hasDefaultValue, subType); - } else if(valueType.startsWith("Map")) { - type = Type.MAP; - String value = valueType.substring("Map".length() + 1, valueType.length() - 1); - String[] val = value.split(",", 2); - String mapKey = val[0].trim(); - String mapValue = val[1].trim(); - Column mapValueType = extractColumn(name, mapValue, false, hasDefaultValue); - return new Column(name, type, false, hasDefaultValue, dispatchPrimitive(mapKey), mapValueType); + Column arrayType = extractColumn(name, valueType.substring("Array".length() + 1, valueType.length() - 1), false, isSubColumn, hasDefaultValue, arrayDepth + 1); + + Column array = builder.type(Type.ARRAY) + .arrayType(arrayType) + .build(); + + arrayType.setParent(array); + return array; + } else if (valueType.startsWith("Map")) { + String mapDefinition = valueType.substring("Map".length() + 1, valueType.length() - 1); + String mapKey = mapDefinition.split(",", 2)[0].trim(); + + // We will fill the map value type later (since the describe_include_subcolumns option prints the details later). + return builder.type(Type.MAP) + .mapKeyType(dispatchPrimitive(mapKey)) + .build(); + } else if (valueType.startsWith("Tuple")) { + // We will fill the columns inside the tuple later (since the describe_include_subcolumns option prints the details later). + return builder.type(Type.TUPLE) + .tupleFields(new ArrayList<>()) + .build(); + } else if (valueType.startsWith("Nested")) { + throw new IllegalArgumentException("DESCRIBE TABLE is never supposed to return Nested type. It should always yield its Array fields directly."); + } else if (valueType.startsWith("Variant")) { + String rawVariantTypes = valueType.substring("Variant".length() + 1, valueType.length() - 1); + List> variantTypes = splitUnlessInBrackets(rawVariantTypes, ',').stream().map( + t -> { + String definition = t.trim(); + + // Variants support parametrized types, such as Decimal(x, y), which has to be described + // including their parameters for proper serialization. We use Column just as a container + // for those parameters. Variant types doesn't hold any names, just types. + Tuple3 typePrecisionAndScale = dispatchPrimitiveWithPrecisionAndScale(definition); + + ColumnBuilder variantTypeBuilder = Column.builder().type(typePrecisionAndScale.getT1()); + + if (Pattern.compile(".+\\(.+\\)").asMatchPredicate().test(definition)) { + variantTypeBuilder = variantTypeBuilder + .precision(typePrecisionAndScale.getT2()) + .scale(typePrecisionAndScale.getT3()); + } + + if (definition.equalsIgnoreCase("bool")) { + // So that we can match it from the Kafka Connect type + definition = "Boolean"; + } + + return Tuples.of(variantTypeBuilder.build(), definition); + } + ).collect(Collectors.toList()); + + return builder.type(Type.VARIANT) + .variantTypes(variantTypes) + .build(); } else if (valueType.startsWith("LowCardinality")) { - return extractColumn(name, valueType.substring("LowCardinality".length() + 1, valueType.length() - 1), isNull, hasDefaultValue); + return extractColumn(name, valueType.substring("LowCardinality".length() + 1, valueType.length() - 1), isNull, hasDefaultValue, isSubColumn); } else if (valueType.startsWith("Nullable")) { - return extractColumn(name, valueType.substring("Nullable".length() + 1, valueType.length() - 1), true, hasDefaultValue); - } else if (type == Type.FIXED_STRING) { - int length = Integer.parseInt(valueType.substring("FixedString".length() + 1, valueType.length() - 1).trim()); - return new Column(name, type, isNull, hasDefaultValue, length, 0); + return extractColumn(name, valueType.substring("Nullable".length() + 1, valueType.length() - 1), true, hasDefaultValue, isSubColumn); + } + + // We're dealing with a primitive type here + Tuple3 typePrecisionAndScale = dispatchPrimitiveWithPrecisionAndScale(valueType); + + return builder + .type(typePrecisionAndScale.getT1()) + .isNullable(isNull) + .precision(typePrecisionAndScale.getT2()) + .scale(typePrecisionAndScale.getT3()) + .build(); + } + + private static Tuple3 dispatchPrimitiveWithPrecisionAndScale(String valueType) { + Type type = dispatchPrimitive(valueType); + + int precision = 0; + int scale = 0; + + if (type == Type.FIXED_STRING) { + precision = Integer.parseInt(valueType.substring("FixedString".length() + 1, valueType.length() - 1).trim()); } else if (type == Type.DateTime64) { String[] scaleAndTimezone = valueType.substring("DateTime64".length() + 1, valueType.length() - 1).split(","); - int precision = Integer.parseInt(scaleAndTimezone[0].trim()); - LOGGER.trace("Precision is {}", precision); - return new Column(name, type, isNull, hasDefaultValue, precision, 0); + precision = Integer.parseInt(scaleAndTimezone[0].trim()); + LOGGER.trace("Parsed precision of DateTime64 is {}", precision); } else if (type == Type.Decimal) { final Pattern patter = Pattern.compile("Decimal(?\\d{2,3})?\\s*(\\((?\\d{1,}\\s*)?,*\\s*(?\\d{1,})?\\))?"); Matcher match = patter.matcher(valueType); @@ -232,7 +275,6 @@ public static Column extractColumn(String name, String valueType, boolean isNull Optional arg2 = Optional.ofNullable(match.group("a2")).map(Integer::parseInt); if (size.isPresent()) { - int precision; switch (size.get()) { case 32: precision = 9; break; case 64: precision = 18; break; @@ -241,17 +283,45 @@ public static Column extractColumn(String name, String valueType, boolean isNull default: throw new RuntimeException("Not supported precision"); } - return new Column(name, type, isNull, hasDefaultValue, precision, arg1.orElseThrow()); + scale = arg1.orElseThrow(); } else if (arg2.isPresent()) { - return new Column(name, type, isNull, hasDefaultValue, arg1.orElseThrow(), arg2.orElseThrow()); + precision = arg1.orElseThrow(); + scale = arg2.orElseThrow(); } else if (arg1.isPresent()) { - return new Column(name, type, isNull, hasDefaultValue, arg1.orElseThrow(), 0); + precision = arg1.orElseThrow(); } else { - return new Column(name, type, isNull, hasDefaultValue, 10, 0); + precision = 10; } } - return new Column(name, type, isNull, hasDefaultValue); + return Tuples.of(type, precision, scale); + } + + public static List splitUnlessInBrackets(String input, char delimiter) { + List parts = new ArrayList<>(); + int bracketCounter = 0; // To keep track of whether we are inside brackets + StringBuilder part = new StringBuilder(); + + for (char ch : input.toCharArray()) { + if (ch == '(') { + bracketCounter++; + } else if (ch == ')') { + bracketCounter--; + } + + if (ch == delimiter && bracketCounter == 0) { + // We've reached a comma outside of brackets, add the part to the list and reset the part builder + parts.add(part.toString()); + part = new StringBuilder(); + } else { + part.append(ch); // Add the character to the current part + } + } + + // Add the last part after the final comma, or the full string if no comma was found + parts.add(part.toString()); + + return parts; } private static Map extractEnumValues(String valueType) { @@ -273,7 +343,29 @@ public Integer convertEnumValues(String value) { } public String toString() { - return String.format("Column{name=%s, type=%s, isNullable=%s, hasDefaultValue=%s, subType=%s, mapKeyType=%s, mapValueType=%s, precision=%s, scale=%s}", - name, type, isNullable, hasDefaultValue, subType, mapKeyType, mapValueType, precision, scale); + return String.format( + "%s{name=%s", + type, name + ) + (!isNullable ? "" : + String.format(", isNullable=%s", isNullable) + ) + (!hasDefault ? "" : + String.format(", hasDefault=%s", hasDefault) + ) + (precision == 0 && type != Type.Decimal ? "" : + String.format(", precision=%s", precision) + ) + (scale == 0 && type != Type.Decimal ? "" : + String.format(", scale=%s", scale) + ) + (enumValues == null ? "" : + String.format(", enumValues=%s", enumValues) + ) + (arrayType == null ? "" : + String.format(", arrayType=%s", arrayType) + ) + (mapKeyType == null ? "" : + String.format(", mapKeyType=%s", mapKeyType) + ) + (mapValueType == null ? "" : + String.format(", mapValueType=%s", mapValueType) + ) + (tupleFields == null ? "" : + String.format(", tupleFields=%s", tupleFields.stream().map(Column::toString).collect(Collectors.joining(", ", "[", "]"))) + ) + (variantTypes == null ? "" : + String.format(", variantTypes=%s", variantTypes.stream().map(Tuple2::getT2).collect(Collectors.joining(", ", "[", "]"))) + ) + "}"; } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java index 8e7dcfb8..5672ec57 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java @@ -1,45 +1,168 @@ package com.clickhouse.kafka.connect.sink.db.mapping; import com.clickhouse.kafka.connect.util.Utils; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.HashMap; -import java.util.Map; +import java.util.*; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +@Getter public class Table { - private String name; - private List columnsList = null; - private Map columnsMap = null; + private static final Logger LOGGER = LoggerFactory.getLogger(Table.class); + private static final Predicate SIZE_FIELD_MATCHER = Pattern.compile(".+\\.size[0-9]+$").asMatchPredicate(); + private static final Pattern MULTIPLE_MAP_VALUES_PATTERN = Pattern.compile("(\\.values)(?=((\\.values)+$))"); - private boolean hasDefaults = false; + private final String name; + + private final List rootColumnsList; + private final Map rootColumnsMap; + private final List allColumnsList; + private final Map allColumnsMap; + + @Setter + @Accessors(fluent = true) + private boolean hasDefaults; public Table(String name) { this.name = name; - this.columnsList = new ArrayList<>(); - this.columnsMap = new HashMap<>(); + this.rootColumnsList = new ArrayList<>(); + this.rootColumnsMap = new HashMap<>(); + + this.allColumnsList = new ArrayList<>(); + this.allColumnsMap = new HashMap<>(); } public String getName() { return Utils.escapeTopicName(name); } + private void registerValidColumn(Column column) { + allColumnsMap.put(column.getName(), column); + allColumnsList.add(column); + } + public void addColumn(Column column) { - columnsList.add(column); - columnsMap.put(column.getName(), column); + registerValidColumn(column); + + if (column.isSubColumn()) handleNonRoot(column); + else { + rootColumnsList.add(column); + rootColumnsMap.put(column.getName(), column); + } } - public Column getColumnByName(String name) { - return columnsMap.get(name); + private void handleNonRoot(Column column) { + String parentName = column.getName().substring(0, column.getName().lastIndexOf(".")); + Column parent = allColumnsMap.getOrDefault(parentName, null); + if (parent == null) { + LOGGER.error("Got non-root column, but its parent was not found to be updated. {}", column); + return; + } + + updateParent(parent, column); } - public List getColumns() { return columnsList; } + private void updateParent(Column parent, Column child) { + switch (parent.getType()) { + case VARIANT: + // Variants are handled fully in the Column class because its types are always primitive. Let's ignore them here. + return; + case ARRAY: + if (SIZE_FIELD_MATCHER.test(child.getName())) + return; + + Column parentArrayType = parent.getArrayType(); + switch (parentArrayType.getType()) { + case MAP: + case TUPLE: + updateParent(parent.getArrayType(), child.getArrayType()); + return; + case ARRAY: + do { + child = child.getArrayType(); + parent = parent.getArrayType(); + } while (child.getType() == Type.ARRAY && parent.getType() == Type.ARRAY); + updateParent(parent, child); + return; + case VARIANT: + return; + default: + LOGGER.error("Unhandled complex type '{}' as a child of an array", parentArrayType.getType()); + return; + } + case MAP: + // Keys are parsed fully in the Column class as its type is always primitive. + if (child.getName().endsWith(".keys") || SIZE_FIELD_MATCHER.test(child.getName())) + return; + + if (child.getType() == Type.ARRAY && child.getName().endsWith(".values")) { + int depth = 1; + + Matcher matcher = MULTIPLE_MAP_VALUES_PATTERN.matcher(child.getName()); + while (matcher.find()) depth += 1; + + int remainingDepth = depth; - public boolean hasDefaults() { - return hasDefaults; + // ClickHouse outputs nested maps values as nested array types + while (remainingDepth-- > 0) { + child = child.getArrayType(); + } + + child.setParent(parent); + + parent.setMapDepth(depth); + parent.setMapValueType(child); + registerValidColumn(child); + } + return; + case TUPLE: + Column parentOfParent = parent.getParent(); + + if (parentOfParent != null) { + boolean anyTransitionalParentIsMap = parentOfParent.getType() == Type.MAP; + + if (!anyTransitionalParentIsMap && parentOfParent.getType() == Type.ARRAY) { + Column currentParent = parentOfParent.getParent(); + + while (currentParent != null) { + anyTransitionalParentIsMap = currentParent.getType() == Type.MAP; + + if (anyTransitionalParentIsMap) + break; + + currentParent = currentParent.getParent(); + } + } + + if (anyTransitionalParentIsMap) { + int remainingDepth = getRemainingDepth(parent, parentOfParent); + + while (remainingDepth-- > 0) { + child = child.getArrayType(); + } + } + } + parent.getTupleFields().add(child); + return; + default: + LOGGER.error("Unsupported complex parent type: {}", parent.getType()); + } } - public void setHasDefaults(boolean hasDefaults) { - this.hasDefaults = hasDefaults; + private static int getRemainingDepth(Column parent, Column parentOfParent) { + int compensationDepth = 0; + + // I don't really know why the ClickHouse describe table result wraps the type in an additional + // array only when the parent is a map which is under array. But we have to deal with it. + Matcher matcher = MULTIPLE_MAP_VALUES_PATTERN.matcher(parent.getName()); + while (matcher.find()) compensationDepth += 1; + + return parentOfParent.getMapDepth() + parentOfParent.getArrayDepth() - compensationDepth; } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java index fed6c98a..609179fb 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java @@ -1,7 +1,7 @@ package com.clickhouse.kafka.connect.sink.db.mapping; public enum Type { - NONE, + UNKNOWN, INT8, INT16, INT32, @@ -14,6 +14,8 @@ public enum Type { BOOLEAN, ARRAY, MAP, + TUPLE, + VARIANT, Date, Date32, DateTime, diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/KeeperStateProvider.java b/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/KeeperStateProvider.java index b40653a9..6e2479bf 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/KeeperStateProvider.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/state/provider/KeeperStateProvider.java @@ -83,7 +83,7 @@ public StateRecord getStateRecord(String topic, int partition) { .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query(selectStr) .executeAndWait()) { - LOGGER.debug("return size: " + response.getSummary().getReadRows()); + LOGGER.debug("return size: {}", response.getSummary().getReadRows()); if ( response.getSummary().getReadRows() == 0) { LOGGER.info(String.format("read state record: topic %s partition %s with NONE state", topic, partition)); return new StateRecord(topic, partition, 0, 0, State.NONE); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java index 8d490687..03042299 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java @@ -6,24 +6,29 @@ import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.clickhouse.kafka.connect.sink.helper.SchemaTestData; +import com.clickhouse.kafka.connect.sink.junit.extension.FromVersionConditionExtension; +import com.clickhouse.kafka.connect.sink.junit.extension.SinceClickHouseVersion; import com.clickhouse.kafka.connect.util.Utils; import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.AfterAll; +import org.json.JSONObject; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.testcontainers.clickhouse.ClickHouseContainer; +import org.junit.jupiter.api.extension.ExtendWith; import org.testcontainers.containers.Network; import org.testcontainers.containers.ToxiproxyContainer; import java.io.IOException; import java.util.*; +import java.util.stream.LongStream; import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.assertFalse; +@ExtendWith(FromVersionConditionExtension.class) public class ClickHouseSinkTaskWithSchemaProxyTest extends ClickHouseBase { private static ToxiproxyContainer toxiproxy = null; private static Proxy proxy = null; @@ -316,4 +321,49 @@ public void schemaWithBytesTest() { assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); } + + @Test + @Disabled("Since Variants are not supported in current ClickHouse version that is compatible with toxiproxy.") + @SinceClickHouseVersion("24.1") + public void schemaWithTupleLikeInfluxTest() { + Map props = getTestProperties(); + ClickHouseHelperClient chc = createClient(props); + + String topic = "tuple-like-influx-value-table-test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` (" + + "`off16` Int16," + + "`payload` Tuple(" + + " fields Map(String, Variant(Float64, Int64, String))," + + " tags Map(String, String)" + + ")) Engine = MergeTree ORDER BY off16", + Map.of("allow_experimental_variant_type", 1) + ); + + Collection sr = SchemaTestData.createTupleLikeInfluxValueData(topic, 1); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(sr); + chst.stop(); + + assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); + List rows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic); + + LongStream.range(0, sr.size()).forEachOrdered(n -> { + JSONObject row = rows.get((int) n); + + assertEquals(n, row.getInt("off16")); + + JSONObject payload = row.getJSONObject("payload"); + JSONObject fields = payload.getJSONObject("fields"); + + assertEquals(1 / (float) (n + 1), fields.getFloat("field1")); + assertEquals(n, fields.getBigInteger("field2").longValue()); + assertEquals(String.format("Value: '%d'", n), fields.getString("field3")); + + JSONObject tags = payload.getJSONObject("tags"); + assertEquals("tag1", tags.getString("tag1")); + assertEquals("tag2", tags.getString("tag2")); + }); + } } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java index 2af7ba44..8abeda82 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java @@ -1,25 +1,27 @@ package com.clickhouse.kafka.connect.sink; -import com.clickhouse.client.config.ClickHouseProxyType; -import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.clickhouse.kafka.connect.sink.helper.SchemaTestData; +import com.clickhouse.kafka.connect.sink.junit.extension.SinceClickHouseVersion; +import com.clickhouse.kafka.connect.sink.junit.extension.FromVersionConditionExtension; import com.clickhouse.kafka.connect.util.Utils; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.json.JSONArray; +import org.json.JSONObject; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; +import java.math.BigDecimal; import java.util.*; import static org.junit.jupiter.api.Assertions.*; +@ExtendWith(FromVersionConditionExtension.class) public class ClickHouseSinkTaskWithSchemaTest extends ClickHouseBase { private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkTaskWithSchemaTest.class); @Test @@ -468,4 +470,129 @@ public void supportEnumTest() { chst.stop(); assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); } + + @Test + @SinceClickHouseVersion("24.1") + public void schemaWithTupleOfMapsWithVariantTest() { + Map props = createProps(); + ClickHouseHelperClient chc = createClient(props); + String topic = "tuple-array-map-variant-table-test"; + ClickHouseTestHelpers.dropTable(chc, topic); + + String simpleTuple = "Tuple(" + + " `variant_with_string` Variant(Double, String)," + + " `variant_with_double` Variant(Double, String)," + + " `variant_array` Array(Variant(Double, String))," + + " `variant_map` Map(String, Variant(Double, String))" + + " )"; + + ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` (" + + "`off16` Int16," + + "`tuple` Tuple(" + + " `array` Array(String)," + + " `map` Map(String, String)," + + " `array_array` Array(Array(String))," + + " `map_map` Map(String, Map(String, Int64))," + + " `array_map` Array(Map(String, String))," + + " `map_array` Map(String, Array(String))," + + " `array_array_array` Array(Array(Array(String)))," + + " `map_map_map` Map(String, Map(String, Map(String, String)))," + + " `tuple` " + simpleTuple + "," + + " `array_tuple` Array(" + simpleTuple + ")," + + " `map_tuple` Map(String, " + simpleTuple + ")," + + " `array_array_tuple` Array(Array(" + simpleTuple + "))," + + " `map_map_tuple` Map(String, Map(String, " + simpleTuple + "))," + + " `array_map_tuple` Array(Map(String, " + simpleTuple + "))," + + " `map_array_tuple` Map(String, Array(" + simpleTuple + "))" + + ")) Engine = MergeTree ORDER BY `off16`", + Map.of( + "allow_experimental_variant_type", 1 + )); + Collection sr = SchemaTestData.createTupleType(topic, 1, 5); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(sr); + chst.stop(); + + assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); + + List allRows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic); + for (int i = 0; i < sr.size(); i++) { + JSONObject row = allRows.get(i); + + assertEquals(i, row.getInt("off16")); + JSONObject tuple = row.getJSONObject("tuple"); + JSONObject nestedTuple = tuple.getJSONObject("tuple"); + assertEquals(1 / (double) 3, nestedTuple.getDouble("variant_with_double")); + } + } + + @Test + @SinceClickHouseVersion("24.1") + public void schemaWithNestedTupleMapArrayAndVariant() { + Map props = createProps(); + ClickHouseHelperClient chc = createClient(props); + String topic = "nested-tuple-map-array-and-variant-table-test"; + ClickHouseTestHelpers.dropTable(chc, topic); + + ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` (" + + "`off16` Int16," + + "`nested` Nested(" + + " `string` String," + + " `decimal` Decimal(14, 2)," + + " `tuple` Tuple(" + + " `map` Map(String, String)," + + " `variant` Variant(Boolean, String)" + + "))) Engine = MergeTree ORDER BY `off16`", + Map.of( + "allow_experimental_variant_type", 1 + )); + Collection sr = SchemaTestData.createNestedType(topic, 1); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(sr); + chst.stop(); + + assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); + + List allRows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic); + for (int i = 0; i < allRows.size(); i++) { + JSONObject row = allRows.get(i); + + assertEquals(i, row.getInt("off16")); + + String expectedString = String.format("v%d", i); + + int expectedNestedSize = i % 4; + assertEquals(expectedNestedSize, row.getJSONArray("nested.string").length()); + assertEquals(expectedNestedSize, row.getJSONArray("nested.decimal").length()); + assertEquals(expectedNestedSize, row.getJSONArray("nested.tuple").length()); + + BigDecimal expectedDecimal = new BigDecimal(String.format("%d.%d", i, 2)); + + assertEquals( + expectedDecimal.multiply(new BigDecimal(expectedNestedSize)).doubleValue(), + row.getJSONArray("nested.decimal").toList().stream() + .map(object -> (BigDecimal) object) + .reduce(BigDecimal::add) + .orElseGet(() -> new BigDecimal(0)).doubleValue() + ); + + final int n = i; + row.getJSONArray("nested.tuple").toList().forEach(object -> { + Map tuple = (Map) object; + if (n % 2 == 0) { + assertEquals(n % 8 >= 4, tuple.get("variant")); + } else { + assertEquals(expectedString, tuple.get("variant")); + } + }); + + row.getJSONArray("nested.string").toList().forEach(object -> + assertEquals(expectedString, object) + ); + } + } } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java index e85d6337..16642687 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java @@ -2,44 +2,52 @@ import org.junit.jupiter.api.Test; +import java.util.List; + +import static com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers.col; +import static com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers.newDescriptor; import static org.junit.jupiter.api.Assertions.*; class ColumnTest { @Test public void extractNullableColumn() { - Column col = Column.extractColumn("columnName", "Nullable(String)", true, false); + Column col = Column.extractColumn(newDescriptor("Nullable(String)")); assertEquals(Type.STRING, col.getType()); } @Test public void extractLowCardinalityColumn() { - Column col = Column.extractColumn("columnName", "LowCardinality(String)", true, false); + Column col = Column.extractColumn(newDescriptor("LowCardinality(String)")); assertEquals(Type.STRING, col.getType()); } @Test public void extractLowCardinalityNullableColumn() { - Column col = Column.extractColumn("columnName", "LowCardinality(Nullable(String))", true, false); + Column col = Column.extractColumn(newDescriptor("LowCardinality(Nullable(String))")); assertEquals(Type.STRING, col.getType()); } @Test public void extractArrayOfLowCardinalityNullableColumn() { - Column col = Column.extractColumn("columnName", "Array(LowCardinality(Nullable(String)))", true, false); + Column col = Column.extractColumn(newDescriptor("Array(LowCardinality(Nullable(String)))")); assertEquals(Type.ARRAY, col.getType()); - assertEquals(Type.STRING, col.getSubType().getType()); + assertEquals(Type.STRING, col.getArrayType().getType()); + + assertNull(col.getMapKeyType()); + assertNull(col.getMapValueType()); + assertNull(col.getTupleFields()); } @Test public void extractDecimalNullableColumn() { - Column col = Column.extractColumn("columnName", "Nullable(Decimal)", true, false); + Column col = Column.extractColumn(newDescriptor("Nullable(Decimal)")); assertEquals(Type.Decimal, col.getType()); } @Test public void extractDecimal_default() { - Column col = Column.extractColumn("columnName", "Decimal", true, false); + Column col = Column.extractColumn(newDescriptor("Decimal")); assertEquals(Type.Decimal, col.getType()); assertEquals(10, col.getPrecision()); assertEquals(0, col.getScale()); @@ -47,7 +55,7 @@ public void extractDecimal_default() { @Test public void extractDecimal_default_5() { - Column col = Column.extractColumn("columnName", "Decimal(5)", true, false); + Column col = Column.extractColumn(newDescriptor("Decimal(5)")); assertEquals(Type.Decimal, col.getType()); assertEquals(5, col.getPrecision()); assertEquals(0, col.getScale()); @@ -55,7 +63,7 @@ public void extractDecimal_default_5() { @Test public void extractDecimal_sized_5() { - Column col = Column.extractColumn("columnName", "Decimal256(5)", true, false); + Column col = Column.extractColumn(newDescriptor("Decimal256(5)")); assertEquals(Type.Decimal, col.getType()); assertEquals(76, col.getPrecision()); assertEquals(5, col.getScale()); @@ -63,7 +71,7 @@ public void extractDecimal_sized_5() { @Test public void extractDecimal_14_2() { - Column col = Column.extractColumn("columnName", "Decimal(14,2)", true, false); + Column col = Column.extractColumn(newDescriptor("Decimal(14, 2)")); assertEquals(Type.Decimal, col.getType()); assertEquals(14, col.getPrecision()); assertEquals(2, col.getScale()); @@ -71,10 +79,14 @@ public void extractDecimal_14_2() { @Test public void extractArrayOfDecimalNullable_5() { - Column col = Column.extractColumn("columnName", "Array(Nullable(Decimal(5)))", true, false); + Column col = Column.extractColumn(newDescriptor("Array(Nullable(Decimal(5)))")); assertEquals(Type.ARRAY, col.getType()); - Column subType = col.getSubType(); + assertNull(col.getMapKeyType()); + assertNull(col.getMapValueType()); + assertNull(col.getTupleFields()); + + Column subType = col.getArrayType(); assertEquals(Type.Decimal, subType.getType()); assertEquals(5, subType.getPrecision()); assertTrue(subType.isNullable()); @@ -82,65 +94,67 @@ public void extractArrayOfDecimalNullable_5() { @Test public void extractArrayOfArrayOfArrayOfString() { - Column col = Column.extractColumn("columnName", "Array(Array(Array(String)))", true, false); + Column col = Column.extractColumn(newDescriptor("Array(Array(Array(String)))")); assertEquals(Type.ARRAY, col.getType()); - Column subType = col.getSubType(); + assertNull(col.getMapKeyType()); + assertNull(col.getMapValueType()); + assertNull(col.getTupleFields()); + + Column subType = col.getArrayType(); assertEquals(Type.ARRAY, subType.getType()); - Column subSubType = subType.getSubType(); + Column subSubType = subType.getArrayType(); assertEquals(Type.ARRAY, subSubType.getType()); - Column subSubSubType = subSubType.getSubType(); + Column subSubSubType = subSubType.getArrayType(); assertEquals(Type.STRING, subSubSubType.getType()); - assertNull(subSubSubType.getSubType()); + assertNull(subSubSubType.getArrayType()); } @Test public void extractMapOfPrimitives() { - Column col = Column.extractColumn("columnName", "Map(String, Decimal(5))", true, false); + Column col = Column.extractColumn(newDescriptor("Map(String, Decimal(5)")); assertEquals(Type.MAP, col.getType()); - assertNull(col.getSubType()); assertEquals(Type.STRING, col.getMapKeyType()); - Column mapValueType = col.getMapValueType(); - assertEquals(Type.Decimal, mapValueType.getType()); - assertEquals(5, mapValueType.getPrecision()); + assertNull(col.getArrayType()); + assertNull(col.getMapValueType()); + assertNull(col.getTupleFields()); } @Test - public void extractMapWithComplexValue() { - Column col = Column.extractColumn("columnName", "Map(String, Map(String, Decimal(5)))", true, false); - assertEquals(Type.MAP, col.getType()); - - assertNull(col.getSubType()); - assertEquals(Type.STRING, col.getMapKeyType()); + public void extractTupleOfPrimitives() { + Column col = Column.extractColumn(newDescriptor("Tuple(first String, second Decimal(5))")); + assertEquals(Type.TUPLE, col.getType()); - Column mapValueType = col.getMapValueType(); - assertEquals(Type.MAP, mapValueType.getType()); - assertEquals(Type.STRING, mapValueType.getMapKeyType()); - - Column nestedMapValue = mapValueType.getMapValueType(); - assertEquals(Type.Decimal, nestedMapValue.getType()); - assertEquals(5, nestedMapValue.getPrecision()); + assertNull(col.getArrayType()); + assertNull(col.getMapValueType()); + assertEquals(List.of(), col.getTupleFields()); } @Test - public void extractMapOfMapOfMapOfString() { - Column col = Column.extractColumn("columnName", "Map(String, Map(String, Map(String, String)))", true, false); - assertEquals(Type.MAP, col.getType()); - assertEquals(Type.STRING, col.getMapKeyType()); - assertNull(col.getSubType()); - - Column subType = col.getMapValueType(); - assertEquals(Type.MAP, subType.getType()); - assertEquals(Type.STRING, subType.getMapKeyType()); - assertNull(col.getSubType()); - - Column subSubType = subType.getMapValueType(); - assertEquals(Type.MAP, subSubType.getType()); - assertEquals(Type.STRING, subSubType.getMapKeyType()); - assertNull(col.getSubType()); + public void extractVariantOfPrimitives() { + Column col = Column.extractColumn(newDescriptor("Variant(String, Decimal256(5), Decimal(14, 2), Decimal(5))")); + assertEquals(Type.VARIANT, col.getType()); + assertEquals(4, col.getVariantTypes().size()); + + List expectedSubtypes = List.of( + col(Type.STRING), + col(Type.Decimal, 76, 5), + col(Type.Decimal, 14, 2), + col(Type.Decimal, 5, 0) + ); + + for (int i = 0; i < expectedSubtypes.size(); i++) { + Column expectedSubtype = expectedSubtypes.get(i); + Column actualSubtype = col.getVariantTypes().get(i).getT1(); + + assertEquals(expectedSubtype.getType(), actualSubtype.getType()); + assertEquals(expectedSubtype.getPrecision(), actualSubtype.getPrecision()); + assertEquals(expectedSubtype.getScale(), actualSubtype.getScale()); + } } } + diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/TableTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/TableTest.java new file mode 100644 index 00000000..a79868fc --- /dev/null +++ b/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/TableTest.java @@ -0,0 +1,128 @@ +package com.clickhouse.kafka.connect.sink.db.mapping; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.stream.Collectors; + +import static com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers.newDescriptor; +import static org.junit.jupiter.api.Assertions.*; + +class TableTest { + + @Test + public void extractMapOfPrimitives() { + Table table = new Table("t"); + + Column map = Column.extractColumn(newDescriptor("map", "Map(String, Decimal(5)")); + Column mapValues = Column.extractColumn(newDescriptor("map.values", "Array(Decimal(5))")); + + assertEquals(Type.MAP, map.getType()); + assertNull(map.getMapValueType()); + + table.addColumn(map); + table.addColumn(mapValues); + + Column mapValueType = map.getMapValueType(); + assertEquals(Type.Decimal, mapValueType.getType()); + assertEquals(5, mapValueType.getPrecision()); + } + + @Test + public void extractMapWithComplexValue() { + Table table = new Table("t"); + + Column map = Column.extractColumn(newDescriptor("map", "Map(String, Map(String, Decimal(5)))")); + Column mapValues = Column.extractColumn(newDescriptor("map.values", "Array(Map(String, Decimal(5)))")); + Column mapValuesValues = Column.extractColumn(newDescriptor("map.values.values", "Array(Array(Decimal(5)))")); + + assertEquals(Type.MAP, map.getType()); + assertNull(map.getMapValueType()); + + table.addColumn(map); + table.addColumn(mapValues); + table.addColumn(mapValuesValues); + + Column mapValueType = map.getMapValueType(); + assertEquals(Type.MAP, mapValueType.getType()); + assertEquals(Type.STRING, mapValueType.getMapKeyType()); + + Column nestedMapValue = mapValueType.getMapValueType(); + assertEquals(Type.Decimal, nestedMapValue.getType()); + assertEquals(5, nestedMapValue.getPrecision()); + } + + @Test + public void extractMapOfMapOfMapOfString() { + Table table = new Table("t"); + + Column map = Column.extractColumn(newDescriptor("map", "Map(String, Map(String, Map(String, String)))")); + Column mapValues = Column.extractColumn(newDescriptor("map.values", "Array(Map(String, Map(String, String)))")); + Column mapValuesValues = Column.extractColumn(newDescriptor("map.values.values", "Array(Array(Map(String, String)))")); + Column mapValuesValuesValues = Column.extractColumn(newDescriptor("map.values.values.values", "Array(Array(Array(String)))")); + + assertEquals(Type.MAP, map.getType()); + assertNull(map.getMapValueType()); + + table.addColumn(map); + table.addColumn(mapValues); + table.addColumn(mapValuesValues); + table.addColumn(mapValuesValuesValues); + + Column mapValueType = map.getMapValueType(); + assertEquals(Type.MAP, mapValueType.getType()); + assertEquals(Type.STRING, mapValueType.getMapKeyType()); + + Column nestedMapValue = mapValueType.getMapValueType(); + assertEquals(Type.MAP, nestedMapValue.getType()); + assertEquals(Type.STRING, nestedMapValue.getMapKeyType()); + + Column againNestedMapValue = nestedMapValue.getMapValueType(); + assertEquals(Type.STRING, againNestedMapValue.getType()); + } + + @Test + public void extractTupleOfPrimitives() { + Table table = new Table("t"); + Column tuple = Column.extractColumn(newDescriptor("tuple", "Tuple(first String, second Decimal(5))")); + Column tupleFirst = Column.extractColumn(newDescriptor("tuple.first", "String")); + Column tupleSecond = Column.extractColumn(newDescriptor("tuple.second", "Decimal(5)")); + + assertEquals(Type.TUPLE, tuple.getType()); + assertEquals(List.of(), tuple.getTupleFields()); + + table.addColumn(tuple); + table.addColumn(tupleFirst); + table.addColumn(tupleSecond); + + assertEquals(2, tuple.getTupleFields().size()); + assertEquals(List.of("tuple.first", "tuple.second"), tuple.getTupleFields().stream().map(Column::getName).collect(Collectors.toList())); + assertEquals(List.of(Type.STRING, Type.Decimal), tuple.getTupleFields().stream().map(Column::getType).collect(Collectors.toList())); + assertEquals(List.of(0, 5), tuple.getTupleFields().stream().map(Column::getPrecision).collect(Collectors.toList())); + } + + @Test + public void extractTupleOfTupleOfTuple() { + Table table = new Table("t"); + Column tuple = Column.extractColumn(newDescriptor("tuple", "Tuple(tuple Tuple(tuple Tuple(string String)))")); + Column tupleTuple = Column.extractColumn(newDescriptor("tuple.tuple", "Tuple(tuple Tuple(string String))")); + Column tupleTupleTuple = Column.extractColumn(newDescriptor("tuple.tuple.tuple", "Tuple(string String)")); + Column tupleTupleTupleString = Column.extractColumn(newDescriptor("tuple.tuple.tuple.string", "String")); + + assertEquals(Type.TUPLE, tuple.getType()); + assertEquals(List.of(), tuple.getTupleFields()); + + table.addColumn(tuple); + table.addColumn(tupleTuple); + table.addColumn(tupleTupleTuple); + table.addColumn(tupleTupleTupleString); + + assertEquals(1, tuple.getTupleFields().size()); + assertEquals(1, tuple.getTupleFields().get(0).getTupleFields().size()); + assertEquals(1, tuple.getTupleFields().get(0).getTupleFields().get(0).getTupleFields().size()); + + Column stringColumn = tuple.getTupleFields().get(0).getTupleFields().get(0).getTupleFields().get(0); + assertEquals("tuple.tuple.tuple.string", stringColumn.getName()); + assertEquals(Type.STRING, stringColumn.getType()); + } +} diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java b/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java index 7b55bf75..ecac6411 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java @@ -1,7 +1,19 @@ package com.clickhouse.kafka.connect.sink.helper; import com.clickhouse.client.*; +import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseFieldDescriptor; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; +import com.clickhouse.kafka.connect.sink.db.mapping.Column; +import com.clickhouse.kafka.connect.sink.db.mapping.Type; +import org.json.JSONObject; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public class ClickHouseTestHelpers { @@ -44,6 +56,10 @@ public static ClickHouseResponseSummary dropTable(ClickHouseHelperClient chc, St } public static ClickHouseResponseSummary createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) { + return createTable(chc, tableName, createTableQuery, new HashMap<>()); + } + + public static ClickHouseResponseSummary createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery, Map clientSettings) { String createTableQueryTmp = String.format(createTableQuery, tableName); try (ClickHouseClient client = ClickHouseClient.builder() @@ -51,6 +67,7 @@ public static ClickHouseResponseSummary createTable(ClickHouseHelperClient chc, .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) .build(); ClickHouseResponse response = client.read(chc.getServer()) + .settings(clientSettings) .query(createTableQueryTmp) .executeAndWait()) { return response.getSummary(); @@ -59,6 +76,26 @@ public static ClickHouseResponseSummary createTable(ClickHouseHelperClient chc, } } + public static List getAllRowsAsJson(ClickHouseHelperClient chc, String tableName) { + String query = String.format("SELECT * FROM `%s`", tableName); + try (ClickHouseClient client = ClickHouseClient.builder() + .options(chc.getDefaultClientOptions()) + .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) + .build(); + ClickHouseResponse response = client.read(chc.getServer()) + .query(query) + .format(ClickHouseFormat.JSONEachRow) + .executeAndWait()) { + + return StreamSupport.stream(response.records().spliterator(), false) + .map(record -> record.getValue(0).asString()) + .map(JSONObject::new) + .collect(Collectors.toList()); + } catch (ClickHouseException e) { + throw new RuntimeException(e); + } + } + public static int countRows(ClickHouseHelperClient chc, String tableName) { String queryCount = String.format("SELECT COUNT(*) FROM `%s`", tableName); try (ClickHouseClient client = ClickHouseClient.builder() @@ -107,7 +144,7 @@ public static int countRowsWithEmojis(ClickHouseHelperClient chc, String tableNa @Deprecated(since = "for debug purposes only") public static void showRows(ClickHouseHelperClient chc, String topic) { - String queryCount = String.format("select * from %s", topic); + String queryCount = String.format("select * from `%s`", topic); try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints) // you'll have to parse response manually if using a different format @@ -122,4 +159,29 @@ public static void showRows(ClickHouseHelperClient chc, String topic) { throw new RuntimeException(e); } } + + public static ClickHouseFieldDescriptor newDescriptor(String name, String valueType) { + return ClickHouseFieldDescriptor + .builder() + .name(name) + .type(valueType) + .isSubcolumn(name.contains(".")) + .build(); + } + + public static ClickHouseFieldDescriptor newDescriptor(String valueType) { + return ClickHouseFieldDescriptor + .builder() + .name("columnName") + .type(valueType) + .build(); + } + + public static Column col(Type type) { + return Column.builder().type(type).build(); + } + + public static Column col(Type type, int precision, int scale) { + return Column.builder().type(type).precision(precision).scale(scale).build(); + } } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java index ce0e4b09..f886aa50 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java @@ -398,6 +398,132 @@ public static Collection createMapType(String topic, int partition, }); return array; } + public static Collection createTupleType(String topic, int partition) { + return createTupleType(topic, partition, DEFAULT_TOTAL_RECORDS); + } + public static Collection createTupleType(String topic, int partition, int totalRecords) { + + Schema ARRAY_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA); + Schema MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA); + + Schema ARRAY_ARRAY_SCHEMA = SchemaBuilder.array(SchemaBuilder.array(Schema.STRING_SCHEMA)); + Schema MAP_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT64_SCHEMA)); + + Schema ARRAY_MAP_SCHEMA = SchemaBuilder.array(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)); + Schema MAP_ARRAY_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.array(Schema.STRING_SCHEMA)); + + Schema ARRAY_ARRAY_ARRAY_SCHEMA = SchemaBuilder.array(SchemaBuilder.array(SchemaBuilder.array(Schema.STRING_SCHEMA))); + Schema MAP_MAP_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))); + + Schema VARIANT_SCHEMA = SchemaBuilder.struct() + .field("double", Schema.OPTIONAL_FLOAT64_SCHEMA) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + Schema VARIANT_TUPLE_SCHEMA = SchemaBuilder.struct() + .field("variant_with_string", VARIANT_SCHEMA) + .field("variant_with_double", VARIANT_SCHEMA) + .field("variant_array", SchemaBuilder.array(VARIANT_SCHEMA).build()) + .field("variant_map", SchemaBuilder.map(Schema.STRING_SCHEMA, VARIANT_SCHEMA).build()) + .build(); + + Schema TUPLE_SCHEMA = SchemaBuilder.struct() + .field("array", ARRAY_SCHEMA) + .field("map", MAP_SCHEMA) + .field("array_array", ARRAY_ARRAY_SCHEMA) + .field("map_map", MAP_MAP_SCHEMA) + .field("array_map", ARRAY_MAP_SCHEMA) + .field("map_array", MAP_ARRAY_SCHEMA) + .field("array_array_array", ARRAY_ARRAY_ARRAY_SCHEMA) + .field("map_map_map", MAP_MAP_MAP_SCHEMA) + .field("tuple", VARIANT_TUPLE_SCHEMA) + .field("array_tuple", SchemaBuilder.array(VARIANT_TUPLE_SCHEMA).build()) + .field("map_tuple", SchemaBuilder.map(Schema.STRING_SCHEMA, VARIANT_TUPLE_SCHEMA).build()) + .field("array_array_tuple", SchemaBuilder.array(SchemaBuilder.array(VARIANT_TUPLE_SCHEMA)).build()) + .field("map_map_tuple", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.map(Schema.STRING_SCHEMA, VARIANT_TUPLE_SCHEMA)).build()) + .field("array_map_tuple", SchemaBuilder.array(SchemaBuilder.map(Schema.STRING_SCHEMA, VARIANT_TUPLE_SCHEMA)).build()) + .field("map_array_tuple", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.array(VARIANT_TUPLE_SCHEMA)).build()) + .build(); + + Schema ROOT_SCHEMA = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("tuple", TUPLE_SCHEMA) + .build(); + + List result = new ArrayList<>(); + LongStream.range(0, totalRecords).forEachOrdered(n -> { + + List array = Arrays.asList("1", "2"); + List> arrayArray = Arrays.asList(array, array); + List>> arrayArrayArray = Arrays.asList(arrayArray, arrayArray); + + Map map = Map.of("k1", "v1", "k2", "v2"); + Map> mapMap = Map.of( + "k1", Map.of("nk1", (long) 1), + "k2", Map.of("nk1", (long) 2) + ); + Map>> mapMapMap = Map.of( + "k1", Map.of("nk1", Map.of("nk2", "v1")), + "k2", Map.of("nk1", Map.of("nk2", "v2")) + ); + + List> arrayMap = Arrays.asList(map, map); + Map> mapArray = Map.of( + "k1", array, + "k2", array + ); + + Struct stringVariant = new Struct(VARIANT_SCHEMA).put("string", "v1"); + Struct doubleVariant = new Struct(VARIANT_SCHEMA).put("double", (double) 1 / 3); + + List variantArray = Arrays.asList(stringVariant, doubleVariant); + + Struct nestedTuple = new Struct(VARIANT_TUPLE_SCHEMA) + .put("variant_with_string", stringVariant) + .put("variant_with_double", doubleVariant) + .put("variant_array", variantArray) + .put("variant_map", Map.of("s1", stringVariant, "d1", doubleVariant)); + + List arrayTuple = Arrays.asList(nestedTuple, nestedTuple); + Map mapTuple = Map.of("k1", nestedTuple, "k2", nestedTuple); + + Struct tupleStruct = new Struct(TUPLE_SCHEMA) + .put("array", array) + .put("map", map) + .put("array_array", arrayArray) + .put("map_map", mapMap) + .put("array_map", arrayMap) + .put("map_array", mapArray) + .put("array_array_array", arrayArrayArray) + .put("map_map_map", mapMapMap) + .put("tuple", nestedTuple) + .put("array_tuple", arrayTuple) + .put("map_tuple", mapTuple) + .put("array_array_tuple", Arrays.asList(arrayTuple, arrayTuple)) + .put("map_map_tuple", Map.of("r1", mapTuple, "r2", mapTuple)) + .put("array_map_tuple", Arrays.asList(mapTuple, mapTuple)) + .put("map_array_tuple", Map.of("r1", arrayTuple, "r2", arrayTuple)); + + + Struct rootStruct = new Struct(ROOT_SCHEMA) + .put("off16", (short) n) + .put("tuple", tupleStruct); + + SinkRecord sr = new SinkRecord( + topic, + partition, + null, + null, ROOT_SCHEMA, + rootStruct, + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + ); + + result.add(sr); + }); + return result; + } public static Collection createNullValueData(String topic, int partition) { return createNullValueData(topic, partition, DEFAULT_TOTAL_RECORDS); @@ -470,6 +596,61 @@ public static Collection createBytesValueData(String topic, int part return array; } + public static Collection createTupleLikeInfluxValueData(String topic, int partition) { + return createTupleLikeInfluxValueData(topic, partition, DEFAULT_TOTAL_RECORDS); + } + public static Collection createTupleLikeInfluxValueData(String topic, int partition, int totalRecords) { + + Schema variantSchema = SchemaBuilder.struct() + .field("float64", Schema.OPTIONAL_FLOAT64_SCHEMA) + .field("int64", Schema.OPTIONAL_INT64_SCHEMA) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + Schema payloadSchema = SchemaBuilder.struct() + .field("fields", SchemaBuilder.map(Schema.STRING_SCHEMA, variantSchema)) + .field("tags", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)) + .build(); + + Schema nestedSchema = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("payload", payloadSchema) + .build(); + + + List array = new ArrayList<>(); + LongStream.range(0, totalRecords).forEachOrdered(n -> { + Struct payload = new Struct(payloadSchema) + .put("fields", Map.of( + "field1", new Struct(variantSchema).put("float64", 1 / (float) (n + 1)), + "field2", new Struct(variantSchema).put("int64", n), + "field3", new Struct(variantSchema).put("string", String.format("Value '%d'", n)) + )) + .put("tags", Map.of( + "tag1", "tag1", + "tag2", "tag2" + )); + + Struct valueStruct = new Struct(nestedSchema) + .put("off16", (short) n) + .put("payload", payload); + + SinkRecord sr = new SinkRecord( + topic, + partition, + null, + null, nestedSchema, + valueStruct, + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + ); + + array.add(sr); + }); + return array; + } + public static Collection createDateType(String topic, int partition) { return createDateType(topic, partition, DEFAULT_TOTAL_RECORDS); } @@ -764,7 +945,6 @@ public static Collection createFixedStringData(String topic, int par return array; } - public static Collection createEnumValueData(String topic, int i) { return createEnumValueData(topic, i, DEFAULT_TOTAL_RECORDS); } @@ -801,4 +981,63 @@ public static Collection createEnumValueData(String topic, int parti return array; } + public static Collection createNestedType(String topic, int partition) { + return createNestedType(topic, partition, DEFAULT_TOTAL_RECORDS); + } + public static Collection createNestedType(String topic, int partition, int totalRecords) { + + Schema VARIANT_SCHEMA = SchemaBuilder.struct() + .field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + Schema TUPLE_SCHEMA = SchemaBuilder.struct() + .field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)) + .field("variant", VARIANT_SCHEMA) + .build(); + + Schema ROOT_SCHEMA = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("nested.string", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("nested.decimal", SchemaBuilder.array(Decimal.schema(2))) + .field("nested.tuple", SchemaBuilder.array(TUPLE_SCHEMA)) + .build(); + + List result = new ArrayList<>(); + LongStream.range(0, totalRecords).forEachOrdered(n -> { + + String currentStringKey = String.format("k%d", n); + String currentStringValue = String.format("v%d", n); + + Struct booleanVariant = new Struct(VARIANT_SCHEMA).put("boolean", n % 8 >= 4); + Struct stringVariant = new Struct(VARIANT_SCHEMA).put("string", currentStringValue); + + Struct tupleStruct = new Struct(TUPLE_SCHEMA) + .put("map", Map.of(currentStringKey, currentStringValue)) + .put("variant", n % 2 == 0 ? booleanVariant : stringVariant); + + int nestedSize = (int) n % 4; + + Struct rootStruct = new Struct(ROOT_SCHEMA) + .put("off16", (short) n) + .put("nested.string", new ArrayList<>(Collections.nCopies(nestedSize, currentStringValue))) + .put("nested.decimal", new ArrayList<>(Collections.nCopies(nestedSize, new BigDecimal(String.format("%d.%d", n, 2))))) + .put("nested.tuple", new ArrayList<>(Collections.nCopies(nestedSize, tupleStruct))); + + SinkRecord sr = new SinkRecord( + topic, + partition, + null, + null, ROOT_SCHEMA, + rootStruct, + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + ); + + result.add(sr); + }); + return result; + } + } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/junit/extension/FromVersionConditionExtension.java b/src/test/java/com/clickhouse/kafka/connect/sink/junit/extension/FromVersionConditionExtension.java new file mode 100644 index 00000000..ca15cbaf --- /dev/null +++ b/src/test/java/com/clickhouse/kafka/connect/sink/junit/extension/FromVersionConditionExtension.java @@ -0,0 +1,49 @@ +package com.clickhouse.kafka.connect.sink.junit.extension; + +import org.junit.jupiter.api.extension.BeforeTestExecutionCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.platform.commons.util.AnnotationUtils; + +import java.util.List; +import java.util.Optional; + +public class FromVersionConditionExtension implements BeforeTestExecutionCallback { + + @Override + public void beforeTestExecution(ExtensionContext context) { + Optional optionalFromVersion = AnnotationUtils.findAnnotation(context.getElement(), SinceClickHouseVersion.class); + if (optionalFromVersion.isPresent()) { + String requiredVersion = optionalFromVersion.get().value(); + String currentVersion = System.getenv("CLICKHOUSE_VERSION"); + if (currentVersion == null) { + // We assume latest if the version env is not set + return; + } + if (compareVersions(currentVersion, requiredVersion) < 0) { + throw new org.junit.AssumptionViolatedException("Test skipped because CLICKHOUSE_VERSION is lower than required"); + } + } + } + + private int compareVersions(String currentVersion, String requiredVersion) { + if (List.of("latest", "cloud").contains(currentVersion)) + return 0; + + String[] currentParts = currentVersion.split("\\."); + String[] requiredParts = requiredVersion.split("\\."); + + try { + int length = Math.max(currentParts.length, requiredParts.length); + for (int i = 0; i < length; i++) { + int currentPart = i < currentParts.length ? Integer.parseInt(currentParts[i]) : 0; + int requiredPart = i < requiredParts.length ? Integer.parseInt(requiredParts[i]) : 0; + if (currentPart != requiredPart) { + return Integer.compare(currentPart, requiredPart); + } + } + } catch (NumberFormatException e) { + return 0; + } + return 0; + } +} diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/junit/extension/SinceClickHouseVersion.java b/src/test/java/com/clickhouse/kafka/connect/sink/junit/extension/SinceClickHouseVersion.java new file mode 100644 index 00000000..590becc3 --- /dev/null +++ b/src/test/java/com/clickhouse/kafka/connect/sink/junit/extension/SinceClickHouseVersion.java @@ -0,0 +1,12 @@ +package com.clickhouse.kafka.connect.sink.junit.extension; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface SinceClickHouseVersion { + String value(); +}