From ce11d265196ebc1ff3507bbe17e015fcaf047260 Mon Sep 17 00:00:00 2001 From: XiaoJiang521 <131635688+XiaoJiang521@users.noreply.github.com> Date: Thu, 24 Aug 2023 14:07:45 +0800 Subject: [PATCH] [feature] add transform sql interface (#102) --- .../SchemaDerivationController.java | 49 +++++ .../app/service/ISchemaDerivationService.java | 26 +++ .../impl/SchemaDerivationServiceImpl.java | 186 ++++++++++++++++++ 3 files changed, 261 insertions(+) create mode 100644 seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SchemaDerivationController.java create mode 100644 seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ISchemaDerivationService.java create mode 100644 seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SchemaDerivationController.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SchemaDerivationController.java new file mode 100644 index 000000000..03d2afb07 --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SchemaDerivationController.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.controller; + +import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.domain.request.job.TableSchemaReq; +import org.apache.seatunnel.app.domain.request.job.transform.SQL; +import org.apache.seatunnel.app.service.ISchemaDerivationService; + +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.ApiParam; + +import javax.annotation.Resource; + +@RestController +@RequestMapping("/seatunnel/api/v1/schema/derivation") +public class SchemaDerivationController { + + @Resource private ISchemaDerivationService schemaDerivationService; + + @PostMapping("/sql") + Result SQLSchemaDerivation( + @ApiParam(value = "job version id", required = true) @RequestParam long jobVersionId, + @ApiParam(value = "inputPluginId", required = true) @RequestParam String inputPluginId, + @RequestBody SQL sql) { + return Result.success( + schemaDerivationService.derivationSQL(jobVersionId, inputPluginId, sql)); + } +} diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ISchemaDerivationService.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ISchemaDerivationService.java new file mode 100644 index 000000000..922f24d8f --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ISchemaDerivationService.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.service; + +import org.apache.seatunnel.app.domain.request.job.TableSchemaReq; +import org.apache.seatunnel.app.domain.request.job.transform.SQL; + +public interface ISchemaDerivationService { + + TableSchemaReq derivationSQL(long jobVersionId, String inputPluginId, SQL sql); +} diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java new file mode 100644 index 000000000..03cacb2b3 --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.service.impl; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.connector.TableTransform; +import org.apache.seatunnel.api.table.factory.FactoryUtil; +import org.apache.seatunnel.api.table.factory.TableFactoryContext; +import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.app.domain.request.job.DatabaseTableSchemaReq; +import org.apache.seatunnel.app.domain.request.job.PluginConfig; +import org.apache.seatunnel.app.domain.request.job.TableSchemaReq; +import org.apache.seatunnel.app.domain.request.job.transform.SQL; +import org.apache.seatunnel.app.service.IJobTaskService; +import org.apache.seatunnel.app.service.ISchemaDerivationService; +import org.apache.seatunnel.datasource.plugin.api.model.TableField; +import org.apache.seatunnel.transform.sql.SQLTransform; + +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Service +public class SchemaDerivationServiceImpl implements ISchemaDerivationService { + + @Resource private IJobTaskService jobTaskService; + + private static final Pattern decimalPattern = Pattern.compile("DECIMAL\\((\\d+), (\\d+)\\)"); + + @Override + public TableSchemaReq derivationSQL(long jobVersionId, String inputPluginId, SQL sql) { + + PluginConfig pluginConfig = jobTaskService.getSingleTask(jobVersionId, inputPluginId); + TableTransformFactory factory = + FactoryUtil.discoverFactory( + Thread.currentThread().getContextClassLoader(), + TableTransformFactory.class, + "Sql"); + List tableSchemaReqs = pluginConfig.getOutputSchema(); + if (tableSchemaReqs.isEmpty()) { + throw new IllegalArgumentException("outputSchema is empty, please add input plugin"); + } + DatabaseTableSchemaReq tableSchema = tableSchemaReqs.get(0); + TableSchema.Builder builder = TableSchema.builder(); + List primaryKeys = new ArrayList<>(); + for (TableField f : tableSchema.getFields()) { + if (f.getPrimaryKey()) { + primaryKeys.add(f.getName()); + } + builder.column( + PhysicalColumn.of( + f.getName(), + stringToDataType(f.getOutputDataType()), + 0, + f.getNullable(), + f.getDefaultValue(), + f.getComment())); + } + builder.primaryKey(PrimaryKey.of("PrimaryKeys", primaryKeys)); + + CatalogTable table = + CatalogTable.of( + TableIdentifier.of( + "default", tableSchema.getDatabase(), tableSchema.getTableName()), + builder.build(), + Collections.emptyMap(), + Collections.emptyList(), + tableSchema.getTableName()); + Map config = new HashMap<>(); + config.put(SQLTransform.KEY_QUERY.key(), sql.getQuery()); + TableFactoryContext context = + new TableFactoryContext( + Collections.singletonList(table), + ReadonlyConfig.fromMap(config), + Thread.currentThread().getContextClassLoader()); + TableTransform transform = factory.createTransform(context); + SQLTransform sqlTransform = (SQLTransform) transform.createTransform(); + CatalogTable result = sqlTransform.getProducedCatalogTable(); + List primaryKeysList = new ArrayList<>(); + if (result.getTableSchema().getPrimaryKey() != null) { + primaryKeysList.addAll(result.getTableSchema().getPrimaryKey().getColumnNames()); + } + List fields = new ArrayList<>(); + for (Column column : result.getTableSchema().getColumns()) { + TableField field = new TableField(); + field.setName(column.getName()); + field.setComment(column.getComment()); + field.setDefaultValue( + column.getDefaultValue() != null ? column.getDefaultValue().toString() : null); + field.setNullable(column.isNullable()); + field.setOutputDataType(column.getDataType().toString()); + field.setPrimaryKey(primaryKeysList.contains(column.getName())); + field.setType(column.getDataType().toString()); + fields.add(field); + } + + TableSchemaReq tableSchemaRes = new TableSchemaReq(); + tableSchemaRes.setFields(fields); + tableSchemaRes.setTableName(tableSchema.getTableName()); + return tableSchemaRes; + } + + private SeaTunnelDataType stringToDataType(String dataTypeStr) { + dataTypeStr = dataTypeStr.toUpperCase(); + switch (dataTypeStr) { + case "STRING": + return BasicType.STRING_TYPE; + case "BOOLEAN": + return BasicType.BOOLEAN_TYPE; + case "TINYINT": + return BasicType.BYTE_TYPE; + case "SMALLINT": + return BasicType.SHORT_TYPE; + case "INT": + return BasicType.INT_TYPE; + case "BIGINT": + return BasicType.LONG_TYPE; + case "FLOAT": + return BasicType.FLOAT_TYPE; + case "DOUBLE": + return BasicType.DOUBLE_TYPE; + case "NULL": + return BasicType.VOID_TYPE; + case "BYTES": + return ArrayType.BYTE_ARRAY_TYPE; + case "DATE": + return LocalTimeType.LOCAL_DATE_TYPE; + case "TIME": + return LocalTimeType.LOCAL_TIME_TYPE; + case "TIMESTAMP": + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case "DECIMAL": + return new DecimalType(38, 18); + case "ARRAY": + case "MAP": + case "ROW": + case "MULTIPLE_ROW": + return BasicType.STRING_TYPE; + default: + break; + } + + Matcher matcher = decimalPattern.matcher(dataTypeStr); + if (matcher.matches()) { + int precision = Integer.parseInt(matcher.group(1)); + int scale = Integer.parseInt(matcher.group(2)); + return new DecimalType(precision, scale); + } + return BasicType.STRING_TYPE; + } +}