From 02a35c3979d905fe612c97f003876122b410a671 Mon Sep 17 00:00:00 2001 From: limin Date: Wed, 13 Nov 2024 18:45:50 +0800 Subject: [PATCH] [Feature] [Connector-Redis] Redis connector support delete data (#7994) Co-authored-by: limin --- .../seatunnel/redis/client/RedisClient.java | 23 +++-- .../redis/client/RedisClusterClient.java | 46 +++++++--- .../redis/client/RedisSingleClient.java | 73 +++++++++++----- .../seatunnel/redis/config/RedisDataType.java | 35 ++++++++ .../seatunnel/redis/sink/RedisSinkWriter.java | 20 +++-- .../redis/RedisTestCaseTemplateIT.java | 59 +++++++++++++ .../fake-to-redis-test-delete-hash.conf | 87 +++++++++++++++++++ .../fake-to-redis-test-delete-key.conf | 87 +++++++++++++++++++ .../fake-to-redis-test-delete-list.conf | 86 ++++++++++++++++++ .../fake-to-redis-test-delete-set.conf | 86 ++++++++++++++++++ .../fake-to-redis-test-delete-zset.conf | 86 ++++++++++++++++++ 11 files changed, 648 insertions(+), 40 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java index 109c51f78a3..af7894795d8 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.redis.client; +import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; @@ -92,17 +93,29 @@ private ScanResult scanOnRedis5( public abstract List> batchGetZset(List keys); public abstract void batchWriteString( - List keys, List values, long expireSeconds); + List rowKinds, List keys, List values, long expireSeconds); public abstract void batchWriteList( - List keyBuffer, List valueBuffer, long expireSeconds); + List rowKinds, + List keyBuffer, + List valueBuffer, + long expireSeconds); public abstract void batchWriteSet( - List keyBuffer, List valueBuffer, long expireSeconds); + List rowKinds, + List keyBuffer, + List valueBuffer, + long expireSeconds); public abstract void batchWriteHash( - List keyBuffer, List valueBuffer, long expireSeconds); + List rowKinds, + List keyBuffer, + List valueBuffer, + long expireSeconds); public abstract void batchWriteZset( - List keyBuffer, List valueBuffer, long expireSeconds); + List rowKinds, + List keyBuffer, + List valueBuffer, + long expireSeconds); } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java index bd687e6c9b9..485499476c6 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.redis.client; +import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; @@ -97,42 +98,67 @@ public List> batchGetZset(List keys) { } @Override - public void batchWriteString(List keys, List values, long expireSeconds) { + public void batchWriteString( + List rowKinds, List keys, List values, long expireSeconds) { int size = keys.size(); for (int i = 0; i < size; i++) { - RedisDataType.STRING.set(this, keys.get(i), values.get(i), expireSeconds); + if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) { + RedisDataType.STRING.del(this, keys.get(i), values.get(i)); + } else { + RedisDataType.STRING.set(this, keys.get(i), values.get(i), expireSeconds); + } } } @Override - public void batchWriteList(List keys, List values, long expireSeconds) { + public void batchWriteList( + List rowKinds, List keys, List values, long expireSeconds) { int size = keys.size(); for (int i = 0; i < size; i++) { - RedisDataType.LIST.set(this, keys.get(i), values.get(i), expireSeconds); + if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) { + RedisDataType.LIST.del(this, keys.get(i), values.get(i)); + } else { + RedisDataType.LIST.set(this, keys.get(i), values.get(i), expireSeconds); + } } } @Override - public void batchWriteSet(List keys, List values, long expireSeconds) { + public void batchWriteSet( + List rowKinds, List keys, List values, long expireSeconds) { int size = keys.size(); for (int i = 0; i < size; i++) { - RedisDataType.SET.set(this, keys.get(i), values.get(i), expireSeconds); + if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) { + RedisDataType.SET.del(this, keys.get(i), values.get(i)); + } else { + RedisDataType.SET.set(this, keys.get(i), values.get(i), expireSeconds); + } } } @Override - public void batchWriteHash(List keys, List values, long expireSeconds) { + public void batchWriteHash( + List rowKinds, List keys, List values, long expireSeconds) { int size = keys.size(); for (int i = 0; i < size; i++) { - RedisDataType.HASH.set(this, keys.get(i), values.get(i), expireSeconds); + if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) { + RedisDataType.HASH.del(this, keys.get(i), values.get(i)); + } else { + RedisDataType.HASH.set(this, keys.get(i), values.get(i), expireSeconds); + } } } @Override - public void batchWriteZset(List keys, List values, long expireSeconds) { + public void batchWriteZset( + List rowKinds, List keys, List values, long expireSeconds) { int size = keys.size(); for (int i = 0; i < size; i++) { - RedisDataType.ZSET.set(this, keys.get(i), values.get(i), expireSeconds); + if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == RowKind.UPDATE_BEFORE) { + RedisDataType.ZSET.del(this, keys.get(i), values.get(i)); + } else { + RedisDataType.ZSET.set(this, keys.get(i), values.get(i), expireSeconds); + } } } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java index f79aa46e98c..c9d3ba6788f 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.redis.client; +import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; @@ -139,76 +140,108 @@ public List> batchGetZset(List keys) { } @Override - public void batchWriteString(List keys, List values, long expireSeconds) { + public void batchWriteString( + List rowKinds, List keys, List values, long expireSeconds) { Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { + RowKind rowKind = rowKinds.get(i); String key = keys.get(i); String value = values.get(i); - pipelined.set(key, value); - if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { + pipelined.del(key); + } else { + pipelined.set(key, value); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } } } pipelined.sync(); } @Override - public void batchWriteList(List keys, List values, long expireSeconds) { + public void batchWriteList( + List rowKinds, List keys, List values, long expireSeconds) { Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { + RowKind rowKind = rowKinds.get(i); String key = keys.get(i); String value = values.get(i); - pipelined.lpush(key, value); - if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { + pipelined.lrem(key, 1, value); + } else { + pipelined.lpush(key, value); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } } } pipelined.sync(); } @Override - public void batchWriteSet(List keys, List values, long expireSeconds) { + public void batchWriteSet( + List rowKinds, List keys, List values, long expireSeconds) { Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { + RowKind rowKind = rowKinds.get(i); String key = keys.get(i); String value = values.get(i); - pipelined.sadd(key, value); - if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { + pipelined.srem(key, value); + } else { + pipelined.sadd(key, value); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } } } pipelined.sync(); } @Override - public void batchWriteHash(List keys, List values, long expireSeconds) { + public void batchWriteHash( + List rowKinds, List keys, List values, long expireSeconds) { Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { + RowKind rowKind = rowKinds.get(i); String key = keys.get(i); String value = values.get(i); Map fieldsMap = JsonUtils.toMap(value); - pipelined.hset(key, fieldsMap); - if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { + for (Map.Entry entry : fieldsMap.entrySet()) { + pipelined.hdel(key, entry.getKey()); + } + } else { + pipelined.hset(key, fieldsMap); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } } } pipelined.sync(); } @Override - public void batchWriteZset(List keys, List values, long expireSeconds) { + public void batchWriteZset( + List rowKinds, List keys, List values, long expireSeconds) { Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { + RowKind rowKind = rowKinds.get(i); String key = keys.get(i); String value = values.get(i); - pipelined.zadd(key, 1, value); - if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { + pipelined.zrem(key, value); + } else { + pipelined.zadd(key, 1, value); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } } } pipelined.sync(); diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java index aac874254d2..7929e8181d2 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java @@ -39,6 +39,11 @@ public void set(Jedis jedis, String key, String value, long expire) { public List get(Jedis jedis, String key) { return Collections.singletonList(jedis.get(key)); } + + @Override + public void del(Jedis jedis, String key, String value) { + jedis.del(key); + } }, STRING { @Override @@ -51,6 +56,11 @@ public void set(Jedis jedis, String key, String value, long expire) { public List get(Jedis jedis, String key) { return Collections.singletonList(jedis.get(key)); } + + @Override + public void del(Jedis jedis, String key, String value) { + jedis.del(key); + } }, HASH { @Override @@ -65,6 +75,12 @@ public List get(Jedis jedis, String key) { Map kvMap = jedis.hgetAll(key); return Collections.singletonList(JsonUtils.toJsonString(kvMap)); } + + @Override + public void del(Jedis jedis, String key, String value) { + Map fieldsMap = JsonUtils.toMap(value); + fieldsMap.forEach((k, v) -> jedis.hdel(key, k)); + } }, LIST { @Override @@ -77,6 +93,11 @@ public void set(Jedis jedis, String key, String value, long expire) { public List get(Jedis jedis, String key) { return jedis.lrange(key, 0, -1); } + + @Override + public void del(Jedis jedis, String key, String value) { + jedis.lrem(key, 1, value); + } }, SET { @Override @@ -90,6 +111,11 @@ public List get(Jedis jedis, String key) { Set members = jedis.smembers(key); return new ArrayList<>(members); } + + @Override + public void del(Jedis jedis, String key, String value) { + jedis.srem(key, value); + } }, ZSET { @Override @@ -102,6 +128,11 @@ public void set(Jedis jedis, String key, String value, long expire) { public List get(Jedis jedis, String key) { return jedis.zrange(key, 0, -1); } + + @Override + public void del(Jedis jedis, String key, String value) { + jedis.zrem(key, value); + } }; public List get(Jedis jedis, String key) { @@ -117,4 +148,8 @@ private static void expire(Jedis jedis, String key, long expire) { public void set(Jedis jedis, String key, String value, long expire) { // do nothing } + + public void del(Jedis jedis, String key, String value) { + // do nothing + } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index 9d5c73df2c8..b42fc6107bf 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.serialization.SerializationSchema; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCode; @@ -51,6 +52,7 @@ public class RedisSinkWriter extends AbstractSinkWriter private final int batchSize; + private final List rowKinds; private final List keyBuffer; private final List valueBuffer; @@ -62,12 +64,14 @@ public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters redisP this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType); this.redisClient = redisParameters.buildRedisClient(); this.batchSize = redisParameters.getBatchSize(); + this.rowKinds = new ArrayList<>(batchSize); this.keyBuffer = new ArrayList<>(batchSize); this.valueBuffer = new ArrayList<>(batchSize); } @Override public void write(SeaTunnelRow element) throws IOException { + rowKinds.add(element.getRowKind()); List fields = Arrays.asList(seaTunnelRowType.getFieldNames()); String key = getKey(element, fields); keyBuffer.add(key); @@ -173,6 +177,7 @@ private String handleOtherTypes(SeaTunnelRow element, List fields) { } private void clearBuffer() { + rowKinds.clear(); keyBuffer.clear(); valueBuffer.clear(); } @@ -180,23 +185,28 @@ private void clearBuffer() { private void doBatchWrite() { RedisDataType redisDataType = redisParameters.getRedisDataType(); if (RedisDataType.KEY.equals(redisDataType) || RedisDataType.STRING.equals(redisDataType)) { - redisClient.batchWriteString(keyBuffer, valueBuffer, redisParameters.getExpire()); + redisClient.batchWriteString( + rowKinds, keyBuffer, valueBuffer, redisParameters.getExpire()); return; } if (RedisDataType.LIST.equals(redisDataType)) { - redisClient.batchWriteList(keyBuffer, valueBuffer, redisParameters.getExpire()); + redisClient.batchWriteList( + rowKinds, keyBuffer, valueBuffer, redisParameters.getExpire()); return; } if (RedisDataType.SET.equals(redisDataType)) { - redisClient.batchWriteSet(keyBuffer, valueBuffer, redisParameters.getExpire()); + redisClient.batchWriteSet( + rowKinds, keyBuffer, valueBuffer, redisParameters.getExpire()); return; } if (RedisDataType.HASH.equals(redisDataType)) { - redisClient.batchWriteHash(keyBuffer, valueBuffer, redisParameters.getExpire()); + redisClient.batchWriteHash( + rowKinds, keyBuffer, valueBuffer, redisParameters.getExpire()); return; } if (RedisDataType.ZSET.equals(redisDataType)) { - redisClient.batchWriteZset(keyBuffer, valueBuffer, redisParameters.getExpire()); + redisClient.batchWriteZset( + rowKinds, keyBuffer, valueBuffer, redisParameters.getExpire()); return; } throw new RedisConnectorException( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java index 0f67575ea4e..efd9d8df442 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java @@ -442,5 +442,64 @@ public void testCustomHashKeyAndValueWriteRedis(TestContainer container) jedis.del("custom-hash-check"); } + @TestTemplate + public void testFakeToRedisDeleteHashTest(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/fake-to-redis-test-delete-hash.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(2, jedis.hlen("hash_check")); + jedis.del("hash_check"); + } + + @TestTemplate + public void testFakeToRedisDeleteKeyTest(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/fake-to-redis-test-delete-key.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + int count = 0; + for (int i = 1; i <= 3; i++) { + String data = jedis.get("key_check:" + i); + if (data != null) { + count++; + } + } + Assertions.assertEquals(2, count); + for (int i = 1; i <= 3; i++) { + jedis.del("key_check:" + i); + } + } + + @TestTemplate + public void testFakeToRedisDeleteListTest(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/fake-to-redis-test-delete-list.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(2, jedis.llen("list_check")); + jedis.del("list_check"); + } + + @TestTemplate + public void testFakeToRedisDeleteSetTest(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/fake-to-redis-test-delete-set.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(2, jedis.scard("set_check")); + jedis.del("set_check"); + } + + @TestTemplate + public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/fake-to-redis-test-delete-zset.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(2, jedis.zcard("zset_check")); + jedis.del("zset_check"); + } + public abstract RedisContainerInfo getRedisContainerInfo(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf new file mode 100644 index 00000000000..cffd866916b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_BEFORE + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_AFTER + fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = DELETE + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "hash_check" + data_type = hash + hash_key_field = "id" + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf new file mode 100644 index 00000000000..5be915889e0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_BEFORE + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_AFTER + fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = DELETE + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "key_check:{id}" + data_type = key + support_custom_key = true + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf new file mode 100644 index 00000000000..55deb187549 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_BEFORE + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_AFTER + fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = DELETE + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "list_check" + data_type = list + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf new file mode 100644 index 00000000000..bd1c71128e5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_BEFORE + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_AFTER + fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = DELETE + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "set_check" + data_type = set + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf new file mode 100644 index 00000000000..cf80d3b00cc --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_BEFORE + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_AFTER + fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = DELETE + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "zset_check" + data_type = zset + batch_size = 33 + } +} \ No newline at end of file