Skip to content

Commit

Permalink
Merge branch 'apache:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
happyboy1024 authored Dec 11, 2023
2 parents 3df3e1a + d4a323e commit 4e7302b
Show file tree
Hide file tree
Showing 187 changed files with 3,952 additions and 2,154 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,6 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: run kudu connector integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kudu-e2e -am -Pci
env:
Expand All @@ -1018,7 +1017,6 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: run amazonsqs connector integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-amazonsqs-e2e -am -Pci
env:
Expand Down
14 changes: 13 additions & 1 deletion docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ By default, we use 2PC commit to ensure `exactly-once`

### path [string]

The target dir path is required.
The target dir path is required, you can inject the upstream CatalogTable into the path by using: `${database_name}`, `${table_name}` and `${schema_name}`.

### custom_filename [boolean]

Expand Down Expand Up @@ -237,6 +237,18 @@ LocalFile {

```

For extract source metadata from upstream, you can use `${database_name}`, `${table_name}` and `${schema_name}` in the path.

```bash

LocalFile {
path = "/tmp/hive/warehouse/${table_name}"
file_format_type = "parquet"
sink_columns = ["name","age"]
}

```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
285 changes: 115 additions & 170 deletions docs/en/connector-v2/sink/OssFile.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ You can use the following placeholders
- rowtype_fields: Used to get all the fields in the upstream schema, we will automatically map to the field
description of StarRocks
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)

## Data Type Mapping

Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/Http.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| pageing.total_page_size | Int | No | - | This parameter is used to control the total number of pages |
| pageing.batch_size | Int | No | - | The batch size returned per request is used to determine whether to continue when the total number of pages is unknown |
| content_json | String | No | - | This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. |
| format | String | No | json | The format of upstream data, now only support `json` `text`, default `json`. |
| format | String | No | text | The format of upstream data, now only support `json` `text`, default `text`. |
| method | String | No | get | Http request method, only supports GET, POST method. |
| headers | Map | No | - | Http headers. |
| params | Map | No | - | Http params,the program will automatically add http header application/x-www-form-urlencoded. |
Expand Down
225 changes: 85 additions & 140 deletions docs/en/connector-v2/source/OssFile.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@
- [Core] [Starter] Add check of sink and source config to avoid null pointer exception. (#4734)
- [Core] [Flink] Remove useless stage type related codes. (#5650)

### Formats

- [Json] Remove assert key word. (#5919)

### Connector-V2

- [Connector-V2] [CDC] Improve startup.mode/stop.mode options (#4360)
Expand Down Expand Up @@ -146,6 +150,7 @@
- [Zeta] Improve Zeta operation max count and ignore NPE (#4787)
- [Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722)
- [zeta] Checkpoint exception status messages exclude state data (#5547)
- [Zeta] Remove assert key words (#5947)

## Feature

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ public void validate(OptionRule rule) {
List<RequiredOption> requiredOptions = rule.getRequiredOptions();
for (RequiredOption requiredOption : requiredOptions) {
validate(requiredOption);
requiredOption
.getOptions()
.forEach(
option -> {
if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
validateSingleChoice(option);
}
});

for (Option<?> option : requiredOption.getOptions()) {
if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
// is required option and not match condition, skip validate
if (isConditionOption(requiredOption)
&& !matchCondition(
(RequiredOption.ConditionalRequiredOptions) requiredOption)) {
continue;
}
validateSingleChoice(option);
}
}
}

for (Option option : rule.getOptionalOptions()) {
Expand All @@ -74,15 +78,15 @@ void validateSingleChoice(Option option) {
Object o = singleChoiceOption.defaultValue();
if (o != null && !optionValues.contains(o)) {
throw new OptionValidationException(
"These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues.",
getOptionKeys(Arrays.asList(singleChoiceOption)), o);
"These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues(%s).",
getOptionKeys(Arrays.asList(singleChoiceOption)), o, optionValues);
}

Object value = config.get(option);
if (value != null && !optionValues.contains(value)) {
throw new OptionValidationException(
"These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues.",
getOptionKeys(Arrays.asList(singleChoiceOption)), value);
"These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues(%s).",
getOptionKeys(Arrays.asList(singleChoiceOption)), value, optionValues);
}
}

Expand All @@ -99,7 +103,7 @@ void validate(RequiredOption requiredOption) {
validate((RequiredOption.ExclusiveRequiredOptions) requiredOption);
return;
}
if (requiredOption instanceof RequiredOption.ConditionalRequiredOptions) {
if (isConditionOption(requiredOption)) {
validate((RequiredOption.ConditionalRequiredOptions) requiredOption);
return;
}
Expand Down Expand Up @@ -181,8 +185,7 @@ void validate(RequiredOption.ExclusiveRequiredOptions exclusiveRequiredOptions)
}

void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) {
Expression expression = conditionalRequiredOptions.getExpression();
boolean match = validate(expression);
boolean match = matchCondition(conditionalRequiredOptions);
if (!match) {
return;
}
Expand All @@ -193,7 +196,8 @@ void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptio
}
throw new OptionValidationException(
"There are unconfigured options, the options(%s) are required because [%s] is true.",
getOptionKeys(absentOptions), expression.toString());
getOptionKeys(absentOptions),
conditionalRequiredOptions.getExpression().toString());
}

private boolean validate(Expression expression) {
Expand Down Expand Up @@ -222,4 +226,14 @@ private <T> boolean validate(Condition<T> condition) {
return match || validate(condition.getNext());
}
}

private boolean isConditionOption(RequiredOption requiredOption) {
return requiredOption instanceof RequiredOption.ConditionalRequiredOptions;
}

private boolean matchCondition(
RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) {
Expression expression = conditionalRequiredOptions.getExpression();
return validate(expression);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class SaveModeConstants {

public static final String ROWTYPE_PRIMARY_KEY = "rowtype_primary_key";
public static final String ROWTYPE_FIELDS = "rowtype_fields";
public static final String ROWTYPE_UNIQUE_KEY = "rowtype_unique_key";

public static final String TABLE_NAME = "table_name";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,20 @@
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -58,6 +65,9 @@ default Optional<Factory> getFactory() {
*/
void close() throws CatalogException;

/** Get the name of the catalog. */
String name();

// --------------------------------------------------------------------------------------------
// database
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -124,15 +134,10 @@ default Optional<Factory> getFactory() {
default List<CatalogTable> getTables(ReadonlyConfig config) throws CatalogException {
// Get the list of specified tables
List<String> tableNames = config.get(CatalogOptions.TABLE_NAMES);
List<CatalogTable> catalogTables = new ArrayList<>();
if (tableNames != null && !tableNames.isEmpty()) {
for (String tableName : tableNames) {
TablePath tablePath = TablePath.of(tableName);
if (this.tableExists(tablePath)) {
catalogTables.add(this.getTable(tablePath));
}
}
return catalogTables;
Iterator<TablePath> tablePaths =
tableNames.stream().map(TablePath::of).filter(this::tableExists).iterator();
return buildCatalogTablesWithErrorCheck(tablePaths);
}

// Get the list of table pattern
Expand All @@ -144,17 +149,66 @@ default List<CatalogTable> getTables(ReadonlyConfig config) throws CatalogExcept
Pattern tablePattern = Pattern.compile(config.get(CatalogOptions.TABLE_PATTERN));
List<String> allDatabase = this.listDatabases();
allDatabase.removeIf(s -> !databasePattern.matcher(s).matches());
List<TablePath> tablePaths = new ArrayList<>();
for (String databaseName : allDatabase) {
tableNames = this.listTables(databaseName);
for (String tableName : tableNames) {
if (tablePattern.matcher(databaseName + "." + tableName).matches()) {
catalogTables.add(this.getTable(TablePath.of(databaseName, tableName)));
tableNames.forEach(
tableName -> {
if (tablePattern.matcher(databaseName + "." + tableName).matches()) {
tablePaths.add(TablePath.of(databaseName, tableName));
}
});
}
return buildCatalogTablesWithErrorCheck(tablePaths.iterator());
}

default List<CatalogTable> buildCatalogTablesWithErrorCheck(Iterator<TablePath> tablePaths) {
Map<String, Map<String, String>> unsupportedTable = new LinkedHashMap<>();
List<CatalogTable> catalogTables = new ArrayList<>();
while (tablePaths.hasNext()) {
try {
catalogTables.add(getTable(tablePaths.next()));
} catch (SeaTunnelRuntimeException e) {
if (e.getSeaTunnelErrorCode()
.equals(CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR)) {
unsupportedTable.put(
e.getParams().get("tableName"),
e.getParamsValueAsMap("fieldWithDataTypes"));
} else {
throw e;
}
}
}
if (!unsupportedTable.isEmpty()) {
throw CommonError.getCatalogTablesWithUnsupportedType(name(), unsupportedTable);
}
return catalogTables;
}

default <T> void buildColumnsWithErrorCheck(
TablePath tablePath,
TableSchema.Builder builder,
Iterator<T> keys,
Function<T, Column> getColumn) {
Map<String, String> unsupported = new LinkedHashMap<>();
while (keys.hasNext()) {
try {
builder.column(getColumn.apply(keys.next()));
} catch (SeaTunnelRuntimeException e) {
if (e.getSeaTunnelErrorCode()
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
unsupported.put(e.getParams().get("field"), e.getParams().get("dataType"));
} else {
throw e;
}
}
}
if (!unsupported.isEmpty()) {
throw CommonError.getCatalogTableWithUnsupportedType(
name(), tablePath.getFullName(), unsupported);
}
}

/**
* Create a new table in this catalog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ public static CatalogTable buildWithConfig(String catalogName, ReadonlyConfig re
schemaConfig.get(
TableSchemaOptions.TableIdentifierOptions.SCHEMA_FIRST));
} else {
tablePath = TablePath.EMPTY;
Optional<String> resultTableNameOptional =
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME);
tablePath = resultTableNameOptional.map(TablePath::of).orElse(TablePath.EMPTY);
}

return CatalogTable.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public SeaTunnelRowType toPhysicalRowDataType() {
return new SeaTunnelRowType(fields, fieldTypes);
}

public String[] getFieldNames() {
return columns.stream().map(Column::getName).toArray(String[]::new);
}

public static final class Builder {
private final List<Column> columns = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void testSingleChoiceOptionDefaultValueValidator() {
config.put(SINGLE_CHOICE_TEST.key(), "A");
Executable executable = () -> validate(config, optionRule);
assertEquals(
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the defaultValue(M) must be one of the optionValues.",
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the defaultValue(M) must be one of the optionValues([A, B, C]).",
assertThrows(OptionValidationException.class, executable).getMessage());
}

Expand All @@ -290,7 +290,7 @@ public void testSingleChoiceOptionValueValidator() {
config.put(SINGLE_CHOICE_VALUE_TEST.key(), "N");
executable = () -> validate(config, optionRule);
assertEquals(
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the value(N) must be one of the optionValues.",
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the value(N) must be one of the optionValues([A, B, C]).",
assertThrows(OptionValidationException.class, executable).getMessage());
}
}
Loading

0 comments on commit 4e7302b

Please sign in to comment.