Skip to content

Commit 8081c88

Browse files
committed
[BitSail#106][Connector] Migrate hadoop source connector to v1 interface & support more InputFormat.
1 parent a3c2ffe commit 8081c88

File tree

25 files changed

+707
-183
lines changed

25 files changed

+707
-183
lines changed

bitsail-connectors/connector-hadoop/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,26 @@
148148
<scope>provided</scope>
149149
</dependency>
150150

151+
<dependency>
152+
<groupId>com.bytedance.bitsail</groupId>
153+
<artifactId>bitsail-shaded-hive</artifactId>
154+
<version>${revision}</version>
155+
<exclusions>
156+
<exclusion>
157+
<groupId>org.apache.ant</groupId>
158+
<artifactId>ant</artifactId>
159+
</exclusion>
160+
<exclusion>
161+
<artifactId>log4j</artifactId>
162+
<groupId>log4j</groupId>
163+
</exclusion>
164+
<exclusion>
165+
<artifactId>commons-net</artifactId>
166+
<groupId>commons-net</groupId>
167+
</exclusion>
168+
</exclusions>
169+
</dependency>
170+
151171
<dependency>
152172
<groupId>com.bytedance.bitsail</groupId>
153173
<artifactId>bitsail-connector-test</artifactId>

bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,4 @@
1818

1919
public class HadoopConstants {
2020
public static String HADOOP_CONNECTOR_NAME = "hadoop";
21-
public static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
22-
public static final String SCHEMA = "hdfs";
2321
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.hadoop.format;
18+
19+
import com.bytedance.bitsail.base.format.DeserializationSchema;
20+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
21+
import com.bytedance.bitsail.common.row.Row;
22+
import com.bytedance.bitsail.common.typeinfo.TypeInfo;
23+
24+
import org.apache.hadoop.hive.serde2.io.DateWritable;
25+
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
26+
import org.apache.hadoop.hive.serde2.io.ShortWritable;
27+
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
28+
import org.apache.hadoop.io.BooleanWritable;
29+
import org.apache.hadoop.io.ByteWritable;
30+
import org.apache.hadoop.io.BytesWritable;
31+
import org.apache.hadoop.io.DoubleWritable;
32+
import org.apache.hadoop.io.FloatWritable;
33+
import org.apache.hadoop.io.IntWritable;
34+
import org.apache.hadoop.io.LongWritable;
35+
import org.apache.hadoop.io.Text;
36+
import org.apache.hadoop.io.Writable;
37+
38+
public abstract class AbstractInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
39+
protected final BitSailConfiguration deserializationConfiguration;
40+
protected final TypeInfo<?>[] typeInfos;
41+
protected final String[] fieldNames;
42+
43+
public AbstractInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
44+
TypeInfo<?>[] typeInfos,
45+
String[] fieldNames) {
46+
47+
this.deserializationConfiguration = deserializationConfiguration;
48+
this.typeInfos = typeInfos;
49+
this.fieldNames = fieldNames;
50+
}
51+
52+
protected Object getWritableValue(Object writable) {
53+
Object ret;
54+
55+
if (writable == null) {
56+
ret = null;
57+
} else if (writable instanceof IntWritable) {
58+
ret = ((IntWritable) writable).get();
59+
} else if (writable instanceof Text) {
60+
ret = writable.toString();
61+
} else if (writable instanceof LongWritable) {
62+
ret = ((LongWritable) writable).get();
63+
} else if (writable instanceof ByteWritable) {
64+
ret = ((ByteWritable) writable).get();
65+
} else if (writable instanceof DateWritable) {
66+
ret = ((DateWritable) writable).get();
67+
} else if (writable instanceof DoubleWritable) {
68+
ret = ((DoubleWritable) writable).get();
69+
} else if (writable instanceof TimestampWritable) {
70+
ret = ((TimestampWritable) writable).getTimestamp();
71+
} else if (writable instanceof FloatWritable) {
72+
ret = ((FloatWritable) writable).get();
73+
} else if (writable instanceof BooleanWritable) {
74+
ret = ((BooleanWritable) writable).get();
75+
} else if (writable instanceof BytesWritable) {
76+
BytesWritable bytesWritable = (BytesWritable) writable;
77+
byte[] bytes = bytesWritable.getBytes();
78+
ret = new byte[bytesWritable.getLength()];
79+
System.arraycopy(bytes, 0, ret, 0, bytesWritable.getLength());
80+
} else if (writable instanceof HiveDecimalWritable) {
81+
ret = ((HiveDecimalWritable) writable).getHiveDecimal().bigDecimalValue();
82+
} else if (writable instanceof ShortWritable) {
83+
ret = ((ShortWritable) writable).get();
84+
} else {
85+
ret = writable.toString();
86+
}
87+
return ret;
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.hadoop.format;
18+
19+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
20+
import com.bytedance.bitsail.common.row.Row;
21+
import com.bytedance.bitsail.common.typeinfo.TypeInfo;
22+
23+
import org.apache.hadoop.io.ArrayWritable;
24+
import org.apache.hadoop.io.Writable;
25+
26+
public class MapredParquetInputFormatDeserializationSchema extends AbstractInputFormatDeserializationSchema {
27+
public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
28+
TypeInfo<?>[] typeInfos,
29+
String[] fieldNames) {
30+
super(deserializationConfiguration, typeInfos, fieldNames);
31+
}
32+
33+
@Override
34+
public Row deserialize(Writable message) {
35+
int arity = fieldNames.length;
36+
Row row = new Row(arity);
37+
Writable[] writableDataArray = ((ArrayWritable) message).get();
38+
for (int i = 0; i < arity; ++i) {
39+
row.setField(i, getWritableValue(writableDataArray[i]));
40+
}
41+
return row;
42+
}
43+
44+
@Override
45+
public boolean isEndOfStream(Row nextElement) {
46+
return false;
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.hadoop.format;
18+
19+
import com.bytedance.bitsail.common.BitSailException;
20+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
21+
import com.bytedance.bitsail.common.model.ColumnInfo;
22+
import com.bytedance.bitsail.common.row.Row;
23+
import com.bytedance.bitsail.common.typeinfo.TypeInfo;
24+
import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode;
25+
import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;
26+
27+
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
28+
import org.apache.hadoop.hive.serde2.SerDeException;
29+
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
30+
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
31+
import org.apache.hadoop.io.Writable;
32+
import org.apache.hadoop.mapred.JobConf;
33+
34+
import java.util.List;
35+
import java.util.Properties;
36+
import java.util.stream.Collectors;
37+
38+
public class OrcInputFormatDeserializationSchema extends AbstractInputFormatDeserializationSchema {
39+
StructObjectInspector inspector;
40+
public OrcInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
41+
TypeInfo<?>[] typeInfos,
42+
String[] fieldNames) {
43+
44+
super(deserializationConfiguration, typeInfos, fieldNames);
45+
46+
List<ColumnInfo> columnInfos = deserializationConfiguration.get(HadoopReaderOptions.COLUMNS);
47+
Properties p = new Properties();
48+
OrcSerde serde = new OrcSerde();
49+
String columns = columnInfos.stream().map(ColumnInfo::getName).collect(Collectors.joining(","));
50+
String columnsTypes = columnInfos.stream().map(ColumnInfo::getType).collect(Collectors.joining(":"));
51+
p.setProperty("columns", columns);
52+
p.setProperty("columns.types", columnsTypes);
53+
serde.initialize(new JobConf(), p);
54+
try {
55+
this.inspector = (StructObjectInspector) serde.getObjectInspector();
56+
} catch (SerDeException e) {
57+
throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported orc type");
58+
}
59+
}
60+
61+
@Override
62+
public Row deserialize(Writable message) {
63+
int arity = fieldNames.length;
64+
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
65+
Row row = new Row(arity);
66+
for (int i = 0; i < arity; ++i) {
67+
Object writableData = inspector.getStructFieldData(message, fields.get(i));
68+
row.setField(i, getWritableValue(writableData));
69+
}
70+
return row;
71+
}
72+
73+
@Override
74+
public boolean isEndOfStream(Row nextElement) {
75+
return false;
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.hadoop.format;
18+
19+
import com.bytedance.bitsail.base.enumerate.ContentType;
20+
import com.bytedance.bitsail.base.format.DeserializationSchema;
21+
import com.bytedance.bitsail.common.BitSailException;
22+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
23+
import com.bytedance.bitsail.common.row.Row;
24+
import com.bytedance.bitsail.common.typeinfo.TypeInfo;
25+
import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema;
26+
import com.bytedance.bitsail.component.format.json.JsonDeserializationSchema;
27+
import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode;
28+
import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;
29+
30+
import org.apache.hadoop.io.Writable;
31+
32+
public class TextInputFormatDeserializationSchema extends AbstractInputFormatDeserializationSchema {
33+
private transient DeserializationSchema<byte[], Row> deserializationSchema;
34+
35+
public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
36+
TypeInfo<?>[] typeInfos,
37+
String[] fieldNames) {
38+
super(deserializationConfiguration, typeInfos, fieldNames);
39+
ContentType contentType = ContentType.valueOf(
40+
deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase());
41+
switch (contentType) {
42+
case CSV:
43+
this.deserializationSchema =
44+
new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
45+
break;
46+
case JSON:
47+
this.deserializationSchema =
48+
new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
49+
break;
50+
default:
51+
throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType);
52+
}
53+
}
54+
55+
@Override
56+
public Row deserialize(Writable message) {
57+
return deserializationSchema.deserialize((message.toString()).getBytes());
58+
}
59+
60+
@Override
61+
public boolean isEndOfStream(Row nextElement) {
62+
return false;
63+
}
64+
}

bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,10 @@
2525

2626
public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions {
2727
@Essential
28-
ConfigOption<String> DEFAULT_FS =
29-
key(READER_PREFIX + "defaultFS")
30-
.noDefaultValue(String.class);
31-
@Essential
3228
ConfigOption<String> PATH_LIST =
3329
key(READER_PREFIX + "path_list")
3430
.noDefaultValue(String.class);
3531

36-
@Essential
3732
ConfigOption<String> CONTENT_TYPE =
3833
key(READER_PREFIX + "content_type")
3934
.noDefaultValue(String.class);
@@ -45,4 +40,8 @@ public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions {
4540
ConfigOption<Integer> DEFAULT_HADOOP_PARALLELISM_THRESHOLD =
4641
key(READER_PREFIX + "default_hadoop_parallelism_threshold")
4742
.defaultValue(2);
43+
44+
ConfigOption<String> HADOOP_INPUT_FORMAT_CLASS =
45+
key(READER_PREFIX + "hadoop_inputformat_class")
46+
.defaultValue("org.apache.hadoop.mapred.TextInputFormat");
4847
}

bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import java.util.Arrays;
4343
import java.util.List;
4444

45-
public class HadoopSource implements Source<Row, HadoopSourceSplit, EmptyState>, ParallelismComputable {
45+
public class HadoopSource<K, V> implements Source<Row, HadoopSourceSplit, EmptyState>, ParallelismComputable {
4646
private static final Logger LOG = LoggerFactory.getLogger(HadoopSource.class);
4747

4848
private BitSailConfiguration readerConfiguration;
@@ -61,12 +61,12 @@ public Boundedness getSourceBoundedness() {
6161

6262
@Override
6363
public SourceReader<Row, HadoopSourceSplit> createReader(SourceReader.Context readerContext) {
64-
return new HadoopSourceReader(readerConfiguration, readerContext);
64+
return new HadoopSourceReader<K, V>(readerConfiguration, readerContext, hadoopPathList);
6565
}
6666

6767
@Override
6868
public SourceSplitCoordinator<HadoopSourceSplit, EmptyState> createSplitCoordinator(SourceSplitCoordinator.Context<HadoopSourceSplit, EmptyState> coordinatorContext) {
69-
return new HadoopSourceSplitCoordinator(readerConfiguration, coordinatorContext, hadoopPathList);
69+
return new HadoopSourceSplitCoordinator<K, V>(readerConfiguration, coordinatorContext, hadoopPathList);
7070
}
7171

7272
@Override

0 commit comments

Comments
 (0)