Skip to content

Commit

Permalink
unified
Browse files Browse the repository at this point in the history
Signed-off-by: Jiao Mingye <[email protected]>
  • Loading branch information
mxdzs0612 committed Jun 27, 2024
1 parent d92e732 commit 0348d6b
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 17 deletions.
27 changes: 21 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,27 @@
import org.apache.paimon.types.DataField;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR;


public class PaimonTable extends Table {
private final String catalogName;
private final String databaseName;
private final String tableName;
private final org.apache.paimon.table.Table paimonNativeTable;
private final List<String> partColumnNames;
private final List<String> paimonFieldNames;
private String catalogName;
private String databaseName;
private String tableName;
private org.apache.paimon.table.Table paimonNativeTable;
private List<String> partColumnNames;
private List<String> paimonFieldNames;
private Map<String, String> properties;

public PaimonTable() {
super(TableType.PAIMON);
}

public PaimonTable(String catalogName, String dbName, String tblName, List<Column> schema,
org.apache.paimon.table.Table paimonNativeTable, long createTime) {
Expand Down Expand Up @@ -85,6 +92,14 @@ public String getTableLocation() {
}
}

@Override
public Map<String, String> getProperties() {
if (properties == null) {
this.properties = new HashMap<>();
}
return properties;
}

@Override
public List<String> getPartitionColumnNames() {
return partColumnNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;

public class PaimonConnector implements Connector {
private static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
private static final String PAIMON_CATALOG_WAREHOUSE = "paimon.catalog.warehouse";
public static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
public static final String PAIMON_CATALOG_WAREHOUSE = "paimon.catalog.warehouse";
private static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
private static final String DLF_CATGALOG_ID = "dlf.catalog.id";
private final HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -82,7 +82,9 @@ public PaimonConnector(ConnectorContext context) {
&& !catalogType.equalsIgnoreCase("dlf")) {
throw new StarRocksConnectorException("The property %s must be set.", PAIMON_CATALOG_WAREHOUSE);
}
paimonOptions.setString(WAREHOUSE.key(), warehousePath);
if (!Strings.isNullOrEmpty(warehousePath)) {
paimonOptions.setString(WAREHOUSE.key(), warehousePath);
}
initFsOption(cloudConfiguration);
String keyPrefix = "paimon.option.";
Set<String> optionKeys = properties.keySet().stream().filter(k -> k.startsWith(keyPrefix)).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.starrocks.connector.hudi.HudiConnector;
import com.starrocks.connector.iceberg.IcebergConnector;
import com.starrocks.connector.kudu.KuduConnector;
import com.starrocks.connector.paimon.PaimonConnector;
import com.starrocks.sql.analyzer.SemanticException;

import java.util.HashMap;
Expand All @@ -37,14 +38,17 @@
import static com.starrocks.catalog.Table.TableType.HUDI;
import static com.starrocks.catalog.Table.TableType.ICEBERG;
import static com.starrocks.catalog.Table.TableType.KUDU;
import static com.starrocks.catalog.Table.TableType.PAIMON;
import static com.starrocks.connector.hive.HiveConnector.HIVE_METASTORE_TYPE;
import static com.starrocks.connector.iceberg.IcebergCatalogProperties.ICEBERG_CATALOG_TYPE;
import static com.starrocks.connector.kudu.KuduConnector.KUDU_CATALOG_TYPE;
import static com.starrocks.connector.paimon.PaimonConnector.PAIMON_CATALOG_TYPE;
import static com.starrocks.connector.paimon.PaimonConnector.PAIMON_CATALOG_WAREHOUSE;

public class UnifiedConnector implements Connector {
public static final String UNIFIED_METASTORE_TYPE = "unified.metastore.type";
public static final List<String> SUPPORTED_METASTORE_TYPE = ImmutableList.of("hive", "glue");
private final Map<Table.TableType, Connector> connectorMap;
private Map<Table.TableType, Connector> connectorMap;

public UnifiedConnector(ConnectorContext context) {
String metastoreType = context.getProperties().get(UNIFIED_METASTORE_TYPE);
Expand All @@ -56,18 +60,24 @@ public UnifiedConnector(ConnectorContext context) {
derivedProperties.putAll(context.getProperties());
derivedProperties.put(HIVE_METASTORE_TYPE, metastoreType);
derivedProperties.put(ICEBERG_CATALOG_TYPE, metastoreType);
derivedProperties.put(PAIMON_CATALOG_TYPE, metastoreType);
derivedProperties.put(KUDU_CATALOG_TYPE, metastoreType);

ConnectorContext derivedContext = new ConnectorContext(context.getCatalogName(), context.getType(),
derivedProperties.build());

connectorMap = ImmutableMap.of(
HIVE, new HiveConnector(derivedContext),
ICEBERG, new IcebergConnector(derivedContext),
HUDI, new HudiConnector(derivedContext),
DELTALAKE, new DeltaLakeConnector(derivedContext),
KUDU, new KuduConnector(derivedContext)
);
boolean shouldCreatePaimonConnector = null != context.getProperties().get(PAIMON_CATALOG_WAREHOUSE);
ImmutableMap.Builder<Table.TableType, Connector> builder = ImmutableMap.builder();

builder.put(HIVE, new HiveConnector(derivedContext))
.put(ICEBERG, new IcebergConnector(derivedContext))
.put(HUDI, new HudiConnector(derivedContext))
.put(DELTALAKE, new DeltaLakeConnector(derivedContext))
.put(KUDU, new KuduConnector(derivedContext));
if (shouldCreatePaimonConnector) {
builder.put(PAIMON, new PaimonConnector(derivedContext));
}
connectorMap = builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static com.starrocks.catalog.Table.TableType.HUDI;
import static com.starrocks.catalog.Table.TableType.ICEBERG;
import static com.starrocks.catalog.Table.TableType.KUDU;
import static com.starrocks.catalog.Table.TableType.PAIMON;
import static java.util.Objects.requireNonNull;

public class UnifiedMetadata implements ConnectorMetadata {
Expand Down Expand Up @@ -94,6 +95,9 @@ private Table.TableType getTableType(String dbName, String tblName) {
if (isDeltaLakeTable(table.getProperties())) {
return DELTALAKE;
}
if (isPaimonTable(table.getProperties())) {
return PAIMON;
}
if (table.isKuduTable()) {
return KUDU;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class PaimonTableTest {

Expand Down Expand Up @@ -63,6 +64,8 @@ public void testPartitionKeys(@Mocked FileStoreTable paimonNativeTable) {
};
PaimonTable paimonTable = new PaimonTable("testCatalog", "testDB", "testTable", fullSchema,
paimonNativeTable, 100L);
Map<String, String> properties = paimonTable.getProperties();
Assert.assertEquals(0, properties.size());
List<Column> partitionColumns = paimonTable.getPartitionColumns();
Assertions.assertThat(partitionColumns).hasSameElementsAs(partitionSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import com.starrocks.connector.ConnectorFactory;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.paimon.PaimonConnector;
import com.starrocks.connector.paimon.PaimonMetadata;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
Expand All @@ -28,13 +32,22 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

public class UnifiedConnectorTest {
@Mocked private PaimonConnector paimonConnector;
@Mocked private PaimonMetadata paimonMetadata;

@Test
public void testCreateUnifiedConnectorFromConnectorFactory() throws StarRocksConnectorException {
new Expectations() {
{
paimonConnector.getMetadata();
result = paimonMetadata;
}
};
Map<String, String> properties = new HashMap<>();
properties.put("type", "unified");
properties.put("unified.metastore.type", "hive");
properties.put("hive.metastore.uris", "thrift://127.0.0.1:9083");
properties.put("paimon.catalog.warehouse", "file:///tmp/");
properties.put("kudu.master", "localhost:7051");
properties.put("kudu.schema-emulation.enabled", "true");
properties.put("kudu.schema-emulation.prefix", "impala::");
Expand All @@ -47,10 +60,17 @@ public void testCreateUnifiedConnectorFromConnectorFactory() throws StarRocksCon

@Test
public void testCreateUnifiedConnector() {
new Expectations() {
{
paimonConnector.getMetadata();
result = paimonMetadata;
}
};
Map<String, String> properties = new HashMap<>();
properties.put("type", "unified");
properties.put("unified.metastore.type", "hive");
properties.put("hive.metastore.uris", "thrift://127.0.0.1:9083");
properties.put("paimon.catalog.warehouse", "file:///tmp/");
properties.put("kudu.master", "localhost:7051");
properties.put("kudu.schema-emulation.enabled", "true");
properties.put("kudu.schema-emulation.prefix", "impala::");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.starrocks.connector.iceberg.IcebergMetaSpec;
import com.starrocks.connector.iceberg.IcebergMetadata;
import com.starrocks.connector.kudu.KuduMetadata;
import com.starrocks.connector.paimon.PaimonMetadata;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudType;
import com.starrocks.sql.ast.CreateTableStmt;
Expand All @@ -51,6 +52,7 @@
import static com.starrocks.catalog.Table.TableType.HUDI;
import static com.starrocks.catalog.Table.TableType.ICEBERG;
import static com.starrocks.catalog.Table.TableType.KUDU;
import static com.starrocks.catalog.Table.TableType.PAIMON;
import static com.starrocks.connector.unified.UnifiedMetadata.DELTA_LAKE_PROVIDER;
import static com.starrocks.connector.unified.UnifiedMetadata.ICEBERG_TABLE_TYPE_NAME;
import static com.starrocks.connector.unified.UnifiedMetadata.ICEBERG_TABLE_TYPE_VALUE;
Expand All @@ -63,6 +65,7 @@ public class UnifiedMetadataTest {
@Mocked private IcebergMetadata icebergMetadata;
@Mocked private HudiMetadata hudiMetadata;
@Mocked private DeltaLakeMetadata deltaLakeMetadata;
@Mocked private PaimonMetadata paimonMetadata;
@Mocked private KuduMetadata kuduMetadata;
private final CreateTableStmt createTableStmt = new CreateTableStmt(false, true,
new TableName("test_db", "test_tbl"), ImmutableList.of(), "hive",
Expand All @@ -77,6 +80,7 @@ public void setUp() {
ICEBERG, icebergMetadata,
HUDI, hudiMetadata,
DELTALAKE, deltaLakeMetadata,
PAIMON, paimonMetadata,
KUDU, kuduMetadata
)
);
Expand Down

0 comments on commit 0348d6b

Please sign in to comment.