diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java index 7f34d85..fd3c1bf 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java @@ -85,6 +85,10 @@ public enum PushDirection { public static final String PUSH_DIRECTION_DOC = "List push direction: " + PushDirection.LEFT + " (LPUSH) or " + PushDirection.RIGHT + " (RPUSH)"; + public static final String KEY_SET_EXPIRE_CONFIG = "redis.set.expire.timeout"; + public static final String KEY_SET_EXPIRE_DEFAULT = "0"; + public static final String KEY_SET_EXPIRE_DOC = "Key expiration timeout in millis for SET command."; + private final Charset charset; private final DataType type; private final String keyspace; @@ -93,6 +97,7 @@ public enum PushDirection { private final boolean multiexec; private final int waitReplicas; private final long waitTimeout; + private final long keySetExpireTimeout; public RedisSinkConfig(Map originals) { super(new RedisSinkConfigDef(), originals); @@ -105,6 +110,7 @@ public RedisSinkConfig(Map originals) { multiexec = Boolean.TRUE.equals(getBoolean(MULTIEXEC_CONFIG)); waitReplicas = getInt(WAIT_REPLICAS_CONFIG); waitTimeout = getLong(WAIT_TIMEOUT_CONFIG); + keySetExpireTimeout = getLong(KEY_SET_EXPIRE_CONFIG); } public Charset getCharset() { @@ -139,6 +145,10 @@ public long getWaitTimeout() { return waitTimeout; } + public long getKeySetExpireTimeout() { + return keySetExpireTimeout; + } + public static class RedisSinkConfigDef extends RedisConfigDef { public RedisSinkConfigDef() { @@ -168,6 +178,8 @@ private void define() { .defaultValue(WAIT_REPLICAS_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build()); define(ConfigKeyBuilder.of(WAIT_TIMEOUT_CONFIG, ConfigDef.Type.LONG).documentation(WAIT_TIMEOUT_DOC) .defaultValue(WAIT_TIMEOUT_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build()); + define(ConfigKeyBuilder.of(KEY_SET_EXPIRE_CONFIG, ConfigDef.Type.LONG).documentation(KEY_SET_EXPIRE_DOC) + .defaultValue(KEY_SET_EXPIRE_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build()); } @Override diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java index 3fe8913..5e30780 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import io.lettuce.core.SetArgs; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; @@ -81,6 +82,7 @@ public class RedisSinkTask extends SinkTask { private RedisItemWriter writer; private Converter jsonConverter; private GenericObjectPool> pool; + private SetArgs setArgs; @Override public String version() { @@ -97,6 +99,7 @@ public void start(final Map props) { jsonConverter.configure(Collections.singletonMap("schemas.enable", "false"), false); writer = writer(client).options(config.writerOptions()).operation(operation()); writer.open(new ExecutionContext()); + setArgs = getSetArgs(config); final java.util.Set assignment = this.context.assignment(); if (!assignment.isEmpty()) { Map partitionOffsets = new HashMap<>(assignment.size()); @@ -146,7 +149,7 @@ private Operation operation() { case JSON: return JsonSet.key(this::key).value(this::jsonValue).del(this::isDelete).build(); case STRING: - return Set.key(this::key).value(this::value).del(this::isDelete).build(); + return Set.key(this::key).value(this::value).args(setArgs).del(this::isDelete).build(); case STREAM: return Xadd.key(this::collectionKey).body(this::map).build(); case LIST: @@ -213,6 +216,16 @@ private boolean isDelete(SinkRecord sinkRecord) { return sinkRecord.value() == null; } + private SetArgs getSetArgs(RedisSinkConfig config) { + SetArgs setArgs = new SetArgs(); + + if (config.getKeySetExpireTimeout() > 0) { + setArgs.ex(config.getKeySetExpireTimeout()); + } + + return setArgs; + } + private byte[] key(SinkRecord sinkRecord) { if (config.getKeyspace().isEmpty()) { return bytes("key", sinkRecord.key());