Skip to content

Commit

Permalink
[Feature][Connector V2][File] Add config of 'file_filter_pattern', wh…
Browse files Browse the repository at this point in the history
…ich used for filtering files.
  • Loading branch information
FlechazoW authored and wentz committed Jul 27, 2023
1 parent 1f94676 commit eec21db
Show file tree
Hide file tree
Showing 27 changed files with 943 additions and 166 deletions.
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/CosFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -247,6 +248,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -245,6 +246,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -225,6 +226,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/OssFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -282,6 +283,10 @@ Reader the sheet of the workbook,Only used when file_format is excel.
```

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/OssJindoFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -248,6 +249,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -299,6 +300,10 @@ Reader the sheet of the workbook,Only used when file_format is excel.
```

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Changelog

### 2.3.0-beta 2022-10-20
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/SftpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### host [string]

Expand Down Expand Up @@ -226,6 +227,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ public class BaseSourceConfig {
.stringType()
.noDefaultValue()
.withDescription("To be read sheet name,only valid for excel files");

public static final Option<String> FILE_FILTER_PATTERN =
Options.key("file_filter_pattern")
.stringType()
.defaultValue("*")
.withDescription(
"File pattern. The connector will filter some files base on the pattern.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
Expand Down Expand Up @@ -74,6 +77,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
protected boolean isKerberosAuthorization = false;

protected Pattern pattern;

@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
Expand Down Expand Up @@ -126,7 +131,7 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString()));
continue;
}
if (fileStatus.isFile()) {
if (fileStatus.isFile() && filterFileByPattern(fileStatus)) {
// filter '_SUCCESS' file
if (!fileStatus.getPath().getName().equals("_SUCCESS")
&& !fileStatus.getPath().getName().startsWith(".")) {
Expand Down Expand Up @@ -166,6 +171,11 @@ public void setPluginConfig(Config pluginConfig) {
if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
readColumns.addAll(pluginConfig.getStringList(BaseSourceConfig.READ_COLUMNS.key()));
}
if (pluginConfig.hasPath(BaseSourceConfig.FILE_FILTER_PATTERN.key())) {
String filterPattern =
pluginConfig.getString(BaseSourceConfig.FILE_FILTER_PATTERN.key());
this.pattern = Pattern.compile(Matcher.quoteReplacement(filterPattern));
}
}

@Override
Expand Down Expand Up @@ -214,4 +224,11 @@ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType sea
// return merge row type
return new SeaTunnelRowType(newFieldNames, newFieldTypes);
}

protected boolean filterFileByPattern(FileStatus fileStatus) {
if (Objects.nonNull(pattern)) {
return pattern.matcher(fileStatus.getPath().getName()).matches();
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,20 @@
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestHelper;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.stream.Stream;

Expand Down Expand Up @@ -87,71 +84,65 @@ public void startUp() throws Exception {
Startables.deepStart(Stream.of(ftpContainer)).join();
log.info("ftp container started");

Path jsonPath = ContainerUtil.getResourcesFile("/json/e2e.json").toPath();
Path textPath = ContainerUtil.getResourcesFile("/text/e2e.txt").toPath();
Path excelPath = ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath();

ftpContainer.copyFileToContainer(
MountableFile.forHostPath(jsonPath),
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json");
ftpContainer.copyFileToContainer(
MountableFile.forHostPath(textPath),
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt");
ftpContainer.copyFileToContainer(
MountableFile.forHostPath(excelPath),
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx");
ContainerUtil.copyFileIntoContainers(
"/json/e2e.json",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/json/e2e.json",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e_filter.json",
ftpContainer);

ContainerUtil.copyFileIntoContainers(
"/text/e2e.txt",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/text/e2e.txt",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e_filter.txt",
ftpContainer);

ContainerUtil.copyFileIntoContainers(
"/excel/e2e.xlsx",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/excel/e2e.xlsx",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
ftpContainer);

ftpContainer.execInContainer("sh", "-c", "chmod -R 777 /home/vsftpd/seatunnel/");
ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp /home/vsftpd/seatunnel/");
}

@TestTemplate
public void testFtpFileReadAndWrite(TestContainer container)
throws IOException, InterruptedException {
TestHelper helper = new TestHelper(container);
// test write ftp excel file
Container.ExecResult excelWriteResult =
container.executeJob("/excel/fake_source_to_ftp_excel.conf");
Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr());
helper.execute("/excel/fake_source_to_ftp_excel.conf");
// test read ftp excel file
Container.ExecResult excelReadResult =
container.executeJob("/excel/ftp_excel_to_assert.conf");
Assertions.assertEquals(0, excelReadResult.getExitCode(), excelReadResult.getStderr());
helper.execute("/excel/ftp_excel_to_assert.conf");
// test read ftp excel file with projection
Container.ExecResult excelProjectionReadResult =
container.executeJob("/excel/ftp_excel_projection_to_assert.conf");
Assertions.assertEquals(
0, excelProjectionReadResult.getExitCode(), excelProjectionReadResult.getStderr());
helper.execute("/excel/ftp_excel_projection_to_assert.conf");
// test read ftp excel file with filter
helper.execute("/excel/ftp_filter_excel_to_assert.conf");
// test write ftp text file
Container.ExecResult textWriteResult =
container.executeJob("/text/fake_to_ftp_file_text.conf");
Assertions.assertEquals(0, textWriteResult.getExitCode());
helper.execute("/text/fake_to_ftp_file_text.conf");
// test read skip header
Container.ExecResult textWriteAndSkipResult =
container.executeJob("/text/ftp_file_text_skip_headers.conf");
Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
helper.execute("/text/ftp_file_text_skip_headers.conf");
// test read ftp text file
Container.ExecResult textReadResult =
container.executeJob("/text/ftp_file_text_to_assert.conf");
Assertions.assertEquals(0, textReadResult.getExitCode());
helper.execute("/text/ftp_file_text_to_assert.conf");
// test read ftp text file with projection
Container.ExecResult textProjectionResult =
container.executeJob("/text/ftp_file_text_projection_to_assert.conf");
Assertions.assertEquals(0, textProjectionResult.getExitCode());
helper.execute("/text/ftp_file_text_projection_to_assert.conf");
// test write ftp json file
Container.ExecResult jsonWriteResult =
container.executeJob("/json/fake_to_ftp_file_json.conf");
Assertions.assertEquals(0, jsonWriteResult.getExitCode());
helper.execute("/json/fake_to_ftp_file_json.conf");
// test read ftp json file
Container.ExecResult jsonReadResult =
container.executeJob("/json/ftp_file_json_to_assert.conf");
Assertions.assertEquals(0, jsonReadResult.getExitCode());
helper.execute("/json/ftp_file_json_to_assert.conf");
// test write ftp parquet file
Container.ExecResult parquetWriteResult =
container.executeJob("/parquet/fake_to_ftp_file_parquet.conf");
Assertions.assertEquals(0, parquetWriteResult.getExitCode());
helper.execute("/parquet/fake_to_ftp_file_parquet.conf");
// test write ftp orc file
Container.ExecResult orcWriteResult =
container.executeJob("/orc/fake_to_ftp_file_orc.conf");
Assertions.assertEquals(0, orcWriteResult.getExitCode());
helper.execute("/orc/fake_to_ftp_file_orc.conf");
}

@AfterAll
Expand Down
Loading

0 comments on commit eec21db

Please sign in to comment.