Skip to content

Commit

Permalink
[Feature] [Connector-Redis] Redis connector support delete data (#7994)
Browse files Browse the repository at this point in the history
Co-authored-by: limin <[email protected]>
  • Loading branch information
lm-ylj and limin authored Nov 13, 2024
1 parent af4fd8b commit 02a35c3
Show file tree
Hide file tree
Showing 11 changed files with 648 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.redis.client;

import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;

Expand Down Expand Up @@ -92,17 +93,29 @@ private ScanResult<String> scanOnRedis5(
public abstract List<List<String>> batchGetZset(List<String> keys);

public abstract void batchWriteString(
List<String> keys, List<String> values, long expireSeconds);
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds);

public abstract void batchWriteList(
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);
List<RowKind> rowKinds,
List<String> keyBuffer,
List<String> valueBuffer,
long expireSeconds);

public abstract void batchWriteSet(
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);
List<RowKind> rowKinds,
List<String> keyBuffer,
List<String> valueBuffer,
long expireSeconds);

public abstract void batchWriteHash(
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);
List<RowKind> rowKinds,
List<String> keyBuffer,
List<String> valueBuffer,
long expireSeconds);

public abstract void batchWriteZset(
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);
List<RowKind> rowKinds,
List<String> keyBuffer,
List<String> valueBuffer,
long expireSeconds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.redis.client;

import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;

Expand Down Expand Up @@ -97,42 +98,67 @@ public List<List<String>> batchGetZset(List<String> keys) {
}

@Override
public void batchWriteString(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteString(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.STRING.set(this, keys.get(i), values.get(i), expireSeconds);
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) {
RedisDataType.STRING.del(this, keys.get(i), values.get(i));
} else {
RedisDataType.STRING.set(this, keys.get(i), values.get(i), expireSeconds);
}
}
}

@Override
public void batchWriteList(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteList(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.LIST.set(this, keys.get(i), values.get(i), expireSeconds);
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) {
RedisDataType.LIST.del(this, keys.get(i), values.get(i));
} else {
RedisDataType.LIST.set(this, keys.get(i), values.get(i), expireSeconds);
}
}
}

@Override
public void batchWriteSet(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteSet(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.SET.set(this, keys.get(i), values.get(i), expireSeconds);
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) {
RedisDataType.SET.del(this, keys.get(i), values.get(i));
} else {
RedisDataType.SET.set(this, keys.get(i), values.get(i), expireSeconds);
}
}
}

@Override
public void batchWriteHash(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteHash(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.HASH.set(this, keys.get(i), values.get(i), expireSeconds);
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) {
RedisDataType.HASH.del(this, keys.get(i), values.get(i));
} else {
RedisDataType.HASH.set(this, keys.get(i), values.get(i), expireSeconds);
}
}
}

@Override
public void batchWriteZset(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteZset(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.ZSET.set(this, keys.get(i), values.get(i), expireSeconds);
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) {
RedisDataType.ZSET.del(this, keys.get(i), values.get(i));
} else {
RedisDataType.ZSET.set(this, keys.get(i), values.get(i), expireSeconds);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.redis.client;

import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;

Expand Down Expand Up @@ -139,76 +140,108 @@ public List<List<String>> batchGetZset(List<String> keys) {
}

@Override
public void batchWriteString(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteString(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
pipelined.set(key, value);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
pipelined.del(key);
} else {
pipelined.set(key, value);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
}
}
}
pipelined.sync();
}

@Override
public void batchWriteList(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteList(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
pipelined.lpush(key, value);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
pipelined.lrem(key, 1, value);
} else {
pipelined.lpush(key, value);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
}
}
}
pipelined.sync();
}

@Override
public void batchWriteSet(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteSet(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
pipelined.sadd(key, value);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
pipelined.srem(key, value);
} else {
pipelined.sadd(key, value);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
}
}
}
pipelined.sync();
}

@Override
public void batchWriteHash(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteHash(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
Map<String, String> fieldsMap = JsonUtils.toMap(value);
pipelined.hset(key, fieldsMap);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
pipelined.hdel(key, entry.getKey());
}
} else {
pipelined.hset(key, fieldsMap);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
}
}
}
pipelined.sync();
}

@Override
public void batchWriteZset(List<String> keys, List<String> values, long expireSeconds) {
public void batchWriteZset(
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
RowKind rowKind = rowKinds.get(i);
String key = keys.get(i);
String value = values.get(i);
pipelined.zadd(key, 1, value);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
pipelined.zrem(key, value);
} else {
pipelined.zadd(key, 1, value);
if (expireSeconds > 0) {
pipelined.expire(key, expireSeconds);
}
}
}
pipelined.sync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public void set(Jedis jedis, String key, String value, long expire) {
public List<String> get(Jedis jedis, String key) {
return Collections.singletonList(jedis.get(key));
}

@Override
public void del(Jedis jedis, String key, String value) {
jedis.del(key);
}
},
STRING {
@Override
Expand All @@ -51,6 +56,11 @@ public void set(Jedis jedis, String key, String value, long expire) {
public List<String> get(Jedis jedis, String key) {
return Collections.singletonList(jedis.get(key));
}

@Override
public void del(Jedis jedis, String key, String value) {
jedis.del(key);
}
},
HASH {
@Override
Expand All @@ -65,6 +75,12 @@ public List<String> get(Jedis jedis, String key) {
Map<String, String> kvMap = jedis.hgetAll(key);
return Collections.singletonList(JsonUtils.toJsonString(kvMap));
}

@Override
public void del(Jedis jedis, String key, String value) {
Map<String, String> fieldsMap = JsonUtils.toMap(value);
fieldsMap.forEach((k, v) -> jedis.hdel(key, k));
}
},
LIST {
@Override
Expand All @@ -77,6 +93,11 @@ public void set(Jedis jedis, String key, String value, long expire) {
public List<String> get(Jedis jedis, String key) {
return jedis.lrange(key, 0, -1);
}

@Override
public void del(Jedis jedis, String key, String value) {
jedis.lrem(key, 1, value);
}
},
SET {
@Override
Expand All @@ -90,6 +111,11 @@ public List<String> get(Jedis jedis, String key) {
Set<String> members = jedis.smembers(key);
return new ArrayList<>(members);
}

@Override
public void del(Jedis jedis, String key, String value) {
jedis.srem(key, value);
}
},
ZSET {
@Override
Expand All @@ -102,6 +128,11 @@ public void set(Jedis jedis, String key, String value, long expire) {
public List<String> get(Jedis jedis, String key) {
return jedis.zrange(key, 0, -1);
}

@Override
public void del(Jedis jedis, String key, String value) {
jedis.zrem(key, value);
}
};

public List<String> get(Jedis jedis, String key) {
Expand All @@ -117,4 +148,8 @@ private static void expire(Jedis jedis, String key, long expire) {
public void set(Jedis jedis, String key, String value, long expire) {
// do nothing
}

public void del(Jedis jedis, String key, String value) {
// do nothing
}
}
Loading

0 comments on commit 02a35c3

Please sign in to comment.