diff --git a/docs/en/connector-v2/sink/ObsFile.md b/docs/en/connector-v2/sink/ObsFile.md
new file mode 100644
index 00000000000..cfb1ec8c55e
--- /dev/null
+++ b/docs/en/connector-v2/sink/ObsFile.md
@@ -0,0 +1,287 @@
+# ObsFile
+
+> Obs file sink connector
+
+## Support those engines
+
+> Spark
+>
+> Flink
+>
+> Seatunnel Zeta
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
+- [x] file format type
+ - [x] text
+ - [x] csv
+ - [x] parquet
+ - [x] orc
+ - [x] json
+ - [x] excel
+
+## Description
+
+Output data to huawei cloud obs file system.
+
+If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
+
+If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this.
+
+We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OBS and this connector need some hadoop dependencies.
+It only supports hadoop version **2.9.X+**.
+
+## Required Jar List
+
+| jar | supported versions | maven |
+|--------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------|
+| hadoop-huaweicloud | support version >= 3.1.1.29 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/hadoop/hadoop-huaweicloud/) |
+| esdk-obs-java | support version >= 3.19.7.3 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/com/huawei/storage/esdk-obs-java/) |
+| okhttp | support version >= 3.11.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okhttp3/okhttp/) |
+| okio | support version >= 1.14.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okio/okio/) |
+
+> Please download the support list corresponding to 'Maven' and copy them to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory.
+>
+> And copy all jars to $SEATNUNNEL_HOME/lib/
+
+## Options
+
+| name | type | required | default | description |
+|----------------------------------|---------|----------|--------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|
+| path | string | yes | - | The target dir path. |
+| bucket | string | yes | - | The bucket address of obs file system, for example: `obs://obs-bucket-name`. |
+| access_key | string | yes | - | The access key of obs file system. |
+| access_secret | string | yes | - | The access secret of obs file system. |
+| endpoint | string | yes | - | The endpoint of obs file system. |
+| custom_filename | boolean | no | false | Whether you need custom the filename. |
+| file_name_expression | string | no | "${transactionId}" | Describes the file expression which will be created into the `path`. Only used when custom_filename is true. [Tips](#file_name_expression) |
+| filename_time_format | string | no | "yyyy.MM.dd" | Specify the time format of the `path`. Only used when custom_filename is true. [Tips](#filename_time_format) |
+| file_format_type | string | no | "csv" | Supported file types. [Tips](#file_format_type) |
+| field_delimiter | string | no | '\001' | The separator between columns in a row of data.Only used when file_format is text. |
+| row_delimiter | string | no | "\n" | The separator between rows in a file. Only needed by `text` file format. |
+| have_partition | boolean | no | false | Whether you need processing partitions. |
+| partition_by | array | no | - | Partition data based on selected fields. Only used then have_partition is true. |
+| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true.[Tips](#partition_dir_expression) |
+| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true.[Tips](#is_partition_field_write_in_file) |
+| sink_columns | array | no | | When this parameter is empty, all fields are sink columns.[Tips](#sink_columns) |
+| is_enable_transaction | boolean | no | true | [Tips](#is_enable_transaction) |
+| batch_size | int | no | 1000000 | [Tips](#batch_size) |
+| compress_codec | string | no | none | [Tips](#compress_codec) |
+| common-options | object | no | - | [Tips](#common_options) |
+| max_rows_in_memory | int | no | - | When File Format is Excel,The maximum number of data items that can be cached in the memory.Only used when file_format is excel. |
+| sheet_name | string | no | Sheet${Random number} | Writer the sheet of the workbook. Only used when file_format is excel. |
+
+### Tips
+
+#### file_name_expression
+
+> Only used when `custom_filename` is `true`
+>
+> `file_name_expression` describes the file expression which will be created into the `path`.
+>
+> We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`,
+>
+> `${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
+
+#### filename_time_format
+
+> Only used when `custom_filename` is `true`
+>
+> When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol | Description |
+|--------|--------------------|
+| y | Year |
+| M | Month |
+| d | Day of month |
+| H | Hour in day (0-23) |
+| m | Minute in hour |
+| s | Second in minute |
+
+#### file_format_type
+
+> We supported as the following file types:
+>
+> `text` `json` `csv` `orc` `parquet` `excel`
+
+Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`.
+
+#### partition_dir_expression
+
+> Only used when `have_partition` is `true`.
+>
+> If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory.
+>
+> Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field.
+
+#### is_partition_field_write_in_file
+
+> Only used when `have_partition` is `true`.
+>
+> If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be write into data file.
+>
+> For example, if you want to write a Hive Data File, Its value should be `false`.
+
+#### sink_columns
+
+> Which columns need be written to file, default value is all the columns get from `Transform` or `Source`.
+> The order of the fields determines the order in which the file is actually written.
+
+#### is_enable_transaction
+
+> If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory.
+>
+> Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. Only support `true` now.
+
+#### batch_size
+
+> The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+
+#### compress_codec
+
+> The compress codec of files and the details that supported as the following shown:
+>
+> - txt: `lzo` `none`
+> - json: `lzo` `none`
+> - csv: `lzo` `none`
+> - orc: `lzo` `snappy` `lz4` `zlib` `none`
+> - parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
+Please note that excel type does not support any compression format
+
+#### common options
+
+> Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
+
+## Task Example
+
+### text file
+
+> For text file format with `have_partition` and `custom_filename` and `sink_columns`
+
+```hocon
+
+ ObsFile {
+ path="/seatunnel/text"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxx"
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "text"
+ field_delimiter = "\t"
+ row_delimiter = "\n"
+ have_partition = true
+ partition_by = ["age"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ custom_filename = true
+ file_name_expression = "${transactionId}_${now}"
+ filename_time_format = "yyyy.MM.dd"
+ sink_columns = ["name","age"]
+ is_enable_transaction = true
+ }
+
+```
+
+### parquet file
+
+> For parquet file format with `have_partition` and `sink_columns`
+
+```hocon
+
+ ObsFile {
+ path = "/seatunnel/parquet"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxx"
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ have_partition = true
+ partition_by = ["age"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_format_type = "parquet"
+ sink_columns = ["name","age"]
+ }
+
+```
+
+### orc file
+
+> For orc file format simple config
+
+```hocon
+
+ ObsFile {
+ path="/seatunnel/orc"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxx"
+ endpoint = "obs.xxxxx.myhuaweicloud.com"
+ file_format_type = "orc"
+ }
+
+```
+
+### json file
+
+> For json file format simple config
+
+```hcocn
+
+ ObsFile {
+ path = "/seatunnel/json"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxx"
+ endpoint = "obs.xxxxx.myhuaweicloud.com"
+ file_format_type = "json"
+ }
+
+```
+
+### excel file
+
+> For excel file format simple config
+
+```hcocn
+
+ ObsFile {
+ path = "/seatunnel/excel"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxx"
+ endpoint = "obs.xxxxx.myhuaweicloud.com"
+ file_format_type = "excel"
+ }
+
+```
+
+### csv file
+
+> For csv file format simple config
+
+```hcocn
+
+ ObsFile {
+ path = "/seatunnel/csv"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxx"
+ endpoint = "obs.xxxxx.myhuaweicloud.com"
+ file_format_type = "csv"
+ }
+
+```
+
+## Changelog
+
+### next version
+
+- Add Obs Sink Connector
+
diff --git a/docs/en/connector-v2/source/ObsFile.md b/docs/en/connector-v2/source/ObsFile.md
new file mode 100644
index 00000000000..b5363d77173
--- /dev/null
+++ b/docs/en/connector-v2/source/ObsFile.md
@@ -0,0 +1,350 @@
+# ObsFile
+
+> Obs file source connector
+
+## Support those engines
+
+> Spark
+>
+> Flink
+>
+> Seatunnel Zeta
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+Read all the data in a split in a pollNext call. What splits are read will be saved in snapshot.
+
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+- [x] file format type
+ - [x] text
+ - [x] csv
+ - [x] parquet
+ - [x] orc
+ - [x] json
+ - [x] excel
+
+## Description
+
+Read data from huawei cloud obs file system.
+
+If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
+
+If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this.
+
+We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OBS and this connector need some hadoop dependencies.
+It only supports hadoop version **2.9.X+**.
+
+## Required Jar List
+
+| jar | supported versions | maven |
+|--------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------|
+| hadoop-huaweicloud | support version >= 3.1.1.29 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/hadoop/hadoop-huaweicloud/) |
+| esdk-obs-java | support version >= 3.19.7.3 | [Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/com/huawei/storage/esdk-obs-java/) |
+| okhttp | support version >= 3.11.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okhttp3/okhttp/) |
+| okio | support version >= 1.14.0 | [Download](https://repo1.maven.org/maven2/com/squareup/okio/okio/) |
+
+> Please download the support list corresponding to 'Maven' and copy them to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory.
+>
+> And copy all jars to $SEATNUNNEL_HOME/lib/
+
+## Options
+
+| name | type | required | default | description |
+|---------------------------|---------|----------|---------------------|--------------------------------------------------------------------------------------------------------------|
+| path | string | yes | - | The target dir path |
+| file_format_type | string | yes | - | File type.[Tips](#file_format_type) |
+| bucket | string | yes | - | The bucket address of obs file system, for example: `obs://obs-bucket-name` |
+| access_key | string | yes | - | The access key of obs file system |
+| access_secret | string | yes | - | The access secret of obs file system |
+| endpoint | string | yes | - | The endpoint of obs file system |
+| read_columns | list | yes | - | The read column list of the data source, user can use it to implement field projection.[Tips](#read_columns) |
+| delimiter | string | no | \001 | Field delimiter, used to tell connector how to slice and dice fields when reading text files |
+| parse_partition_from_path | boolean | no | true | Control whether parse the partition keys and values from file path. [Tips](#parse_partition_from_path) |
+| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. |
+| date_format | string | no | yyyy-MM-dd | Date type format, used to tell the connector how to convert string to date.[Tips](#date_format) |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell the connector how to convert string to datetime.[Tips](#datetime_format) |
+| time_format | string | no | HH:mm:ss | Time type format, used to tell the connector how to convert string to time.[Tips](#time_format) |
+| schema | config | no | - | [Tips](#schema) |
+| common-options | | no | - | [Tips](#common_options) |
+| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. |
+
+### Tips
+
+#### parse_partition_from_path
+
+> Control whether parse the partition keys and values from file path
+>
+> For example if you read a file from path `obs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+>
+> Every record data from the file will be added these two fields:
+
+| name | age |
+|---------------|-----|
+| tyrantlucifer | 26 |
+
+> Do not define partition fields in schema option
+
+#### date_format
+
+> Date type format, used to tell the connector how to convert string to date, supported as the following formats:
+>
+> `yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+>
+> default `yyyy-MM-dd`
+
+### datetime_format
+
+> Datetime type format, used to tell the connector how to convert string to datetime, supported as the following formats:
+>
+> `yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+>
+> default `yyyy-MM-dd HH:mm:ss`
+
+### time_format
+
+> Time type format, used to tell the connector how to convert string to time, supported as the following formats:
+>
+> `HH:mm:ss` `HH:mm:ss.SSS`
+>
+> default `HH:mm:ss`
+
+###
+
+> Skip the first few lines, but only for the txt and csv.
+>
+> For example, set like following:
+>
+> `skip_header_row_number = 2`
+>
+> Then Seatunnel will skip the first 2 lines from source files
+
+### file_format_type
+
+> File type, supported as the following file types:
+>
+> `text` `csv` `parquet` `orc` `json` `excel`
+>
+> If you assign file type to `json`, you should also assign schema option to tell the connector how to parse data to the row you want.
+>
+> For example,upstream data is the following:
+>
+> ```json
+>
+> ```
+
+{"code": 200, "data": "get success", "success": true}
+
+```
+
+> You can also save multiple pieces of data in one file and split them by one newline:
+
+```json lines
+
+{"code": 200, "data": "get success", "success": true}
+{"code": 300, "data": "get failed", "success": false}
+
+```
+
+> you should assign schema as the following:
+
+```hocon
+
+schema {
+ fields {
+ code = int
+ data = string
+ success = boolean
+ }
+}
+
+```
+
+> connector will generate data as the following:
+
+| code | data | success |
+|------|-------------|---------|
+| 200 | get success | true |
+
+> If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
+>
+> If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
+>
+> For example, upstream data is the following:
+
+```text
+
+tyrantlucifer#26#male
+
+```
+
+> If you do not assign data schema connector will treat the upstream data as the following:
+
+| content |
+|-----------------------|
+| tyrantlucifer#26#male |
+
+> If you assign data schema, you should also assign the option `delimiter` too except CSV file type
+>
+> you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+ fields {
+ name = string
+ age = int
+ gender = string
+ }
+}
+
+```
+
+> connector will generate data as the following:
+
+| name | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26 | male |
+
+#### schema
+
+##### fields
+
+> The schema of upstream data.
+
+#### read_columns
+
+> The read column list of the data source, user can use it to implement field projection.
+>
+> The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+- excel
+
+> If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured
+
+#### common options
+
+> Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
+
+## Task Example
+
+### text file
+
+> For text file format simple config
+
+```hocon
+
+ ObsFile {
+ path = "/seatunnel/text"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "text"
+ }
+
+```
+
+### parquet file
+
+> For parquet file format simple config
+
+```hocon
+
+ ObsFile {
+ path = "/seatunnel/parquet"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "parquet"
+ }
+
+```
+
+### orc file
+
+> For orc file format simple config
+
+```hocon
+
+ ObsFile {
+ path = "/seatunnel/orc"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "orc"
+ }
+
+```
+
+### json file
+
+> For json file format simple config
+
+```hocon
+
+ ObsFile {
+ path = "/seatunnel/json"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "json"
+ }
+
+```
+
+### excel file
+
+> For excel file format simple config
+
+```hocon
+
+ ObsFile {
+ path = "/seatunnel/excel"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "excel"
+ }
+
+```
+
+### csv file
+
+> For csv file format simple config
+
+```hocon
+
+ ObsFile {
+ path = "/seatunnel/csv"
+ bucket = "obs://obs-bucket-name"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "csv"
+ delimiter = ","
+ }
+
+```
+
+## Changelog
+
+### next version
+
+- Add Obs File Source Connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 314d453ffc7..411e42b8808 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -124,3 +124,5 @@ seatunnel.sink.Easysearch = connector-easysearch
seatunnel.source.Postgres-CDC = connector-cdc-postgres
seatunnel.source.Oracle-CDC = connector-cdc-oracle
seatunnel.sink.Pulsar = connector-pulsar
+seatunnel.source.ObsFile = connector-file-obs
+seatunnel.sink.ObsFile = connector-file-obs
diff --git a/release-note.md b/release-note.md
index b799df78f74..840648fe649 100644
--- a/release-note.md
+++ b/release-note.md
@@ -177,6 +177,7 @@
- [Connector-V2] [Kafka] Kafka source supports data deserialization failure skipping (#4364)
- [Connector-V2] [Jdbc] [TiDB] Add TiDB catalog (#4438)
- [Connector-V2] [File] Add file excel sink and source (#4164)
+- [Connector-V2] [FILE-OBS] Add Huawei Cloud OBS connector (#4577)
- [Connector-v2] [Snowflake] Add Snowflake Source&Sink connector (#4470)
- [Connector-V2] [Pular] support read format for pulsar (#4111)
- [Connector-V2] [Paimon] Introduce paimon connector (#4178)
@@ -193,6 +194,7 @@
- [Connector-V2] [Assert] Support check the precision and scale of Decimal type (#6110)
- [Connector-V2] [Assert] Support field type assert and field value equality assert for full data types (#6275)
- [Connector-V2] [Iceberg] Support iceberg sink #6198
+- [Connector-V2] [FILE-OBS] Add Huawei Cloud OBS connector #4578
### Zeta(ST-Engine)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
index 3d3965b7c3f..c7a5c26aec7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
@@ -27,7 +27,8 @@ public enum FileSystemType implements Serializable {
COS("CosFile"),
FTP("FtpFile"),
SFTP("SftpFile"),
- S3("S3File");
+ S3("S3File"),
+ OBS("ObsFile");
private final String fileSystemPluginName;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml
new file mode 100644
index 00000000000..00676916fda
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml
@@ -0,0 +1,80 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ connector-file
+ ${revision}
+
+
+ connector-file-obs
+ SeaTunnel : Connectors V2 : File : Obs
+
+
+ 3.1.1.29
+ 3.19.7.3
+
+
+
+
+
+ org.apache.seatunnel
+ connector-file-base
+ ${project.version}
+
+
+ org.apache.flink
+ flink-shaded-hadoop-2
+ provided
+
+
+ org.apache.avro
+ avro
+
+
+
+
+ org.apache.hadoop
+ hadoop-huaweicloud
+ ${hadoop-huaweicloud.version}
+
+
+ com.huawei.storage
+ esdk-obs-java
+
+
+
+
+ com.huawei.storage
+ esdk-obs-java
+ ${esdk.version}
+
+
+
+
+
+
+ huaweiCloud
+ https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/
+
+
+
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
new file mode 100644
index 00000000000..714e7ede178
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.apache.hadoop.fs.obs.Constants;
+
+import java.util.HashMap;
+
+public class ObsConf extends HadoopConf {
+ private static final String HDFS_IMPL = "org.apache.hadoop.fs.obs.OBSFileSystem";
+ private static final String SCHEMA = "obs";
+
+ @Override
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
+
+ public ObsConf(String hdfsNameKey) {
+ super(hdfsNameKey);
+ }
+
+ public static HadoopConf buildWithConfig(Config config) {
+ HadoopConf hadoopConf = new ObsConf(config.getString(ObsConfig.BUCKET.key()));
+ HashMap ossOptions = new HashMap<>();
+ ossOptions.put(Constants.ACCESS_KEY, config.getString(ObsConfig.ACCESS_KEY.key()));
+ ossOptions.put(Constants.SECRET_KEY, config.getString(ObsConfig.ACCESS_SECRET.key()));
+ ossOptions.put(Constants.ENDPOINT, config.getString(ObsConfig.ENDPOINT.key()));
+ hadoopConf.setExtraOptions(ossOptions);
+ return hadoopConf;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java
new file mode 100644
index 00000000000..a4893f6c153
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+
+public class ObsConfig extends BaseSourceConfigOptions {
+ public static final Option ACCESS_KEY =
+ Options.key("access_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("OBS bucket access key");
+ public static final Option ACCESS_SECRET =
+ Options.key("access_secret")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("OBS bucket access secret");
+ public static final Option ENDPOINT =
+ Options.key("endpoint").stringType().noDefaultValue().withDescription("OBS endpoint");
+ public static final Option BUCKET =
+ Options.key("bucket").stringType().noDefaultValue().withDescription("OBS bucket");
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
new file mode 100644
index 00000000000..8f303b6a457
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConf;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class ObsFileSink extends BaseFileSink {
+ @Override
+ public String getPluginName() {
+ return FileSystemType.OBS.getFileSystemPluginName();
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ super.prepare(pluginConfig);
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(
+ pluginConfig,
+ ObsConfig.FILE_PATH.key(),
+ ObsConfig.BUCKET.key(),
+ ObsConfig.ACCESS_KEY.key(),
+ ObsConfig.ACCESS_SECRET.key(),
+ ObsConfig.BUCKET.key());
+ if (!result.isSuccess()) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SINK, result.getMsg()));
+ }
+ hadoopConf = ObsConf.buildWithConfig(pluginConfig);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
new file mode 100644
index 00000000000..8f1c221e076
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ObsFileSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return FileSystemType.OBS.getFileSystemPluginName();
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(ObsConfig.FILE_PATH)
+ .required(ObsConfig.BUCKET)
+ .required(ObsConfig.ACCESS_KEY)
+ .required(ObsConfig.ACCESS_SECRET)
+ .required(ObsConfig.ENDPOINT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSinkConfig.ROW_DELIMITER,
+ BaseSinkConfig.FIELD_DELIMITER,
+ BaseSinkConfig.TXT_COMPRESS)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.CSV,
+ BaseSinkConfig.TXT_COMPRESS)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.JSON,
+ BaseSinkConfig.TXT_COMPRESS)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.ORC,
+ BaseSinkConfig.ORC_COMPRESS)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.PARQUET,
+ BaseSinkConfig.PARQUET_COMPRESS)
+ .optional(BaseSinkConfig.CUSTOM_FILENAME)
+ .conditional(
+ BaseSinkConfig.CUSTOM_FILENAME,
+ true,
+ BaseSinkConfig.FILE_NAME_EXPRESSION,
+ BaseSinkConfig.FILENAME_TIME_FORMAT)
+ .optional(BaseSinkConfig.HAVE_PARTITION)
+ .conditional(
+ BaseSinkConfig.HAVE_PARTITION,
+ true,
+ BaseSinkConfig.PARTITION_BY,
+ BaseSinkConfig.PARTITION_DIR_EXPRESSION,
+ BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE)
+ .optional(BaseSinkConfig.SINK_COLUMNS)
+ .optional(BaseSinkConfig.IS_ENABLE_TRANSACTION)
+ .optional(BaseSinkConfig.DATE_FORMAT)
+ .optional(BaseSinkConfig.DATETIME_FORMAT)
+ .optional(BaseSinkConfig.TIME_FORMAT)
+ .build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
new file mode 100644
index 00000000000..cf3061a44a3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConf;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSource.class)
+public class ObsFileSource extends BaseFileSource {
+ @Override
+ public String getPluginName() {
+ return FileSystemType.OBS.getFileSystemPluginName();
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(
+ pluginConfig,
+ ObsConfig.FILE_PATH.key(),
+ ObsConfig.FILE_FORMAT_TYPE.key(),
+ ObsConfig.ENDPOINT.key(),
+ ObsConfig.ACCESS_KEY.key(),
+ ObsConfig.ACCESS_SECRET.key(),
+ ObsConfig.BUCKET.key());
+ if (!result.isSuccess()) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SOURCE, result.getMsg()));
+ }
+ readStrategy =
+ ReadStrategyFactory.of(pluginConfig.getString(ObsConfig.FILE_FORMAT_TYPE.key()));
+ readStrategy.setPluginConfig(pluginConfig);
+ hadoopConf = ObsConf.buildWithConfig(pluginConfig);
+ readStrategy.init(hadoopConf);
+ String path = pluginConfig.getString(ObsConfig.FILE_PATH.key());
+ try {
+ filePaths = readStrategy.getFileNamesByPath(path);
+ } catch (IOException e) {
+ String errorMsg = String.format("Get file list from this path [%s] failed", path);
+ throw new FileConnectorException(
+ FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
+ }
+ // support user-defined schema
+ FileFormat fileFormat =
+ FileFormat.valueOf(
+ pluginConfig.getString(ObsConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
+ // only json text csv type support user-defined schema now
+ if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
+ switch (fileFormat) {
+ case CSV:
+ case TEXT:
+ case JSON:
+ case EXCEL:
+ SeaTunnelRowType userDefinedSchema =
+ CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+ readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
+ break;
+ case ORC:
+ case PARQUET:
+ throw new FileConnectorException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ "SeaTunnel does not support user-defined schema for [parquet, orc] files");
+ default:
+ // never got in there
+ throw new FileConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "SeaTunnel does not supported this file format");
+ }
+ } else {
+ if (filePaths.isEmpty()) {
+ // When the directory is empty, distribute default behavior schema
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ return;
+ }
+ try {
+ rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
+ } catch (FileConnectorException e) {
+ String errorMsg =
+ String.format("Get table schema from file [%s] failed", filePaths.get(0));
+ throw new FileConnectorException(
+ CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
new file mode 100644
index 00000000000..e1cd0ee97ba
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Arrays;
+
+@AutoService(Factory.class)
+public class ObsFileSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return FileSystemType.OBS.getFileSystemPluginName();
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(ObsConfig.FILE_PATH)
+ .required(ObsConfig.BUCKET)
+ .required(ObsConfig.ACCESS_KEY)
+ .required(ObsConfig.ACCESS_SECRET)
+ .required(ObsConfig.ENDPOINT)
+ .required(BaseSourceConfigOptions.FILE_FORMAT_TYPE)
+ .conditional(
+ BaseSourceConfigOptions.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSourceConfigOptions.FIELD_DELIMITER)
+ .conditional(
+ BaseSourceConfigOptions.FILE_FORMAT_TYPE,
+ Arrays.asList(
+ FileFormat.TEXT, FileFormat.JSON, FileFormat.EXCEL, FileFormat.CSV),
+ TableSchemaOptions.SCHEMA)
+ .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH)
+ .optional(BaseSourceConfigOptions.DATE_FORMAT)
+ .optional(BaseSourceConfigOptions.DATETIME_FORMAT)
+ .optional(BaseSourceConfigOptions.TIME_FORMAT)
+ .build();
+ }
+
+ @Override
+ public Class extends SeaTunnelSource> getSourceClass() {
+ return ObsFileSource.class;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 00000000000..75590871402
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.obs.OBSFileSystem
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java
new file mode 100644
index 00000000000..dc7c34525a5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs;
+
+import org.apache.seatunnel.connectors.seatunnel.file.obs.sink.ObsFileSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.source.ObsFileSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ObsFileFactoryTest {
+
+ @Test
+ void optionRule() {
+ Assertions.assertNotNull((new ObsFileSourceFactory()).optionRule());
+ Assertions.assertNotNull((new ObsFileSinkFactory()).optionRule());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml
index 4bdfa981cee..efb32ab4445 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -38,6 +38,7 @@
connector-file-base-hadoop
connector-file-sftp
connector-file-s3
+ connector-file-obs
connector-file-jindo-oss
connector-file-cos
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 28b987b4016..59ce6122303 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -541,6 +541,13 @@
provided
+
+ org.apache.seatunnel
+ connector-file-obs
+ ${project.version}
+ provided
+
+
org.apache.seatunnel
connector-paimon
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml
new file mode 100644
index 00000000000..9ee7fd4f147
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connector-v2-e2e
+ ${revision}
+
+ connector-file-obs-e2e
+ SeaTunnel : E2E : Connector V2 : File Obs
+
+
+
+ org.apache.seatunnel
+ connector-fake
+ ${project.version}
+
+
+ org.apache.seatunnel
+ connector-file-obs
+ ${project.version}
+
+
+ org.apache.seatunnel
+ connector-assert
+ ${project.version}
+
+
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java
new file mode 100644
index 00000000000..c5a87959d60
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.file.obs;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.flink.Flink13Container;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Disabled("Please testing it in your local environment with obs account conf")
+public class ObsFileIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void testLocalFileReadAndWrite(TestContainer container)
+ throws IOException, InterruptedException {
+ if (container instanceof Flink13Container) {
+ return;
+ }
+ // test write obs csv file
+ Container.ExecResult csvWriteResult = container.executeJob("/csv/fake_to_obs_csv.conf");
+ Assertions.assertEquals(0, csvWriteResult.getExitCode(), csvWriteResult.getStderr());
+ // test read obs csv file
+ Container.ExecResult csvReadResult = container.executeJob("/csv/obs_csv_to_assert.conf");
+ Assertions.assertEquals(0, csvReadResult.getExitCode(), csvReadResult.getStderr());
+ // test read obs csv file with projection
+ Container.ExecResult csvProjectionReadResult =
+ container.executeJob("/csv/obs_csv_projection_to_assert.conf");
+ Assertions.assertEquals(
+ 0, csvProjectionReadResult.getExitCode(), csvProjectionReadResult.getStderr());
+ // test write obs excel file
+ Container.ExecResult excelWriteResult =
+ container.executeJob("/excel/fake_to_obs_excel.conf");
+ Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr());
+ // test read obs excel file
+ Container.ExecResult excelReadResult =
+ container.executeJob("/excel/obs_excel_to_assert.conf");
+ Assertions.assertEquals(0, excelReadResult.getExitCode(), excelReadResult.getStderr());
+ // test read obs excel file with projection
+ Container.ExecResult excelProjectionReadResult =
+ container.executeJob("/excel/obs_excel_projection_to_assert.conf");
+ Assertions.assertEquals(
+ 0, excelProjectionReadResult.getExitCode(), excelProjectionReadResult.getStderr());
+ // test write obs text file
+ Container.ExecResult textWriteResult =
+ container.executeJob("/text/fake_to_obs_file_text.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ // test read skip header
+ Container.ExecResult textWriteAndSkipResult =
+ container.executeJob("/text/obs_file_text_skip_headers.conf");
+ Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
+ // test read obs text file
+ Container.ExecResult textReadResult =
+ container.executeJob("/text/obs_file_text_to_assert.conf");
+ Assertions.assertEquals(0, textReadResult.getExitCode());
+ // test read obs text file with projection
+ Container.ExecResult textProjectionResult =
+ container.executeJob("/text/obs_file_text_projection_to_assert.conf");
+ Assertions.assertEquals(0, textProjectionResult.getExitCode());
+ // test write obs json file
+ Container.ExecResult jsonWriteResult =
+ container.executeJob("/json/fake_to_obs_file_json.conf");
+ Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+ // test read obs json file
+ Container.ExecResult jsonReadResult =
+ container.executeJob("/json/obs_file_json_to_assert.conf");
+ Assertions.assertEquals(0, jsonReadResult.getExitCode());
+ // test write obs orc file
+ Container.ExecResult orcWriteResult =
+ container.executeJob("/orc/fake_to_obs_file_orc.conf");
+ Assertions.assertEquals(0, orcWriteResult.getExitCode());
+ // test read obs orc file
+ Container.ExecResult orcReadResult =
+ container.executeJob("/orc/obs_file_orc_to_assert.conf");
+ Assertions.assertEquals(0, orcReadResult.getExitCode());
+ // test read obs orc file with projection
+ Container.ExecResult orcProjectionResult =
+ container.executeJob("/orc/obs_file_orc_projection_to_assert.conf");
+ Assertions.assertEquals(0, orcProjectionResult.getExitCode());
+ // test write obs parquet file
+ Container.ExecResult parquetWriteResult =
+ container.executeJob("/parquet/fake_to_obs_file_parquet.conf");
+ Assertions.assertEquals(0, parquetWriteResult.getExitCode());
+ // test read obs parquet file
+ Container.ExecResult parquetReadResult =
+ container.executeJob("/parquet/obs_file_parquet_to_assert.conf");
+ Assertions.assertEquals(0, parquetReadResult.getExitCode());
+ // test read obs parquet file with projection
+ Container.ExecResult parquetProjectionResult =
+ container.executeJob("/parquet/obs_file_parquet_projection_to_assert.conf");
+ Assertions.assertEquals(0, parquetProjectionResult.getExitCode());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf
new file mode 100644
index 00000000000..8ed1e64fce0
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ ObsFile {
+ path="/seatunnel/csv"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format_type="csv"
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf
new file mode 100644
index 00000000000..da22e3e90a2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+source {
+ ObsFile {
+ path="/seatunnel/csv"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ result_table_name = "fake"
+ file_format_type = csv
+ delimiter = ","
+ read_columns = [c_string, c_boolean]
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf
new file mode 100644
index 00000000000..52bbcf5ab95
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path="/seatunnel/csv"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ result_table_name = "fake"
+ file_format_type = csv
+ delimiter = ","
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf
new file mode 100644
index 00000000000..79ff16eb1af
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ ObsFile {
+ path="/seatunnel/excel"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format_type="excel"
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf
new file mode 100644
index 00000000000..4ae33021fc5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path="/seatunnel/excel"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ result_table_name = "fake"
+ file_format_type = excel
+ delimiter = ;
+ read_columns = [c_string, c_boolean]
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf
new file mode 100644
index 00000000000..45144959b0f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path="/seatunnel/excel"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ result_table_name = "fake"
+ file_format_type = excel
+ delimiter = ;
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf
new file mode 100644
index 00000000000..1cd92373f3a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ ObsFile {
+ path = "/seatunnel/json"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "json"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf
new file mode 100644
index 00000000000..76f746bcb2b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path = "/seatunnel/json"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "json"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf
new file mode 100644
index 00000000000..bb531a3c13c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ ObsFile {
+ path = "/seatunnel/orc"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "orc"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ compress_codec = "zlib"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf
new file mode 100644
index 00000000000..b89bed9a49d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path = "/seatunnel/orc"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "orc"
+ read_columns = [c_string, c_boolean, c_double]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf
new file mode 100644
index 00000000000..4d5ab63f5ec
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path = "/seatunnel/orc"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "parquet"
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf
new file mode 100644
index 00000000000..bf696c24946
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ ObsFile {
+ path = "/seatunnel/parquet"
+ bucket = "obs://dc-for-test/seatunnel-test"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "parquet"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ compress_codec = "gzip"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf
new file mode 100644
index 00000000000..3ca1c801222
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path = "/seatunnel/parquet"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "parquet"
+ read_columns = [c_string, c_boolean, c_double]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf
new file mode 100644
index 00000000000..67b0146efb9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path = "/seatunnel/parquet"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "parquet"
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf
new file mode 100644
index 00000000000..4b78f77d476
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ ObsFile {
+ path = "/seatunnel/text"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "text"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ compress_codec = "lzo"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf
new file mode 100644
index 00000000000..09853ce0672
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path = "/seatunnel/text"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "text"
+ read_columns = [c_string, c_boolean, c_double]
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf
new file mode 100644
index 00000000000..452fb79fd8e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path = "/seatunnel/text"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "text"
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 4
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf
new file mode 100644
index 00000000000..86742f67d42
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ ObsFile {
+ path = "/seatunnel/text"
+ bucket = "obs://obs-bucket-name"
+ access_key = ""
+ access_secret = ""
+ endpoint = "obs.xxxxxx.myhuaweicloud.com"
+ file_format_type = "text"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 9e236b74391..35a002fc4b4 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -63,6 +63,7 @@
connector-maxcompute-e2e
connector-google-firestore-e2e
connector-rocketmq-e2e
+ connector-file-obs-e2e
connector-file-ftp-e2e
connector-pulsar-e2e
connector-paimon-e2e