Skip to content

Commit

Permalink
[Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory o…
Browse files Browse the repository at this point in the history
…n redis (apache#5901)
  • Loading branch information
jackyyyyyssss authored and DESKTOP-GHPCOV0\dingaolong committed Dec 19, 2023
1 parent ae0ad11 commit 56eae49
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public HttpSink(Config pluginConfig, SeaTunnelRowType rowType) {

@Override
public String getPluginName() {
return "Http";
return HttpConfig.CONNECTOR_IDENTITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.util.List;

public class RedisConfig {

public static final String CONNECTOR_IDENTITY = "Redis";

public enum RedisMode {
SINGLE,
CLUSTER;
Expand All @@ -38,8 +42,8 @@ public enum HashKeyParseMode {
.noDefaultValue()
.withDescription("redis hostname or ip");

public static final Option<String> PORT =
Options.key("port").stringType().noDefaultValue().withDescription("redis port");
public static final Option<Integer> PORT =
Options.key("port").intType().noDefaultValue().withDescription("redis port");

public static final Option<String> AUTH =
Options.key("auth")
Expand Down Expand Up @@ -75,9 +79,9 @@ public enum HashKeyParseMode {
.noDefaultValue()
.withDescription("The value of key you want to write to redis.");

public static final Option<String> DATA_TYPE =
public static final Option<RedisDataType> DATA_TYPE =
Options.key("data_type")
.stringType()
.enumType(RedisDataType.class)
.noDefaultValue()
.withDescription("redis data types, support key hash list set zset.");

Expand All @@ -95,9 +99,9 @@ public enum HashKeyParseMode {
.withDescription(
"redis mode, support single or cluster, default value is single");

public static final Option<String> NODES =
public static final Option<List<String>> NODES =
Options.key("nodes")
.stringType()
.listType()
.noDefaultValue()
.withDescription(
"redis nodes information, used in cluster mode, must like as the following format: [host1:port1, host2:port2]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.redis.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;

Expand Down Expand Up @@ -50,64 +49,41 @@ public class RedisParameters implements Serializable {
private List<String> redisNodes = Collections.emptyList();
private long expire = RedisConfig.EXPIRE.defaultValue();

public void buildWithConfig(Config config) {
public void buildWithConfig(ReadonlyConfig config) {
// set host
this.host = config.getString(RedisConfig.HOST.key());
this.host = config.get(RedisConfig.HOST);
// set port
this.port = config.getInt(RedisConfig.PORT.key());
// set auth
if (config.hasPath(RedisConfig.AUTH.key())) {
this.auth = config.getString(RedisConfig.AUTH.key());
}
this.port = config.get(RedisConfig.PORT);
// set db_num
if (config.hasPath(RedisConfig.DB_NUM.key())) {
this.dbNum = config.getInt(RedisConfig.DB_NUM.key());
this.dbNum = config.get(RedisConfig.DB_NUM);
// set hash key mode
this.hashKeyParseMode = config.get(RedisConfig.HASH_KEY_PARSE_MODE);
// set expire
this.expire = config.get(RedisConfig.EXPIRE);
// set auth
if (config.getOptional(RedisConfig.AUTH).isPresent()) {
this.auth = config.get(RedisConfig.AUTH);
}
// set user
if (config.hasPath(RedisConfig.USER.key())) {
this.user = config.getString(RedisConfig.USER.key());
if (config.getOptional(RedisConfig.USER).isPresent()) {
this.user = config.get(RedisConfig.USER);
}
// set mode
if (config.hasPath(RedisConfig.MODE.key())) {
this.mode =
RedisConfig.RedisMode.valueOf(
config.getString(RedisConfig.MODE.key()).toUpperCase());
} else {
this.mode = RedisConfig.MODE.defaultValue();
}
// set hash key mode
if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE.key())) {
this.hashKeyParseMode =
RedisConfig.HashKeyParseMode.valueOf(
config.getString(RedisConfig.HASH_KEY_PARSE_MODE.key()).toUpperCase());
} else {
this.hashKeyParseMode = RedisConfig.HASH_KEY_PARSE_MODE.defaultValue();
}
this.mode = config.get(RedisConfig.MODE);
// set redis nodes information
if (config.hasPath(RedisConfig.NODES.key())) {
this.redisNodes = config.getStringList(RedisConfig.NODES.key());
if (config.getOptional(RedisConfig.NODES).isPresent()) {
this.redisNodes = config.get(RedisConfig.NODES);
}
// set key
if (config.hasPath(RedisConfig.KEY.key())) {
this.keyField = config.getString(RedisConfig.KEY.key());
if (config.getOptional(RedisConfig.KEY).isPresent()) {
this.keyField = config.get(RedisConfig.KEY);
}
// set keysPattern
if (config.hasPath(RedisConfig.KEY_PATTERN.key())) {
this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key());
}
if (config.hasPath(RedisConfig.EXPIRE.key())) {
this.expire = config.getLong(RedisConfig.EXPIRE.key());
}
// set redis data type
try {
String dataType = config.getString(RedisConfig.DATA_TYPE.key());
this.redisDataType = RedisDataType.valueOf(dataType.toUpperCase());
} catch (IllegalArgumentException e) {
throw new RedisConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Redis source connector only support these data types [key, hash, list, set, zset]",
e);
if (config.getOptional(RedisConfig.KEY_PATTERN).isPresent()) {
this.keysPattern = config.get(RedisConfig.KEY_PATTERN);
}
// set redis data type verification factory createAndPrepareSource
this.redisDataType = config.get(RedisConfig.DATA_TYPE);
}

public Jedis buildJedis() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,34 @@

package org.apache.seatunnel.connectors.seatunnel.redis.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;

import com.google.auto.service.AutoService;

import java.io.IOException;

@AutoService(SeaTunnelSink.class)
public class RedisSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private final RedisParameters redisParameters = new RedisParameters();
private SeaTunnelRowType seaTunnelRowType;
private Config pluginConfig;

@Override
public String getPluginName() {
return "Redis";
private ReadonlyConfig readonlyConfig;
private CatalogTable catalogTable;

public RedisSink(ReadonlyConfig config, CatalogTable table) {
this.readonlyConfig = config;
this.catalogTable = table;
this.redisParameters.buildWithConfig(config);
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
RedisConfig.HOST.key(),
RedisConfig.PORT.key(),
RedisConfig.KEY.key(),
RedisConfig.DATA_TYPE.key());
if (!result.isSuccess()) {
throw new RedisConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
this.redisParameters.buildWithConfig(pluginConfig);
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
public String getPluginName() {
return RedisConfig.CONNECTOR_IDENTITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.seatunnel.connectors.seatunnel.redis.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;

import com.google.auto.service.AutoService;
Expand All @@ -31,6 +34,12 @@ public String factoryIdentifier() {
return "Redis";
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
CatalogTable catalogTable = context.getCatalogTable();
return () -> new RedisSink(context.getOptions(), catalogTable);
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@

package org.apache.seatunnel.connectors.seatunnel.redis.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
Expand All @@ -40,40 +35,29 @@
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

import com.google.auto.service.AutoService;
import com.google.common.collect.Lists;

import java.util.List;

@AutoService(SeaTunnelSource.class)
public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private final RedisParameters redisParameters = new RedisParameters();
private SeaTunnelRowType seaTunnelRowType;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;

private CatalogTable catalogTable;

@Override
public String getPluginName() {
return "Redis";
return RedisConfig.CONNECTOR_IDENTITY;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
RedisConfig.HOST.key(),
RedisConfig.PORT.key(),
RedisConfig.KEY_PATTERN.key(),
RedisConfig.DATA_TYPE.key());
if (!result.isSuccess()) {
throw new RedisConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
this.redisParameters.buildWithConfig(pluginConfig);
public RedisSource(ReadonlyConfig readonlyConfig) {

this.redisParameters.buildWithConfig(readonlyConfig);
// TODO: use format SPI
// default use json format
if (pluginConfig.hasPath(RedisConfig.FORMAT.key())) {
if (!pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
if (readonlyConfig.getOptional(RedisConfig.FORMAT).isPresent()) {
if (!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
throw new RedisConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
Expand All @@ -83,17 +67,16 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"Must config schema when format parameter been config"));
}

RedisConfig.Format format =
RedisConfig.Format.valueOf(
pluginConfig.getString(RedisConfig.FORMAT.key()).toUpperCase());
RedisConfig.Format format = readonlyConfig.get(RedisConfig.FORMAT);
if (RedisConfig.Format.JSON.equals(format)) {
this.seaTunnelRowType =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
this.catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.deserializationSchema =
new JsonDeserializationSchema(false, false, seaTunnelRowType);
}
} else {
this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
this.catalogTable = CatalogTableUtil.buildSimpleTextTable();
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.deserializationSchema = null;
}
}
Expand All @@ -104,8 +87,8 @@ public Boundedness getBoundedness() {
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return seaTunnelRowType;
public List<CatalogTable> getProducedCatalogTables() {
return Lists.newArrayList(catalogTable);
}

@Override
Expand Down
Loading

0 comments on commit 56eae49

Please sign in to comment.