From eec21db80060ad7b91726e6b6c88ebc7986b88d1 Mon Sep 17 00:00:00 2001 From: FlechazoW Date: Thu, 27 Jul 2023 09:45:18 +0800 Subject: [PATCH] [Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. --- docs/en/connector-v2/source/CosFile.md | 5 + docs/en/connector-v2/source/HdfsFile.md | 5 + docs/en/connector-v2/source/LocalFile.md | 5 + docs/en/connector-v2/source/OssFile.md | 5 + docs/en/connector-v2/source/OssJindoFile.md | 5 + docs/en/connector-v2/source/S3File.md | 5 + docs/en/connector-v2/source/SftpFile.md | 5 + .../file/config/BaseSourceConfig.java | 7 + .../source/reader/AbstractReadStrategy.java | 19 ++- .../file/cos/source/CosFileSourceFactory.java | 1 + .../file/ftp/source/FtpFileSourceFactory.java | 1 + .../hdfs/source/HdfsFileSourceFactory.java | 1 + .../file/oss/source/OssFileSourceFactory.java | 1 + .../local/source/LocalFileSourceFactory.java | 1 + .../file/oss/source/OssFileSourceFactory.java | 1 + .../file/s3/source/S3FileSourceFactory.java | 1 + .../sftp/source/SftpFileSourceFactory.java | 1 + .../e2e/connector/file/ftp/FtpFileIT.java | 93 ++++++------ .../excel/ftp_filter_excel_to_assert.conf | 141 ++++++++++++++++++ .../e2e/connector/file/local/LocalFileIT.java | 134 ++++++++--------- .../excel/local_filter_excel_to_assert.conf | 131 ++++++++++++++++ .../e2e/connector/file/fstp/SftpFileIT.java | 89 +++++------ .../excel/sftp_filter_excel_to_assert.conf | 132 ++++++++++++++++ .../json/sftp_filter_file_json_to_assert.conf | 137 +++++++++++++++++ .../text/sftp_filter_file_text_to_assert.conf | 137 +++++++++++++++++ .../e2e/common/container/TestHelper.java | 40 +++++ .../e2e/common/util/ContainerUtil.java | 6 + 27 files changed, 943 insertions(+), 166 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_filter_file_json_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_filter_file_text_to_assert.conf create mode 100644 seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java diff --git a/docs/en/connector-v2/source/CosFile.md b/docs/en/connector-v2/source/CosFile.md index 18fc0299c9e2..2cf6d6eb9377 100644 --- a/docs/en/connector-v2/source/CosFile.md +++ b/docs/en/connector-v2/source/CosFile.md @@ -55,6 +55,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | schema | config | no | - | | common-options | | no | - | | sheet_name | string | no | - | +| file_filter_pattern | string | no | * | ### path [string] @@ -247,6 +248,10 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format is excel. +### file_filter_pattern [string] + +Filter pattern, which used for filtering files. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index d255f4fd3a7c..60820affade1 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -53,6 +53,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | schema | config | no | - | | common-options | | no | - | | sheet_name | string | no | - | +| file_filter_pattern | string | no | * | ### path [string] @@ -245,6 +246,10 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format is excel. +### file_filter_pattern [string] + +Filter pattern, which used for filtering files. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index d33288b7a57a..8452a9480fe1 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -49,6 +49,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | schema | config | no | - | | common-options | | no | - | | sheet_name | string | no | - | +| file_filter_pattern | string | no | * | ### path [string] @@ -225,6 +226,10 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format is excel. +### file_filter_pattern [string] + +Filter pattern, which used for filtering files. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md index 532b4d03aa79..11abeb8eabfe 100644 --- a/docs/en/connector-v2/source/OssFile.md +++ b/docs/en/connector-v2/source/OssFile.md @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | schema | config | no | - | | common-options | | no | - | | sheet_name | string | no | - | +| file_filter_pattern | string | no | * | ### path [string] @@ -282,6 +283,10 @@ Reader the sheet of the workbook,Only used when file_format is excel. ``` +### file_filter_pattern [string] + +Filter pattern, which used for filtering files. + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md index 3e3649e19b92..5e2d6b42dd49 100644 --- a/docs/en/connector-v2/source/OssJindoFile.md +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | schema | config | no | - | | common-options | | no | - | | sheet_name | string | no | - | +| file_filter_pattern | string | no | * | ### path [string] @@ -248,6 +249,10 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format is excel. +### file_filter_pattern [string] + +Filter pattern, which used for filtering files. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index f58a1a6bc36e..462b17913925 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | schema | config | no | - | | common-options | | no | - | | sheet_name | string | no | - | +| file_filter_pattern | string | no | * | ### path [string] @@ -299,6 +300,10 @@ Reader the sheet of the workbook,Only used when file_format is excel. ``` +### file_filter_pattern [string] + +Filter pattern, which used for filtering files. + ## Changelog ### 2.3.0-beta 2022-10-20 diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index 500ec2af5b57..62442a439450 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -47,6 +47,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | schema | config | no | - | | common-options | | no | - | | sheet_name | string | no | - | +| file_filter_pattern | string | no | * | ### host [string] @@ -226,6 +227,10 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format is excel. +### file_filter_pattern [string] + +Filter pattern, which used for filtering files. + ## Example ```hocon diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java index fa65628bd561..f0db314a0118 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java @@ -112,4 +112,11 @@ public class BaseSourceConfig { .stringType() .noDefaultValue() .withDescription("To be read sheet name,only valid for excel files"); + + public static final Option FILE_FILTER_PATTERN = + Options.key("file_filter_pattern") + .stringType() + .defaultValue("*") + .withDescription( + "File pattern. The connector will filter some files base on the pattern."); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index 54e70662425d..a2e462fba4e3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -43,6 +43,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED; import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS; @@ -74,6 +77,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy { protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue(); protected boolean isKerberosAuthorization = false; + protected Pattern pattern; + @Override public void init(HadoopConf conf) { this.hadoopConf = conf; @@ -126,7 +131,7 @@ public List getFileNamesByPath(HadoopConf hadoopConf, String path) throw fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString())); continue; } - if (fileStatus.isFile()) { + if (fileStatus.isFile() && filterFileByPattern(fileStatus)) { // filter '_SUCCESS' file if (!fileStatus.getPath().getName().equals("_SUCCESS") && !fileStatus.getPath().getName().startsWith(".")) { @@ -166,6 +171,11 @@ public void setPluginConfig(Config pluginConfig) { if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) { readColumns.addAll(pluginConfig.getStringList(BaseSourceConfig.READ_COLUMNS.key())); } + if (pluginConfig.hasPath(BaseSourceConfig.FILE_FILTER_PATTERN.key())) { + String filterPattern = + pluginConfig.getString(BaseSourceConfig.FILE_FILTER_PATTERN.key()); + this.pattern = Pattern.compile(Matcher.quoteReplacement(filterPattern)); + } } @Override @@ -214,4 +224,11 @@ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType sea // return merge row type return new SeaTunnelRowType(newFieldNames, newFieldTypes); } + + protected boolean filterFileByPattern(FileStatus fileStatus) { + if (Objects.nonNull(pattern)) { + return pattern.matcher(fileStatus.getPath().getName()).matches(); + } + return true; + } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java index d0b781f1a144..496e9277f4e3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java @@ -60,6 +60,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATE_FORMAT) .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) + .optional(BaseSourceConfig.FILE_FILTER_PATTERN) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java index d2d11da5b467..4ab637c43484 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java @@ -60,6 +60,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATE_FORMAT) .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) + .optional(BaseSourceConfig.FILE_FILTER_PATTERN) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java index d4c17384904e..c3d406d62c79 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java @@ -57,6 +57,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATE_FORMAT) .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) + .optional(BaseSourceConfig.FILE_FILTER_PATTERN) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index 502567676254..eaea7bccb61e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -60,6 +60,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATE_FORMAT) .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) + .optional(BaseSourceConfig.FILE_FILTER_PATTERN) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java index 4ae2ae3a9b61..03ec8660ce2d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java @@ -56,6 +56,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATE_FORMAT) .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) + .optional(BaseSourceConfig.FILE_FILTER_PATTERN) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index c6a2d7040923..e7d862bd44ad 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -60,6 +60,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATE_FORMAT) .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) + .optional(BaseSourceConfig.FILE_FILTER_PATTERN) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java index 71156a21b66c..a3b48088650c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java @@ -65,6 +65,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATE_FORMAT) .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) + .optional(BaseSourceConfig.FILE_FILTER_PATTERN) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java index 18cda2fbe5ec..e9efe1cdf9b3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java @@ -60,6 +60,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATE_FORMAT) .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) + .optional(BaseSourceConfig.FILE_FILTER_PATTERN) .build(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java index 5fc0e486091c..879a586f1d0d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java @@ -21,23 +21,20 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.container.TestHelper; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; -import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.nio.file.Path; import java.util.Collections; import java.util.stream.Stream; @@ -87,19 +84,33 @@ public void startUp() throws Exception { Startables.deepStart(Stream.of(ftpContainer)).join(); log.info("ftp container started"); - Path jsonPath = ContainerUtil.getResourcesFile("/json/e2e.json").toPath(); - Path textPath = ContainerUtil.getResourcesFile("/text/e2e.txt").toPath(); - Path excelPath = ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath(); - - ftpContainer.copyFileToContainer( - MountableFile.forHostPath(jsonPath), - "/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json"); - ftpContainer.copyFileToContainer( - MountableFile.forHostPath(textPath), - "/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt"); - ftpContainer.copyFileToContainer( - MountableFile.forHostPath(excelPath), - "/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx"); + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", + ftpContainer); + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e_filter.json", + ftpContainer); + + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", + ftpContainer); + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e_filter.txt", + ftpContainer); + + ContainerUtil.copyFileIntoContainers( + "/excel/e2e.xlsx", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", + ftpContainer); + ContainerUtil.copyFileIntoContainers( + "/excel/e2e.xlsx", + "/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", + ftpContainer); + ftpContainer.execInContainer("sh", "-c", "chmod -R 777 /home/vsftpd/seatunnel/"); ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp /home/vsftpd/seatunnel/"); } @@ -107,51 +118,31 @@ public void startUp() throws Exception { @TestTemplate public void testFtpFileReadAndWrite(TestContainer container) throws IOException, InterruptedException { + TestHelper helper = new TestHelper(container); // test write ftp excel file - Container.ExecResult excelWriteResult = - container.executeJob("/excel/fake_source_to_ftp_excel.conf"); - Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr()); + helper.execute("/excel/fake_source_to_ftp_excel.conf"); // test read ftp excel file - Container.ExecResult excelReadResult = - container.executeJob("/excel/ftp_excel_to_assert.conf"); - Assertions.assertEquals(0, excelReadResult.getExitCode(), excelReadResult.getStderr()); + helper.execute("/excel/ftp_excel_to_assert.conf"); // test read ftp excel file with projection - Container.ExecResult excelProjectionReadResult = - container.executeJob("/excel/ftp_excel_projection_to_assert.conf"); - Assertions.assertEquals( - 0, excelProjectionReadResult.getExitCode(), excelProjectionReadResult.getStderr()); + helper.execute("/excel/ftp_excel_projection_to_assert.conf"); + // test read ftp excel file with filter + helper.execute("/excel/ftp_filter_excel_to_assert.conf"); // test write ftp text file - Container.ExecResult textWriteResult = - container.executeJob("/text/fake_to_ftp_file_text.conf"); - Assertions.assertEquals(0, textWriteResult.getExitCode()); + helper.execute("/text/fake_to_ftp_file_text.conf"); // test read skip header - Container.ExecResult textWriteAndSkipResult = - container.executeJob("/text/ftp_file_text_skip_headers.conf"); - Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode()); + helper.execute("/text/ftp_file_text_skip_headers.conf"); // test read ftp text file - Container.ExecResult textReadResult = - container.executeJob("/text/ftp_file_text_to_assert.conf"); - Assertions.assertEquals(0, textReadResult.getExitCode()); + helper.execute("/text/ftp_file_text_to_assert.conf"); // test read ftp text file with projection - Container.ExecResult textProjectionResult = - container.executeJob("/text/ftp_file_text_projection_to_assert.conf"); - Assertions.assertEquals(0, textProjectionResult.getExitCode()); + helper.execute("/text/ftp_file_text_projection_to_assert.conf"); // test write ftp json file - Container.ExecResult jsonWriteResult = - container.executeJob("/json/fake_to_ftp_file_json.conf"); - Assertions.assertEquals(0, jsonWriteResult.getExitCode()); + helper.execute("/json/fake_to_ftp_file_json.conf"); // test read ftp json file - Container.ExecResult jsonReadResult = - container.executeJob("/json/ftp_file_json_to_assert.conf"); - Assertions.assertEquals(0, jsonReadResult.getExitCode()); + helper.execute("/json/ftp_file_json_to_assert.conf"); // test write ftp parquet file - Container.ExecResult parquetWriteResult = - container.executeJob("/parquet/fake_to_ftp_file_parquet.conf"); - Assertions.assertEquals(0, parquetWriteResult.getExitCode()); + helper.execute("/parquet/fake_to_ftp_file_parquet.conf"); // test write ftp orc file - Container.ExecResult orcWriteResult = - container.executeJob("/orc/fake_to_ftp_file_orc.conf"); - Assertions.assertEquals(0, orcWriteResult.getExitCode()); + helper.execute("/orc/fake_to_ftp_file_orc.conf"); } @AfterAll diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf new file mode 100644 index 000000000000..d4fd0727cad7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + path = "/tmp/seatunnel/read/excel" + result_table_name = "ftp" + file_format_type = excel + delimiter = ; + skip_header_row_number = 1 + file_filter_pattern = "e2e_filter.*" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + + +sink { + Assert { + source_table_name = "ftp" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = hobby + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index f5c220deabd2..e2b6949cbe7a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -21,17 +21,14 @@ import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.container.TestContainerId; +import org.apache.seatunnel.e2e.common.container.TestHelper; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.apache.seatunnel.e2e.common.util.ContainerUtil; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; -import org.testcontainers.containers.Container; -import org.testcontainers.utility.MountableFile; import java.io.IOException; -import java.nio.file.Path; @DisabledOnContainer( value = {TestContainerId.SPARK_2_4}, @@ -43,88 +40,85 @@ public class LocalFileIT extends TestSuiteBase { @TestContainerExtension private final ContainerExtendedFactory extendedFactory = container -> { - Path jsonPath = ContainerUtil.getResourcesFile("/json/e2e.json").toPath(); - Path orcPath = ContainerUtil.getResourcesFile("/orc/e2e.orc").toPath(); - Path parquetPath = ContainerUtil.getResourcesFile("/parquet/e2e.parquet").toPath(); - Path textPath = ContainerUtil.getResourcesFile("/text/e2e.txt").toPath(); - Path excelPath = ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath(); - container.copyFileToContainer( - MountableFile.forHostPath(jsonPath), - "/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json"); - container.copyFileToContainer( - MountableFile.forHostPath(orcPath), - "/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc"); - container.copyFileToContainer( - MountableFile.forHostPath(parquetPath), - "/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet"); - container.copyFileToContainer( - MountableFile.forHostPath(textPath), - "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt"); - container.copyFileToContainer( - MountableFile.forHostPath(excelPath), - "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx"); + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", + container); + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e_filter.json", + container); + + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", + container); + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e_filter.txt", + container); + + ContainerUtil.copyFileIntoContainers( + "/excel/e2e.xlsx", + "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", + container); + ContainerUtil.copyFileIntoContainers( + "/excel/e2e.xlsx", + "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", + container); + + ContainerUtil.copyFileIntoContainers( + "/orc/e2e.orc", + "/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc", + container); + ContainerUtil.copyFileIntoContainers( + "/orc/e2e.orc", + "/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e_filter.orc", + container); + + ContainerUtil.copyFileIntoContainers( + "/parquet/e2e.parquet", + "/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet", + container); + ContainerUtil.copyFileIntoContainers( + "/parquet/e2e.parquet", + "/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e_filter.parquet", + container); }; @TestTemplate public void testLocalFileReadAndWrite(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult excelWriteResult = - container.executeJob("/excel/fake_to_local_excel.conf"); - Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr()); - Container.ExecResult excelReadResult = - container.executeJob("/excel/local_excel_to_assert.conf"); - Assertions.assertEquals(0, excelReadResult.getExitCode(), excelReadResult.getStderr()); - Container.ExecResult excelProjectionReadResult = - container.executeJob("/excel/local_excel_projection_to_assert.conf"); - Assertions.assertEquals( - 0, excelProjectionReadResult.getExitCode(), excelProjectionReadResult.getStderr()); + TestHelper helper = new TestHelper(container); + + helper.execute("/excel/fake_to_local_excel.conf"); + helper.execute("/excel/local_excel_to_assert.conf"); + helper.execute("/excel/local_excel_projection_to_assert.conf"); // test write local text file - Container.ExecResult textWriteResult = - container.executeJob("/text/fake_to_local_file_text.conf"); - Assertions.assertEquals(0, textWriteResult.getExitCode()); + helper.execute("/text/fake_to_local_file_text.conf"); // test read skip header - Container.ExecResult textWriteAndSkipResult = - container.executeJob("/text/local_file_text_skip_headers.conf"); - Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode()); + helper.execute("/text/local_file_text_skip_headers.conf"); // test read local text file - Container.ExecResult textReadResult = - container.executeJob("/text/local_file_text_to_assert.conf"); - Assertions.assertEquals(0, textReadResult.getExitCode()); + helper.execute("/text/local_file_text_to_assert.conf"); // test read local text file with projection - Container.ExecResult textProjectionResult = - container.executeJob("/text/local_file_text_projection_to_assert.conf"); - Assertions.assertEquals(0, textProjectionResult.getExitCode()); + helper.execute("/text/local_file_text_projection_to_assert.conf"); // test write local json file - Container.ExecResult jsonWriteResult = - container.executeJob("/json/fake_to_local_file_json.conf"); - Assertions.assertEquals(0, jsonWriteResult.getExitCode()); + helper.execute("/json/fake_to_local_file_json.conf"); // test read local json file - Container.ExecResult jsonReadResult = - container.executeJob("/json/local_file_json_to_assert.conf"); - Assertions.assertEquals(0, jsonReadResult.getExitCode()); + helper.execute("/json/local_file_json_to_assert.conf"); // test write local orc file - Container.ExecResult orcWriteResult = - container.executeJob("/orc/fake_to_local_file_orc.conf"); - Assertions.assertEquals(0, orcWriteResult.getExitCode()); + helper.execute("/orc/fake_to_local_file_orc.conf"); // test read local orc file - Container.ExecResult orcReadResult = - container.executeJob("/orc/local_file_orc_to_assert.conf"); - Assertions.assertEquals(0, orcReadResult.getExitCode()); + helper.execute("/orc/local_file_orc_to_assert.conf"); // test read local orc file with projection - Container.ExecResult orcProjectionResult = - container.executeJob("/orc/local_file_orc_projection_to_assert.conf"); - Assertions.assertEquals(0, orcProjectionResult.getExitCode()); + helper.execute("/orc/local_file_orc_projection_to_assert.conf"); // test write local parquet file - Container.ExecResult parquetWriteResult = - container.executeJob("/parquet/fake_to_local_file_parquet.conf"); - Assertions.assertEquals(0, parquetWriteResult.getExitCode()); + helper.execute("/parquet/fake_to_local_file_parquet.conf"); // test read local parquet file - Container.ExecResult parquetReadResult = - container.executeJob("/parquet/local_file_parquet_to_assert.conf"); - Assertions.assertEquals(0, parquetReadResult.getExitCode()); + helper.execute("/parquet/local_file_parquet_to_assert.conf"); // test read local parquet file with projection - Container.ExecResult parquetProjectionResult = - container.executeJob("/parquet/local_file_parquet_projection_to_assert.conf"); - Assertions.assertEquals(0, parquetProjectionResult.getExitCode()); + helper.execute("/parquet/local_file_parquet_projection_to_assert.conf"); + // test read filtered local file + helper.execute("/excel/local_filter_excel_to_assert.conf"); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf new file mode 100644 index 000000000000..a4e53ef2cec4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/seatunnel/read/excel" + result_table_name = "fake" + file_format_type = excel + delimiter = ; + skip_header_row_number = 1 + file_filter_pattern = "e2e_filter.*" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = hobby + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index 82d1be73db16..82ed88573927 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -21,22 +21,19 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.container.TestContainerId; +import org.apache.seatunnel.e2e.common.container.TestHelper; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; -import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.nio.file.Path; import java.util.Collections; import java.util.stream.Stream; @@ -75,61 +72,65 @@ public void startUp() throws Exception { sftpContainer.start(); Startables.deepStart(Stream.of(sftpContainer)).join(); log.info("Sftp container started"); - Path jsonPath = ContainerUtil.getResourcesFile("/json/e2e.json").toPath(); - Path textPath = ContainerUtil.getResourcesFile("/text/e2e.txt").toPath(); - Path excelPath = ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath(); - sftpContainer.copyFileToContainer( - MountableFile.forHostPath(jsonPath), - "/home/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json"); - sftpContainer.copyFileToContainer( - MountableFile.forHostPath(textPath), - "/home/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt"); - sftpContainer.copyFileToContainer( - MountableFile.forHostPath(excelPath), - "/home/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx"); + + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", + sftpContainer); + ContainerUtil.copyFileIntoContainers( + "/json/e2e.json", + "/home/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e_filter.json", + sftpContainer); + + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", + sftpContainer); + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e_filter.txt", + sftpContainer); + + ContainerUtil.copyFileIntoContainers( + "/excel/e2e.xlsx", + "/home/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", + sftpContainer); + ContainerUtil.copyFileIntoContainers( + "/excel/e2e.xlsx", + "/home/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", + sftpContainer); + sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel /home/seatunnel/tmp/"); } @TestTemplate public void testSftpFileReadAndWrite(TestContainer container) throws IOException, InterruptedException { + TestHelper helper = new TestHelper(container); // test write sftp excel file - Container.ExecResult excelWriteResult = - container.executeJob("/excel/fakesource_to_sftp_excel.conf"); - Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr()); + helper.execute("/excel/fakesource_to_sftp_excel.conf"); // test read sftp excel file - Container.ExecResult excelReadResult = - container.executeJob("/excel/sftp_excel_to_assert.conf"); - Assertions.assertEquals(0, excelReadResult.getExitCode(), excelReadResult.getStderr()); + helper.execute("/excel/sftp_excel_to_assert.conf"); // test read sftp excel file with projection - Container.ExecResult excelProjectionReadResult = - container.executeJob("/excel/sftp_excel_projection_to_assert.conf"); - Assertions.assertEquals( - 0, excelProjectionReadResult.getExitCode(), excelProjectionReadResult.getStderr()); + helper.execute("/excel/sftp_excel_projection_to_assert.conf"); + // test read sftp excel file with filter pattern + helper.execute("/excel/sftp_filter_excel_to_assert.conf"); // test write sftp text file - Container.ExecResult textWriteResult = - container.executeJob("/text/fake_to_sftp_file_text.conf"); - Assertions.assertEquals(0, textWriteResult.getExitCode()); + helper.execute("/text/fake_to_sftp_file_text.conf"); // test read skip header - Container.ExecResult textWriteAndSkipResult = - container.executeJob("/text/sftp_file_text_skip_headers.conf"); - Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode()); + helper.execute("/text/sftp_file_text_skip_headers.conf"); // test read sftp text file - Container.ExecResult textReadResult = - container.executeJob("/text/sftp_file_text_to_assert.conf"); - Assertions.assertEquals(0, textReadResult.getExitCode()); + helper.execute("/text/sftp_file_text_to_assert.conf"); // test read sftp text file with projection - Container.ExecResult textProjectionResult = - container.executeJob("/text/sftp_file_text_projection_to_assert.conf"); - Assertions.assertEquals(0, textProjectionResult.getExitCode()); + helper.execute("/text/sftp_file_text_projection_to_assert.conf"); + // test read and filter text file + helper.execute("/text/sftp_filter_file_text_to_assert.conf"); // test write sftp json file - Container.ExecResult jsonWriteResult = - container.executeJob("/json/fake_to_sftp_file_json.conf"); - Assertions.assertEquals(0, jsonWriteResult.getExitCode()); + helper.execute("/json/fake_to_sftp_file_json.conf"); // test read sftp json file - Container.ExecResult jsonReadResult = - container.executeJob("/json/sftp_file_json_to_assert.conf"); - Assertions.assertEquals(0, jsonReadResult.getExitCode()); + helper.execute("/json/sftp_file_json_to_assert.conf"); + // test read and filter sftp json file + helper.execute("/json/sftp_filter_file_json_to_assert.conf"); } @AfterAll diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf new file mode 100644 index 000000000000..b75e25b1d04e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf @@ -0,0 +1,132 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + SftpFile { + path = "tmp/seatunnel/read/excel" + result_table_name = "sftp" + file_format_type = excel + host = "sftp" + port = 22 + user = seatunnel + password = pass + delimiter = ";" + file_filter_pattern = "e2e_filter.*" + skip_header_row_number = 1 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + Assert { + source_table_name = "sftp" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} + + + + + + + + + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_filter_file_json_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_filter_file_json_to_assert.conf new file mode 100644 index 000000000000..cc941a732467 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_filter_file_json_to_assert.conf @@ -0,0 +1,137 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/json" + file_format_type = "json" + result_table_name = "sftp" + file_filter_pattern = "e2e_filter.*" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + C_MAP = "map" + C_ARRAY = "array" + C_STRING = string + C_BOOLEAN = boolean + C_TINYINT = tinyint + C_SMALLINT = smallint + C_INT = int + C_BIGINT = bigint + C_FLOAT = float + C_DOUBLE = double + C_BYTES = bytes + C_DATE = date + C_DECIMAL = "decimal(38, 18)" + C_TIMESTAMP = timestamp + } + } + } + } +} + +sink { + Assert { + result_table_name = "sftp" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = hobby + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_filter_file_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_filter_file_text_to_assert.conf new file mode 100644 index 000000000000..a79f33c33b01 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_filter_file_text_to_assert.conf @@ -0,0 +1,137 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/text" + file_format_type = "text" + result_table_name = "sftp" + file_filter_pattern = "e2e_filter.*" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + Assert { + source_table_name = "sftp" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = hobby + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java new file mode 100644 index 000000000000..a88723f82012 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.common.container; + +import org.junit.jupiter.api.Assertions; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +public class TestHelper { + private final TestContainer container; + + public TestHelper(TestContainer container) { + this.container = container; + } + + public void execute(String file) throws IOException, InterruptedException { + execute(0, file); + } + + public void execute(int exceptResult, String file) throws IOException, InterruptedException { + Container.ExecResult result = container.executeJob(file); + Assertions.assertEquals(exceptResult, result.getExitCode(), result.getStderr()); + } +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java index 92d6100a7cea..fa5660a17002 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java @@ -246,4 +246,10 @@ public static List discoverTestContainers() { throw new FactoryException("Could not load service provider for containers.", e); } } + + public static void copyFileIntoContainers( + String fileName, String targetPath, GenericContainer container) { + Path path = getResourcesFile(fileName).toPath(); + container.copyFileToContainer(MountableFile.forHostPath(path), targetPath); + } }