From fc2a7a49d1d1d24eb03e3bece8c7806ab8a85a9c Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Thu, 14 Nov 2024 12:24:53 +0800 Subject: [PATCH] [Hotfix][Transforms-v2] DynamicCompile Plugin compatibility fix. --- .../e2e/transform/TestDynamicCompileIT.java | 9 ++ ...mic_java_compile_transform_compatible.conf | 115 ++++++++++++++++++ .../DynamicCompileTransform.java | 21 +++- 3 files changed, 143 insertions(+), 2 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java index 2528499fc1b..2ad2f32e4b9 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java @@ -116,6 +116,15 @@ public void testDynamicSingleCompileJava(TestContainer container) Assertions.assertEquals(0, execResult.getExitCode()); } + @TestTemplate + public void testDynamicSingleCompileJavaOldVersionCompatible(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob( + basePath + "single_dynamic_java_compile_transform_compatible.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + @TestTemplate public void testDynamicMultipleCompileGroovy(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf new file mode 100644 index 00000000000..f26dd210604 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + } + } + } +} + +transform { + DynamicCompile { + source_table_name = "fake" + result_table_name = "fake1" + compile_language = "JAVA" + compile_pattern = "SOURCE_CODE" + source_code = """ + import org.apache.seatunnel.api.table.catalog.Column; + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.catalog.*; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + + + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + + ArrayList columns = new ArrayList(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "col1", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + return new Column[]{ + destColumn + }; + + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + + Object[] fieldValues = new Object[1]; + fieldValues[0]="test1"; + return fieldValues; + } + """ + + } +} + +sink { + Assert { + source_table_name = "fake1" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = id + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = col1 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "test1" + + } + + ] + } + ] + } + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java index bfae2b8d2a1..7ff88d85d49 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.common.utils.ReflectionUtils; @@ -30,6 +31,7 @@ import org.apache.seatunnel.transform.exception.TransformException; import java.nio.file.Paths; +import java.util.Optional; import static org.apache.seatunnel.transform.dynamiccompile.CompileTransformErrorCode.COMPILE_TRANSFORM_ERROR_CODE; @@ -42,6 +44,8 @@ public class DynamicCompileTransform extends MultipleFieldOutputTransform { private final String sourceCode; + private final boolean compatibilityMode; + private final CompilePattern compilePattern; private AbstractParse DynamicCompileParse; @@ -68,6 +72,9 @@ public DynamicCompileTransform(ReadonlyConfig readonlyConfig, CatalogTable catal readonlyConfig.get( DynamicCompileTransformConfig.ABSOLUTE_PATH))); } + compatibilityMode = + sourceCode.contains( + org.apache.seatunnel.transform.common.SeaTunnelRowAccessor.class.getName()); } @Override @@ -98,14 +105,24 @@ protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) { try { result = ReflectionUtils.invoke( - getCompileLanguageInstance(), getInlineOutputFieldValues, inputRow); - + getCompileLanguageInstance(), + getInlineOutputFieldValues, + getCompatibilityAccessor(inputRow)); } catch (Exception e) { throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, e.getMessage()); } return (Object[]) result; } + private Object getCompatibilityAccessor(SeaTunnelRowAccessor inputRow) { + if (compatibilityMode) { + Optional field = ReflectionUtils.getField(inputRow, "row"); + SeaTunnelRow row = (SeaTunnelRow) field.get(); + return new org.apache.seatunnel.transform.common.SeaTunnelRowAccessor(row); + } + return inputRow; + } + private Object getCompileLanguageInstance() throws InstantiationException, IllegalAccessException { Class compileClass = DynamicCompileParse.parseClassSourceCode(sourceCode);