Skip to content

Commit

Permalink
[Feature] [File Connector] add enable_header_write,false:dont write h…
Browse files Browse the repository at this point in the history
…eader,true:write header. #5443
  • Loading branch information
zck committed Sep 13, 2023
1 parent de2859b commit 4acdda8
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 10 deletions.
7 changes: 6 additions & 1 deletion docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once`

## Options

| name | type | required | default value | remarks |
| name | type | required | default value | remarks |
|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------|
| path | string | yes | - | |
| custom_filename | boolean | no | false | Whether you need custom the filename |
Expand All @@ -50,6 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| common-options | object | no | - | |
| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. |
| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. |
| enable_header_write | boolean | no | false | false:dont write header,true:write header. |

### path [string]

Expand Down Expand Up @@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items that can be cached in

Writer the sheet of the workbook

### enable_header_write [boolean]

false:dont write header,true:write header.

## Example

For orc file format simple config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
protected Boolean enableHeaderWriter = false;

public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
Expand Down Expand Up @@ -99,6 +100,10 @@ public BaseFileSinkConfig(@NonNull Config config) {
timeFormat =
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
}

if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
}
}

public BaseFileSinkConfig() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,10 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("To be written sheet name,only valid for excel files");

public static final Option<Boolean> ENABLE_HEADER_WRITE =
Options.key("enable_header_write")
.booleanType()
.defaultValue(false)
.withDescription("false:dont write header,true:write header");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.text.TextSerializationSchema;
Expand All @@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
private final DateUtils.Formatter dateFormat;
private final DateTimeUtils.Formatter dateTimeFormat;
private final TimeUtils.Formatter timeFormat;
private final FileFormat fileFormat;
private final Boolean enableHeaderWriter;
private SerializationSchema serializationSchema;

public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
Expand All @@ -58,6 +61,8 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
this.dateFormat = fileSinkConfig.getDateFormat();
this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
this.timeFormat = fileSinkConfig.getTimeFormat();
this.fileFormat = fileSinkConfig.getFileFormat();
this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter();
}

@Override
Expand Down Expand Up @@ -133,24 +138,18 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
OutputStream out =
lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
fsDataOutputStream = new FSDataOutputStream(out, null);
fsDataOutputStream.write(
String.join(",", seaTunnelRowType.getFieldNames()).getBytes());
fsDataOutputStream.write(rowDelimiter.getBytes());
enableWriteHeader(fsDataOutputStream);
break;
case NONE:
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
fsDataOutputStream.write(
String.join(",", seaTunnelRowType.getFieldNames()).getBytes());
fsDataOutputStream.write(rowDelimiter.getBytes());
enableWriteHeader(fsDataOutputStream);
break;
default:
log.warn(
"Text file does not support this compress type: {}",
compressFormat.getCompressCodec());
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
fsDataOutputStream.write(
String.join(",", seaTunnelRowType.getFieldNames()).getBytes());
fsDataOutputStream.write(rowDelimiter.getBytes());
enableWriteHeader(fsDataOutputStream);
break;
}
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
Expand All @@ -164,4 +163,17 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
}
return fsDataOutputStream;
}

private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException {
if (enableHeaderWriter) {
fsDataOutputStream.write(
String.join(
FileFormat.CSV.equals(fileFormat)
? ","
: fieldDelimiter,
seaTunnelRowType.getFieldNames())
.getBytes());
fsDataOutputStream.write(rowDelimiter.getBytes());
}
}
}

0 comments on commit 4acdda8

Please sign in to comment.