Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Tuples & Variants #368

Merged
merged 5 commits into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,32 @@ jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
clickhouse: [ "23.7", "24.3", "latest", "cloud" ]
clickhouse: ["23.7", "24.3", "latest", "cloud"]
name: ClickHouse ${{ matrix.clickhouse }} tests
steps:
- name: Check for Cloud Credentials
id: check-cloud-credentials
run: |
if [[ "${{ matrix.clickhouse }}" == "cloud" && (-z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}" || -z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}") ]]; then
echo "SKIP_STEP=true" >> $GITHUB_ENV
else
echo "SKIP_STEP=false" >> $GITHUB_ENV
fi
shell: bash

- uses: actions/checkout@v3
if: env.SKIP_STEP != 'true'
- name: Set up JDK 17
if: env.SKIP_STEP != 'true'
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'adopt'
architecture: x64
- name: Setup and execute Gradle 'test' task
if: env.SKIP_STEP != 'true'
uses: gradle/gradle-build-action@v2
env:
CLICKHOUSE_VERSION: ${{ matrix.clickhouse }}
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 1.1.0
* Added support for Tuple type
* Added support for Variant type
* Added support for Nested type
* Refactored Column class so that we use Builder pattern using Lombok
* Refactored recursive Map type parsing to iterative approach using describe_include_subcolumns=1

