Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Aug 15, 2023
1 parent 4ae3982 commit ace8a2d
Show file tree
Hide file tree
Showing 32 changed files with 1,007 additions and 183 deletions.
255 changes: 242 additions & 13 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
distributedTableParams);
static void CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams);
static void ConvertCitusLocalTableToTableType(Oid relationId,
CitusTableType tableType,
DistributedTableParams *
distributedTableParams);
static uint32 SingleShardTableColocationNodeId(uint32 colocationId);
static uint32 SingleShardTableGetNodeId(Oid relationId);
static int64 NoneDistTableGetShardId(Oid relationId);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
Expand Down Expand Up @@ -1095,23 +1102,36 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
}

/*
* EnsureTableNotDistributed errors out when relation is a citus table but
* we don't want to ask user to first undistribute their citus local tables
* when creating reference or distributed tables from them.
* For this reason, here we undistribute citus local tables beforehand.
* But since UndistributeTable does not support undistributing relations
* involved in foreign key relationships, we first drop foreign keys that
* given relation is involved, then we undistribute the relation and finally
* we re-create dropped foreign keys at the end of this function.
* EnsureTableNotDistributed errors out when relation is a Citus table.
*
* For this reason, we either undistribute the Citus Local table first
* and then follow the usual code-path to create distributed table; or
* we simply move / replicate its shard to create a single-shard table /
* reference table, and then we update the metadata accordingly.
*
* If we're about it to undistribute it (because we will create a distributed
* table soon), then we first drop foreign keys that given relation is
* involved because UndistributeTable does not support undistributing
* relations involved in foreign key relationships. At the end of this
* function, we then re-create the dropped foreign keys.
*/
List *originalForeignKeyRecreationCommands = NIL;
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
/* store foreign key creation commands that relation is involved */
originalForeignKeyRecreationCommands =
GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
INCLUDE_ALL_TABLE_TYPES);
relationId = DropFKeysAndUndistributeTable(relationId);
if (tableType == REFERENCE_TABLE || tableType == SINGLE_SHARD_DISTRIBUTED)
{
ConvertCitusLocalTableToTableType(relationId, tableType,
distributedTableParams);
return;
}
else
{
/* store foreign key creation commands that relation is involved */
originalForeignKeyRecreationCommands =
GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
INCLUDE_ALL_TABLE_TYPES);
relationId = DropFKeysAndUndistributeTable(relationId);
}
}
/*
* To support foreign keys between reference tables and local tables,
Expand Down Expand Up @@ -1319,6 +1339,215 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
}


/*
* ConvertCitusLocalTableToTableType converts given Citus local table to
* given table type.
*
* This only supports converting Citus local tables to reference tables
* (by replicating the shard to workers) and single-shard distributed
* tables (by moving the shard to appropriate worker).
*/
static void
ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams)
{
if (!IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errmsg("table is not a local table added to metadata")));
}

if (tableType != REFERENCE_TABLE && tableType != SINGLE_SHARD_DISTRIBUTED)
{
ereport(ERROR, (errmsg("table type is not supported for conversion")));
}

LockRelationOid(relationId, ExclusiveLock);

Var *distributionColumn = NULL;
CitusTableParams citusTableParams = DecideCitusTableParams(tableType,
distributedTableParams);

uint32 colocationId = INVALID_COLOCATION_ID;
if (distributedTableParams &&
distributedTableParams->colocationParam.colocationParamType ==
COLOCATE_WITH_COLOCATION_ID)
{
colocationId = distributedTableParams->colocationParam.colocationId;
}
else
{
colocationId = ColocationIdForNewTable(relationId, tableType,
distributedTableParams,
distributionColumn);
}

/* check constraints etc. on table based on new distribution params */
EnsureRelationCanBeDistributed(relationId, distributionColumn,
citusTableParams.distributionMethod,
colocationId, citusTableParams.replicationModel);

/*
* Regarding the foreign key relationships that given relation is involved,
* EnsureRelationCanBeDistributed() only checks the ones where the relation is
* the referencing table.
*
* And given that the table at hand is a Citus local table, right now it may
* only be referenced by a reference table or a Citus local table.
*
* However, given that neither of those two cases are not applicable for a
* distributed table, here we throw an error assuming that the referencing
* relation is a reference table or a Citus local table.
*
* While doing so, we use the same error message used in
* ErrorIfUnsupportedForeignConstraintExists(), which is eventually called
* by EnsureRelationCanBeDistributed().
*
* Note that we don't need to check the same if we're creating a reference
* table from a Citus local table because all the foreign keys referencing
* Citus local tables are supported by reference tables.
*/
if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
int fkeyFlags = (INCLUDE_REFERENCED_CONSTRAINTS | EXCLUDE_SELF_REFERENCES |
INCLUDE_ALL_TABLE_TYPES);
List *externalReferencedFkeyIds = GetForeignKeyOids(relationId, fkeyFlags);
if (list_length(externalReferencedFkeyIds) != 0)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint "
"since foreign keys from reference tables "
"and local tables to distributed tables "
"are not supported"),
errdetail("Reference tables and local tables "
"can only have foreign keys to reference "
"tables and local tables")));
}
}

EnsureReferenceTablesExistOnAllNodes();

LockColocationId(colocationId, ShareLock);

int64 shardId = NoneDistTableGetShardId(relationId);
WorkerNode *sourceNode = CoordinatorNodeIfAddedAsWorkerOrError();

if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
uint32 targetNodeId = SingleShardTableColocationNodeId(colocationId);
if (targetNodeId != sourceNode->nodeId)
{
bool missingOk = false;
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);

