Skip to content

Commit

Permalink
[Enhancement] Unified Catalog support Paimon (StarRocks#47398)
Browse files Browse the repository at this point in the history
Make unified catalog support paimon data sources.

example:
```sql
CREATE EXTERNAL CATALOG unified
PROPERTIES
(
    "type" = "unified",
    "unified.metastore.type" = "hive",
    "hive.metastore.uris" = "thrift://localhost:9083",
    "paimon.catalog.warehouse" = "hdfs://127.0.0.1:9000/paimon",
);
```

Signed-off-by: Jiao Mingye <[email protected]>

(cherry picked from commit 2e50513)
  • Loading branch information
mxdzs0612 committed Nov 8, 2024
1 parent 5dc0002 commit 5cc8087
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 22 deletions.
27 changes: 21 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,29 @@
import org.apache.paimon.types.DataField;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR;


public class PaimonTable extends Table {
private final String catalogName;
private final String databaseName;
private final String tableName;
private final org.apache.paimon.table.Table paimonNativeTable;
private final List<String> partColumnNames;
private final List<String> paimonFieldNames;
private String catalogName;
private String databaseName;
private String tableName;
private org.apache.paimon.table.Table paimonNativeTable;
private List<String> partColumnNames;
private List<String> paimonFieldNames;
private Map<String, String> properties;
private long latestSnapshotId;

public PaimonTable() {
super(TableType.PAIMON);
}

public PaimonTable(String catalogName, String dbName, String tblName, List<Column> schema,
org.apache.paimon.table.Table paimonNativeTable, long createTime) {
super(CONNECTOR_ID_GENERATOR.getNextId().asInt(), tblName, TableType.PAIMON, schema);
Expand Down Expand Up @@ -86,6 +93,14 @@ public String getTableLocation() {
}
}

@Override
public Map<String, String> getProperties() {
if (properties == null) {
this.properties = new HashMap<>();
}
return properties;
}

@Override
public List<String> getPartitionColumnNames() {
return partColumnNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;

public class PaimonConnector implements Connector {
private static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
private static final String PAIMON_CATALOG_WAREHOUSE = "paimon.catalog.warehouse";
public static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
public static final String PAIMON_CATALOG_WAREHOUSE = "paimon.catalog.warehouse";
private static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
private static final String DLF_CATGALOG_ID = "dlf.catalog.id";
private final HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -83,7 +83,9 @@ public PaimonConnector(ConnectorContext context) {
&& !catalogType.equalsIgnoreCase("dlf")) {
throw new StarRocksConnectorException("The property %s must be set.", PAIMON_CATALOG_WAREHOUSE);
}
paimonOptions.setString(WAREHOUSE.key(), warehousePath);
if (!Strings.isNullOrEmpty(warehousePath)) {
paimonOptions.setString(WAREHOUSE.key(), warehousePath);
}
initFsOption(cloudConfiguration);
String keyPrefix = "paimon.option.";
Set<String> optionKeys = properties.keySet().stream().filter(k -> k.startsWith(keyPrefix)).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.starrocks.connector.hive.HiveConnector;
import com.starrocks.connector.hudi.HudiConnector;
import com.starrocks.connector.iceberg.IcebergConnector;
import com.starrocks.connector.paimon.PaimonConnector;
import com.starrocks.sql.analyzer.SemanticException;

import java.util.HashMap;
Expand All @@ -35,13 +36,16 @@
import static com.starrocks.catalog.Table.TableType.HIVE;
import static com.starrocks.catalog.Table.TableType.HUDI;
import static com.starrocks.catalog.Table.TableType.ICEBERG;
import static com.starrocks.catalog.Table.TableType.PAIMON;
import static com.starrocks.connector.hive.HiveConnector.HIVE_METASTORE_TYPE;
import static com.starrocks.connector.iceberg.IcebergConnector.ICEBERG_CATALOG_TYPE;
import static com.starrocks.connector.paimon.PaimonConnector.PAIMON_CATALOG_TYPE;
import static com.starrocks.connector.paimon.PaimonConnector.PAIMON_CATALOG_WAREHOUSE;

public class UnifiedConnector implements Connector {
public static final String UNIFIED_METASTORE_TYPE = "unified.metastore.type";
public static final List<String> SUPPORTED_METASTORE_TYPE = ImmutableList.of("hive", "glue");
private final Map<Table.TableType, Connector> connectorMap;
private Map<Table.TableType, Connector> connectorMap;

public UnifiedConnector(ConnectorContext context) {
String metastoreType = context.getProperties().get(UNIFIED_METASTORE_TYPE);
Expand All @@ -53,16 +57,22 @@ public UnifiedConnector(ConnectorContext context) {
derivedProperties.putAll(context.getProperties());
derivedProperties.put(HIVE_METASTORE_TYPE, metastoreType);
derivedProperties.put(ICEBERG_CATALOG_TYPE, metastoreType);
derivedProperties.put(PAIMON_CATALOG_TYPE, metastoreType);

ConnectorContext derivedContext = new ConnectorContext(context.getCatalogName(), context.getType(),
derivedProperties.build());

connectorMap = ImmutableMap.of(
HIVE, new HiveConnector(derivedContext),
ICEBERG, new IcebergConnector(derivedContext),
HUDI, new HudiConnector(derivedContext),
DELTALAKE, new DeltaLakeConnector(derivedContext)
);
ImmutableMap.Builder<Table.TableType, Connector> builder = ImmutableMap.builder();

builder.put(HIVE, new HiveConnector(derivedContext))
.put(ICEBERG, new IcebergConnector(derivedContext))
.put(HUDI, new HudiConnector(derivedContext))
.put(DELTALAKE, new DeltaLakeConnector(derivedContext));
boolean containsPaimon = null != context.getProperties().get(PAIMON_CATALOG_WAREHOUSE);
if (containsPaimon) {
builder.put(PAIMON, new PaimonConnector(derivedContext));
}
connectorMap = builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static com.starrocks.catalog.Table.TableType.HIVE;
import static com.starrocks.catalog.Table.TableType.HUDI;
import static com.starrocks.catalog.Table.TableType.ICEBERG;
import static com.starrocks.catalog.Table.TableType.PAIMON;
import static java.util.Objects.requireNonNull;

public class UnifiedMetadata implements ConnectorMetadata {
Expand Down Expand Up @@ -89,6 +90,9 @@ private Table.TableType getTableType(String dbName, String tblName) {
if (isDeltaLakeTable(table.getProperties())) {
return DELTALAKE;
}
if (isPaimonTable(table.getProperties())) {
return PAIMON;
}
return HIVE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class PaimonTableTest {

Expand Down Expand Up @@ -63,6 +64,8 @@ public void testPartitionKeys(@Mocked FileStoreTable paimonNativeTable) {
};
PaimonTable paimonTable = new PaimonTable("testCatalog", "testDB", "testTable", fullSchema,
paimonNativeTable, 100L);
Map<String, String> properties = paimonTable.getProperties();
Assert.assertEquals(0, properties.size());
List<Column> partitionColumns = paimonTable.getPartitionColumns();
Assertions.assertThat(partitionColumns).hasSameElementsAs(partitionSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import com.starrocks.connector.ConnectorContext;
import com.starrocks.connector.ConnectorFactory;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.paimon.PaimonConnector;
import com.starrocks.connector.paimon.PaimonMetadata;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
Expand All @@ -27,13 +32,22 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

public class UnifiedConnectorTest {
@Mocked private PaimonConnector paimonConnector;
@Mocked private PaimonMetadata paimonMetadata;

@Test
public void testCreateUnifiedConnectorFromConnectorFactory() {
public void testCreateUnifiedConnectorFromConnectorFactory() throws StarRocksConnectorException {
new Expectations() {
{
paimonConnector.getMetadata();
result = paimonMetadata;
}
};
Map<String, String> properties = new HashMap<>();
properties.put("type", "unified");
properties.put("unified.metastore.type", "hive");
properties.put("hive.metastore.uris", "thrift://127.0.0.1:9083");
properties.put("paimon.catalog.warehouse", "file:///tmp/");
ConnectorContext context = new ConnectorContext("unified_catalog", "unified", properties);
CatalogConnector catalogConnector = ConnectorFactory.createConnector(context, false);
ConnectorMetadata metadata = catalogConnector.getMetadata();
Expand All @@ -43,10 +57,17 @@ public void testCreateUnifiedConnectorFromConnectorFactory() {

@Test
public void testCreateUnifiedConnector() {
new Expectations() {
{
paimonConnector.getMetadata();
result = paimonMetadata;
}
};
Map<String, String> properties = new HashMap<>();
properties.put("type", "unified");
properties.put("unified.metastore.type", "hive");
properties.put("hive.metastore.uris", "thrift://127.0.0.1:9083");
properties.put("paimon.catalog.warehouse", "file:///tmp/");
ConnectorContext context = new ConnectorContext("unified_catalog", "unified", properties);
UnifiedConnector unifiedConnector = new UnifiedConnector(context);
ConnectorMetadata metadata = unifiedConnector.getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.starrocks.connector.hive.HiveMetadata;
import com.starrocks.connector.hudi.HudiMetadata;
import com.starrocks.connector.iceberg.IcebergMetadata;
import com.starrocks.connector.paimon.PaimonMetadata;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudType;
import com.starrocks.sql.ast.CreateTableStmt;
Expand All @@ -46,6 +47,7 @@
import static com.starrocks.catalog.Table.TableType.HIVE;
import static com.starrocks.catalog.Table.TableType.HUDI;
import static com.starrocks.catalog.Table.TableType.ICEBERG;
import static com.starrocks.catalog.Table.TableType.PAIMON;
import static com.starrocks.connector.unified.UnifiedMetadata.DELTA_LAKE_PROVIDER;
import static com.starrocks.connector.unified.UnifiedMetadata.ICEBERG_TABLE_TYPE_NAME;
import static com.starrocks.connector.unified.UnifiedMetadata.ICEBERG_TABLE_TYPE_VALUE;
Expand All @@ -54,10 +56,16 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

public class UnifiedMetadataTest {
@Mocked private HiveMetadata hiveMetadata;
@Mocked private IcebergMetadata icebergMetadata;
@Mocked private HudiMetadata hudiMetadata;
@Mocked private DeltaLakeMetadata deltaLakeMetadata;
@Mocked
private HiveMetadata hiveMetadata;
@Mocked
private IcebergMetadata icebergMetadata;
@Mocked
private HudiMetadata hudiMetadata;
@Mocked
private DeltaLakeMetadata deltaLakeMetadata;
@Mocked
private PaimonMetadata paimonMetadata;
private final CreateTableStmt createTableStmt = new CreateTableStmt(false, true,
new TableName("test_db", "test_tbl"), ImmutableList.of(), "hive",
null, null, null, null, null, null);
Expand All @@ -70,7 +78,8 @@ public void setUp() {
HIVE, hiveMetadata,
ICEBERG, icebergMetadata,
HUDI, hudiMetadata,
DELTALAKE, deltaLakeMetadata
DELTALAKE, deltaLakeMetadata,
PAIMON, paimonMetadata
)
);
}
Expand Down

0 comments on commit 5cc8087

Please sign in to comment.