## 1.0.17
* Added support for ClickHouse Enum type #370
* Added extra break down of time measurement for insert operations
Expand Down
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ A sample REST call you could use to create the connector (POST to `localhost:808


## Proposing code changes
This is a relatively straightfoward process:
This is a relatively straightforward process:
* Ensure there's unit test coverage for the changes (and that prior tests work still, of course).
* Update VERSION to the next logical version number
* Add changes to CHANGELOG in a human readable way
* Add changes to CHANGELOG in a human-readable way
* Submit a PR

## Releasing a new version
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.0.17
v1.1.0
11 changes: 11 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ dependencies {
implementation("com.google.code.gson:gson:2.10.1")
// https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5
implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1")
// https://mvnrepository.com/artifact/com.google.guava/guava
implementation("com.google.guava:guava:33.1.0-jre")


// Avoid telescoping constructors problem with the builder pattern using Lombok
compileOnly("org.projectlombok:lombok:1.18.32")
annotationProcessor("org.projectlombok:lombok:1.18.32")

// To parse JSON response from ClickHouse to parse complex data types correctly
implementation("com.fasterxml.jackson.core:jackson-databind:2.17.0")


// TODO: need to remove ???
implementation("org.slf4j:slf4j-reload4j:2.0.11")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.clickhouse.kafka.connect.sink.data;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;

import java.util.List;

public class Data {
private Schema schema;
private Object object;
Expand All @@ -11,6 +14,10 @@ public Data(Schema schema, Object object) {
this.object = object;
}

public List<Field> getFields() {
return schema.fields();
}

public Schema.Type getFieldType() {
return schema.type();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ public static Map<String, Data> toJsonMap(Struct struct) {
jsonMap.put(fieldName, new Data(field.schema(), toJsonMap(struct.getStruct(fieldName))));
break;
case MAP:
Map<Object, Object> fieldMap = struct.getMap(fieldName);
if (fieldMap != null && !fieldMap.isEmpty() && fieldMap.values().iterator().next() instanceof Struct) {
Map<Object, Object> fieldMap = new HashMap<>(struct.getMap(fieldName));
if (!fieldMap.isEmpty() && fieldMap.values().iterator().next() instanceof Struct) {
// Map values are `Struct`

for (Map.Entry<Object, Object> entry : fieldMap.entrySet()) {
entry.setValue(toJsonMap((Struct) entry.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
import com.clickhouse.kafka.connect.sink.data.Data;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.data.StructToJsonMap;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
Expand All @@ -32,6 +33,8 @@
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Streams;
import reactor.util.function.Tuples;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -178,7 +181,7 @@ public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporte

private boolean validateDataSchema(Table table, Record record, boolean onlyFieldsName) {
boolean validSchema = true;
for (Column col : table.getColumns()) {
for (Column col : table.getRootColumnsList()) {
String colName = col.getName();
Type type = col.getType();
boolean isNullable = col.isNullable();
Expand Down Expand Up @@ -207,13 +210,19 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField
break;//I notice we just break here, rather than actually validate the type
default:
if (!colTypeName.equals(dataTypeName)) {
if (!(colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))) {
LOGGER.debug("Data schema name: {}", objSchema.name());
if (!("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal"))) {
validSchema = false;
LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName));
}
}
LOGGER.debug("Data schema name: {}", objSchema.name());

if (colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))
continue;

if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT"))
continue;

if (("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal")))
continue;

validSchema = false;
LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName));
}
}
}
Expand Down Expand Up @@ -387,16 +396,80 @@ private void doWriteColValue(Column col, ClickHousePipedOutputStream stream, Dat
BinaryStreamUtils.writeVarInt(stream, sizeArrObject);
arrObject.forEach(v -> {
try {
if (col.getSubType().isNullable() && v != null) {
if (col.getArrayType().isNullable() && v != null) {
BinaryStreamUtils.writeNonNull(stream);
}
doWriteColValue(col.getSubType(), stream, new Data(value.getNestedValueSchema(), v), defaultsSupport);
doWriteColValue(col.getArrayType(), stream, new Data(value.getNestedValueSchema(), v), defaultsSupport);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
break;
case TUPLE:
Map<?, ?> jsonMapValues;

Object underlyingObject = value.getObject();
if (underlyingObject.getClass() != Struct.class) {
// Tuples in the root structure are parsed using StructToJsonMap
jsonMapValues = (Map<?, ?>) underlyingObject;
} else {
jsonMapValues = StructToJsonMap.toJsonMap((Struct) underlyingObject);
}

Streams.zip(
col.getTupleFields().stream(), value.getFields().stream(), Tuples::of
).forEach((fields) -> {
Column column = fields.getT1();
Field field = fields.getT2();

Data innerData = (Data) jsonMapValues.get(field.name());
try {
doWriteColValue(column, stream, innerData, defaultsSupport);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
break;
case VARIANT:
// https://github.com/ClickHouse/ClickHouse/pull/58047/files#diff-f56b7f61d5a82c440bb1a078ea8e5dcf2679dc92adbbc28bd89638cbe499363dR368-R384
// https://github.com/ClickHouse/ClickHouse/blob/658a8e9a9b1658cd12c78365f9829b35d016f1b2/src/Columns/ColumnVariant.h#L10-L56
mapTmp = (Map<?, ?>) value.getObject();
Optional<Data> variantValueOption = mapTmp.values().stream()
.map(o -> (Data) o)
.filter(data -> data.getObject() != null)
.findFirst();

// Null Discriminator (https://github.com/ClickHouse/ClickHouse/blob/658a8e9a9b1658cd12c78365f9829b35d016f1b2/src/Columns/ColumnVariant.h#L65)
int nullDiscriminator = 255;
if (variantValueOption.isEmpty()) {
BinaryStreamUtils.writeUnsignedInt8(stream, nullDiscriminator);
} else {
Data variantValue = variantValueOption.get();

String fieldTypeName = variantValue.getFieldType().getName();
Optional<Integer> globalDiscriminator = col.getVariantGlobalDiscriminator(fieldTypeName);
if (globalDiscriminator.isEmpty()) {
LOGGER.error("Unable to determine the global discriminator of {} variant! Writing NULL variant instead.", fieldTypeName);
BinaryStreamUtils.writeUnsignedInt8(stream, nullDiscriminator);
return;
}
BinaryStreamUtils.writeUnsignedInt8(stream, globalDiscriminator.get());

// Variants support parametrized types, such as Decimal(x, y). Because of that, we can't use
// the doWritePrimitive method.
doWriteColValue(
col.getVariantGlobalDiscriminators().get(globalDiscriminator.get()).getT1(),
stream,
variantValue,
defaultsSupport
);
}
break;
default:
// If you wonder, how NESTED works in JDBC:
// https://github.com/ClickHouse/clickhouse-java/blob/6cbbd8fe3f86ac26d12a95e0c2b964f3a3755fc9/clickhouse-data/src/main/java/com/clickhouse/data/format/ClickHouseRowBinaryProcessor.java#L159
LOGGER.error("Cannot serialize unsupported type {}", columnType);
}
}

Expand Down Expand Up @@ -590,7 +663,7 @@ protected void doInsertRawBinary(List<Record> records, Table table, QueryIdentif
// write bytes into the piped stream
for (Record record : records) {
if (record.getSinkRecord().value() != null) {
for (Column col : table.getColumns()) {
for (Column col : table.getRootColumnsList()) {
long beforePushStream = System.currentTimeMillis();
doWriteCol(record, col, stream, supportDefaults);
pushStreamTime += System.currentTimeMillis() - beforePushStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.clickhouse.kafka.connect.sink.db.helper;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;

/**
* Java object representation of one DESCRIBE TABLE result row.
* <p>
* We use Jackson to instantiate it from JSON.
*/
@Data
@Builder
@Jacksonized
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class ClickHouseFieldDescriptor {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private String name;
private String type;
private String defaultType;
private String defaultExpression;
private String comment;
private String codecExpression;
private String ttlExpression;
private boolean isSubcolumn;

public boolean isAlias() {
return "ALIAS".equals(defaultType);
}

public boolean isMaterialized() {
return "MATERIALIZED".equals(defaultType);
}

public boolean hasDefault() {
return "DEFAULT".equals(defaultType);
}

public static ClickHouseFieldDescriptor fromJsonRow(String json) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(json.replace("\n", "\\n"), ClickHouseFieldDescriptor.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.util.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,6 +34,7 @@ public class ClickHouseHelperClient {
private final boolean sslEnabled;
private final String jdbcConnectionProperties;
private final int timeout;
@Getter
private ClickHouseNode server = null;
private final int retry;
private ClickHouseProxyType proxyType = null;
Expand Down Expand Up @@ -86,7 +89,7 @@ private ClickHouseNode create() {
tmpJdbcConnectionProperties
);

LOGGER.info("ClickHouse URL: " + url);
LOGGER.info("ClickHouse URL: {}", url);

if (username != null && password != null) {
LOGGER.debug(String.format("Adding username [%s]", username));
Expand Down Expand Up @@ -137,10 +140,6 @@ public String version() {
}
}

public ClickHouseNode getServer() {
return this.server;
}

public ClickHouseResponse query(String query) {
return query(query, null);
}
Expand All @@ -167,7 +166,6 @@ public ClickHouseResponse query(String query, ClickHouseFormat clickHouseFormat)
throw new RuntimeException(ce);
}


public List<String> showTables() {
List<String> tablesNames = new ArrayList<>();
try (ClickHouseClient client = ClickHouseClient.builder()
Expand Down Expand Up @@ -200,32 +198,30 @@ public Table describeTable(String tableName) {
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(server)
.set("describe_include_subcolumns", true)
.format(ClickHouseFormat.JSONEachRow)
.query(describeQuery)
.executeAndWait()) {

Table table = new Table(tableName);
for (ClickHouseRecord r : response.records()) {
boolean hasDefault = false;
ClickHouseValue v = r.getValue(0);
String value = v.asString();
String[] cols = value.split("\t");
if (cols.length > 2) {
String defaultKind = cols[2];
if ("ALIAS".equals(defaultKind) || "MATERIALIZED".equals(defaultKind)) {
LOGGER.debug("Skipping column {} as it is an alias or materialized view", cols[0]);
// Only insert into "real" columns
continue;
} else if("DEFAULT".equals(defaultKind)) {
table.setHasDefaults(true);
hasDefault = true;
}

ClickHouseFieldDescriptor fieldDescriptor = ClickHouseFieldDescriptor.fromJsonRow(v.asString());
if (fieldDescriptor.isAlias() || fieldDescriptor.isMaterialized()) {
LOGGER.debug("Skipping column {} as it is an alias or materialized view", fieldDescriptor.getName());
continue;
}
String name = cols[0];
String type = cols[1];
Column column = Column.extractColumn(name, type, false, hasDefault);

if (fieldDescriptor.hasDefault()) {
table.hasDefaults(true);
}

Column column = Column.extractColumn(fieldDescriptor);
table.addColumn(column);
}
return table;
} catch (ClickHouseException e) {
} catch (ClickHouseException | JsonProcessingException e) {
LOGGER.error(String.format("Exception when running describeTable %s", describeQuery), e);
return null;
}
Expand Down
Loading