Skip to content

Commit

Permalink
[Feature][Catalog] Add JDBC Catalog auto create table (#4917)
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoJiang521 authored Jul 31, 2023
1 parent bc28eb1 commit 63eb137
Show file tree
Hide file tree
Showing 53 changed files with 4,221 additions and 209 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,30 @@ jobs:
env:
MAVEN_OPTS: -Xmx4096m

jdbc-connectors-it-part-4:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run jdbc connectors integration test (part-4)
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1C verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-4 -am -Pci
env:
MAVEN_OPTS: -Xmx4096m

kafka-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public final class CatalogTable implements Serializable {

private final String comment;

private final String catalogName;

public static CatalogTable of(
TableIdentifier tableId,
TableSchema tableSchema,
Expand All @@ -47,17 +49,38 @@ public static CatalogTable of(
return new CatalogTable(tableId, tableSchema, options, partitionKeys, comment);
}

public static CatalogTable of(
TableIdentifier tableId,
TableSchema tableSchema,
Map<String, String> options,
List<String> partitionKeys,
String comment,
String catalogName) {
return new CatalogTable(tableId, tableSchema, options, partitionKeys, comment, catalogName);
}

private CatalogTable(
TableIdentifier tableId,
TableSchema tableSchema,
Map<String, String> options,
List<String> partitionKeys,
String comment) {
this(tableId, tableSchema, options, partitionKeys, comment, "");
}

private CatalogTable(
TableIdentifier tableId,
TableSchema tableSchema,
Map<String, String> options,
List<String> partitionKeys,
String comment,
String catalogName) {
this.tableId = tableId;
this.tableSchema = tableSchema;
this.options = options;
this.partitionKeys = partitionKeys;
this.comment = comment;
this.catalogName = catalogName;
}

public TableIdentifier getTableId() {
Expand All @@ -80,6 +103,10 @@ public String getComment() {
return comment;
}

public String getCatalogName() {
return catalogName;
}

@Override
public String toString() {
return "CatalogTable{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.Data;

import java.io.Serializable;
import java.util.Map;

/**
* Represent the column of {@link TableSchema}.
Expand Down Expand Up @@ -54,19 +55,71 @@ public abstract class Column implements Serializable {

protected final String comment;

/** Field type in the database * */
protected final String sourceType;

/** Unsigned bit * */
protected final boolean isUnsigned;

/** Whether to use the 0 bit * */
protected final boolean isZeroFill;

/** Bit length * */
protected final Long bitLen;

/** integer may be cross the border * */
protected final Long longColumnLength;

/** your options * */
protected final Map<String, Object> options;

protected Column(
String name,
SeaTunnelDataType<?> dataType,
Integer columnLength,
boolean nullable,
Object defaultValue,
String comment) {
this(
name,
dataType,
columnLength,
nullable,
defaultValue,
comment,
null,
false,
false,
null,
0L,
null);
}

protected Column(
String name,
SeaTunnelDataType<?> dataType,
Integer columnLength,
boolean nullable,
Object defaultValue,
String comment,
String sourceType,
boolean isUnsigned,
boolean isZeroFill,
Long bitLen,
Long longColumnLength,
Map<String, Object> options) {
this.name = name;
this.dataType = dataType;
this.columnLength = columnLength;
this.nullable = nullable;
this.defaultValue = defaultValue;
this.comment = comment;
this.sourceType = sourceType;
this.isUnsigned = isUnsigned;
this.isZeroFill = isZeroFill;
this.bitLen = bitLen;
this.longColumnLength = longColumnLength;
this.options = options;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.Map;

/** Representation of a physical column. */
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
Expand All @@ -38,6 +40,34 @@ protected PhysicalColumn(
super(name, dataType, columnLength, nullable, defaultValue, comment);
}

protected PhysicalColumn(
String name,
SeaTunnelDataType<?> dataType,
Integer columnLength,
boolean nullable,
Object defaultValue,
String comment,
String sourceType,
boolean isUnsigned,
boolean isZeroFill,
Long bitLen,
Long longColumnLength,
Map<String, Object> options) {
super(
name,
dataType,
columnLength,
nullable,
defaultValue,
comment,
sourceType,
isUnsigned,
isZeroFill,
bitLen,
longColumnLength,
options);
}

public static PhysicalColumn of(
String name,
SeaTunnelDataType<?> dataType,
Expand All @@ -48,18 +78,70 @@ public static PhysicalColumn of(
return new PhysicalColumn(name, dataType, columnLength, nullable, defaultValue, comment);
}

public static PhysicalColumn of(
String name,
SeaTunnelDataType<?> dataType,
Integer columnLength,
boolean nullable,
Object defaultValue,
String comment,
String sourceType,
boolean isUnsigned,
boolean isZeroFill,
Long bitLen,
Map<String, Object> options,
Long longColumnLength) {
return new PhysicalColumn(
name,
dataType,
columnLength,
nullable,
defaultValue,
comment,
sourceType,
isUnsigned,
isZeroFill,
bitLen,
longColumnLength,
options);
}

@Override
public boolean isPhysical() {
return true;
}

@Override
public Column copy(SeaTunnelDataType<?> newType) {
return PhysicalColumn.of(name, newType, columnLength, nullable, defaultValue, comment);
return PhysicalColumn.of(
name,
newType,
columnLength,
nullable,
defaultValue,
comment,
sourceType,
isUnsigned,
isZeroFill,
bitLen,
options,
longColumnLength);
}

@Override
public Column copy() {
return PhysicalColumn.of(name, dataType, columnLength, nullable, defaultValue, comment);
return PhysicalColumn.of(
name,
dataType,
columnLength,
nullable,
defaultValue,
comment,
sourceType,
isUnsigned,
isZeroFill,
bitLen,
options,
longColumnLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@ public abstract class AbstractJdbcCatalog implements Catalog {
protected final String suffix;
protected final String defaultUrl;

protected final Optional<String> defaultSchema;

protected Connection defaultConnection;

public AbstractJdbcCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
String catalogName,
String username,
String pwd,
JdbcUrlUtil.UrlInfo urlInfo,
String defaultSchema) {

checkArgument(StringUtils.isNotBlank(username));
urlInfo.getDefaultDatabase()
Expand All @@ -78,10 +84,10 @@ public AbstractJdbcCatalog(
this.defaultDatabase = urlInfo.getDefaultDatabase().get();
this.username = username;
this.pwd = pwd;
String baseUrl = urlInfo.getUrlWithoutDatabase();
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.baseUrl = urlInfo.getUrlWithoutDatabase();
this.defaultUrl = urlInfo.getOrigin();
this.suffix = urlInfo.getSuffix();
this.defaultSchema = Optional.ofNullable(defaultSchema);
}

@Override
Expand Down Expand Up @@ -248,6 +254,13 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
}
if (defaultSchema.isPresent()) {
tablePath =
new TablePath(
tablePath.getDatabaseName(),
defaultSchema.get(),
tablePath.getTableName());
}
if (!createTableInternal(tablePath, table) && !ignoreIfExists) {
throw new TableAlreadyExistException(catalogName, tablePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,27 @@ public interface JdbcCatalogOptions {
.noDefaultValue()
.withDescription("Password to use when connecting to the database server.");

Option<String> SCHEMA =
Options.key("schema")
.stringType()
.noDefaultValue()
.withDescription(
"for databases that support the schema parameter, give it priority.");

OptionRule.Builder BASE_RULE =
OptionRule.builder().required(BASE_URL).required(USERNAME, PASSWORD);
OptionRule.builder().required(BASE_URL).required(USERNAME, PASSWORD).optional(SCHEMA);

Option<String> TABLE_PREFIX =
Options.key("tablePrefix")
.stringType()
.noDefaultValue()
.withDescription(
"The table prefix name added when the table is automatically created");

Option<String> TABLE_SUFFIX =
Options.key("tableSuffix")
.stringType()
.noDefaultValue()
.withDescription(
"The table suffix name added when the table is automatically created");
}
Loading

0 comments on commit 63eb137

Please sign in to comment.