Skip to content

Commit 707e461

Browse files
libailinzoudaokoulife
authored andcommitted
[Feature-#1918][s3] Add support for reading all types of documents supported by Apache Tika, read excel format
[Feature-#1918][s3] Add support for reading all types of documents supported by Apache Tika, read excel format
1 parent 33c14a5 commit 707e461

File tree

42 files changed

+6270
-21
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+6270
-21
lines changed

chunjun-connectors/chunjun-connector-s3/pom.xml

+12
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@
6868
<version>1.11-8</version>
6969
<scope>test</scope>
7070
</dependency>
71+
<dependency>
72+
<groupId>com.dtstack.chunjun</groupId>
73+
<artifactId>chunjun-format-tika</artifactId>
74+
<version>${project.version}</version>
75+
<scope>provided</scope>
76+
</dependency>
77+
<dependency>
78+
<groupId>com.dtstack.chunjun</groupId>
79+
<artifactId>chunjun-format-excel</artifactId>
80+
<version>${project.version}</version>
81+
<scope>provided</scope>
82+
</dependency>
7183
</dependencies>
7284
<build>
7385
<plugins>

chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java

+21
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package com.dtstack.chunjun.connector.s3.config;
2020

2121
import com.dtstack.chunjun.config.CommonConfig;
22+
import com.dtstack.chunjun.format.excel.config.ExcelFormatConfig;
23+
import com.dtstack.chunjun.format.tika.config.TikaReadConfig;
2224

2325
import com.amazonaws.regions.Regions;
2426
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -86,4 +88,23 @@ public class S3Config extends CommonConfig implements Serializable {
8688

8789
/** 生成的文件名后缀 */
8890
private String suffix;
91+
92+
/** 对象匹配规则 */
93+
private String objectsRegex;
94+
95+
/** 是否使用文本限定符 */
96+
private boolean useTextQualifier = true;
97+
98+
/** 是否开启每条记录生成一个对应的文件 */
99+
private boolean enableWriteSingleRecordAsFile = false;
100+
101+
/** 保留原始文件名 */
102+
private boolean keepOriginalFilename = false;
103+
104+
/** 禁用 Bucket 名称注入到 endpoint 前缀 */
105+
private boolean disableBucketNameInEndpoint = false;
106+
107+
private TikaReadConfig tikaReadConfig = new TikaReadConfig();
108+
109+
private ExcelFormatConfig excelFormatConfig = new ExcelFormatConfig();
89110
}

chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,6 @@ public DynamicTableSink copy() {
8484

8585
@Override
8686
public String asSummaryString() {
87-
return "StreamDynamicTableSink";
87+
return S3DynamicTableSink.class.getName();
8888
}
8989
}

chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java

+62-14
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,28 @@
2727
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
2828
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
2929
import com.dtstack.chunjun.throwable.WriteRecordException;
30+
import com.dtstack.chunjun.util.GsonUtil;
3031

3132
import org.apache.flink.api.java.tuple.Tuple2;
3233
import org.apache.flink.table.data.RowData;
3334

3435
import com.amazonaws.services.s3.AmazonS3;
3536
import com.amazonaws.services.s3.model.PartETag;
3637
import lombok.extern.slf4j.Slf4j;
38+
import org.apache.commons.io.FilenameUtils;
3739
import org.apache.commons.lang3.StringUtils;
3840

3941
import java.io.StringWriter;
4042
import java.io.UnsupportedEncodingException;
4143
import java.util.ArrayList;
4244
import java.util.List;
45+
import java.util.Map;
4346
import java.util.Objects;
47+
import java.util.UUID;
4448
import java.util.stream.Collectors;
4549

50+
import static com.dtstack.chunjun.format.tika.config.TikaReadConfig.ORIGINAL_FILENAME;
51+
4652
/** The OutputFormat Implementation which write data to Amazon S3. */
4753
@Slf4j
4854
public class S3OutputFormat extends BaseRichOutputFormat {
@@ -137,7 +143,8 @@ private void checkOutputDir() {
137143
amazonS3,
138144
s3Config.getBucket(),
139145
s3Config.getObject(),
140-
s3Config.getFetchSize());
146+
s3Config.getFetchSize(),
147+
s3Config.getObjectsRegex());
141148
} else {
142149
subObjects =
143150
S3Util.listObjectsByv1(
@@ -166,11 +173,17 @@ private void nextBlock() {
166173
sw = new StringWriter();
167174
}
168175
this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter());
176+
if (!s3Config.isUseTextQualifier()) {
177+
writerUtil.setUseTextQualifier(false);
178+
}
169179
this.currentPartNumber = this.currentPartNumber + 1;
170180
}
171181

172182
/** Create file multipart upload ID */
173183
private void createActionFinishedTag() {
184+
if (s3Config.isEnableWriteSingleRecordAsFile()) {
185+
return;
186+
}
174187
if (!StringUtils.isNotBlank(currentUploadId)) {
175188
this.currentUploadId =
176189
S3Util.initiateMultipartUploadAndGetId(
@@ -193,26 +206,35 @@ private void beforeWriteRecords() {
193206
}
194207

195208
protected void flushDataInternal() {
209+
if (sw == null) {
210+
return;
211+
}
196212
StringBuffer sb = sw.getBuffer();
197-
if (sb.length() > MIN_SIZE || willClose) {
213+
if (sb.length() > MIN_SIZE || willClose || s3Config.isEnableWriteSingleRecordAsFile()) {
198214
byte[] byteArray;
199215
try {
200216
byteArray = sb.toString().getBytes(s3Config.getEncoding());
201217
} catch (UnsupportedEncodingException e) {
202218
throw new ChunJunRuntimeException(e);
203219
}
204220
log.info("Upload part size:" + byteArray.length);
205-
PartETag partETag =
206-
S3Util.uploadPart(
207-
amazonS3,
208-
s3Config.getBucket(),
209-
s3Config.getObject(),
210-
this.currentUploadId,
211-
this.currentPartNumber,
212-
byteArray);
213-
214-
MyPartETag myPartETag = new MyPartETag(partETag);
215-
myPartETags.add(myPartETag);
221+
222+
if (s3Config.isEnableWriteSingleRecordAsFile()) {
223+
S3Util.putStringObject(
224+
amazonS3, s3Config.getBucket(), s3Config.getObject(), sb.toString());
225+
} else {
226+
PartETag partETag =
227+
S3Util.uploadPart(
228+
amazonS3,
229+
s3Config.getBucket(),
230+
s3Config.getObject(),
231+
this.currentUploadId,
232+
this.currentPartNumber,
233+
byteArray);
234+
235+
MyPartETag myPartETag = new MyPartETag(partETag);
236+
myPartETags.add(myPartETag);
237+
}
216238

217239
log.debug(
218240
"task-{} upload etag:[{}]",
@@ -225,6 +247,9 @@ protected void flushDataInternal() {
225247
}
226248

227249
private void completeMultipartUploadFile() {
250+
if (s3Config.isEnableWriteSingleRecordAsFile()) {
251+
return;
252+
}
228253
if (this.currentPartNumber > 10000) {
229254
throw new IllegalArgumentException("part can not bigger than 10000");
230255
}
@@ -282,7 +307,11 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
282307
// convert row to string
283308
stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord);
284309
try {
285-
for (int i = 0; i < columnNameList.size(); ++i) {
310+
int columnSize = columnNameList.size();
311+
if (s3Config.isEnableWriteSingleRecordAsFile()) {
312+
columnSize = 1;
313+
}
314+
for (int i = 0; i < columnSize; ++i) {
286315

287316
String column = stringRecord[i];
288317

@@ -292,6 +321,25 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
292321
writerUtil.write(column);
293322
}
294323
writerUtil.endRecord();
324+
325+
if (s3Config.isEnableWriteSingleRecordAsFile()) {
326+
Map<String, String> metadataMap =
327+
GsonUtil.GSON.fromJson(stringRecord[1], Map.class);
328+
String key = FilenameUtils.getPath(s3Config.getObject());
329+
// 是否保留原始文件名
330+
if (s3Config.isKeepOriginalFilename()) {
331+
key += metadataMap.get(ORIGINAL_FILENAME) + getExtension();
332+
} else {
333+
key +=
334+
jobId
335+
+ "_"
336+
+ taskNumber
337+
+ "_"
338+
+ UUID.randomUUID().toString()
339+
+ getExtension();
340+
}
341+
s3Config.setObject(key);
342+
}
295343
flushDataInternal();
296344
} catch (Exception ex) {
297345
String msg = "RowData2string error RowData(" + rowData + ")";

chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3DynamicTableSource.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
6666
field.setName(column.getName());
6767
field.setType(
6868
TypeConfig.fromString(column.getDataType().getLogicalType().asSummaryString()));
69-
field.setIndex(i);
69+
int index =
70+
s3Config.getExcelFormatConfig().getColumnIndex() != null
71+
? s3Config.getExcelFormatConfig()
72+
.getColumnIndex()
73+
.get(columns.indexOf(column))
74+
: columns.indexOf(column);
75+
field.setIndex(index);
7076
columnList.add(field);
7177
}
7278
s3Config.setColumn(columnList);

chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java

+116-2
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818

1919
package com.dtstack.chunjun.connector.s3.source;
2020

21+
import com.dtstack.chunjun.config.FieldConfig;
2122
import com.dtstack.chunjun.config.RestoreConfig;
2223
import com.dtstack.chunjun.connector.s3.config.S3Config;
2324
import com.dtstack.chunjun.connector.s3.enums.CompressType;
2425
import com.dtstack.chunjun.connector.s3.util.ReaderUtil;
2526
import com.dtstack.chunjun.connector.s3.util.S3SimpleObject;
2627
import com.dtstack.chunjun.connector.s3.util.S3Util;
28+
import com.dtstack.chunjun.format.excel.common.ExcelData;
29+
import com.dtstack.chunjun.format.excel.source.ExcelInputFormat;
30+
import com.dtstack.chunjun.format.tika.common.TikaData;
31+
import com.dtstack.chunjun.format.tika.source.TikaInputFormat;
2732
import com.dtstack.chunjun.restore.FormatState;
2833
import com.dtstack.chunjun.source.format.BaseRichInputFormat;
2934
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
@@ -38,6 +43,8 @@
3843
import com.amazonaws.services.s3.model.S3Object;
3944
import com.amazonaws.services.s3.model.S3ObjectInputStream;
4045
import lombok.extern.slf4j.Slf4j;
46+
import org.apache.commons.collections.CollectionUtils;
47+
import org.apache.commons.io.FilenameUtils;
4148
import org.apache.commons.lang3.StringUtils;
4249

4350
import java.io.IOException;
@@ -71,6 +78,12 @@ public class S3InputFormat extends BaseRichInputFormat {
7178

7279
private RestoreConfig restoreConf;
7380

81+
private transient TikaData tikaData;
82+
private TikaInputFormat tikaInputFormat;
83+
84+
private transient ExcelData excelData;
85+
private ExcelInputFormat excelInputFormat;
86+
7487
@Override
7588
public void openInputFormat() throws IOException {
7689
super.openInputFormat();
@@ -137,7 +150,31 @@ protected InputSplit[] createInputSplitsInternal(int minNumSplits) {
137150
protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException {
138151
String[] fields;
139152
try {
140-
fields = readerUtil.getValues();
153+
if (s3Config.getTikaReadConfig().isUseExtract() && tikaData != null) {
154+
fields = tikaData.getData();
155+
} else if (s3Config.getExcelFormatConfig().isUseExcelFormat() && excelData != null) {
156+
fields = excelData.getData();
157+
} else {
158+
fields = readerUtil.getValues();
159+
}
160+
// 处理字段配置了对应的列索引
161+
if (s3Config.getExcelFormatConfig().getColumnIndex() != null) {
162+
List<FieldConfig> columns = s3Config.getColumn();
163+
String[] fieldsData = new String[columns.size()];
164+
for (int i = 0; i < CollectionUtils.size(columns); i++) {
165+
FieldConfig fieldConfig = columns.get(i);
166+
if (fieldConfig.getIndex() >= fields.length) {
167+
String errorMessage =
168+
String.format(
169+
"The column index is greater than the data size."
170+
+ " The current column index is [%s], but the data size is [%s]. Data loss may occur.",
171+
fieldConfig.getIndex(), fields.length);
172+
throw new IllegalArgumentException(errorMessage);
173+
}
174+
fieldsData[i] = fields[fieldConfig.getIndex()];
175+
}
176+
fields = fieldsData;
177+
}
141178
rowData = rowConverter.toInternal(fields);
142179
} catch (IOException e) {
143180
throw new ChunJunRuntimeException(e);
@@ -164,9 +201,82 @@ protected void closeInternal() {
164201

165202
@Override
166203
public boolean reachedEnd() throws IOException {
204+
if (s3Config.getTikaReadConfig().isUseExtract()) {
205+
tikaData = getTikaData();
206+
return tikaData == null || tikaData.getData() == null;
207+
} else if (s3Config.getExcelFormatConfig().isUseExcelFormat()) {
208+
excelData = getExcelData();
209+
return excelData == null || excelData.getData() == null;
210+
}
167211
return reachedEndWithoutCheckState();
168212
}
169213

214+
public ExcelData getExcelData() {
215+
if (excelInputFormat == null) {
216+
nextExcelDataStream();
217+
}
218+
if (excelInputFormat != null) {
219+
if (!excelInputFormat.hasNext()) {
220+
excelInputFormat.close();
221+
excelInputFormat = null;
222+
return getExcelData();
223+
}
224+
String[] record = excelInputFormat.nextRecord();
225+
return new ExcelData(record);
226+
} else {
227+
return null;
228+
}
229+
}
230+
231+
private void nextExcelDataStream() {
232+
if (splits.hasNext()) {
233+
currentObject = splits.next();
234+
GetObjectRequest rangeObjectRequest =
235+
new GetObjectRequest(s3Config.getBucket(), currentObject);
236+
log.info("Current read file {}", currentObject);
237+
S3Object o = amazonS3.getObject(rangeObjectRequest);
238+
S3ObjectInputStream s3is = o.getObjectContent();
239+
excelInputFormat = new ExcelInputFormat();
240+
excelInputFormat.open(s3is, s3Config.getExcelFormatConfig());
241+
} else {
242+
excelInputFormat = null;
243+
}
244+
}
245+
246+
public TikaData getTikaData() {
247+
if (tikaInputFormat == null) {
248+
nextTikaDataStream();
249+
}
250+
if (tikaInputFormat != null) {
251+
if (!tikaInputFormat.hasNext()) {
252+
tikaInputFormat.close();
253+
tikaInputFormat = null;
254+
return getTikaData();
255+
}
256+
String[] record = tikaInputFormat.nextRecord();
257+
return new TikaData(record);
258+
} else {
259+
return null;
260+
}
261+
}
262+
263+
private void nextTikaDataStream() {
264+
if (splits.hasNext()) {
265+
currentObject = splits.next();
266+
GetObjectRequest rangeObjectRequest =
267+
new GetObjectRequest(s3Config.getBucket(), currentObject);
268+
log.info("Current read file {}", currentObject);
269+
S3Object o = amazonS3.getObject(rangeObjectRequest);
270+
S3ObjectInputStream s3is = o.getObjectContent();
271+
tikaInputFormat =
272+
new TikaInputFormat(
273+
s3Config.getTikaReadConfig(), s3Config.getFieldNameList().size());
274+
tikaInputFormat.open(s3is, FilenameUtils.getName(currentObject));
275+
} else {
276+
tikaInputFormat = null;
277+
}
278+
}
279+
170280
public boolean reachedEndWithoutCheckState() throws IOException {
171281
// br is empty, indicating that a new file needs to be read
172282
if (readerUtil == null) {
@@ -259,7 +369,11 @@ public List<S3SimpleObject> resolveObjects() {
259369
if (s3Config.isUseV2()) {
260370
subObjects =
261371
S3Util.listObjectsKeyByPrefix(
262-
amazonS3, bucket, prefix, s3Config.getFetchSize());
372+
amazonS3,
373+
bucket,
374+
prefix,
375+
s3Config.getFetchSize(),
376+
s3Config.getObjectsRegex());
263377
} else {
264378
subObjects =
265379
S3Util.listObjectsByv1(

0 commit comments

Comments
 (0)