TransferCitusLocalTableShardInXact(shardId, sourceNode->workerName,
sourceNode->workerPort,
targetNode->workerName,
targetNode->workerPort,
SHARD_TRANSFER_MOVE);
}
}
else if (tableType == REFERENCE_TABLE)
{
List *nodeList = ActivePrimaryNonCoordinatorNodeList(ShareLock);
nodeList = SortList(nodeList, CompareWorkerNodes);

WorkerNode *targetNode = NULL;
foreach_ptr(targetNode, nodeList)
{
TransferCitusLocalTableShardInXact(shardId, sourceNode->workerName,
sourceNode->workerPort,
targetNode->workerName,
targetNode->workerPort,
SHARD_TRANSFER_COPY);
}
}

bool autoConverted = false;
UpdateNoneDistTableMetadataGlobally(
relationId, citusTableParams.replicationModel,
colocationId, autoConverted);

/*
* TransferCitusLocalTableShardInXact() moves / copies partition shards
* to the target node too, but we still need to update the metadata
* for them.
*/
if (PartitionedTable(relationId))
{
Oid partitionRelationId = InvalidOid;
List *partitionList = PartitionList(relationId);
foreach_oid(partitionRelationId, partitionList)
{
UpdateNoneDistTableMetadataGlobally(
partitionRelationId, citusTableParams.replicationModel,
colocationId, autoConverted);
}
}
}


/*
* SingleShardTableColocationNodeId takes a colocation id that is known to be
* used / going be used to colocate a set of single-shard tables and returns
* id of the node that should store the shards of those tables.
*/
static uint32
SingleShardTableColocationNodeId(uint32 colocationId)
{
List *tablesInColocationGroup = ColocationGroupTableList(colocationId, 1);
if (list_length(tablesInColocationGroup) == 0)
{
int workerNodeIndex =
EmptySingleShardTableColocationDecideNodeId(colocationId);
List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock);
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex);

return workerNode->nodeId;
}
else
{
Oid colocatedTableId = linitial_oid(tablesInColocationGroup);
return SingleShardTableGetNodeId(colocatedTableId);
}
}


/*
* SingleShardTableGetNodeId returns id of the node that stores shard of
* given single-shard table.
*/
static uint32
SingleShardTableGetNodeId(Oid relationId)
{
int64 shardId = NoneDistTableGetShardId(relationId);

List *shardPlacementList = ShardPlacementList(shardId);
if (list_length(shardPlacementList) != 1)
{
ereport(ERROR, (errmsg("table shard does not have a single shard placement")));
}

return ((ShardPlacement *) linitial(shardPlacementList))->nodeId;
}


/*
* NoneDistTableGetShardId returns shard id of given table that is known
* to be a none-distriubted table.
*/
static int64
NoneDistTableGetShardId(Oid relationId)
{
if (HasDistributionKey(relationId))
{
ereport(ERROR, (errmsg("table is not a none-distributed table")));
}

List *shardIntervalList = LoadShardIntervalList(relationId);
return ((ShardInterval *) linitial(shardIntervalList))->shardId;
}


/*
* DecideCitusTableParams decides CitusTableParams based on given CitusTableType
* and DistributedTableParams if it's a distributed table.
Expand Down
63 changes: 63 additions & 0 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ static char * RemoteSchemaIdExpressionById(Oid schemaId);
static char * RemoteSchemaIdExpressionByName(char *schemaName);
static char * RemoteTypeIdExpression(Oid typeId);
static char * RemoteCollationIdExpression(Oid colocationId);
static char * RemoteTableIdExpression(Oid relationId);


PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes);
Expand All @@ -176,6 +177,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);


static bool got_SIGTERM = false;
Expand Down Expand Up @@ -3836,6 +3838,33 @@ citus_internal_delete_tenant_schema(PG_FUNCTION_ARGS)
}


/*
* citus_internal_update_none_dist_table_metadata is an internal UDF to
* update a row in pg_dist_partition that belongs to given none-distributed
* table.
*/
Datum
citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

Oid relationId = PG_GETARG_OID(0);
char replicationModel = PG_GETARG_CHAR(1);
uint32 colocationId = PG_GETARG_INT32(2);
bool autoConverted = PG_GETARG_BOOL(3);

if (!ShouldSkipMetadataChecks())
{
EnsureCoordinatorInitiatedOperation();
}

UpdateNoneDistTableMetadata(relationId, replicationModel,
colocationId, autoConverted);

PG_RETURN_VOID();
}


/*
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
*/
Expand Down Expand Up @@ -4017,6 +4046,24 @@ TenantSchemaDeleteCommand(char *schemaName)
}


/*
* UpdateNoneDistTableMetadataCommand returns a command to call
* citus_internal_update_none_dist_table_metadata().
*/
char *
UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(%s, '%c', %u, %s)",
RemoteTableIdExpression(relationId), replicationModel, colocationId,
autoConverted ? "true" : "false");

return command->data;
}


/*
* RemoteSchemaIdExpressionById returns an expression in text form that
* can be used to obtain the OID of the schema with given schema id on a
Expand Down Expand Up @@ -4051,6 +4098,22 @@ RemoteSchemaIdExpressionByName(char *schemaName)
}


/*
* RemoteTableIdExpression returns an expression in text form that
* can be used to obtain the OID of the table with given name on a
* different node when included in a query string.
*/
static char *
RemoteTableIdExpression(Oid relationId)
{
StringInfo regnamespaceExpr = makeStringInfo();
appendStringInfo(regnamespaceExpr, "%s::regclass",
quote_literal_cstr(generate_qualified_relation_name(relationId)));

return regnamespaceExpr->data;
}


/*
* SetMetadataSyncNodesFromNodeList sets list of nodes that needs to be metadata
* synced among given node list into metadataSyncContext.
Expand Down
Loading

0 comments on commit ace8a2d

Please sign in to comment.