Skip to content

Commit

Permalink
Revert "[fix][hive-source][bug] fix An error occurred reading an empt…
Browse files Browse the repository at this point in the history
…y directory (#5427)" (#5487)
  • Loading branch information
EricJoy2048 authored Sep 15, 2023
1 parent d308e27 commit 0939010
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -151,9 +153,15 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
}
}
}
if (this.fileNames.isEmpty()) {
log.error("The current directory is empty " + path);

if (fileNames.isEmpty()) {
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_EMPTY,
"The target file list is empty,"
+ "SeaTunnel will not be able to sync empty table, "
+ "please check the configuration parameters such as: [file_filter_pattern]");
}

return fileNames;
}

Expand Down Expand Up @@ -188,12 +196,10 @@ public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {

protected Map<String, String> parsePartitionsByPath(String path) {
LinkedHashMap<String, String> partitions = new LinkedHashMap<>();
if (path != null && !path.isEmpty()) {
Arrays.stream(path.split("/", -1))
.filter(split -> split.contains("="))
.map(split -> split.split("=", -1))
.forEach(kv -> partitions.put(kv[0], kv[1]));
}
Arrays.stream(path.split("/", -1))
.filter(split -> split.contains("="))
.map(split -> split.split("=", -1))
.forEach(kv -> partitions.put(kv[0], kv[1]));
return partitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
"Schmea information is not set or incorrect schmea settings");
}
SeaTunnelRowType userDefinedRowTypeWithPartition =
mergePartitionTypes(
fileNames.size() > 0 ? fileNames.get(0) : null, seaTunnelRowType);
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
// column projection
if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
// get the read column index from user-defined row type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String pa
@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
SeaTunnelRowType userDefinedRowTypeWithPartition =
mergePartitionTypes(
fileNames.size() > 0 ? fileNames.get(0) : null, seaTunnelRowType);
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER.key())) {
fieldDelimiter = pluginConfig.getString(BaseSourceConfig.DELIMITER.key());
} else {
Expand Down

0 comments on commit 0939010

Please sign in to comment.