From 7a44e95e7754199ae6bb66d6afa693ecfd8e0c41 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Mon, 11 Sep 2023 09:47:54 +0800 Subject: [PATCH 01/42] [Feature] [File Connector] Supports writing column names when the output type is file (CSV) #5443 --- .../seatunnel/file/sink/writer/TextWriteStrategy.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index f309edb70f2..f2ee544fb50 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -133,15 +133,21 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); + fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(System.lineSeparator().getBytes()); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(System.lineSeparator().getBytes()); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(System.lineSeparator().getBytes()); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); From de2859ba0e5b362962b44452e3f09df61d31ddfd Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 13 Sep 2023 10:22:08 +0800 Subject: [PATCH 02/42] [Feature] [File Connector] fix code style and lineSeparator #5443 --- .../file/sink/writer/TextWriteStrategy.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index f2ee544fb50..5659e26bd1d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -133,21 +133,24 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); - fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(System.lineSeparator().getBytes()); + fsDataOutputStream.write( + String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(System.lineSeparator().getBytes()); + fsDataOutputStream.write( + String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(System.lineSeparator().getBytes()); + fsDataOutputStream.write( + String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); From 4acdda8aa932cd10eb06cbda91d37dd676828e20 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 13 Sep 2023 16:08:58 +0800 Subject: [PATCH 03/42] [Feature] [File Connector] add enable_header_write,false:dont write header,true:write header. #5443 --- docs/en/connector-v2/sink/LocalFile.md | 7 ++++- .../file/config/BaseFileSinkConfig.java | 5 ++++ .../seatunnel/file/config/BaseSinkConfig.java | 6 ++++ .../file/sink/writer/TextWriteStrategy.java | 30 +++++++++++++------ 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 8e2c1526e90..0d650cee7c6 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | +| name | type | required | default value | remarks | |----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| | path | string | yes | - | | | custom_filename | boolean | no | false | Whether you need custom the filename | @@ -50,6 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| enable_header_write | boolean | no | false | false:dont write header,true:write header. | ### path [string] @@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### enable_header_write [boolean] + +false:dont write header,true:write header. + ## Example For orc file format simple config diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java index 9a0ac6c678b..112ab9fa1c6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java @@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable { protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD; protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS; protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS; + protected Boolean enableHeaderWriter = false; public BaseFileSinkConfig(@NonNull Config config) { if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) { @@ -99,6 +100,10 @@ public BaseFileSinkConfig(@NonNull Config config) { timeFormat = TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key())); } + + if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) { + enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key()); + } } public BaseFileSinkConfig() {} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 431a5d1daa5..4f4d09d75cd 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -232,4 +232,10 @@ public class BaseSinkConfig { .stringType() .noDefaultValue() .withDescription("To be written sheet name,only valid for excel files"); + + public static final Option ENABLE_HEADER_WRITE = + Options.key("enable_header_write") + .booleanType() + .defaultValue(false) + .withDescription("false:dont write header,true:write header"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index 5659e26bd1d..83121f84c6c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.format.text.TextSerializationSchema; @@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy { private final DateUtils.Formatter dateFormat; private final DateTimeUtils.Formatter dateTimeFormat; private final TimeUtils.Formatter timeFormat; + private final FileFormat fileFormat; + private final Boolean enableHeaderWriter; private SerializationSchema serializationSchema; public TextWriteStrategy(FileSinkConfig fileSinkConfig) { @@ -58,6 +61,8 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) { this.dateFormat = fileSinkConfig.getDateFormat(); this.dateTimeFormat = fileSinkConfig.getDatetimeFormat(); this.timeFormat = fileSinkConfig.getTimeFormat(); + this.fileFormat = fileSinkConfig.getFileFormat(); + this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter(); } @Override @@ -133,24 +138,18 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); - fsDataOutputStream.write( - String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(rowDelimiter.getBytes()); + enableWriteHeader(fsDataOutputStream); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write( - String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(rowDelimiter.getBytes()); + enableWriteHeader(fsDataOutputStream); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write( - String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(rowDelimiter.getBytes()); + enableWriteHeader(fsDataOutputStream); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); @@ -164,4 +163,17 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { } return fsDataOutputStream; } + + private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException { + if (enableHeaderWriter) { + fsDataOutputStream.write( + String.join( + FileFormat.CSV.equals(fileFormat) + ? "," + : fieldDelimiter, + seaTunnelRowType.getFieldNames()) + .getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); + } + } } From b7c90dc1c2545d29c50f5385236b3942279c51aa Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 13 Sep 2023 16:10:54 +0800 Subject: [PATCH 04/42] [Feature] [File Connector] fix code style #5443 --- docs/en/connector-v2/sink/LocalFile.md | 2 +- .../seatunnel/file/sink/writer/TextWriteStrategy.java | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 0d650cee7c6..4180f1ee1f2 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | +| name | type | required | default value | remarks | |----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| | path | string | yes | - | | | custom_filename | boolean | no | false | Whether you need custom the filename | diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index 83121f84c6c..b4b7bdb9558 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -168,9 +168,7 @@ private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOE if (enableHeaderWriter) { fsDataOutputStream.write( String.join( - FileFormat.CSV.equals(fileFormat) - ? "," - : fieldDelimiter, + FileFormat.CSV.equals(fileFormat) ? "," : fieldDelimiter, seaTunnelRowType.getFieldNames()) .getBytes()); fsDataOutputStream.write(rowDelimiter.getBytes()); From d1da88a7ba987ef43fe474fa97d01ebbdbd32a53 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Thu, 14 Sep 2023 17:41:05 +0800 Subject: [PATCH 05/42] [Feature] [File Connector] add enable_header_write explain #5443 --- docs/en/connector-v2/sink/LocalFile.md | 44 +++++++++++++------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 4180f1ee1f2..d55c59992d5 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,27 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| -| path | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| enable_header_write | boolean | no | false | false:dont write header,true:write header. | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------| +| path | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:dont write header,true:write header. | ### path [string] @@ -169,7 +169,7 @@ Writer the sheet of the workbook ### enable_header_write [boolean] -false:dont write header,true:write header. +Only used when file_format_type is text,csv.false:dont write header,true:write header. ## Example From dc26b7314f52850414647950eae0bfd6fc77c253 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 15 Sep 2023 09:25:16 +0800 Subject: [PATCH 06/42] [Feature] [File Connector]fix code style #5443 --- docs/en/connector-v2/sink/LocalFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index d55c59992d5..8fe57d1b9ba 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | +| name | type | required | default value | remarks | |----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------| | path | string | yes | - | | | custom_filename | boolean | no | false | Whether you need custom the filename | From 6da518a03ba0aa580b3bcea9c86617b7ecd2726b Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 15 Sep 2023 11:52:05 +0800 Subject: [PATCH 07/42] Update docs/en/connector-v2/sink/LocalFile.md --- docs/en/connector-v2/sink/LocalFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 8fe57d1b9ba..1b2ac24bf1f 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -50,7 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:dont write header,true:write header. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:don't write header,true:write header. | ### path [string] From 98d257166533e57cd2245237bb9fed464e60848e Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 15 Sep 2023 11:52:11 +0800 Subject: [PATCH 08/42] Update docs/en/connector-v2/sink/LocalFile.md --- docs/en/connector-v2/sink/LocalFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 1b2ac24bf1f..037db2feeee 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -169,7 +169,7 @@ Writer the sheet of the workbook ### enable_header_write [boolean] -Only used when file_format_type is text,csv.false:dont write header,true:write header. +Only used when file_format_type is text,csv.false:don't write header,true:write header. ## Example From 17e67107ca44e2a8439e01cfb16690a299b7365a Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 15 Sep 2023 14:11:35 +0800 Subject: [PATCH 09/42] [Feature] [File Connector]fix code style #5443 --- docs/en/connector-v2/sink/LocalFile.md | 40 +++++++++++++------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 037db2feeee..c8ea48d3f1c 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,26 +30,26 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------| -| path | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | | enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:don't write header,true:write header. | ### path [string] From 6846e376fd98a5224bbbbda51b21e2be5843d77b Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Tue, 19 Sep 2023 10:27:54 +0800 Subject: [PATCH 10/42] [Feature] [File Connector]add junit test #5443 --- .../seatunnel/engine/e2e/TextHeaderT.java | 166 ++++++++++++++++++ .../batch_fakesource_to_file_header.conf | 30 ++++ 2 files changed, 196 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java new file mode 100644 index 00000000000..86163b7a68b --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair; + +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import scala.Tuple3; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * Cluster fault tolerance test. Test the job recovery capability and data consistency assurance + * capability in case of cluster node failure + */ +@Slf4j +public class TextHeaderT { + + private String FILE_FORMAT_TYPE = "file_format_type"; + private String ENABLE_HEADER_WRITE = "enable_header_write"; + + @Test + public void testEnableWriteHeader() { + List lists = new ArrayList<>(); + lists.add(new Tuple3<>("text", "true", 2)); + lists.add(new Tuple3<>("text", "false", 1)); + lists.add(new Tuple3<>("csv", "true", 2)); + lists.add(new Tuple3<>("csv", "false", 1)); + lists.forEach( + t -> { + try { + enableWriteHeader( + t._1().toString(), + t._2().toString(), + Integer.parseInt(t._3().toString())); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + public void enableWriteHeader(String file_format_type, String headerWrite, int lineNumber) + throws ExecutionException, InterruptedException { + String testClusterName = "ClusterFaultToleranceIT_EnableWriteHeaderNode"; + HazelcastInstanceImpl node1 = null; + SeaTunnelClient engineClient = null; + + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig + .getHazelcastConfig() + .setClusterName(TestUtils.getClusterName(testClusterName)); + + try { + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + // waiting all node added to cluster + HazelcastInstanceImpl finalNode = node1; + Awaitility.await() + .atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 1, finalNode.getCluster().getMembers().size())); + + Common.setDeployMode(DeployMode.CLIENT); + ImmutablePair testResources = + createTestResources(headerWrite, file_format_type); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(headerWrite); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); + engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(testResources.getRight(), jobConfig); + ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + + CompletableFuture objectCompletableFuture = + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Thread.sleep(2000); + Assertions.assertTrue( + objectCompletableFuture.isDone() + && JobStatus.FINISHED.equals( + objectCompletableFuture.get())); + }); + + Long fileLineNumberFromDir = + FileUtils.getFileLineNumberFromDir(testResources.getLeft()); + Assertions.assertEquals(lineNumber, fileLineNumberFromDir); + log.info("========================clean test resource===================="); + } finally { + if (engineClient != null) { + engineClient.shutdown(); + } + if (node1 != null) { + node1.shutdown(); + } + } + } + + private ImmutablePair createTestResources( + @NonNull String headerWrite, @NonNull String formatType) { + Map valueMap = new HashMap<>(); + valueMap.put(ENABLE_HEADER_WRITE, headerWrite); + valueMap.put(FILE_FORMAT_TYPE, formatType); + String targetDir = "/tmp/text"; + targetDir = targetDir.replace("/", File.separator); + // clear target dir before test + FileUtils.createNewDir(targetDir); + String targetConfigFilePath = + File.separator + + "tmp" + + File.separator + + "test_conf" + + File.separator + + headerWrite + + ".conf"; + TestUtils.createTestConfigFileFromTemplate( + "batch_fakesource_to_file_header.conf", valueMap, targetConfigFilePath); + return new ImmutablePair<>(targetDir, targetConfigFilePath); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf new file mode 100644 index 00000000000..561afab63ac --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf @@ -0,0 +1,30 @@ +# Set the basic configuration of the task to be performed +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +# Create a source to connect to Mongodb +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +sink { + +LocalFile { + path = "/tmp/text" + file_format_type="${file_format_type}" + enable_header_write="${enable_header_write}" +} +} From 03a24a90f2e7417c2c010f26cfe497e3bfb380d3 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 20 Sep 2023 14:44:24 +0800 Subject: [PATCH 11/42] [Feature] [File Connector]add license header: #5443 --- .../batch_fakesource_to_file_header.conf | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf index 561afab63ac..96ec46dc2ee 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf @@ -1,4 +1,23 @@ -# Set the basic configuration of the task to be performed +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + env { execution.parallelism = 1 job.mode = "BATCH" From 1ffa54f1be0c28df31b969bb2624cdff1b1911bb Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Mon, 11 Sep 2023 09:47:54 +0800 Subject: [PATCH 12/42] [Feature] [File Connector] Supports writing column names when the output type is file (CSV) #5443 --- .../seatunnel/file/sink/writer/TextWriteStrategy.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index f309edb70f2..f2ee544fb50 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -133,15 +133,21 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); + fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(System.lineSeparator().getBytes()); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(System.lineSeparator().getBytes()); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(System.lineSeparator().getBytes()); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); From cbc07f6ea9d0a499a13d4ce1fc324d4cb1aaefba Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 13 Sep 2023 10:22:08 +0800 Subject: [PATCH 13/42] [Feature] [File Connector] fix code style and lineSeparator #5443 --- .../file/sink/writer/TextWriteStrategy.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index f2ee544fb50..5659e26bd1d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -133,21 +133,24 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); - fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(System.lineSeparator().getBytes()); + fsDataOutputStream.write( + String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(System.lineSeparator().getBytes()); + fsDataOutputStream.write( + String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(System.lineSeparator().getBytes()); + fsDataOutputStream.write( + String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); From 2b1e15cc7bafa0f7ad3648c9b4b9a517d663b4c2 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 13 Sep 2023 16:08:58 +0800 Subject: [PATCH 14/42] [Feature] [File Connector] add enable_header_write,false:dont write header,true:write header. #5443 --- docs/en/connector-v2/sink/LocalFile.md | 7 ++++- .../file/config/BaseFileSinkConfig.java | 5 ++++ .../seatunnel/file/config/BaseSinkConfig.java | 6 ++++ .../file/sink/writer/TextWriteStrategy.java | 30 +++++++++++++------ 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 8e2c1526e90..0d650cee7c6 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | +| name | type | required | default value | remarks | |----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| | path | string | yes | - | | | custom_filename | boolean | no | false | Whether you need custom the filename | @@ -50,6 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| enable_header_write | boolean | no | false | false:dont write header,true:write header. | ### path [string] @@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### enable_header_write [boolean] + +false:dont write header,true:write header. + ## Example For orc file format simple config diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java index 9a0ac6c678b..112ab9fa1c6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java @@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable { protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD; protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS; protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS; + protected Boolean enableHeaderWriter = false; public BaseFileSinkConfig(@NonNull Config config) { if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) { @@ -99,6 +100,10 @@ public BaseFileSinkConfig(@NonNull Config config) { timeFormat = TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key())); } + + if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) { + enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key()); + } } public BaseFileSinkConfig() {} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 431a5d1daa5..4f4d09d75cd 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -232,4 +232,10 @@ public class BaseSinkConfig { .stringType() .noDefaultValue() .withDescription("To be written sheet name,only valid for excel files"); + + public static final Option ENABLE_HEADER_WRITE = + Options.key("enable_header_write") + .booleanType() + .defaultValue(false) + .withDescription("false:dont write header,true:write header"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index 5659e26bd1d..83121f84c6c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.format.text.TextSerializationSchema; @@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy { private final DateUtils.Formatter dateFormat; private final DateTimeUtils.Formatter dateTimeFormat; private final TimeUtils.Formatter timeFormat; + private final FileFormat fileFormat; + private final Boolean enableHeaderWriter; private SerializationSchema serializationSchema; public TextWriteStrategy(FileSinkConfig fileSinkConfig) { @@ -58,6 +61,8 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) { this.dateFormat = fileSinkConfig.getDateFormat(); this.dateTimeFormat = fileSinkConfig.getDatetimeFormat(); this.timeFormat = fileSinkConfig.getTimeFormat(); + this.fileFormat = fileSinkConfig.getFileFormat(); + this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter(); } @Override @@ -133,24 +138,18 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); - fsDataOutputStream.write( - String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(rowDelimiter.getBytes()); + enableWriteHeader(fsDataOutputStream); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write( - String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(rowDelimiter.getBytes()); + enableWriteHeader(fsDataOutputStream); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write( - String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(rowDelimiter.getBytes()); + enableWriteHeader(fsDataOutputStream); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); @@ -164,4 +163,17 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { } return fsDataOutputStream; } + + private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException { + if (enableHeaderWriter) { + fsDataOutputStream.write( + String.join( + FileFormat.CSV.equals(fileFormat) + ? "," + : fieldDelimiter, + seaTunnelRowType.getFieldNames()) + .getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); + } + } } From ce3fdb1a23101d9080a301ae1c45f29a802f57d4 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 13 Sep 2023 16:10:54 +0800 Subject: [PATCH 15/42] [Feature] [File Connector] fix code style #5443 --- docs/en/connector-v2/sink/LocalFile.md | 2 +- .../seatunnel/file/sink/writer/TextWriteStrategy.java | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 0d650cee7c6..4180f1ee1f2 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | +| name | type | required | default value | remarks | |----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| | path | string | yes | - | | | custom_filename | boolean | no | false | Whether you need custom the filename | diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index 83121f84c6c..b4b7bdb9558 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -168,9 +168,7 @@ private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOE if (enableHeaderWriter) { fsDataOutputStream.write( String.join( - FileFormat.CSV.equals(fileFormat) - ? "," - : fieldDelimiter, + FileFormat.CSV.equals(fileFormat) ? "," : fieldDelimiter, seaTunnelRowType.getFieldNames()) .getBytes()); fsDataOutputStream.write(rowDelimiter.getBytes()); From 8388a3195b053490aa21e13dd4143bd9f355b2ff Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Thu, 14 Sep 2023 17:41:05 +0800 Subject: [PATCH 16/42] [Feature] [File Connector] add enable_header_write explain #5443 --- docs/en/connector-v2/sink/LocalFile.md | 44 +++++++++++++------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 4180f1ee1f2..d55c59992d5 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,27 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| -| path | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| enable_header_write | boolean | no | false | false:dont write header,true:write header. | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------| +| path | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:dont write header,true:write header. | ### path [string] @@ -169,7 +169,7 @@ Writer the sheet of the workbook ### enable_header_write [boolean] -false:dont write header,true:write header. +Only used when file_format_type is text,csv.false:dont write header,true:write header. ## Example From dc8ce8a439dc1f250be9f75dc418f74570107f52 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 15 Sep 2023 09:25:16 +0800 Subject: [PATCH 17/42] [Feature] [File Connector]fix code style #5443 --- docs/en/connector-v2/sink/LocalFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index d55c59992d5..8fe57d1b9ba 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | +| name | type | required | default value | remarks | |----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------| | path | string | yes | - | | | custom_filename | boolean | no | false | Whether you need custom the filename | From 3271580ece4ebd76bb2672ef437e625dec3a5f7d Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 15 Sep 2023 11:52:05 +0800 Subject: [PATCH 18/42] Update docs/en/connector-v2/sink/LocalFile.md --- docs/en/connector-v2/sink/LocalFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 8fe57d1b9ba..1b2ac24bf1f 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -50,7 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:dont write header,true:write header. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:don't write header,true:write header. | ### path [string] From d912a39e3c63975f3fcf6fd51b7bd963ca499485 Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 15 Sep 2023 11:52:11 +0800 Subject: [PATCH 19/42] Update docs/en/connector-v2/sink/LocalFile.md --- docs/en/connector-v2/sink/LocalFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 1b2ac24bf1f..037db2feeee 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -169,7 +169,7 @@ Writer the sheet of the workbook ### enable_header_write [boolean] -Only used when file_format_type is text,csv.false:dont write header,true:write header. +Only used when file_format_type is text,csv.false:don't write header,true:write header. ## Example From 8ae9ca02acbc2450ee01abdab54864982cd0bdd6 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 15 Sep 2023 14:11:35 +0800 Subject: [PATCH 20/42] [Feature] [File Connector]fix code style #5443 --- docs/en/connector-v2/sink/LocalFile.md | 40 +++++++++++++------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 037db2feeee..c8ea48d3f1c 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,26 +30,26 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------| -| path | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | | enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:don't write header,true:write header. | ### path [string] From f899f6bb5720f28bbfdc50c796d9164cd7b31a51 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Tue, 19 Sep 2023 10:27:54 +0800 Subject: [PATCH 21/42] [Feature] [File Connector]add junit test #5443 --- .../seatunnel/engine/e2e/TextHeaderT.java | 166 ++++++++++++++++++ .../batch_fakesource_to_file_header.conf | 30 ++++ 2 files changed, 196 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java new file mode 100644 index 00000000000..86163b7a68b --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair; + +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import scala.Tuple3; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * Cluster fault tolerance test. Test the job recovery capability and data consistency assurance + * capability in case of cluster node failure + */ +@Slf4j +public class TextHeaderT { + + private String FILE_FORMAT_TYPE = "file_format_type"; + private String ENABLE_HEADER_WRITE = "enable_header_write"; + + @Test + public void testEnableWriteHeader() { + List lists = new ArrayList<>(); + lists.add(new Tuple3<>("text", "true", 2)); + lists.add(new Tuple3<>("text", "false", 1)); + lists.add(new Tuple3<>("csv", "true", 2)); + lists.add(new Tuple3<>("csv", "false", 1)); + lists.forEach( + t -> { + try { + enableWriteHeader( + t._1().toString(), + t._2().toString(), + Integer.parseInt(t._3().toString())); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + public void enableWriteHeader(String file_format_type, String headerWrite, int lineNumber) + throws ExecutionException, InterruptedException { + String testClusterName = "ClusterFaultToleranceIT_EnableWriteHeaderNode"; + HazelcastInstanceImpl node1 = null; + SeaTunnelClient engineClient = null; + + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig + .getHazelcastConfig() + .setClusterName(TestUtils.getClusterName(testClusterName)); + + try { + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + // waiting all node added to cluster + HazelcastInstanceImpl finalNode = node1; + Awaitility.await() + .atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 1, finalNode.getCluster().getMembers().size())); + + Common.setDeployMode(DeployMode.CLIENT); + ImmutablePair testResources = + createTestResources(headerWrite, file_format_type); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(headerWrite); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); + engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(testResources.getRight(), jobConfig); + ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + + CompletableFuture objectCompletableFuture = + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Thread.sleep(2000); + Assertions.assertTrue( + objectCompletableFuture.isDone() + && JobStatus.FINISHED.equals( + objectCompletableFuture.get())); + }); + + Long fileLineNumberFromDir = + FileUtils.getFileLineNumberFromDir(testResources.getLeft()); + Assertions.assertEquals(lineNumber, fileLineNumberFromDir); + log.info("========================clean test resource===================="); + } finally { + if (engineClient != null) { + engineClient.shutdown(); + } + if (node1 != null) { + node1.shutdown(); + } + } + } + + private ImmutablePair createTestResources( + @NonNull String headerWrite, @NonNull String formatType) { + Map valueMap = new HashMap<>(); + valueMap.put(ENABLE_HEADER_WRITE, headerWrite); + valueMap.put(FILE_FORMAT_TYPE, formatType); + String targetDir = "/tmp/text"; + targetDir = targetDir.replace("/", File.separator); + // clear target dir before test + FileUtils.createNewDir(targetDir); + String targetConfigFilePath = + File.separator + + "tmp" + + File.separator + + "test_conf" + + File.separator + + headerWrite + + ".conf"; + TestUtils.createTestConfigFileFromTemplate( + "batch_fakesource_to_file_header.conf", valueMap, targetConfigFilePath); + return new ImmutablePair<>(targetDir, targetConfigFilePath); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf new file mode 100644 index 00000000000..561afab63ac --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf @@ -0,0 +1,30 @@ +# Set the basic configuration of the task to be performed +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +# Create a source to connect to Mongodb +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +sink { + +LocalFile { + path = "/tmp/text" + file_format_type="${file_format_type}" + enable_header_write="${enable_header_write}" +} +} From 52a3f8dfe6abc0b1ca2d9e1dbb359e26c64f7a7d Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 20 Sep 2023 14:44:24 +0800 Subject: [PATCH 22/42] [Feature] [File Connector]add license header: #5443 --- .../batch_fakesource_to_file_header.conf | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf index 561afab63ac..96ec46dc2ee 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf @@ -1,4 +1,23 @@ -# Set the basic configuration of the task to be performed +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + env { execution.parallelism = 1 job.mode = "BATCH" From 921e7d87d805d996958079572c19f603ce96ddb8 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Thu, 21 Sep 2023 16:45:47 +0800 Subject: [PATCH 23/42] [Feature] [File Connector]add junit: #5443 --- .../seatunnel/engine/e2e/TextHeaderT.java | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java index 86163b7a68b..ed8696502e8 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; @@ -28,6 +29,7 @@ import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.apache.seatunnel.format.text.constant.TextFormatConstant; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; @@ -41,10 +43,7 @@ import scala.Tuple3; import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -62,24 +61,21 @@ public class TextHeaderT { @Test public void testEnableWriteHeader() { List lists = new ArrayList<>(); - lists.add(new Tuple3<>("text", "true", 2)); - lists.add(new Tuple3<>("text", "false", 1)); - lists.add(new Tuple3<>("csv", "true", 2)); - lists.add(new Tuple3<>("csv", "false", 1)); + lists.add(new Tuple3<>("text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(new Tuple3<>("text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(new Tuple3<>("csv", "true", "name,age")); + lists.add(new Tuple3<>("csv", "false", "name,age")); lists.forEach( t -> { try { - enableWriteHeader( - t._1().toString(), - t._2().toString(), - Integer.parseInt(t._3().toString())); + enableWriteHeader(t._1().toString(), t._2().toString(), t._3().toString()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } }); } - public void enableWriteHeader(String file_format_type, String headerWrite, int lineNumber) + public void enableWriteHeader(String file_format_type, String headerWrite, String headerContent) throws ExecutionException, InterruptedException { String testClusterName = "ClusterFaultToleranceIT_EnableWriteHeaderNode"; HazelcastInstanceImpl node1 = null; @@ -127,10 +123,17 @@ public void enableWriteHeader(String file_format_type, String headerWrite, int l && JobStatus.FINISHED.equals( objectCompletableFuture.get())); }); - - Long fileLineNumberFromDir = - FileUtils.getFileLineNumberFromDir(testResources.getLeft()); - Assertions.assertEquals(lineNumber, fileLineNumberFromDir); + File file = new File(testResources.getLeft()); + for (File targetFile : Objects.requireNonNull(file.listFiles())) { + String[] texts = + FileUtils.readFileToStr(targetFile.toPath()) + .split(BaseSinkConfig.ROW_DELIMITER.defaultValue()); + if (headerWrite.equals("true")) { + Assertions.assertEquals(headerContent, texts[0]); + } else { + Assertions.assertNotEquals(headerContent, texts[0]); + } + } log.info("========================clean test resource===================="); } finally { if (engineClient != null) { From 4f66bad238f11eaf8d7b955a4a56e18f9cf518e3 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Thu, 21 Sep 2023 16:57:38 +0800 Subject: [PATCH 24/42] [Feature] [File Connector]add junit: #5443 --- .../engine/e2e/{TextHeaderT.java => TextHeaderIT.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/{TextHeaderT.java => TextHeaderIT.java} (99%) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java similarity index 99% rename from seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java index ed8696502e8..a9a9cb8e2b5 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java @@ -53,7 +53,7 @@ * capability in case of cluster node failure */ @Slf4j -public class TextHeaderT { +public class TextHeaderIT { private String FILE_FORMAT_TYPE = "file_format_type"; private String ENABLE_HEADER_WRITE = "enable_header_write"; From ae1e34725b88b6c13c2742604efa99c1f33e5bf4 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 22 Sep 2023 14:04:07 +0800 Subject: [PATCH 25/42] [Feature] [File Connector]remove scala: #5443 --- .../seatunnel/engine/e2e/TextHeaderIT.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java index a9a9cb8e2b5..b244a0bb2f8 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java @@ -38,12 +38,15 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import com.hazelcast.jet.datamodel.Tuple3; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; -import scala.Tuple3; import java.io.File; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -61,14 +64,14 @@ public class TextHeaderIT { @Test public void testEnableWriteHeader() { List lists = new ArrayList<>(); - lists.add(new Tuple3<>("text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age")); - lists.add(new Tuple3<>("text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age")); - lists.add(new Tuple3<>("csv", "true", "name,age")); - lists.add(new Tuple3<>("csv", "false", "name,age")); + lists.add(Tuple3.tuple3("text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(Tuple3.tuple3("text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(Tuple3.tuple3("csv", "true", "name,age")); + lists.add(Tuple3.tuple3("csv", "false", "name,age")); lists.forEach( t -> { try { - enableWriteHeader(t._1().toString(), t._2().toString(), t._3().toString()); + enableWriteHeader(t.toString(), t.toString(), t.toString()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } @@ -124,7 +127,7 @@ public void enableWriteHeader(String file_format_type, String headerWrite, Strin objectCompletableFuture.get())); }); File file = new File(testResources.getLeft()); - for (File targetFile : Objects.requireNonNull(file.listFiles())) { + for (File targetFile : file.listFiles()) { String[] texts = FileUtils.readFileToStr(targetFile.toPath()) .split(BaseSinkConfig.ROW_DELIMITER.defaultValue()); From 03c369f2426f915eb3e01b4fa2ead2a3e7da53d4 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 22 Sep 2023 14:37:23 +0800 Subject: [PATCH 26/42] [Feature] [File Connector]modify md style: #5443 --- docs/en/connector-v2/sink/LocalFile.md | 42 +++++++++++++------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index c8ea48d3f1c..90e80c6c372 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,27 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:don't write header,true:write header. | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | ### path [string] From bf7df66fd9f6d04c0a6072a46d50ec5bdf72db31 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Mon, 11 Sep 2023 09:47:54 +0800 Subject: [PATCH 27/42] [Feature] [File Connector] Supports writing column names when the output type is file (CSV) #5443 --- .../seatunnel/file/sink/writer/TextWriteStrategy.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index f309edb70f2..f2ee544fb50 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -133,15 +133,21 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); + fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(System.lineSeparator().getBytes()); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(System.lineSeparator().getBytes()); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(System.lineSeparator().getBytes()); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); From 301058e34a7b8e22f7d8564dcbedf4999136c1e6 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 13 Sep 2023 10:22:08 +0800 Subject: [PATCH 28/42] [Feature] [File Connector] fix code style and lineSeparator #5443 --- .../file/sink/writer/TextWriteStrategy.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index f2ee544fb50..5659e26bd1d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -133,21 +133,24 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); - fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(System.lineSeparator().getBytes()); + fsDataOutputStream.write( + String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(System.lineSeparator().getBytes()); + fsDataOutputStream.write( + String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write(String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(System.lineSeparator().getBytes()); + fsDataOutputStream.write( + String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); From 147302ae5abcabc437f6e4c224230818a9d157b6 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 13 Sep 2023 16:08:58 +0800 Subject: [PATCH 29/42] [Feature] [File Connector] add enable_header_write,false:dont write header,true:write header. #5443 --- docs/en/connector-v2/sink/LocalFile.md | 7 ++++- .../file/config/BaseFileSinkConfig.java | 5 ++++ .../seatunnel/file/config/BaseSinkConfig.java | 6 ++++ .../file/sink/writer/TextWriteStrategy.java | 30 +++++++++++++------ 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 8e2c1526e90..0d650cee7c6 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | +| name | type | required | default value | remarks | |----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| | path | string | yes | - | | | custom_filename | boolean | no | false | Whether you need custom the filename | @@ -50,6 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| enable_header_write | boolean | no | false | false:dont write header,true:write header. | ### path [string] @@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### enable_header_write [boolean] + +false:dont write header,true:write header. + ## Example For orc file format simple config diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java index 9a0ac6c678b..112ab9fa1c6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java @@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable { protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD; protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS; protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS; + protected Boolean enableHeaderWriter = false; public BaseFileSinkConfig(@NonNull Config config) { if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) { @@ -99,6 +100,10 @@ public BaseFileSinkConfig(@NonNull Config config) { timeFormat = TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key())); } + + if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) { + enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key()); + } } public BaseFileSinkConfig() {} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 431a5d1daa5..4f4d09d75cd 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -232,4 +232,10 @@ public class BaseSinkConfig { .stringType() .noDefaultValue() .withDescription("To be written sheet name,only valid for excel files"); + + public static final Option ENABLE_HEADER_WRITE = + Options.key("enable_header_write") + .booleanType() + .defaultValue(false) + .withDescription("false:dont write header,true:write header"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index 5659e26bd1d..83121f84c6c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.format.text.TextSerializationSchema; @@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy { private final DateUtils.Formatter dateFormat; private final DateTimeUtils.Formatter dateTimeFormat; private final TimeUtils.Formatter timeFormat; + private final FileFormat fileFormat; + private final Boolean enableHeaderWriter; private SerializationSchema serializationSchema; public TextWriteStrategy(FileSinkConfig fileSinkConfig) { @@ -58,6 +61,8 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) { this.dateFormat = fileSinkConfig.getDateFormat(); this.dateTimeFormat = fileSinkConfig.getDatetimeFormat(); this.timeFormat = fileSinkConfig.getTimeFormat(); + this.fileFormat = fileSinkConfig.getFileFormat(); + this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter(); } @Override @@ -133,24 +138,18 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); - fsDataOutputStream.write( - String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(rowDelimiter.getBytes()); + enableWriteHeader(fsDataOutputStream); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write( - String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(rowDelimiter.getBytes()); + enableWriteHeader(fsDataOutputStream); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - fsDataOutputStream.write( - String.join(",", seaTunnelRowType.getFieldNames()).getBytes()); - fsDataOutputStream.write(rowDelimiter.getBytes()); + enableWriteHeader(fsDataOutputStream); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); @@ -164,4 +163,17 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { } return fsDataOutputStream; } + + private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException { + if (enableHeaderWriter) { + fsDataOutputStream.write( + String.join( + FileFormat.CSV.equals(fileFormat) + ? "," + : fieldDelimiter, + seaTunnelRowType.getFieldNames()) + .getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); + } + } } From 9f7ea83267fb18a2632f679ed5b2abfdb1a6fad2 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 13 Sep 2023 16:10:54 +0800 Subject: [PATCH 30/42] [Feature] [File Connector] fix code style #5443 --- docs/en/connector-v2/sink/LocalFile.md | 2 +- .../seatunnel/file/sink/writer/TextWriteStrategy.java | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 0d650cee7c6..4180f1ee1f2 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | +| name | type | required | default value | remarks | |----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| | path | string | yes | - | | | custom_filename | boolean | no | false | Whether you need custom the filename | diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index 83121f84c6c..b4b7bdb9558 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -168,9 +168,7 @@ private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOE if (enableHeaderWriter) { fsDataOutputStream.write( String.join( - FileFormat.CSV.equals(fileFormat) - ? "," - : fieldDelimiter, + FileFormat.CSV.equals(fileFormat) ? "," : fieldDelimiter, seaTunnelRowType.getFieldNames()) .getBytes()); fsDataOutputStream.write(rowDelimiter.getBytes()); From afb7ce543551b1a5482107de483930d2192ab9ac Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Thu, 14 Sep 2023 17:41:05 +0800 Subject: [PATCH 31/42] [Feature] [File Connector] add enable_header_write explain #5443 --- docs/en/connector-v2/sink/LocalFile.md | 44 +++++++++++++------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 4180f1ee1f2..d55c59992d5 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,27 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| -| path | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| enable_header_write | boolean | no | false | false:dont write header,true:write header. | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------| +| path | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:dont write header,true:write header. | ### path [string] @@ -169,7 +169,7 @@ Writer the sheet of the workbook ### enable_header_write [boolean] -false:dont write header,true:write header. +Only used when file_format_type is text,csv.false:dont write header,true:write header. ## Example From c9b7b3854c8fbc9b9736c741901ddc7f10978b47 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 15 Sep 2023 09:25:16 +0800 Subject: [PATCH 32/42] [Feature] [File Connector]fix code style #5443 --- docs/en/connector-v2/sink/LocalFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index d55c59992d5..8fe57d1b9ba 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | +| name | type | required | default value | remarks | |----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------| | path | string | yes | - | | | custom_filename | boolean | no | false | Whether you need custom the filename | From e0885bb8982b8e3d77911aca489948e9ae58ca3a Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 15 Sep 2023 11:52:05 +0800 Subject: [PATCH 33/42] Update docs/en/connector-v2/sink/LocalFile.md --- docs/en/connector-v2/sink/LocalFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 8fe57d1b9ba..1b2ac24bf1f 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -50,7 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:dont write header,true:write header. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:don't write header,true:write header. | ### path [string] From 0f9987f3bfb6cd976300e19e2c6fc108044ad458 Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 15 Sep 2023 11:52:11 +0800 Subject: [PATCH 34/42] Update docs/en/connector-v2/sink/LocalFile.md --- docs/en/connector-v2/sink/LocalFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 1b2ac24bf1f..037db2feeee 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -169,7 +169,7 @@ Writer the sheet of the workbook ### enable_header_write [boolean] -Only used when file_format_type is text,csv.false:dont write header,true:write header. +Only used when file_format_type is text,csv.false:don't write header,true:write header. ## Example From 1f37a645219cc5bc3247661e3c1503bdfb6ce1c8 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 15 Sep 2023 14:11:35 +0800 Subject: [PATCH 35/42] [Feature] [File Connector]fix code style #5443 --- docs/en/connector-v2/sink/LocalFile.md | 40 +++++++++++++------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 037db2feeee..c8ea48d3f1c 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,26 +30,26 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------| -| path | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | | enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:don't write header,true:write header. | ### path [string] From 57612443a237141ff6dbfa5b5706d7f19b0f4e64 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Tue, 19 Sep 2023 10:27:54 +0800 Subject: [PATCH 36/42] [Feature] [File Connector]add junit test #5443 --- .../seatunnel/engine/e2e/TextHeaderT.java | 166 ++++++++++++++++++ .../batch_fakesource_to_file_header.conf | 30 ++++ 2 files changed, 196 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java new file mode 100644 index 00000000000..86163b7a68b --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair; + +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import scala.Tuple3; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * Cluster fault tolerance test. Test the job recovery capability and data consistency assurance + * capability in case of cluster node failure + */ +@Slf4j +public class TextHeaderT { + + private String FILE_FORMAT_TYPE = "file_format_type"; + private String ENABLE_HEADER_WRITE = "enable_header_write"; + + @Test + public void testEnableWriteHeader() { + List lists = new ArrayList<>(); + lists.add(new Tuple3<>("text", "true", 2)); + lists.add(new Tuple3<>("text", "false", 1)); + lists.add(new Tuple3<>("csv", "true", 2)); + lists.add(new Tuple3<>("csv", "false", 1)); + lists.forEach( + t -> { + try { + enableWriteHeader( + t._1().toString(), + t._2().toString(), + Integer.parseInt(t._3().toString())); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + public void enableWriteHeader(String file_format_type, String headerWrite, int lineNumber) + throws ExecutionException, InterruptedException { + String testClusterName = "ClusterFaultToleranceIT_EnableWriteHeaderNode"; + HazelcastInstanceImpl node1 = null; + SeaTunnelClient engineClient = null; + + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig + .getHazelcastConfig() + .setClusterName(TestUtils.getClusterName(testClusterName)); + + try { + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + // waiting all node added to cluster + HazelcastInstanceImpl finalNode = node1; + Awaitility.await() + .atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 1, finalNode.getCluster().getMembers().size())); + + Common.setDeployMode(DeployMode.CLIENT); + ImmutablePair testResources = + createTestResources(headerWrite, file_format_type); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(headerWrite); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); + engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(testResources.getRight(), jobConfig); + ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + + CompletableFuture objectCompletableFuture = + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Thread.sleep(2000); + Assertions.assertTrue( + objectCompletableFuture.isDone() + && JobStatus.FINISHED.equals( + objectCompletableFuture.get())); + }); + + Long fileLineNumberFromDir = + FileUtils.getFileLineNumberFromDir(testResources.getLeft()); + Assertions.assertEquals(lineNumber, fileLineNumberFromDir); + log.info("========================clean test resource===================="); + } finally { + if (engineClient != null) { + engineClient.shutdown(); + } + if (node1 != null) { + node1.shutdown(); + } + } + } + + private ImmutablePair createTestResources( + @NonNull String headerWrite, @NonNull String formatType) { + Map valueMap = new HashMap<>(); + valueMap.put(ENABLE_HEADER_WRITE, headerWrite); + valueMap.put(FILE_FORMAT_TYPE, formatType); + String targetDir = "/tmp/text"; + targetDir = targetDir.replace("/", File.separator); + // clear target dir before test + FileUtils.createNewDir(targetDir); + String targetConfigFilePath = + File.separator + + "tmp" + + File.separator + + "test_conf" + + File.separator + + headerWrite + + ".conf"; + TestUtils.createTestConfigFileFromTemplate( + "batch_fakesource_to_file_header.conf", valueMap, targetConfigFilePath); + return new ImmutablePair<>(targetDir, targetConfigFilePath); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf new file mode 100644 index 00000000000..561afab63ac --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf @@ -0,0 +1,30 @@ +# Set the basic configuration of the task to be performed +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +# Create a source to connect to Mongodb +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +sink { + +LocalFile { + path = "/tmp/text" + file_format_type="${file_format_type}" + enable_header_write="${enable_header_write}" +} +} From af3b5f25f9fff35c2992085395ae9613ed30ec8d Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Wed, 20 Sep 2023 14:44:24 +0800 Subject: [PATCH 37/42] [Feature] [File Connector]add license header: #5443 --- .../batch_fakesource_to_file_header.conf | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf index 561afab63ac..96ec46dc2ee 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf @@ -1,4 +1,23 @@ -# Set the basic configuration of the task to be performed +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + env { execution.parallelism = 1 job.mode = "BATCH" From 37f3a15f7b526be640953fcf612a562ab63736e9 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Thu, 21 Sep 2023 16:45:47 +0800 Subject: [PATCH 38/42] [Feature] [File Connector]add junit: #5443 --- .../seatunnel/engine/e2e/TextHeaderT.java | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java index 86163b7a68b..ed8696502e8 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; @@ -28,6 +29,7 @@ import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.apache.seatunnel.format.text.constant.TextFormatConstant; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; @@ -41,10 +43,7 @@ import scala.Tuple3; import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -62,24 +61,21 @@ public class TextHeaderT { @Test public void testEnableWriteHeader() { List lists = new ArrayList<>(); - lists.add(new Tuple3<>("text", "true", 2)); - lists.add(new Tuple3<>("text", "false", 1)); - lists.add(new Tuple3<>("csv", "true", 2)); - lists.add(new Tuple3<>("csv", "false", 1)); + lists.add(new Tuple3<>("text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(new Tuple3<>("text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(new Tuple3<>("csv", "true", "name,age")); + lists.add(new Tuple3<>("csv", "false", "name,age")); lists.forEach( t -> { try { - enableWriteHeader( - t._1().toString(), - t._2().toString(), - Integer.parseInt(t._3().toString())); + enableWriteHeader(t._1().toString(), t._2().toString(), t._3().toString()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } }); } - public void enableWriteHeader(String file_format_type, String headerWrite, int lineNumber) + public void enableWriteHeader(String file_format_type, String headerWrite, String headerContent) throws ExecutionException, InterruptedException { String testClusterName = "ClusterFaultToleranceIT_EnableWriteHeaderNode"; HazelcastInstanceImpl node1 = null; @@ -127,10 +123,17 @@ public void enableWriteHeader(String file_format_type, String headerWrite, int l && JobStatus.FINISHED.equals( objectCompletableFuture.get())); }); - - Long fileLineNumberFromDir = - FileUtils.getFileLineNumberFromDir(testResources.getLeft()); - Assertions.assertEquals(lineNumber, fileLineNumberFromDir); + File file = new File(testResources.getLeft()); + for (File targetFile : Objects.requireNonNull(file.listFiles())) { + String[] texts = + FileUtils.readFileToStr(targetFile.toPath()) + .split(BaseSinkConfig.ROW_DELIMITER.defaultValue()); + if (headerWrite.equals("true")) { + Assertions.assertEquals(headerContent, texts[0]); + } else { + Assertions.assertNotEquals(headerContent, texts[0]); + } + } log.info("========================clean test resource===================="); } finally { if (engineClient != null) { From 3b809e7a182617c1be419eab78b2853a66c8f27c Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Thu, 21 Sep 2023 16:57:38 +0800 Subject: [PATCH 39/42] [Feature] [File Connector]add junit: #5443 --- .../engine/e2e/{TextHeaderT.java => TextHeaderIT.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/{TextHeaderT.java => TextHeaderIT.java} (99%) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java similarity index 99% rename from seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java index ed8696502e8..a9a9cb8e2b5 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java @@ -53,7 +53,7 @@ * capability in case of cluster node failure */ @Slf4j -public class TextHeaderT { +public class TextHeaderIT { private String FILE_FORMAT_TYPE = "file_format_type"; private String ENABLE_HEADER_WRITE = "enable_header_write"; From 738e36a4328682e5f7d3a7c5a6fc70f1dfd64dcd Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 22 Sep 2023 14:04:07 +0800 Subject: [PATCH 40/42] [Feature] [File Connector]remove scala: #5443 --- .../seatunnel/engine/e2e/TextHeaderIT.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java index a9a9cb8e2b5..b244a0bb2f8 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java @@ -38,12 +38,15 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import com.hazelcast.jet.datamodel.Tuple3; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; -import scala.Tuple3; import java.io.File; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -61,14 +64,14 @@ public class TextHeaderIT { @Test public void testEnableWriteHeader() { List lists = new ArrayList<>(); - lists.add(new Tuple3<>("text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age")); - lists.add(new Tuple3<>("text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age")); - lists.add(new Tuple3<>("csv", "true", "name,age")); - lists.add(new Tuple3<>("csv", "false", "name,age")); + lists.add(Tuple3.tuple3("text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(Tuple3.tuple3("text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(Tuple3.tuple3("csv", "true", "name,age")); + lists.add(Tuple3.tuple3("csv", "false", "name,age")); lists.forEach( t -> { try { - enableWriteHeader(t._1().toString(), t._2().toString(), t._3().toString()); + enableWriteHeader(t.toString(), t.toString(), t.toString()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } @@ -124,7 +127,7 @@ public void enableWriteHeader(String file_format_type, String headerWrite, Strin objectCompletableFuture.get())); }); File file = new File(testResources.getLeft()); - for (File targetFile : Objects.requireNonNull(file.listFiles())) { + for (File targetFile : file.listFiles()) { String[] texts = FileUtils.readFileToStr(targetFile.toPath()) .split(BaseSinkConfig.ROW_DELIMITER.defaultValue()); From bc5a7e076f0cbb2ad44d4667acd1e252cbb29279 Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Fri, 22 Sep 2023 14:37:23 +0800 Subject: [PATCH 41/42] [Feature] [File Connector]modify md style: #5443 --- docs/en/connector-v2/sink/LocalFile.md | 42 +++++++++++++------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index c8ea48d3f1c..90e80c6c372 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,27 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv. false:don't write header,true:write header. | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | ### path [string] From 2bc2fa5aaa91af5be54aee9bb044fdf850d8b29d Mon Sep 17 00:00:00 2001 From: zck <573693104@qq.com> Date: Mon, 25 Sep 2023 11:30:43 +0800 Subject: [PATCH 42/42] [Feature] [File Connector]junit modify: #5443 --- .../seatunnel/engine/e2e/TextHeaderIT.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java index b244a0bb2f8..7d60fadcde9 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java @@ -38,8 +38,9 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.instance.impl.HazelcastInstanceImpl; -import com.hazelcast.jet.datamodel.Tuple3; +import lombok.Getter; import lombok.NonNull; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.io.File; @@ -61,17 +62,36 @@ public class TextHeaderIT { private String FILE_FORMAT_TYPE = "file_format_type"; private String ENABLE_HEADER_WRITE = "enable_header_write"; + @Getter + @Setter + static class ContentHeader { + private String fileStyle; + private String enableWriteHeader; + private String headerName; + + public ContentHeader(String fileStyle, String enableWriteHeader, String headerName) { + this.fileStyle = fileStyle; + this.enableWriteHeader = enableWriteHeader; + this.headerName = headerName; + } + } + @Test public void testEnableWriteHeader() { - List lists = new ArrayList<>(); - lists.add(Tuple3.tuple3("text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age")); - lists.add(Tuple3.tuple3("text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age")); - lists.add(Tuple3.tuple3("csv", "true", "name,age")); - lists.add(Tuple3.tuple3("csv", "false", "name,age")); + List lists = new ArrayList<>(); + lists.add( + new ContentHeader( + "text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add( + new ContentHeader( + "text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(new ContentHeader("csv", "true", "name,age")); + lists.add(new ContentHeader("csv", "false", "name,age")); lists.forEach( t -> { try { - enableWriteHeader(t.toString(), t.toString(), t.toString()); + enableWriteHeader( + t.getFileStyle(), t.getEnableWriteHeader(), t.getHeaderName()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); }