-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Connector-V2] Add Huawei Cloud OBS connector #4578
Merged
Merged
Changes from all commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
cf01a17
[Feature][Connector-V2] Add huawei cloud obs connector #4577
kim-up a0d2923
[Fix][Merge] merge from dev
kim-up 2c6746a
[Fix][plugin-mapping] update plugin-mapping.properties
kim-up 66d6eca
[Connector-V2][Obs] Add license for pom.xml
kim-up b19a433
[Connector-V2][Obs] Update dist pom.xml
kim-up 15a8f2a
[Fix][Merge] merge and fix conflicts
kim-up f6174c0
[Fix][dependency] add huawei cloud repository
kim-up 35a25c3
[Doc][obs] Unified format
kim-up d4da29a
Merge branch 'dev' into connector-file-obs
kim-up 43564f5
Merge branch 'apache:dev' into connector-file-obs
kim-up 993ea6a
[Connector-v2][obs] Add e2e
kim-up 9eb8464
Merge branch 'dev' into connector-file-obs
kim-up c49aca9
[Connector-v2][obs] Update e2e
kim-up e01d92a
Merge branch 'connector-file-obs' of https://github.com/kim-up/incuba…
kim-up dca5836
Merge branch 'dev' into connector-file-obs
kim-up 38f2465
Merge branch 'dev' into connector-file-obs
EricJoy2048 5a6e9d9
[Connector-V2][file-obs] Merge dev and fix conflicts
kim-up 8b45ef9
[connector-V2][file-obs] Merge dev
17f27e3
Merge branch 'dev' into connector-file-obs
Hisoka-X 6b5158a
Merge branch 'connector-file-obs' of https://github.com/kim-up/incuba…
kim-up 802ca97
[Connector-V2][file] fix pom
kim-up 05aa579
[Connector-V2][file] fix pom
kim-up 479aefd
Merge remote-tracking branch 'origin' into connector-file-obs
kim-up cecda1a
Merge branch 'dev' into connector-file-obs
hailin0 3683454
Merge branch 'dev' into pr/4578
hailin0 f33c57b
fix
hailin0 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,287 @@ | ||
# ObsFile | ||
|
||
> Obs file sink connector | ||
|
||
## Support those engines | ||
|
||
> Spark | ||
> | ||
> Flink | ||
> | ||
> Seatunnel Zeta | ||
|
||
## Key features | ||
|
||
- [x] [exactly-once](../../concept/connector-v2-features.md) | ||
|
||
By default, we use 2PC commit to ensure `exactly-once` | ||
|
||
- [x] file format type | ||
- [x] text | ||
- [x] csv | ||
- [x] parquet | ||
- [x] orc | ||
- [x] json | ||
- [x] excel | ||
|
||
## Description | ||
|
||
Output data to huawei cloud obs file system. | ||
|
||
If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. | ||
|
||
If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. | ||
|
||
We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OBS and this connector need some hadoop dependencies. | ||
It only supports hadoop version **2.9.X+**. | ||
|
||
## Required Jar List | ||
|
||
| jar | supported versions | maven | | ||
|--------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------| | ||
| hadoop-huaweicloud | support version >= 3.1.1.29 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/hadoop/hadoop-huaweicloud/) | | ||
| esdk-obs-java | support version >= 3.19.7.3 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/com/huawei/storage/esdk-obs-java/) | | ||
| okhttp | support version >= 3.11.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okhttp3/okhttp/) | | ||
| okio | support version >= 1.14.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okio/okio/) | | ||
|
||
> Please download the support list corresponding to 'Maven' and copy them to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory. | ||
> | ||
> And copy all jars to $SEATNUNNEL_HOME/lib/ | ||
|
||
## Options | ||
|
||
| name | type | required | default | description | | ||
|----------------------------------|---------|----------|--------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------| | ||
| path | string | yes | - | The target dir path. | | ||
| bucket | string | yes | - | The bucket address of obs file system, for example: `obs://obs-bucket-name`. | | ||
| access_key | string | yes | - | The access key of obs file system. | | ||
| access_secret | string | yes | - | The access secret of obs file system. | | ||
| endpoint | string | yes | - | The endpoint of obs file system. | | ||
| custom_filename | boolean | no | false | Whether you need custom the filename. | | ||
| file_name_expression | string | no | "${transactionId}" | Describes the file expression which will be created into the `path`. Only used when custom_filename is true. [Tips](#file_name_expression) | | ||
| filename_time_format | string | no | "yyyy.MM.dd" | Specify the time format of the `path`. Only used when custom_filename is true. [Tips](#filename_time_format) | | ||
| file_format_type | string | no | "csv" | Supported file types. [Tips](#file_format_type) | | ||
| field_delimiter | string | no | '\001' | The separator between columns in a row of data.Only used when file_format is text. | | ||
| row_delimiter | string | no | "\n" | The separator between rows in a file. Only needed by `text` file format. | | ||
| have_partition | boolean | no | false | Whether you need processing partitions. | | ||
| partition_by | array | no | - | Partition data based on selected fields. Only used then have_partition is true. | | ||
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true.[Tips](#partition_dir_expression) | | ||
| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true.[Tips](#is_partition_field_write_in_file) | | ||
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns.[Tips](#sink_columns) | | ||
| is_enable_transaction | boolean | no | true | [Tips](#is_enable_transaction) | | ||
| batch_size | int | no | 1000000 | [Tips](#batch_size) | | ||
| compress_codec | string | no | none | [Tips](#compress_codec) | | ||
| common-options | object | no | - | [Tips](#common_options) | | ||
| max_rows_in_memory | int | no | - | When File Format is Excel,The maximum number of data items that can be cached in the memory.Only used when file_format is excel. | | ||
| sheet_name | string | no | Sheet${Random number} | Writer the sheet of the workbook. Only used when file_format is excel. | | ||
|
||
### Tips | ||
|
||
#### <span id="file_name_expression"> file_name_expression </span> | ||
|
||
> Only used when `custom_filename` is `true` | ||
> | ||
> `file_name_expression` describes the file expression which will be created into the `path`. | ||
> | ||
> We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`, | ||
> | ||
> `${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`. | ||
|
||
Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. | ||
|
||
#### <span id="filename_time_format"> filename_time_format </span> | ||
|
||
> Only used when `custom_filename` is `true` | ||
> | ||
> When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows: | ||
|
||
| Symbol | Description | | ||
|--------|--------------------| | ||
| y | Year | | ||
| M | Month | | ||
| d | Day of month | | ||
| H | Hour in day (0-23) | | ||
| m | Minute in hour | | ||
| s | Second in minute | | ||
|
||
#### <span id="file_format_type"> file_format_type </span> | ||
|
||
> We supported as the following file types: | ||
> | ||
> `text` `json` `csv` `orc` `parquet` `excel` | ||
|
||
Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. | ||
|
||
#### <span id="partition_dir_expression"> partition_dir_expression </span> | ||
|
||
> Only used when `have_partition` is `true`. | ||
> | ||
> If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory. | ||
> | ||
> Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field. | ||
|
||
#### <span id="is_partition_field_write_in_file"> is_partition_field_write_in_file </span> | ||
|
||
> Only used when `have_partition` is `true`. | ||
> | ||
> If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be write into data file. | ||
> | ||
> For example, if you want to write a Hive Data File, Its value should be `false`. | ||
|
||
#### <span id="sink_columns"> sink_columns </span> | ||
|
||
> Which columns need be written to file, default value is all the columns get from `Transform` or `Source`. | ||
> The order of the fields determines the order in which the file is actually written. | ||
|
||
#### <span id="is_enable_transaction"> is_enable_transaction </span> | ||
|
||
> If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory. | ||
> | ||
> Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. Only support `true` now. | ||
|
||
#### <span id="batch_size"> batch_size </span> | ||
|
||
> The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. | ||
|
||
#### <span id="compress_codec"> compress_codec </span> | ||
|
||
> The compress codec of files and the details that supported as the following shown: | ||
> | ||
> - txt: `lzo` `none` | ||
> - json: `lzo` `none` | ||
> - csv: `lzo` `none` | ||
> - orc: `lzo` `snappy` `lz4` `zlib` `none` | ||
> - parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` | ||
|
||
Please note that excel type does not support any compression format | ||
|
||
#### <span id="common_options"> common options </span> | ||
|
||
> Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. | ||
|
||
## Task Example | ||
|
||
### text file | ||
|
||
> For text file format with `have_partition` and `custom_filename` and `sink_columns` | ||
|
||
```hocon | ||
|
||
ObsFile { | ||
path="/seatunnel/text" | ||
bucket = "obs://obs-bucket-name" | ||
access_key = "xxxxxxxxxxx" | ||
access_secret = "xxxxxxxxxxx" | ||
endpoint = "obs.xxxxxx.myhuaweicloud.com" | ||
file_format_type = "text" | ||
field_delimiter = "\t" | ||
row_delimiter = "\n" | ||
have_partition = true | ||
partition_by = ["age"] | ||
partition_dir_expression = "${k0}=${v0}" | ||
is_partition_field_write_in_file = true | ||
custom_filename = true | ||
file_name_expression = "${transactionId}_${now}" | ||
filename_time_format = "yyyy.MM.dd" | ||
sink_columns = ["name","age"] | ||
is_enable_transaction = true | ||
} | ||
|
||
``` | ||
|
||
### parquet file | ||
|
||
> For parquet file format with `have_partition` and `sink_columns` | ||
|
||
```hocon | ||
|
||
ObsFile { | ||
path = "/seatunnel/parquet" | ||
bucket = "obs://obs-bucket-name" | ||
access_key = "xxxxxxxxxxx" | ||
access_secret = "xxxxxxxxxxxxxxxxx" | ||
endpoint = "obs.xxxxxx.myhuaweicloud.com" | ||
have_partition = true | ||
partition_by = ["age"] | ||
partition_dir_expression = "${k0}=${v0}" | ||
is_partition_field_write_in_file = true | ||
file_format_type = "parquet" | ||
sink_columns = ["name","age"] | ||
} | ||
|
||
``` | ||
|
||
### orc file | ||
|
||
> For orc file format simple config | ||
|
||
```hocon | ||
|
||
ObsFile { | ||
path="/seatunnel/orc" | ||
bucket = "obs://obs-bucket-name" | ||
access_key = "xxxxxxxxxxx" | ||
access_secret = "xxxxxxxxxxx" | ||
endpoint = "obs.xxxxx.myhuaweicloud.com" | ||
file_format_type = "orc" | ||
} | ||
|
||
``` | ||
|
||
### json file | ||
|
||
> For json file format simple config | ||
|
||
```hcocn | ||
|
||
ObsFile { | ||
path = "/seatunnel/json" | ||
bucket = "obs://obs-bucket-name" | ||
access_key = "xxxxxxxxxxx" | ||
access_secret = "xxxxxxxxxxx" | ||
endpoint = "obs.xxxxx.myhuaweicloud.com" | ||
file_format_type = "json" | ||
} | ||
|
||
``` | ||
|
||
### excel file | ||
|
||
> For excel file format simple config | ||
|
||
```hcocn | ||
|
||
ObsFile { | ||
path = "/seatunnel/excel" | ||
bucket = "obs://obs-bucket-name" | ||
access_key = "xxxxxxxxxxx" | ||
access_secret = "xxxxxxxxxxx" | ||
endpoint = "obs.xxxxx.myhuaweicloud.com" | ||
file_format_type = "excel" | ||
} | ||
|
||
``` | ||
|
||
### csv file | ||
|
||
> For csv file format simple config | ||
|
||
```hcocn | ||
|
||
ObsFile { | ||
path = "/seatunnel/csv" | ||
bucket = "obs://obs-bucket-name" | ||
access_key = "xxxxxxxxxxx" | ||
access_secret = "xxxxxxxxxxx" | ||
endpoint = "obs.xxxxx.myhuaweicloud.com" | ||
file_format_type = "csv" | ||
} | ||
|
||
``` | ||
|
||
## Changelog | ||
|
||
### next version | ||
|
||
- Add Obs Sink Connector | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussion in the community, we have optimized the format requirements of the document, which you can refer to:
https://github.com/apache/incubator-seatunnel/issues/4544
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok