Skip to content

Commit

Permalink
Merge branch 'apache:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
happyboy1024 authored Nov 29, 2023
2 parents d1dc354 + fc91475 commit 3df3e1a
Show file tree
Hide file tree
Showing 62 changed files with 1,254 additions and 50 deletions.
5 changes: 3 additions & 2 deletions bin/install-plugin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ fi
while read line; do
first_char=$(echo "$line" | cut -c 1)

if [ "$first_char" != "-" ] && [ "$first_char" != "#" ]
if [ "$first_char" != "-" ] && [ "$first_char" != "#" ] && [ ! -z $first_char ]
then
echo "install connector : " $line
${SEATUNNEL_HOME}/mvnw dependency:get -DgroupId=org.apache.seatunnel -DartifactId=${line} -Dversion=${version} -Ddest=${SEATUNNEL_HOME}/connectors
fi

done < ${SEATUNNEL_HOME}/config/plugin_config
done < ${SEATUNNEL_HOME}/config/plugin_config

2 changes: 2 additions & 0 deletions docs/en/concept/JobEnvConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ You can configure whether the task is in batch mode or stream mode through `job.

Gets the interval in which checkpoints are periodically scheduled.

In `STREAMING` mode, checkpoints is required, if you do not set it, it will be obtained from the application configuration file `seatunnel.yaml`. In `BATCH` mode, you can disable checkpoints by not setting this parameter.

## parallelism

This parameter configures the parallelism of source and sink.
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The internal implementation of StarRocks sink connector is cached and imported b
| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. |
| save_mode_create_template | string | no | see below | see below |
| starrocks.config | map | no | - | The parameter of the stream load `data_desc` |
| http_socket_timeout_ms | int | no | 180000 | Set http socket timeout, default is 3 minutes. |

### save_mode_create_template

Expand Down
1 change: 0 additions & 1 deletion seatunnel-api/src/test/resources/conf/getCatalogTable.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

env {
job.mode = "BATCH"
checkpoint.interval = 5000
}

source {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ private List<AssertFieldRule.AssertRule> assembleFieldValueRules(
}

private SeaTunnelDataType<?> getFieldType(String fieldTypeStr) {
if (fieldTypeStr.toLowerCase().startsWith("decimal(")) {
String lengthAndScale =
fieldTypeStr.toLowerCase().replace("decimal(", "").replace(")", "");
String[] split = lengthAndScale.split(",");
return new DecimalType(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
}
return TYPES.get(fieldTypeStr.toLowerCase());
}

Expand All @@ -110,6 +116,5 @@ private SeaTunnelDataType<?> getFieldType(String fieldTypeStr) {
TYPES.put("datetime", LocalTimeType.LOCAL_DATE_TIME_TYPE);
TYPES.put("date", LocalTimeType.LOCAL_DATE_TYPE);
TYPES.put("time", LocalTimeType.LOCAL_TIME_TYPE);
TYPES.put("decimal", new DecimalType(38, 18));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public class MultiTableSinkWriter
public MultiTableSinkWriter(
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters, int queueSize) {
this.sinkWriters = sinkWriters;
AtomicInteger cnt = new AtomicInteger(0);
executorService =
Executors.newFixedThreadPool(
queueSize,
runnable -> {
AtomicInteger cnt = new AtomicInteger(0);
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(k, getTargetLocation(k));
});
beingWrittenWriter.clear();
}

private ExcelGenerator getOrCreateExcelGenerator(@NonNull String filePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(key, getTargetLocation(key));
});
beingWrittenOutputStream.clear();
isFirstWrite.clear();
}

private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -132,7 +133,16 @@ private Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) {
switch (fieldType.getSqlType()) {
case ARRAY:
ArrayList<Object> origArray = new ArrayList<>();
((GenericData.Array<?>) field).iterator().forEachRemaining(origArray::add);
((GenericData.Array<?>) field)
.iterator()
.forEachRemaining(
ele -> {
if (ele instanceof Utf8) {
origArray.add(ele.toString());
} else {
origArray.add(ele);
}
});
SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
switch (elementType.getSqlType()) {
case STRING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,32 @@
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Paths;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -157,6 +172,29 @@ public void testParquetReadProjection2() throws Exception {
parquetReadStrategy.read(path, "", testCollector);
}

@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetReadArray() throws Exception {
AutoGenerateParquetData.generateTestData();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(
localConf, AutoGenerateParquetData.DATA_FILE_PATH);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
Assertions.assertEquals(seaTunnelRowTypeInfo.getFieldType(3).getClass(), ArrayType.class);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(AutoGenerateParquetData.DATA_FILE_PATH, "1", testCollector);
List<SeaTunnelRow> rows = testCollector.getRows();
SeaTunnelRow seaTunnelRow = rows.get(0);
Assertions.assertEquals(seaTunnelRow.getField(1).toString(), "Alice");
String[] arrayData = (String[]) seaTunnelRow.getField(3);
Assertions.assertEquals(arrayData.length, 2);
Assertions.assertEquals(arrayData[0], "Java");
AutoGenerateParquetData.deleteFile();
}

public static class TestCollector implements Collector<SeaTunnelRow> {

private final List<SeaTunnelRow> rows = new ArrayList<>();
Expand Down Expand Up @@ -195,4 +233,58 @@ public String getSchema() {
return SCHEMA;
}
}

public static class AutoGenerateParquetData {

public static final String DATA_FILE_PATH = "/tmp/data.parquet";

public static void generateTestData() throws IOException {
deleteFile();
String schemaString =
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"double\"},{\"name\":\"skills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}";
Schema schema = new Schema.Parser().parse(schemaString);

Configuration conf = new Configuration();

Path file = new Path(DATA_FILE_PATH);

ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(file)
.withSchema(schema)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();

GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", 1);
record1.put("name", "Alice");
record1.put("salary", 50000.0);
GenericArray<Utf8> skills1 =
new GenericData.Array<>(2, schema.getField("skills").schema());
skills1.add(new Utf8("Java"));
skills1.add(new Utf8("Python"));
record1.put("skills", skills1);
writer.write(record1);

GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", 2);
record2.put("name", "Bob");
record2.put("salary", 60000.0);
GenericArray<Utf8> skills2 =
new GenericData.Array<>(2, schema.getField("skills").schema());
skills2.add(new Utf8("C++"));
skills2.add(new Utf8("Go"));
record2.put("skills", skills2);
writer.write(record2);

writer.close();
}

public static void deleteFile() {
File parquetFile = new File(DATA_FILE_PATH);
if (parquetFile.exists()) {
parquetFile.delete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
Expand Down Expand Up @@ -128,7 +130,16 @@ public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> restoreEn

private List<Integer> initColumnsIndex(InfluxDB influxdb) {
// query one row to get column info
String query = sourceConfig.getSql() + QUERY_LIMIT;
String sql = sourceConfig.getSql();
String query = sql + QUERY_LIMIT;
// if sql contains tz(), can't be append QUERY_LIMIT at last . see bug #4231
int start = containTzFunction(sql.toLowerCase());
if (start > 0) {
StringBuilder tmpSql = new StringBuilder(sql);
tmpSql.insert(start - 1, QUERY_LIMIT).append(" ");
query = tmpSql.toString();
}

try {
QueryResult queryResult = influxdb.query(new Query(query, sourceConfig.getDatabase()));

Expand All @@ -145,4 +156,14 @@ private List<Integer> initColumnsIndex(InfluxDB influxdb) {
e);
}
}

private static int containTzFunction(String sql) {
Pattern pattern = Pattern.compile("tz\\(.*\\)");
Matcher matcher = pattern.matcher(sql);
if (matcher.find()) {
int start = matcher.start();
return start;
}
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,14 @@ private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(
rowConverter);
}

private static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
return row -> {
Object[] fields = new Object[pkFields.length];
for (int i = 0; i < pkFields.length; i++) {
fields[i] = row.getField(pkFields[i]);
}
SeaTunnelRow newRow = new SeaTunnelRow(fields);
newRow.setTableId(row.getTableId());
newRow.setRowKind(row.getRowKind());
return newRow;
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.function.Function;

public class JdbcOutputFormatBuilderTest {

@Test
public void testKeyExtractor() {
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"id", "name", "age"},
new SeaTunnelDataType[] {
BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE
});
SeaTunnelRowType pkType =
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.INT_TYPE});
int[] pkFields = Arrays.stream(pkType.getFieldNames()).mapToInt(rowType::indexOf).toArray();

SeaTunnelRow insertRow = new SeaTunnelRow(new Object[] {1, "a", 60});
insertRow.setTableId("test");
insertRow.setRowKind(RowKind.INSERT);
SeaTunnelRow updateBefore = new SeaTunnelRow(new Object[] {1, "a"});
updateBefore.setTableId("test");
updateBefore.setRowKind(RowKind.UPDATE_BEFORE);
SeaTunnelRow updateAfter = new SeaTunnelRow(new Object[] {1, "b"});
updateAfter.setTableId("test");
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
SeaTunnelRow deleteRow = new SeaTunnelRow(new Object[] {1});
deleteRow.setTableId("test");
deleteRow.setRowKind(RowKind.DELETE);

Function<SeaTunnelRow, SeaTunnelRow> keyExtractor =
JdbcOutputFormatBuilder.createKeyExtractor(pkFields);
keyExtractor.apply(insertRow);

Assertions.assertEquals(keyExtractor.apply(insertRow), keyExtractor.apply(insertRow));
Assertions.assertEquals(keyExtractor.apply(insertRow), keyExtractor.apply(updateBefore));
Assertions.assertEquals(keyExtractor.apply(insertRow), keyExtractor.apply(updateAfter));
Assertions.assertEquals(keyExtractor.apply(insertRow), keyExtractor.apply(deleteRow));

updateBefore.setTableId("test1");
Assertions.assertNotEquals(keyExtractor.apply(insertRow), keyExtractor.apply(updateBefore));
updateAfter.setField(0, "2");
Assertions.assertNotEquals(keyExtractor.apply(insertRow), keyExtractor.apply(updateAfter));
}
}
Loading

0 comments on commit 3df3e1a

Please sign in to comment.