Skip to content

[BitSail][Connector]add oss source connector #467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions bitsail-connectors/connector-oss/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bitsail-connectors</artifactId>
<groupId>com.bytedance.bitsail</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-oss</artifactId>

<properties>
<hadoop-aliyun.version>3.1.1</hadoop-aliyun.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop-aliyun.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-component-format-json</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-component-format-csv</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.config;

import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.connector.oss.option.OssReaderOptions;

import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.Constants;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Data
public class OssConf implements Serializable {
private static final String HDFS_IMPL = "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
private static final String SCHEMA = "oss";
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;
protected String kerberosPrincipal;
protected String kerberosKeytabPath;

public String getHdfsImpl() {
return HDFS_IMPL;
}

public String getSchema() {
return SCHEMA;
}

public OssConf(String hdfsNameKey) {
this.hdfsNameKey = hdfsNameKey;
}

public static OssConf buildWithConfig(BitSailConfiguration config) {
OssConf hadoopConf = new OssConf(config.get(OssReaderOptions.BUCKET));
HashMap<String, String> ossOptions = new HashMap<>();
ossOptions.put(Constants.ACCESS_KEY_ID, config.get(OssReaderOptions.ACCESS_KEY));
ossOptions.put(
Constants.ACCESS_KEY_SECRET, config.get(OssReaderOptions.ACCESS_SECRET));
ossOptions.put(Constants.ENDPOINT_KEY, config.get(OssReaderOptions.ENDPOINT));
hadoopConf.setExtraOptions(ossOptions);
return hadoopConf;
}

public void setExtraOptionsForConfiguration(Configuration configuration) {
if (!extraOptions.isEmpty()) {
extraOptions.forEach(configuration::set);
}
if (hdfsSitePath != null) {
configuration.addResource(new Path(hdfsSitePath));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.config;

import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode;
import com.bytedance.bitsail.connector.oss.option.OssReaderOptions;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

@Getter
@Setter
public class OssConfig implements Serializable {
private String bucket;
private String accessKey;
private String accessSecret;
private String endpoint;
private ContentType contentType;
private Boolean skipFirstLine;
private String filePath;

public OssConfig() {
}

public enum ContentType {
CSV,
JSON
}

public OssConfig(BitSailConfiguration jobConf) {
this.bucket = jobConf.getNecessaryOption(OssReaderOptions.BUCKET, OssConnectorErrorCode.REQUIRED_VALUE);
this.accessKey = jobConf.getNecessaryOption(OssReaderOptions.ACCESS_KEY, OssConnectorErrorCode.REQUIRED_VALUE);
this.accessSecret = jobConf.getNecessaryOption(OssReaderOptions.ACCESS_SECRET, OssConnectorErrorCode.REQUIRED_VALUE);
this.endpoint = jobConf.getNecessaryOption(OssReaderOptions.ENDPOINT, OssConnectorErrorCode.REQUIRED_VALUE);
this.contentType = OssConfig.ContentType.valueOf(jobConf.getNecessaryOption(OssReaderOptions.CONTENT_TYPE, OssConnectorErrorCode.UNSUPPORTED_TYPE).toUpperCase());
this.skipFirstLine = jobConf.get(OssReaderOptions.SKIP_FIRST_LINE);
this.filePath = jobConf.get(OssReaderOptions.FILE_PATH);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.constant;

public class OssConstants {
public static String OSS_CONNECTOR_NAME = "oss";
public static final long OSS_SOURCE_SLEEP_MILL_SECS = 1000L;
public static final String OSS_SOURCE_IGNORE_FILENAME = "_SUCCESS";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.exception;

import com.bytedance.bitsail.common.exception.ErrorCode;

public enum OssConnectorErrorCode implements ErrorCode {
REQUIRED_VALUE("Oss-01", "You missed parameter which is required, please check your configuration."),
CONFIG_ERROR("Oss-02", "Config parameter is error."),
UNSUPPORTED_TYPE("Oss-07", "Content Type is not supported"),
FILE_OPERATION_FAILED("Oss-04", "File Operation Failed"),
SPLIT_ERROR("Oss-05", "Something wrong with creating splits.");
private final String code;
private final String description;

OssConnectorErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}

@Override
public String toString() {
return super.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.option;

import com.bytedance.bitsail.common.option.ConfigOption;
import com.bytedance.bitsail.common.option.ReaderOptions;

import static com.bytedance.bitsail.common.option.ConfigOptions.key;
import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX;

public interface OssReaderOptions extends ReaderOptions.BaseReaderOptions {

public static final ConfigOption<String> ACCESS_KEY =
key(READER_PREFIX + "access_key")
.noDefaultValue(String.class);

public static final ConfigOption<String> ACCESS_SECRET =
key(READER_PREFIX + "access_secret")
.noDefaultValue(String.class);

public static final ConfigOption<String> ENDPOINT =
key(READER_PREFIX + "endpoint")
.noDefaultValue(String.class);

public static final ConfigOption<String> BUCKET =
key(READER_PREFIX + "bucket")
.noDefaultValue(String.class);

ConfigOption<String> FILE_PATH =
key(READER_PREFIX + "file_path")
.noDefaultValue(String.class);

ConfigOption<String> CONTENT_TYPE =
key(READER_PREFIX + "content_type")
.defaultValue("csv");

/**
* CSV Format Options
*/
// whether to treat error column as null when parsing
ConfigOption<Boolean> CONVERT_ERROR_COLUMN_AS_NULL =
key(READER_PREFIX + "convert_error_column_as_null")
.defaultValue(false);

ConfigOption<String> CSV_DELIMITER =
key(READER_PREFIX + "csv_delimiter")
.defaultValue(",");

ConfigOption<Character> CSV_ESCAPE =
key(READER_PREFIX + "csv_escape")
.noDefaultValue(Character.class);

ConfigOption<Character> CSV_QUOTE =
key(READER_PREFIX + "csv_quote")
.noDefaultValue(Character.class);

ConfigOption<String> CSV_WITH_NULL_STRING =
key(READER_PREFIX + "csv_with_null_string")
.noDefaultValue(String.class);

ConfigOption<Character> CSV_MULTI_DELIMITER_REPLACER =
key(READER_PREFIX + "csv_multi_delimiter_replace_char")
.defaultValue('§');

ConfigOption<Boolean> SKIP_FIRST_LINE =
key(READER_PREFIX + "skip_first_line")
.defaultValue(false);

/**
* JSON Options
* <p>
* Tips:
* CONVERT_ERROR_COLUMN_AS_NULL is set above
*/
// whether to be insensitive to upper or lower case
ConfigOption<Boolean> CASE_INSENSITIVE =
key(READER_PREFIX + "case_insensitive")
.defaultValue(false);
}
Loading