From 76ce8a6a1fcd04c5f778add9f0415d9d33f954fb Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Thu, 11 Jul 2024 16:58:52 +0800 Subject: [PATCH 01/17] Support multitable sink for hbase. --- .../seatunnel/hbase/sink/HbaseSink.java | 13 +- .../hbase/sink/HbaseSinkFactory.java | 9 ++ .../seatunnel/hbase/sink/HbaseSinkWriter.java | 4 +- .../fake_to_hbase_with_multipletable.conf | 55 ++++++++ .../hbase-to-assert-with-multipletable.conf | 129 ++++++++++++++++++ 5 files changed, 208 insertions(+), 2 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_multipletable.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java index 81452eb989b..c70d6cf7eea 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java @@ -23,6 +23,8 @@ import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; @@ -45,7 +47,8 @@ import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM; @AutoService(SeaTunnelSink.class) -public class HbaseSink extends AbstractSimpleSink { +public class HbaseSink extends AbstractSimpleSink + implements SupportMultiTableSink { private Config pluginConfig; @@ -56,6 +59,14 @@ public class HbaseSink extends AbstractSimpleSink { private List rowkeyColumnIndexes = new ArrayList<>(); private int versionColumnIndex = -1; + private CatalogTable catalogTable; + + public HbaseSink(Config config, CatalogTable catalogTable) { + this.pluginConfig = config; + this.catalogTable = catalogTable; + this.prepare(pluginConfig); + this.setTypeInfo(catalogTable.getSeaTunnelRowType()); + } @Override public String getPluginName() { diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java index 3038473c4ed..b747ee58934 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java @@ -18,8 +18,11 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import com.google.auto.service.AutoService; @@ -59,4 +62,10 @@ public OptionRule optionRule() { HBASE_EXTRA_CONFIG) .build(); } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new HbaseSink(context.getOptions().toConfig(), catalogTable); + } } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 72722e582e3..982908bfcad 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.sink; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -44,7 +45,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -public class HbaseSinkWriter extends AbstractSinkWriter { +public class HbaseSinkWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { private static final String ALL_COLUMNS = "all_columns"; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_multipletable.conf new file mode 100644 index 00000000000..393ef575d84 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_multipletable.conf @@ -0,0 +1,55 @@ +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + row.num = 5 + schema { + table = "hbase_sink1" + fields { + name = string + age = int + c_tinyint = tinyint + c_smallint = smallint + c_bigint = bigint + c_float = float + c_double = double + c_boolean = boolean + } + } + }, + { + row.num = 6 + schema { + table = "hbase_sink2" + fields { + name = string + age = int + c_tinyint = tinyint + c_smallint = smallint + c_bigint = bigint + c_float = float + c_double = double + c_boolean = boolean + } + } + } + ] + } +} + + +sink { + Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "seatunnel_test" + rowkey_column = ["name"] + family_name { + all_columns = info + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf new file mode 100644 index 00000000000..6dc7530b4bd --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf @@ -0,0 +1,129 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "seatunnel_test" + query_columns=["rowkey", "info:age", "info:c_double", "info:c_boolean","info:c_bigint","info:c_smallint","info:c_tinyint","info:c_float"] + schema = { + columns = [ + { + name = rowkey + type = string + }, + { + name = "info:age" + type = int + }, + { + name = "info:c_double" + type = double + }, + { + name = "info:c_boolean" + type = boolean + }, + { + name = "info:c_bigint" + type = bigint + }, + { + name = "info:c_smallint" + type = smallint + }, + { + name = "info:c_tinyint" + type = tinyint + }, + { + name = "info:c_float" + type = float + } + ] + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 11 + }, + { + rule_type = MIN_ROW + rule_value = 11 + } + ], + field_rules = [ + { + field_name = rowkey + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:c_boolean" + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:c_double" + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:c_bigint" + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:age" + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file From f095db103663ec21b1a8f7daf2bc058f9caadab3 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Thu, 11 Jul 2024 17:16:54 +0800 Subject: [PATCH 02/17] Support multitable sink for hbase. --- .../e2e/connector/hbase/HbaseIT.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index d3cd57b326f..1aa49806bd0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -133,6 +133,27 @@ public void testHbaseSinkWithArray(TestContainer container) scanner.close(); } + @TestTemplate + public void testHbaseSinkWithMultipleTable(TestContainer container) + throws IOException, InterruptedException { + deleteData(table); + Container.ExecResult sinkExecResult = + container.executeJob("/fake_to_hbase_with_multipletable.conf"); + Assertions.assertEquals(0, sinkExecResult.getExitCode()); + Table hbaseTable = hbaseConnection.getTable(table); + Scan scan = new Scan(); + ResultScanner scanner = hbaseTable.getScanner(scan); + ArrayList results = new ArrayList<>(); + for (Result result : scanner) { + results.add(result); + } + Assertions.assertEquals(results.size(), 5); + scanner.close(); + Container.ExecResult sourceExecResult = + container.executeJob("/hbase-to-assert-with-multipletable.conf"); + Assertions.assertEquals(0, sourceExecResult.getExitCode()); + } + private void deleteData(TableName table) throws IOException { Table hbaseTable = hbaseConnection.getTable(table); Scan scan = new Scan(); From 904c33c6b798aa95eba05c56a1c19efa99623b00 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Thu, 11 Jul 2024 17:21:34 +0800 Subject: [PATCH 03/17] Support multitable sink for hbase. --- .../java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index 1aa49806bd0..141bf524acc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -147,7 +147,7 @@ public void testHbaseSinkWithMultipleTable(TestContainer container) for (Result result : scanner) { results.add(result); } - Assertions.assertEquals(results.size(), 5); + Assertions.assertEquals(results.size(), 11); scanner.close(); Container.ExecResult sourceExecResult = container.executeJob("/hbase-to-assert-with-multipletable.conf"); From 711e7e650d9c6daf1097826f06ce4aaffe95193f Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Thu, 11 Jul 2024 17:39:25 +0800 Subject: [PATCH 04/17] resolve conflict --- .../java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index ab0433408d9..01b1fcff11e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -76,7 +76,9 @@ public void startUp() throws Exception { // Create table for hbase sink test log.info("initial"); hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME)); + hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1", "cf2")); table = TableName.valueOf(TABLE_NAME); + tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME); } @AfterAll From a91f7f0adb5ff9e91deb4bf6025c22eca0c84114 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Mon, 12 Aug 2024 15:11:42 +0800 Subject: [PATCH 05/17] add multi-table-sink --- docs/en/connector-v2/sink/Hbase.md | 82 +++++++++++--- .../hbase/config/HbaseParameters.java | 56 +++------- .../seatunnel/hbase/sink/HbaseSink.java | 56 +--------- .../hbase/sink/HbaseSinkFactory.java | 51 ++++++++- .../seatunnel/hbase/sink/HbaseSinkWriter.java | 18 +--- .../e2e/connector/hbase/HbaseIT.java | 100 ++++++------------ .../fake-to-hbase-with-multipletable.conf | 86 +++++++++++++++ .../fake_to_hbase_with_multipletable.conf | 73 ------------- 8 files changed, 260 insertions(+), 262 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_multipletable.conf diff --git a/docs/en/connector-v2/sink/Hbase.md b/docs/en/connector-v2/sink/Hbase.md index dd75d21f0be..f2c4bcae24e 100644 --- a/docs/en/connector-v2/sink/Hbase.md +++ b/docs/en/connector-v2/sink/Hbase.md @@ -15,7 +15,7 @@ Output data to Hbase | name | type | required | default value | |--------------------|---------|----------|-----------------| | zookeeper_quorum | string | yes | - | -| table | string | yes | - | +| table | string | no | - | | rowkey_column | list | yes | - | | family_name | config | yes | - | | rowkey_delimiter | string | no | "" | @@ -26,7 +26,6 @@ Output data to Hbase | encoding | string | no | utf8 | | hbase_extra_config | string | no | - | | common-options | | no | - | -| ttl | long | no | - | ### zookeeper_quorum [string] @@ -96,10 +95,6 @@ The encoding of string field, support [`utf8`, `gbk`], default `utf8` The extra configuration of hbase -### ttl [long] - -Hbase writes data TTL time, the default is based on the TTL set in the table, unit: milliseconds - ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details @@ -116,18 +111,76 @@ Hbase { all_columns = seatunnel } } + ``` -## Writes To The Specified Column Family +### Multiple Table ```hocon -Hbase { - zookeeper_quorum = "hbase_e2e:2181" - table = "assign_cf_table" - rowkey_column = ["id"] - family_name { - c_double = "cf1" - c_bigint = "cf2" +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "hbase_sink_1" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356] + } + ] + }, + { + schema = { + table = "hbase_sink_2" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357] + } + ] + } + ] + } +} + +sink { + Hbase { + zookeeper_quorum = "hbase:2181" + rowkey_column = ["name"] + family_name { + all_columns = info + } } } ``` @@ -137,4 +190,3 @@ Hbase { ### next version - Add hbase sink connector ([4049](https://github.com/apache/seatunnel/pull/4049)) - diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java index 490e2481070..1255fbc1b6c 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.common.config.TypesafeConfigUtils; import lombok.Builder; @@ -31,7 +32,6 @@ import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS; @@ -60,8 +60,6 @@ public class HbaseParameters implements Serializable { private Map hbaseExtraConfig; - @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue(); - @Builder.Default private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue(); @Builder.Default private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue(); @@ -72,44 +70,24 @@ public class HbaseParameters implements Serializable { @Builder.Default private HbaseConfig.EnCoding enCoding = ENCODING.defaultValue(); - public static HbaseParameters buildWithConfig(Config pluginConfig) { + public static HbaseParameters buildWithConfig(ReadonlyConfig config) { HbaseParametersBuilder builder = HbaseParameters.builder(); // required parameters - builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key())); - builder.table(pluginConfig.getString(TABLE.key())); - builder.rowkeyColumns(pluginConfig.getStringList(ROWKEY_COLUMNS.key())); - builder.familyNames( - TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key()))); - - // optional parameters - if (pluginConfig.hasPath(HBASE_TTL_CONFIG.key())) { - builder.ttl(pluginConfig.getLong(HBASE_TTL_CONFIG.key())); - } - if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) { - builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key())); - } - if (pluginConfig.hasPath(VERSION_COLUMN.key())) { - builder.versionColumn(pluginConfig.getString(VERSION_COLUMN.key())); - } - if (pluginConfig.hasPath(NULL_MODE.key())) { - String nullMode = pluginConfig.getString(NULL_MODE.key()); - builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase())); - } - if (pluginConfig.hasPath(WAL_WRITE.key())) { - builder.walWrite(pluginConfig.getBoolean(WAL_WRITE.key())); - } - if (pluginConfig.hasPath(WRITE_BUFFER_SIZE.key())) { - builder.writeBufferSize(pluginConfig.getInt(WRITE_BUFFER_SIZE.key())); - } - if (pluginConfig.hasPath(ENCODING.key())) { - String encoding = pluginConfig.getString(ENCODING.key()); - builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase())); - } - if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) { - Config extraConfig = pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key()); - builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig)); - } + builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM)); + builder.rowkeyColumns(config.get(ROWKEY_COLUMNS)); + builder.familyNames(config.get(FAMILY_NAME)); + + builder.table(config.get(TABLE)); + builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER)); + builder.versionColumn(config.get(VERSION_COLUMN)); + String nullMode = String.valueOf(config.get(NULL_MODE)); + builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase())); + builder.walWrite(config.get(WAL_WRITE)); + builder.writeBufferSize(config.get(WRITE_BUFFER_SIZE)); + String encoding = String.valueOf(config.get(ENCODING)); + builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase())); + builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG)); return builder.build(); } @@ -127,4 +105,4 @@ public static HbaseParameters buildWithSinkConfig(Config pluginConfig) { } return builder.build(); } -} +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java index 5c5c37d39d5..bdc44424751 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java @@ -19,34 +19,19 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; -import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException; - -import com.google.auto.service.AutoService; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM; - -@AutoService(SeaTunnelSink.class) public class HbaseSink extends AbstractSimpleSink implements SupportMultiTableSink { @@ -59,48 +44,15 @@ public class HbaseSink extends AbstractSimpleSink private List rowkeyColumnIndexes = new ArrayList<>(); private int versionColumnIndex = -1; - private CatalogTable catalogTable; - - public HbaseSink(Config config, CatalogTable catalogTable) { - this.pluginConfig = config; - this.catalogTable = catalogTable; - this.prepare(pluginConfig); - this.setTypeInfo(catalogTable.getSeaTunnelRowType()); - } @Override public String getPluginName() { return HbaseSinkFactory.IDENTIFIER; } - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - this.pluginConfig = pluginConfig; - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, - ZOOKEEPER_QUORUM.key(), - TABLE.key(), - ROWKEY_COLUMNS.key(), - FAMILY_NAME.key()); - if (!result.isSuccess()) { - throw new HbaseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - this.hbaseParameters = HbaseParameters.buildWithConfig(pluginConfig); - if (hbaseParameters.getFamilyNames().size() == 0) { - throw new HbaseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - "The corresponding field options should be configured and should not be empty Refer to the hbase sink document"); - } - } - - @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + public HbaseSink(HbaseParameters hbaseParameters, CatalogTable catalogTable) { + this.hbaseParameters = hbaseParameters; + this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); for (String rowkeyColumn : hbaseParameters.getRowkeyColumns()) { this.rowkeyColumnIndexes.add(seaTunnelRowType.indexOf(rowkeyColumn)); } @@ -115,4 +67,4 @@ public AbstractSinkWriter createWriter(SinkWriter.Context co return new HbaseSinkWriter( seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, versionColumnIndex); } -} +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java index b747ee58934..18f8c165543 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java @@ -17,15 +17,22 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; import com.google.auto.service.AutoService; +import java.util.HashMap; +import java.util.Map; + import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG; @@ -65,7 +72,47 @@ public OptionRule optionRule() { @Override public TableSink createSink(TableSinkFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); CatalogTable catalogTable = context.getCatalogTable(); - return () -> new HbaseSink(context.getOptions().toConfig(), catalogTable); + ReadonlyConfig finalReadonlyConfig = + generateCurrentReadonlyConfig(readonlyConfig, catalogTable); + HbaseParameters hbaseParameters = HbaseParameters.buildWithConfig(finalReadonlyConfig); + return () -> new HbaseSink(hbaseParameters, catalogTable); + } + + private ReadonlyConfig generateCurrentReadonlyConfig( + ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + Map configMap = readonlyConfig.toMap(); + + readonlyConfig + .getOptional(TABLE) + .ifPresent( + tableName -> { + String replacedPath = + replaceCatalogTableInPath(tableName, catalogTable); + configMap.put(TABLE.key(), replacedPath); + }); + + return ReadonlyConfig.fromMap(new HashMap<>(configMap)); + } + + private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) { + String tableName = originTableName; + TableIdentifier tableIdentifier = catalogTable.getTableId(); + if (tableIdentifier != null) { + if (tableIdentifier.getSchemaName() != null) { + tableName = + tableName.replace( + SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, + tableIdentifier.getSchemaName()); + } + if (tableIdentifier.getTableName() != null) { + tableName = + tableName.replace( + SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY, + tableIdentifier.getTableName()); + } + } + return tableName; } -} +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 1c468c991dc..a00f442dd0e 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -42,7 +42,6 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -65,7 +64,7 @@ public class HbaseSinkWriter extends AbstractSinkWriter private final int versionColumnIndex; - private String writeAllColumnFamily; + private String defaultFamilyName = "value"; public HbaseSinkWriter( SeaTunnelRowType seaTunnelRowType, @@ -79,7 +78,7 @@ public HbaseSinkWriter( this.versionColumnIndex = versionColumnIndex; if (hbaseParameters.getFamilyNames().size() == 1) { - this.writeAllColumnFamily = hbaseParameters.getFamilyNames().get(ALL_COLUMNS); + defaultFamilyName = hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, "value"); } // initialize hbase configuration @@ -120,9 +119,6 @@ private Put convertRowToPut(SeaTunnelRow row) { timestamp = (Long) row.getField(versionColumnIndex); } Put put = new Put(rowkey, timestamp); - if (hbaseParameters.getTtl() != -1 && hbaseParameters.getTtl() > 0) { - put.setTTL(hbaseParameters.getTtl()); - } if (!hbaseParameters.isWalWrite()) { put.setDurability(Durability.SKIP_WAL); } @@ -134,14 +130,8 @@ private Put convertRowToPut(SeaTunnelRow row) { .collect(Collectors.toList()); for (Integer writeColumnIndex : writeColumnIndexes) { String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex); - // This is the family of columns that we define to be written through the.conf file - Map configurationFamilyNames = hbaseParameters.getFamilyNames(); String familyName = - configurationFamilyNames.getOrDefault(fieldName, writeAllColumnFamily); - if (!configurationFamilyNames.containsKey(ALL_COLUMNS) - && !configurationFamilyNames.containsKey(fieldName)) { - continue; - } + hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName); byte[] bytes = convertColumnToBytes(row, writeColumnIndex); if (bytes != null) { put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(fieldName), bytes); @@ -207,4 +197,4 @@ private byte[] convertColumnToBytes(SeaTunnelRow row, int index) { CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg); } } -} +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index 01b1fcff11e..4899825e7d5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -56,7 +56,9 @@ public class HbaseIT extends TestSuiteBase implements TestResource { private static final String TABLE_NAME = "seatunnel_test"; - private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table"; + private static final String MULTI_TABLE_ONE_NAME = "hbase_sink_1"; + + private static final String MULTI_TABLE_TWO_NAME = "hbase_sink_2"; private static final String FAMILY_NAME = "info"; @@ -65,7 +67,7 @@ public class HbaseIT extends TestSuiteBase implements TestResource { private Admin admin; private TableName table; - private TableName tableAssign; + private HbaseCluster hbaseCluster; @BeforeAll @@ -76,9 +78,10 @@ public void startUp() throws Exception { // Create table for hbase sink test log.info("initial"); hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME)); - hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1", "cf2")); table = TableName.valueOf(TABLE_NAME); - tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME); + // Create table for hbase multi-table sink test + hbaseCluster.createTable(MULTI_TABLE_ONE_NAME, Arrays.asList(FAMILY_NAME)); + hbaseCluster.createTable(MULTI_TABLE_TWO_NAME, Arrays.asList(FAMILY_NAME)); } @AfterAll @@ -95,15 +98,8 @@ public void testHbaseSink(TestContainer container) throws IOException, Interrupt deleteData(table); Container.ExecResult sinkExecResult = container.executeJob("/fake-to-hbase.conf"); Assertions.assertEquals(0, sinkExecResult.getExitCode()); - Table hbaseTable = hbaseConnection.getTable(table); - Scan scan = new Scan(); - ResultScanner scanner = hbaseTable.getScanner(scan); - ArrayList results = new ArrayList<>(); - for (Result result : scanner) { - results.add(result); - } + ArrayList results = readData(table); Assertions.assertEquals(results.size(), 5); - scanner.close(); Container.ExecResult sourceExecResult = container.executeJob("/hbase-to-assert.conf"); Assertions.assertEquals(0, sourceExecResult.getExitCode()); } @@ -136,53 +132,37 @@ public void testHbaseSinkWithArray(TestContainer container) scanner.close(); } - @TestTemplate - public void testHbaseSinkAssignCfSink(TestContainer container) + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK/FLINK do not support multiple table write") + public void testHbaseMultiTableSink(TestContainer container) throws IOException, InterruptedException { - deleteData(tableAssign); - - Container.ExecResult sinkExecResult = container.executeJob("/fake-to-assign-cf-hbase.conf"); + TableName multiTable1 = TableName.valueOf(MULTI_TABLE_ONE_NAME); + TableName multiTable2 = TableName.valueOf(MULTI_TABLE_TWO_NAME); + deleteData(multiTable1); + deleteData(multiTable2); + Container.ExecResult sinkExecResult = + container.executeJob("/fake-to-hbase-with-multipletable.conf"); Assertions.assertEquals(0, sinkExecResult.getExitCode()); + ArrayList results = readData(multiTable1); + Assertions.assertEquals(results.size(), 1); + results = readData(multiTable2); + Assertions.assertEquals(results.size(), 1); + } - Table hbaseTable = hbaseConnection.getTable(tableAssign); + private void deleteData(TableName table) throws IOException { + Table hbaseTable = hbaseConnection.getTable(table); Scan scan = new Scan(); ResultScanner scanner = hbaseTable.getScanner(scan); - ArrayList results = new ArrayList<>(); - for (Result result : scanner) { - results.add(result); - } - - Assertions.assertEquals(results.size(), 5); - - if (scanner != null) { - scanner.close(); - } - int cf1Count = 0; - int cf2Count = 0; - - for (Result result : results) { - for (Cell cell : result.listCells()) { - String family = Bytes.toString(CellUtil.cloneFamily(cell)); - if ("cf1".equals(family)) { - cf1Count++; - } - if ("cf2".equals(family)) { - cf2Count++; - } - } + // Delete the data generated by the test + for (Result result = scanner.next(); result != null; result = scanner.next()) { + Delete deleteRow = new Delete(result.getRow()); + hbaseTable.delete(deleteRow); } - // check cf1 and cf2 - Assertions.assertEquals(cf1Count, 5); - Assertions.assertEquals(cf2Count, 5); } - @TestTemplate - public void testHbaseSinkWithMultipleTable(TestContainer container) - throws IOException, InterruptedException { - deleteData(table); - Container.ExecResult sinkExecResult = - container.executeJob("/fake_to_hbase_with_multipletable.conf"); - Assertions.assertEquals(0, sinkExecResult.getExitCode()); + public ArrayList readData(TableName table) throws IOException { Table hbaseTable = hbaseConnection.getTable(table); Scan scan = new Scan(); ResultScanner scanner = hbaseTable.getScanner(scan); @@ -190,21 +170,7 @@ public void testHbaseSinkWithMultipleTable(TestContainer container) for (Result result : scanner) { results.add(result); } - Assertions.assertEquals(results.size(), 11); scanner.close(); - Container.ExecResult sourceExecResult = - container.executeJob("/hbase-to-assert-with-multipletable.conf"); - Assertions.assertEquals(0, sourceExecResult.getExitCode()); - } - - private void deleteData(TableName table) throws IOException { - Table hbaseTable = hbaseConnection.getTable(table); - Scan scan = new Scan(); - ResultScanner scanner = hbaseTable.getScanner(scan); - // Delete the data generated by the test - for (Result result = scanner.next(); result != null; result = scanner.next()) { - Delete deleteRow = new Delete(result.getRow()); - hbaseTable.delete(deleteRow); - } + return results; } -} +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf new file mode 100644 index 00000000000..8972bf13249 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "hbase_sink_1" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356] + } + ] + }, + { + schema = { + table = "hbase_sink_2" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + } + } + rows = [ + { + kind = INSERT + fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true] + } + ] + } + ] + } +} + +sink { + Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "${table_name}" + rowkey_column = ["name"] + family_name { + all_columns = info + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_multipletable.conf deleted file mode 100644 index 0b57ab40ef5..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_multipletable.conf +++ /dev/null @@ -1,73 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -env { - parallelism = 1 - job.mode = "BATCH" -} - -source { - FakeSource { - tables_configs = [ - { - row.num = 5 - schema { - table = "hbase_sink1" - fields { - name = string - age = int - c_tinyint = tinyint - c_smallint = smallint - c_bigint = bigint - c_float = float - c_double = double - c_boolean = boolean - } - } - }, - { - row.num = 6 - schema { - table = "hbase_sink2" - fields { - name = string - age = int - c_tinyint = tinyint - c_smallint = smallint - c_bigint = bigint - c_float = float - c_double = double - c_boolean = boolean - } - } - } - ] - } -} - - -sink { - Hbase { - zookeeper_quorum = "hbase_e2e:2181" - table = "seatunnel_test" - rowkey_column = ["name"] - family_name { - all_columns = info - } - } -} \ No newline at end of file From 945b883e1d23b4db73b3d4fa8a16a0a7606fcee9 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Mon, 12 Aug 2024 16:08:52 +0800 Subject: [PATCH 06/17] fix ttl and doc --- docs/en/connector-v2/sink/Hbase.md | 20 +++++ docs/zh/connector-v2/sink/Hbase.md | 74 ++++++++++++++++++- .../hbase/config/HbaseParameters.java | 4 + .../seatunnel/hbase/sink/HbaseSinkWriter.java | 3 + 4 files changed, 100 insertions(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/Hbase.md b/docs/en/connector-v2/sink/Hbase.md index f2c4bcae24e..6efe11bfa4e 100644 --- a/docs/en/connector-v2/sink/Hbase.md +++ b/docs/en/connector-v2/sink/Hbase.md @@ -26,6 +26,7 @@ Output data to Hbase | encoding | string | no | utf8 | | hbase_extra_config | string | no | - | | common-options | | no | - | +| ttl | long | no | - | ### zookeeper_quorum [string] @@ -95,6 +96,10 @@ The encoding of string field, support [`utf8`, `gbk`], default `utf8` The extra configuration of hbase +### ttl [long] + +Hbase writes data TTL time, the default is based on the TTL set in the table, unit: milliseconds + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details @@ -114,6 +119,7 @@ Hbase { ``` + ### Multiple Table ```hocon @@ -185,6 +191,20 @@ sink { } ``` +## Writes To The Specified Column Family + +```hocon +Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "assign_cf_table" + rowkey_column = ["id"] + family_name { + c_double = "cf1" + c_bigint = "cf2" + } +} +``` + ## Changelog ### next version diff --git a/docs/zh/connector-v2/sink/Hbase.md b/docs/zh/connector-v2/sink/Hbase.md index 871cad206c6..c46d83f4fb4 100644 --- a/docs/zh/connector-v2/sink/Hbase.md +++ b/docs/zh/connector-v2/sink/Hbase.md @@ -15,7 +15,7 @@ | 名称 | 类型 | 是否必须 | 默认值 | |--------------------|---------|------|-----------------| | zookeeper_quorum | string | yes | - | -| table | string | yes | - | +| table | string | no | - | | rowkey_column | list | yes | - | | family_name | config | yes | - | | rowkey_delimiter | string | no | "" | @@ -119,6 +119,78 @@ Hbase { ``` + +### 写入多表 + +```hocon +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "hbase_sink_1" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356] + } + ] + }, + { + schema = { + table = "hbase_sink_2" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357] + } + ] + } + ] + } +} + +sink { + Hbase { + zookeeper_quorum = "hbase:2181" + rowkey_column = ["name"] + family_name { + all_columns = info + } + } +} +``` + ## 写入指定列族 ```hocon diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java index 1255fbc1b6c..84188f47a91 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java @@ -32,6 +32,7 @@ import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG; +import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS; @@ -60,6 +61,8 @@ public class HbaseParameters implements Serializable { private Map hbaseExtraConfig; + @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue(); + @Builder.Default private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue(); @Builder.Default private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue(); @@ -88,6 +91,7 @@ public static HbaseParameters buildWithConfig(ReadonlyConfig config) { String encoding = String.valueOf(config.get(ENCODING)); builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase())); builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG)); + builder.ttl(config.get(HBASE_TTL_CONFIG)); return builder.build(); } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index a00f442dd0e..638c3570ad7 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -119,6 +119,9 @@ private Put convertRowToPut(SeaTunnelRow row) { timestamp = (Long) row.getField(versionColumnIndex); } Put put = new Put(rowkey, timestamp); + if (hbaseParameters.getTtl() != -1 && hbaseParameters.getTtl() > 0) { + put.setTTL(hbaseParameters.getTtl()); + } if (!hbaseParameters.isWalWrite()) { put.setDurability(Durability.SKIP_WAL); } From 642e8b1a66af995b8b29a78e01f577ed65647e7f Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Mon, 12 Aug 2024 19:21:23 +0800 Subject: [PATCH 07/17] fix code style. --- .../connectors/seatunnel/hbase/config/HbaseParameters.java | 2 +- .../seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java | 2 +- .../connectors/seatunnel/hbase/sink/HbaseSinkFactory.java | 2 +- .../connectors/seatunnel/hbase/sink/HbaseSinkWriter.java | 2 +- .../java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java index ffed2dd7bfb..4d020700ad6 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java @@ -125,4 +125,4 @@ public static HbaseParameters buildWithSourceConfig(Config pluginConfig) { } return builder.build(); } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java index bdc44424751..22c992f208f 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java @@ -67,4 +67,4 @@ public AbstractSinkWriter createWriter(SinkWriter.Context co return new HbaseSinkWriter( seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, versionColumnIndex); } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java index 18f8c165543..af9564fbe8d 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java @@ -115,4 +115,4 @@ private String replaceCatalogTableInPath(String originTableName, CatalogTable ca } return tableName; } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 638c3570ad7..982908bfcad 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -200,4 +200,4 @@ private byte[] convertColumnToBytes(SeaTunnelRow row, int index) { CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg); } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index 0b31b4a0185..cb6849eb516 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -197,4 +197,4 @@ public ArrayList readData(TableName table) throws IOException { scanner.close(); return results; } -} \ No newline at end of file +} From b16640a40364493f7081c116fc4a2ff36b95a9a1 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Mon, 12 Aug 2024 19:46:44 +0800 Subject: [PATCH 08/17] fix code style. --- docs/en/connector-v2/sink/Hbase.md | 2 +- docs/zh/connector-v2/sink/Hbase.md | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/en/connector-v2/sink/Hbase.md b/docs/en/connector-v2/sink/Hbase.md index 6efe11bfa4e..b1df968e0ca 100644 --- a/docs/en/connector-v2/sink/Hbase.md +++ b/docs/en/connector-v2/sink/Hbase.md @@ -119,7 +119,6 @@ Hbase { ``` - ### Multiple Table ```hocon @@ -210,3 +209,4 @@ Hbase { ### next version - Add hbase sink connector ([4049](https://github.com/apache/seatunnel/pull/4049)) + diff --git a/docs/zh/connector-v2/sink/Hbase.md b/docs/zh/connector-v2/sink/Hbase.md index c46d83f4fb4..aa2defdbc78 100644 --- a/docs/zh/connector-v2/sink/Hbase.md +++ b/docs/zh/connector-v2/sink/Hbase.md @@ -119,7 +119,6 @@ Hbase { ``` - ### 写入多表 ```hocon From fe12ad2a8384fd3b425e5cf95da8c0fc0ff0d802 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Tue, 13 Aug 2024 11:03:15 +0800 Subject: [PATCH 09/17] add MULTI_TABLE_SINK_REPLICA for HbaseSinkFactory. --- .../connectors/seatunnel/hbase/sink/HbaseSinkFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java index af9564fbe8d..87cd15aaad1 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; @@ -60,6 +61,7 @@ public OptionRule optionRule() { return OptionRule.builder() .required(ZOOKEEPER_QUORUM, TABLE, ROWKEY_COLUMNS, FAMILY_NAME) .optional( + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA, ROWKEY_DELIMITER, VERSION_COLUMN, NULL_MODE, From b3f1243aeb6da846f7863d0205835dd3de47f6da Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Wed, 14 Aug 2024 17:58:11 +0800 Subject: [PATCH 10/17] fix return type for createWriter --- .../seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java index 22c992f208f..16cf4981948 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java @@ -62,7 +62,7 @@ public HbaseSink(HbaseParameters hbaseParameters, CatalogTable catalogTable) { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) + public HbaseSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new HbaseSinkWriter( seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, versionColumnIndex); From 227ce2bbae25f811e032ae1c9b0572a501862142 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Wed, 14 Aug 2024 18:47:15 +0800 Subject: [PATCH 11/17] fix code style --- .../seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java index 16cf4981948..0c592dd65a0 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java @@ -25,7 +25,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; import java.io.IOException; @@ -62,8 +61,7 @@ public HbaseSink(HbaseParameters hbaseParameters, CatalogTable catalogTable) { } @Override - public HbaseSinkWriter createWriter(SinkWriter.Context context) - throws IOException { + public HbaseSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new HbaseSinkWriter( seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, versionColumnIndex); } From b2102def3c6e3bf2400555f57ef43984cdbd18f5 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Wed, 14 Aug 2024 19:47:46 +0800 Subject: [PATCH 12/17] fix defaultFamilyName. --- .../connectors/seatunnel/hbase/sink/HbaseSinkWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 982908bfcad..04640c395c2 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -78,7 +78,7 @@ public HbaseSinkWriter( this.versionColumnIndex = versionColumnIndex; if (hbaseParameters.getFamilyNames().size() == 1) { - defaultFamilyName = hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, "value"); + defaultFamilyName = hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, defaultFamilyName); } // initialize hbase configuration From aff8d4e44cdf30a1a952c34d61ab5cff8b3fe948 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Wed, 14 Aug 2024 20:17:09 +0800 Subject: [PATCH 13/17] fix code style. --- .../connectors/seatunnel/hbase/sink/HbaseSinkWriter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 04640c395c2..e1e312d3057 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -78,7 +78,8 @@ public HbaseSinkWriter( this.versionColumnIndex = versionColumnIndex; if (hbaseParameters.getFamilyNames().size() == 1) { - defaultFamilyName = hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, defaultFamilyName); + defaultFamilyName = + hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, defaultFamilyName); } // initialize hbase configuration From c9b8006ee238b7c251c7ccf407ff7dd1c9e2325f Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Mon, 26 Aug 2024 19:38:58 +0800 Subject: [PATCH 14/17] fix IT. --- .../e2e/connector/hbase/HbaseIT.java | 52 ++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index cb6849eb516..fe736f965ef 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -56,6 +56,9 @@ public class HbaseIT extends TestSuiteBase implements TestResource { private static final String TABLE_NAME = "seatunnel_test"; + + private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table"; + private static final String MULTI_TABLE_ONE_NAME = "hbase_sink_1"; private static final String MULTI_TABLE_TWO_NAME = "hbase_sink_2"; @@ -67,6 +70,7 @@ public class HbaseIT extends TestSuiteBase implements TestResource { private Admin admin; private TableName table; + private TableName tableAssign; private HbaseCluster hbaseCluster; @@ -78,7 +82,11 @@ public void startUp() throws Exception { // Create table for hbase sink test log.info("initial"); hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME)); + // Create table for hbase assign cf table sink test + hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1", "cf2")); table = TableName.valueOf(TABLE_NAME); + tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME); + // Create table for hbase multi-table sink test hbaseCluster.createTable(MULTI_TABLE_ONE_NAME, Arrays.asList(FAMILY_NAME)); hbaseCluster.createTable(MULTI_TABLE_TWO_NAME, Arrays.asList(FAMILY_NAME)); @@ -132,10 +140,50 @@ public void testHbaseSinkWithArray(TestContainer container) scanner.close(); } + @TestTemplate + public void testHbaseSinkAssignCfSink(TestContainer container) + throws IOException, InterruptedException { + deleteData(tableAssign); + + Container.ExecResult sinkExecResult = container.executeJob("/fake-to-assign-cf-hbase.conf"); + Assertions.assertEquals(0, sinkExecResult.getExitCode()); + + Table hbaseTable = hbaseConnection.getTable(tableAssign); + Scan scan = new Scan(); + ResultScanner scanner = hbaseTable.getScanner(scan); + ArrayList results = new ArrayList<>(); + for (Result result : scanner) { + results.add(result); + } + + Assertions.assertEquals(results.size(), 5); + + if (scanner != null) { + scanner.close(); + } + int cf1Count = 0; + int cf2Count = 0; + + for (Result result : results) { + for (Cell cell : result.listCells()) { + String family = Bytes.toString(CellUtil.cloneFamily(cell)); + if ("cf1".equals(family)) { + cf1Count++; + } + if ("cf2".equals(family)) { + cf2Count++; + } + } + } + // check cf1 and cf2 + Assertions.assertEquals(cf1Count, 5); + Assertions.assertEquals(cf2Count, 5); + } + @DisabledOnContainer( value = {}, - type = {EngineType.SPARK, EngineType.FLINK}, - disabledReason = "Currently SPARK/FLINK do not support multiple table write") + type = {EngineType.FLINK}, + disabledReason = "Currently FLINK does not support multiple table write") public void testHbaseMultiTableSink(TestContainer container) throws IOException, InterruptedException { TableName multiTable1 = TableName.valueOf(MULTI_TABLE_ONE_NAME); From 4b112ae0e7e0095c9fffc489ab8e8363268b6bd0 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Wed, 28 Aug 2024 11:04:51 +0800 Subject: [PATCH 15/17] fix function replacePlaceholderAndCreate. --- .../hbase/sink/HbaseSinkFactory.java | 52 +++---------------- 1 file changed, 7 insertions(+), 45 deletions(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java index 87cd15aaad1..f030013750f 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java @@ -17,12 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.sink; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.sink.SinkCommonOptions; -import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -31,8 +28,7 @@ import com.google.auto.service.AutoService; -import java.util.HashMap; -import java.util.Map; +import java.util.Collections; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; @@ -74,47 +70,13 @@ public OptionRule optionRule() { @Override public TableSink createSink(TableSinkFactoryContext context) { - ReadonlyConfig readonlyConfig = context.getOptions(); + TableSinkFactoryContext.replacePlaceholderAndCreate( + context.getCatalogTable(), + context.getOptions(), + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); + HbaseParameters hbaseParameters = HbaseParameters.buildWithConfig(context.getOptions()); CatalogTable catalogTable = context.getCatalogTable(); - ReadonlyConfig finalReadonlyConfig = - generateCurrentReadonlyConfig(readonlyConfig, catalogTable); - HbaseParameters hbaseParameters = HbaseParameters.buildWithConfig(finalReadonlyConfig); return () -> new HbaseSink(hbaseParameters, catalogTable); } - - private ReadonlyConfig generateCurrentReadonlyConfig( - ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { - Map configMap = readonlyConfig.toMap(); - - readonlyConfig - .getOptional(TABLE) - .ifPresent( - tableName -> { - String replacedPath = - replaceCatalogTableInPath(tableName, catalogTable); - configMap.put(TABLE.key(), replacedPath); - }); - - return ReadonlyConfig.fromMap(new HashMap<>(configMap)); - } - - private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) { - String tableName = originTableName; - TableIdentifier tableIdentifier = catalogTable.getTableId(); - if (tableIdentifier != null) { - if (tableIdentifier.getSchemaName() != null) { - tableName = - tableName.replace( - SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, - tableIdentifier.getSchemaName()); - } - if (tableIdentifier.getTableName() != null) { - tableName = - tableName.replace( - SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY, - tableIdentifier.getTableName()); - } - } - return tableName; - } } From f50654f07cf940bccb8d7f7f0b863b7f7565de42 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Fri, 30 Aug 2024 15:28:14 +0800 Subject: [PATCH 16/17] remove replaceTablePlaceholder. --- .../connectors/seatunnel/hbase/sink/HbaseSinkFactory.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java index f030013750f..1bbeb43f4e3 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java @@ -28,8 +28,6 @@ import com.google.auto.service.AutoService; -import java.util.Collections; - import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG; @@ -70,11 +68,6 @@ public OptionRule optionRule() { @Override public TableSink createSink(TableSinkFactoryContext context) { - TableSinkFactoryContext.replacePlaceholderAndCreate( - context.getCatalogTable(), - context.getOptions(), - Thread.currentThread().getContextClassLoader(), - Collections.emptyList()); HbaseParameters hbaseParameters = HbaseParameters.buildWithConfig(context.getOptions()); CatalogTable catalogTable = context.getCatalogTable(); return () -> new HbaseSink(hbaseParameters, catalogTable); From ed77367d73446ae918a4aa81ca8366df994941b0 Mon Sep 17 00:00:00 2001 From: "wenjie.wang01" Date: Fri, 30 Aug 2024 16:50:43 +0800 Subject: [PATCH 17/17] fix table option. --- docs/en/connector-v2/sink/Hbase.md | 5 +++-- docs/zh/connector-v2/sink/Hbase.md | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/sink/Hbase.md b/docs/en/connector-v2/sink/Hbase.md index 63646e3ff14..3ceba0982d4 100644 --- a/docs/en/connector-v2/sink/Hbase.md +++ b/docs/en/connector-v2/sink/Hbase.md @@ -15,7 +15,7 @@ Output data to Hbase | name | type | required | default value | |--------------------|---------|----------|-----------------| | zookeeper_quorum | string | yes | - | -| table | string | no | - | +| table | string | yes | - | | rowkey_column | list | yes | - | | family_name | config | yes | - | | rowkey_delimiter | string | no | "" | @@ -181,7 +181,8 @@ source { sink { Hbase { - zookeeper_quorum = "hbase:2181" + zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" + table = "${table_name}" rowkey_column = ["name"] family_name { all_columns = info diff --git a/docs/zh/connector-v2/sink/Hbase.md b/docs/zh/connector-v2/sink/Hbase.md index 704e6da05b6..f028a8c93ee 100644 --- a/docs/zh/connector-v2/sink/Hbase.md +++ b/docs/zh/connector-v2/sink/Hbase.md @@ -15,7 +15,7 @@ | 名称 | 类型 | 是否必须 | 默认值 | |--------------------|---------|------|-----------------| | zookeeper_quorum | string | yes | - | -| table | string | no | - | +| table | string | yes | - | | rowkey_column | list | yes | - | | family_name | config | yes | - | | rowkey_delimiter | string | no | "" | @@ -181,7 +181,8 @@ source { sink { Hbase { - zookeeper_quorum = "hbase:2181" + zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" + table = "${table_name}" rowkey_column = ["name"] family_name { all_columns = info