diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 8810e6db90d..52c7856d171 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -139,6 +139,13 @@ static CitusTableParams DecideCitusTableParams(CitusTableType tableType, distributedTableParams); static void CreateCitusTable(Oid relationId, CitusTableType tableType, DistributedTableParams *distributedTableParams); +static void ConvertCitusLocalTableToTableType(Oid relationId, + CitusTableType tableType, + DistributedTableParams * + distributedTableParams); +static uint32 SingleShardTableColocationNodeId(uint32 colocationId); +static uint32 SingleShardTableGetNodeId(Oid relationId); +static int64 NoneDistTableGetShardId(Oid relationId); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, Oid colocatedTableId, bool localTableEmpty); static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, @@ -1095,23 +1102,36 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, } /* - * EnsureTableNotDistributed errors out when relation is a citus table but - * we don't want to ask user to first undistribute their citus local tables - * when creating reference or distributed tables from them. - * For this reason, here we undistribute citus local tables beforehand. - * But since UndistributeTable does not support undistributing relations - * involved in foreign key relationships, we first drop foreign keys that - * given relation is involved, then we undistribute the relation and finally - * we re-create dropped foreign keys at the end of this function. + * EnsureTableNotDistributed errors out when relation is a Citus table. + * + * For this reason, we either undistribute the Citus Local table first + * and then follow the usual code-path to create distributed table; or + * we simply move / replicate its shard to create a single-shard table / + * reference table, and then we update the metadata accordingly. + * + * If we're about it to undistribute it (because we will create a distributed + * table soon), then we first drop foreign keys that given relation is + * involved because UndistributeTable does not support undistributing + * relations involved in foreign key relationships. At the end of this + * function, we then re-create the dropped foreign keys. */ List *originalForeignKeyRecreationCommands = NIL; if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) { - /* store foreign key creation commands that relation is involved */ - originalForeignKeyRecreationCommands = - GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId, - INCLUDE_ALL_TABLE_TYPES); - relationId = DropFKeysAndUndistributeTable(relationId); + if (tableType == REFERENCE_TABLE || tableType == SINGLE_SHARD_DISTRIBUTED) + { + ConvertCitusLocalTableToTableType(relationId, tableType, + distributedTableParams); + return; + } + else + { + /* store foreign key creation commands that relation is involved */ + originalForeignKeyRecreationCommands = + GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId, + INCLUDE_ALL_TABLE_TYPES); + relationId = DropFKeysAndUndistributeTable(relationId); + } } /* * To support foreign keys between reference tables and local tables, @@ -1319,6 +1339,215 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, } +/* + * ConvertCitusLocalTableToTableType converts given Citus local table to + * given table type. + * + * This only supports converting Citus local tables to reference tables + * (by replicating the shard to workers) and single-shard distributed + * tables (by moving the shard to appropriate worker). + */ +static void +ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, + DistributedTableParams *distributedTableParams) +{ + if (!IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + ereport(ERROR, (errmsg("table is not a local table added to metadata"))); + } + + if (tableType != REFERENCE_TABLE && tableType != SINGLE_SHARD_DISTRIBUTED) + { + ereport(ERROR, (errmsg("table type is not supported for conversion"))); + } + + LockRelationOid(relationId, ExclusiveLock); + + Var *distributionColumn = NULL; + CitusTableParams citusTableParams = DecideCitusTableParams(tableType, + distributedTableParams); + + uint32 colocationId = INVALID_COLOCATION_ID; + if (distributedTableParams && + distributedTableParams->colocationParam.colocationParamType == + COLOCATE_WITH_COLOCATION_ID) + { + colocationId = distributedTableParams->colocationParam.colocationId; + } + else + { + colocationId = ColocationIdForNewTable(relationId, tableType, + distributedTableParams, + distributionColumn); + } + + /* check constraints etc. on table based on new distribution params */ + EnsureRelationCanBeDistributed(relationId, distributionColumn, + citusTableParams.distributionMethod, + colocationId, citusTableParams.replicationModel); + + /* + * Regarding the foreign key relationships that given relation is involved, + * EnsureRelationCanBeDistributed() only checks the ones where the relation is + * the referencing table. + * + * And given that the table at hand is a Citus local table, right now it may + * only be referenced by a reference table or a Citus local table. + * + * However, given that neither of those two cases are not applicable for a + * distributed table, here we throw an error assuming that the referencing + * relation is a reference table or a Citus local table. + * + * While doing so, we use the same error message used in + * ErrorIfUnsupportedForeignConstraintExists(), which is eventually called + * by EnsureRelationCanBeDistributed(). + * + * Note that we don't need to check the same if we're creating a reference + * table from a Citus local table because all the foreign keys referencing + * Citus local tables are supported by reference tables. + */ + if (tableType == SINGLE_SHARD_DISTRIBUTED) + { + int fkeyFlags = (INCLUDE_REFERENCED_CONSTRAINTS | EXCLUDE_SELF_REFERENCES | + INCLUDE_ALL_TABLE_TYPES); + List *externalReferencedFkeyIds = GetForeignKeyOids(relationId, fkeyFlags); + if (list_length(externalReferencedFkeyIds) != 0) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create foreign key constraint " + "since foreign keys from reference tables " + "and local tables to distributed tables " + "are not supported"), + errdetail("Reference tables and local tables " + "can only have foreign keys to reference " + "tables and local tables"))); + } + } + + EnsureReferenceTablesExistOnAllNodes(); + + LockColocationId(colocationId, ShareLock); + + int64 shardId = NoneDistTableGetShardId(relationId); + WorkerNode *sourceNode = CoordinatorNodeIfAddedAsWorkerOrError(); + + if (tableType == SINGLE_SHARD_DISTRIBUTED) + { + uint32 targetNodeId = SingleShardTableColocationNodeId(colocationId); + if (targetNodeId != sourceNode->nodeId) + { + bool missingOk = false; + WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); + + TransferCitusLocalTableShardInXact(shardId, sourceNode->workerName, + sourceNode->workerPort, + targetNode->workerName, + targetNode->workerPort, + SHARD_TRANSFER_MOVE); + } + } + else if (tableType == REFERENCE_TABLE) + { + List *nodeList = ActivePrimaryNonCoordinatorNodeList(ShareLock); + nodeList = SortList(nodeList, CompareWorkerNodes); + + WorkerNode *targetNode = NULL; + foreach_ptr(targetNode, nodeList) + { + TransferCitusLocalTableShardInXact(shardId, sourceNode->workerName, + sourceNode->workerPort, + targetNode->workerName, + targetNode->workerPort, + SHARD_TRANSFER_COPY); + } + } + + bool autoConverted = false; + UpdateNoneDistTableMetadataGlobally( + relationId, citusTableParams.replicationModel, + colocationId, autoConverted); + + /* + * TransferCitusLocalTableShardInXact() moves / copies partition shards + * to the target node too, but we still need to update the metadata + * for them. + */ + if (PartitionedTable(relationId)) + { + Oid partitionRelationId = InvalidOid; + List *partitionList = PartitionList(relationId); + foreach_oid(partitionRelationId, partitionList) + { + UpdateNoneDistTableMetadataGlobally( + partitionRelationId, citusTableParams.replicationModel, + colocationId, autoConverted); + } + } +} + + +/* + * SingleShardTableColocationNodeId takes a colocation id that is known to be + * used / going be used to colocate a set of single-shard tables and returns + * id of the node that should store the shards of those tables. + */ +static uint32 +SingleShardTableColocationNodeId(uint32 colocationId) +{ + List *tablesInColocationGroup = ColocationGroupTableList(colocationId, 1); + if (list_length(tablesInColocationGroup) == 0) + { + int workerNodeIndex = + EmptySingleShardTableColocationDecideNodeId(colocationId); + List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock); + WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex); + + return workerNode->nodeId; + } + else + { + Oid colocatedTableId = linitial_oid(tablesInColocationGroup); + return SingleShardTableGetNodeId(colocatedTableId); + } +} + + +/* + * SingleShardTableGetNodeId returns id of the node that stores shard of + * given single-shard table. + */ +static uint32 +SingleShardTableGetNodeId(Oid relationId) +{ + int64 shardId = NoneDistTableGetShardId(relationId); + + List *shardPlacementList = ShardPlacementList(shardId); + if (list_length(shardPlacementList) != 1) + { + ereport(ERROR, (errmsg("table shard does not have a single shard placement"))); + } + + return ((ShardPlacement *) linitial(shardPlacementList))->nodeId; +} + + +/* + * NoneDistTableGetShardId returns shard id of given table that is known + * to be a none-distriubted table. + */ +static int64 +NoneDistTableGetShardId(Oid relationId) +{ + if (HasDistributionKey(relationId)) + { + ereport(ERROR, (errmsg("table is not a none-distributed table"))); + } + + List *shardIntervalList = LoadShardIntervalList(relationId); + return ((ShardInterval *) linitial(shardIntervalList))->shardId; +} + + /* * DecideCitusTableParams decides CitusTableParams based on given CitusTableType * and DistributedTableParams if it's a distributed table. diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f724deca0a0..dc8196ff612 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -150,6 +150,7 @@ static char * RemoteSchemaIdExpressionById(Oid schemaId); static char * RemoteSchemaIdExpressionByName(char *schemaName); static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); +static char * RemoteTableIdExpression(Oid relationId); PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes); @@ -176,6 +177,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema); +PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata); static bool got_SIGTERM = false; @@ -3836,6 +3838,33 @@ citus_internal_delete_tenant_schema(PG_FUNCTION_ARGS) } +/* + * citus_internal_update_none_dist_table_metadata is an internal UDF to + * update a row in pg_dist_partition that belongs to given none-distributed + * table. + */ +Datum +citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Oid relationId = PG_GETARG_OID(0); + char replicationModel = PG_GETARG_CHAR(1); + uint32 colocationId = PG_GETARG_INT32(2); + bool autoConverted = PG_GETARG_BOOL(3); + + if (!ShouldSkipMetadataChecks()) + { + EnsureCoordinatorInitiatedOperation(); + } + + UpdateNoneDistTableMetadata(relationId, replicationModel, + colocationId, autoConverted); + + PG_RETURN_VOID(); +} + + /* * SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker. */ @@ -4017,6 +4046,24 @@ TenantSchemaDeleteCommand(char *schemaName) } +/* + * UpdateNoneDistTableMetadataCommand returns a command to call + * citus_internal_update_none_dist_table_metadata(). + */ +char * +UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel, + uint32 colocationId, bool autoConverted) +{ + StringInfo command = makeStringInfo(); + appendStringInfo(command, + "SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(%s, '%c', %u, %s)", + RemoteTableIdExpression(relationId), replicationModel, colocationId, + autoConverted ? "true" : "false"); + + return command->data; +} + + /* * RemoteSchemaIdExpressionById returns an expression in text form that * can be used to obtain the OID of the schema with given schema id on a @@ -4051,6 +4098,22 @@ RemoteSchemaIdExpressionByName(char *schemaName) } +/* + * RemoteTableIdExpression returns an expression in text form that + * can be used to obtain the OID of the table with given name on a + * different node when included in a query string. + */ +static char * +RemoteTableIdExpression(Oid relationId) +{ + StringInfo regnamespaceExpr = makeStringInfo(); + appendStringInfo(regnamespaceExpr, "%s::regclass", + quote_literal_cstr(generate_qualified_relation_name(relationId))); + + return regnamespaceExpr->data; +} + + /* * SetMetadataSyncNodesFromNodeList sets list of nodes that needs to be metadata * synced among given node list into metadataSyncContext. diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index afe49b1ab66..bef01ad604e 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2243,6 +2243,93 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut } +/* + * UpdateNoneDistTableMetadataGlobally globally updates pg_dist_partition for + * given none-distributed table. + */ +void +UpdateNoneDistTableMetadataGlobally(Oid relationId, char replicationModel, + uint32 colocationId, bool autoConverted) +{ + UpdateNoneDistTableMetadata(relationId, replicationModel, + colocationId, autoConverted); + + if (ShouldSyncTableMetadata(relationId)) + { + char *metadataCommand = + UpdateNoneDistTableMetadataCommand(relationId, + replicationModel, + colocationId, + autoConverted); + SendCommandToWorkersWithMetadata(metadataCommand); + } +} + + +/* + * UpdateNoneDistTableMetadata locally updates pg_dist_partition for given + * none-distributed table. + */ +void +UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 colocationId, + bool autoConverted) +{ + if (HasDistributionKey(relationId)) + { + ereport(ERROR, (errmsg("cannot update metadata for a distributed " + "table that has a distribution column"))); + } + + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + Datum values[Natts_pg_dist_partition]; + bool isnull[Natts_pg_dist_partition]; + bool replace[Natts_pg_dist_partition]; + + Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition, + DistPartitionLogicalRelidIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for Citus table with oid: %u", + relationId))); + } + + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); + isnull[Anum_pg_dist_partition_colocationid - 1] = false; + replace[Anum_pg_dist_partition_colocationid - 1] = true; + + values[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); + isnull[Anum_pg_dist_partition_repmodel - 1] = false; + replace[Anum_pg_dist_partition_repmodel - 1] = true; + + values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); + isnull[Anum_pg_dist_partition_autoconverted - 1] = false; + replace[Anum_pg_dist_partition_autoconverted - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + + CatalogTupleUpdate(pgDistPartition, &heapTuple->t_self, heapTuple); + + CitusInvalidateRelcacheByRelid(relationId); + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + table_close(pgDistPartition, NoLock); +} + + /* * Check that the current user has `mode` permissions on relationId, error out * if not. Superusers always have such permissions. diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 358927a0992..cd8d73b0312 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -400,13 +400,8 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); - int32 workerNodeCount = list_length(workerNodeList); - if (workerNodeCount == 0) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("couldn't find any worker nodes"), - errhint("Add more worker nodes"))); - } + int roundRobinNodeIdx = + EmptySingleShardTableColocationDecideNodeId(colocationId); char shardStorageType = ShardStorageType(relationId); text *minHashTokenText = NULL; @@ -415,9 +410,6 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio InsertShardRow(relationId, shardId, shardStorageType, minHashTokenText, maxHashTokenText); - /* determine the node index based on colocation id */ - int roundRobinNodeIdx = colocationId % workerNodeCount; - int replicationFactor = 1; List *insertedShardPlacements = InsertShardPlacementRows( relationId, @@ -438,6 +430,30 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio } +/* + * EmptySingleShardTableColocationDecideNodeId returns index of the node + * that first shard to be created in given colocation group should be + * placed on. + * + * This is determined by modulo of the colocation id by the length of the + * list returned by DistributedTablePlacementNodeList(). + */ +int +EmptySingleShardTableColocationDecideNodeId(uint32 colocationId) +{ + List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock); + int32 workerNodeCount = list_length(workerNodeList); + if (workerNodeCount == 0) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("couldn't find any worker nodes"), + errhint("Add more worker nodes"))); + } + + return colocationId % workerNodeCount; +} + + /* * CheckHashPartitionedTable looks up the partition information for the given * tableId and checks if the table is hash partitioned. If not, the function diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index abaa00251cf..1cdedd9cee3 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -115,7 +115,11 @@ static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort); + char *targetNodeName, int32 targetNodePort, + bool transactional); +static Task * CreateRebalancerTask(WorkerNode *targetNode, List *commands); +static void RunRebalancerTaskListInXact(List *taskList); +static void LocallyEnableManualChangesToShard(void); static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); @@ -126,7 +130,8 @@ static void ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList); static void DropShardPlacementsFromMetadata(List *shardList, char *nodeName, int32 nodePort); -static void UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId, +static char * DropShardCommand(ShardInterval *shard); +static void UpdateColocatedShardPlacementMetadataOnWorkers(List *colocatedShardList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, @@ -163,7 +168,8 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval, int32 sourceNodePort); static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList); -static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode); +static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode, + bool exclusiveConnection); /* declarations for dynamic loading */ @@ -547,9 +553,9 @@ TransferShards(int64 shardId, char *sourceNodeName, DropShardPlacementsFromMetadata(colocatedShardList, sourceNodeName, sourceNodePort); - UpdateColocatedShardPlacementMetadataOnWorkers(shardId, sourceNodeName, - sourceNodePort, targetNodeName, - targetNodePort); + UpdateColocatedShardPlacementMetadataOnWorkers(colocatedShardList, + sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); } UpdatePlacementUpdateStatusForShardIntervalList( @@ -562,6 +568,164 @@ TransferShards(int64 shardId, char *sourceNodeName, } +/* + * TransferCitusLocalTableShardInXact transfers shard of given Citus local + * table within the current transaction. + */ +void +TransferCitusLocalTableShardInXact(int64 shardId, + char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort, + ShardTransferType transferType) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + + if (!IsCitusTableType(shardInterval->relationId, CITUS_LOCAL_TABLE)) + { + ereport(ERROR, (errmsg("table is not a local table added to metadata"))); + } + + Oid citusLocalTableId = shardInterval->relationId; + + /* make sure that we're not trying to transfer the shard to the same node */ + const char *operationName = ShardTransferTypeNames[transferType]; + ErrorIfSameNode(sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + operationName); + + ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType); + + /* + * Citus local tables can be involved in foreign key relationships only with + * Citus local tables and reference tables. While having a foreign key with + * a reference table is not a problem when moving the shard, having a + * foreign key with another Citus local table is a problem. This is because, + * we don't consider them colocated and we don't move the other Citus + * local table's shard. + * + * Caller should've already verified that but we do it again here to be sure. + */ + if (transferType == SHARD_TRANSFER_MOVE) + { + EnsureNoFKeyFromTableType(citusLocalTableId, INCLUDE_CITUS_LOCAL_TABLES); + EnsureNoFKeyToTableType(citusLocalTableId, INCLUDE_CITUS_LOCAL_TABLES); + } + + /* + * In practice, Citus local tables are not colocated with any other tables, + * except that they are colocated with their partitions if they are partitioned. + */ + List *colocatedTableList = list_make1_oid(citusLocalTableId); + List *colocatedShardList = NIL; + if (PartitionedTable(citusLocalTableId)) + { + colocatedTableList = list_concat( + colocatedTableList, + PartitionList(citusLocalTableId) + ); + + Oid partitionTableId = InvalidOid; + foreach_oid(partitionTableId, colocatedTableList) + { + colocatedShardList = list_concat( + colocatedShardList, + LoadShardIntervalList(partitionTableId) + ); + } + } + else + { + colocatedShardList = list_make1(shardInterval); + } + + /* avoid deadlocks */ + colocatedTableList = SortList(colocatedTableList, CompareOids); + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + + EnsureTableListOwner(colocatedTableList); + + if (transferType == SHARD_TRANSFER_MOVE) + { + LockColocatedRelationsForMove(colocatedTableList); + } + + EnsureAllShardsCanBeCopied(colocatedShardList, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); + + BlockWritesToShardList(colocatedShardList); + + bool transactional = true; + CopyShardTablesViaBlockWrites(colocatedShardList, + sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + transactional); + + /* + * Insert the placements to pg_dist_placement and sync it to the + * metadata workers. + */ + uint32 targetGroupId = GroupForNode(targetNodeName, targetNodePort); + ShardInterval *colocatedShard = NULL; + foreach_ptr(colocatedShard, colocatedShardList) + { + uint64 placementId = GetNextPlacementId(); + + InsertShardPlacementRow(colocatedShard->shardId, placementId, + ShardLength(colocatedShard->shardId), + targetGroupId); + + if (transferType == SHARD_TRANSFER_COPY) + { + Assert(ShouldSyncTableMetadata(colocatedShard->relationId)); + + char *placementCommand = PlacementUpsertCommand(colocatedShard->shardId, + placementId, + 0, targetGroupId); + + /* use superuser connection to write to pg_dist_placement */ + SendCommandToWorkersWithMetadataViaSuperUser(placementCommand); + } + } + + if (transferType == SHARD_TRANSFER_MOVE) + { + /* + * Since this is move operation, we remove the placements from the metadata + * for the source node after copy. + */ + DropShardPlacementsFromMetadata(colocatedShardList, + sourceNodeName, sourceNodePort); + + UpdateColocatedShardPlacementMetadataOnWorkers(colocatedShardList, + sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); + + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + + /* and drop local shard placements */ + colocatedShard = NULL; + foreach_ptr(colocatedShard, colocatedShardList) + { + /* + * We undistribute Citus local tables that are not chained with any reference + * tables via foreign keys at the end of the utility hook. + * Here we temporarily set the related GUC to off to disable the logic for + * internally executed DDL's that might invoke this mechanism unnecessarily. + */ + int save_nestlevel = NewGUCNestLevel(); + + SetLocalEnableLocalReferenceForeignKeys(false); + + char *dropShardCommand = DropShardCommand(colocatedShard); + Task *task = CreateRebalancerTask(sourceNode, list_make1(dropShardCommand)); + RunRebalancerTaskListInXact(list_make1(task)); + + AtEOXact_GUC(true, save_nestlevel); + } + } +} + + /* * Insert deferred cleanup records. * The shards will be dropped by background cleaner later. @@ -1114,8 +1278,15 @@ BlockWritesToShardList(List *shardList) * We need to lock the referenced reference table metadata to avoid * asynchronous shard copy in case of cascading DML operations. */ - LockReferencedReferenceShardDistributionMetadata(shard->shardId, - ExclusiveLock); + LockReferencedShardDistributionMetadata(shard->shardId, REFERENCE_TABLE, + ExclusiveLock); + + /* + * Table might have a foreign key reference to a Citus local table only + * if it is a reference table. Otherwise, below would be a no-op. + */ + LockReferencedShardDistributionMetadata(shard->shardId, CITUS_LOCAL_TABLE, + ExclusiveLock); LockShardDistributionMetadata(shard->shardId, ExclusiveLock); } @@ -1335,8 +1506,9 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP } else { + bool transactional = false; CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + targetNodeName, targetNodePort, transactional); } /* @@ -1416,20 +1588,24 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList) * CopyShardTablesViaBlockWrites copies a shard along with its co-located shards * from a source node to target node via COPY command. While the command is in * progress, the modifications on the source node is blocked. + * + * If transactional is true, then the function runs in the current transaction. + * Otherwise, it uses outside-transaction connections to perform shard table + * creation, data copy and etc. */ static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, bool transactional) { + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaBlockWrites", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); - WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); - WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - /* iterate through the colocated shards and copy each */ ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) @@ -1446,42 +1622,76 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, */ List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - /* drop the shard we created on the target, in case of failure */ - InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), - GroupForNode(targetNodeName, targetNodePort), - CLEANUP_ON_FAILURE); + /* RecreateShardDDLCommandList cannot return an empty list */ + Assert(list_length(ddlCommandList) > 0); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + if (transactional) + { + Task *task = CreateRebalancerTask(targetNode, ddlCommandList); + RunRebalancerTaskListInXact(list_make1(task)); + } + else + { + /* drop the shard we created on the target, in case of failure */ + InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + GroupForNode(targetNodeName, + targetNodePort), + CLEANUP_ON_FAILURE); + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + } } - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COPYING_DATA); + if (!transactional) + { + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COPYING_DATA); - ConflictWithIsolationTestingBeforeCopy(); - CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); - ConflictWithIsolationTestingAfterCopy(); + ConflictWithIsolationTestingBeforeCopy(); + } - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); + CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL, transactional); + + if (!transactional) + { + ConflictWithIsolationTestingAfterCopy(); + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); + } foreach_ptr(shardInterval, shardIntervalList) { List *ddlCommandList = PostLoadShardCreationCommandList(shardInterval, sourceNodeName, sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + + if (list_length(ddlCommandList) == 0) + { + continue; + } + + if (transactional) + { + Task *task = CreateRebalancerTask(targetNode, ddlCommandList); + RunRebalancerTaskListInXact(list_make1(task)); + } + else + { + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + } MemoryContextReset(localContext); } @@ -1506,11 +1716,14 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, } } - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); + if (!transactional) + { + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); + } /* * Iterate through the colocated shards and create DDL commamnds @@ -1537,32 +1750,117 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, ShardCommandList *shardCommandList = NULL; foreach_ptr(shardCommandList, shardIntervalWithDDCommandsList) { - char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardCommandList->ddlCommandList); + if (list_length(shardCommandList->ddlCommandList) == 0) + { + continue; + } + + if (transactional) + { + Task *task = CreateRebalancerTask(targetNode, + shardCommandList->ddlCommandList); + RunRebalancerTaskListInXact(list_make1(task)); + } + else + { + char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, + shardCommandList->ddlCommandList); + } } - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COMPLETING); + if (!transactional) + { + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COMPLETING); + } MemoryContextReset(localContext); MemoryContextSwitchTo(oldContext); + + pfree(sourceNode); + pfree(targetNode); +} + + +/* + * CreateRebalancerTask creates a DDL task to execute given list of tasks + * on target node. + */ +static Task * +CreateRebalancerTask(WorkerNode *targetNode, List *commands) +{ + Task *task = CitusMakeNode(Task); + task->jobId = INVALID_JOB_ID; + task->taskId = INVALID_TASK_ID; + task->taskType = READ_TASK; + task->replicationModel = REPLICATION_MODEL_INVALID; + SetTaskQueryStringList(task, commands); + + ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(targetPlacement, targetNode); + + task->taskPlacementList = list_make1(targetPlacement); + + return task; +} + + +/* + * RunRebalancerTaskListInXact executes given list of tasks in current + * transaction via adaptive executor. + * + * It temporarily disables citus.enable_manual_changes_to_shards GUC to allow + * given list of commands to modify shards. + */ +static void +RunRebalancerTaskListInXact(List *taskList) +{ + int save_nestlevel = NewGUCNestLevel(); + + LocallyEnableManualChangesToShard(); + + bool localExecutionSupported = true; + ExecuteUtilityTaskList(taskList, localExecutionSupported); + + AtEOXact_GUC(true, save_nestlevel); +} + + +/* + * LocallyEnableManualChangesToShard locally enables + * citus.enable_manual_changes_to_shards GUC. + */ +static void +LocallyEnableManualChangesToShard(void) +{ + set_config_option("citus.enable_manual_changes_to_shards", "on", + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); } /* * CopyShardsToNode copies the list of shards from the source to the target. * When snapshotName is not NULL it will do the COPY using this snapshot name. + * + * Note that "snapshotName" can only be provided when "transactional" is provided + * as false. */ void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardIntervalList, - char *snapshotName) + char *snapshotName, bool transactional) { - int taskId = 0; + if (transactional && snapshotName != NULL) + { + ereport(ERROR, (errmsg("cannot specify the snapshot when copying the " + "shards in the current transaction"))); + } + List *copyTaskList = NIL; ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) @@ -1579,57 +1877,60 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte List *ddlCommandList = NIL; - /* - * This uses repeatable read because we want to read the table in - * the state exactly as it was when the snapshot was created. This - * is needed when using this code for the initial data copy when - * using logical replication. The logical replication catchup might - * fail otherwise, because some of the updates that it needs to do - * have already been applied on the target. - */ - StringInfo beginTransaction = makeStringInfo(); - appendStringInfo(beginTransaction, - "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); - ddlCommandList = lappend(ddlCommandList, beginTransaction->data); - - /* Set snapshot for non-blocking shard split. */ - if (snapshotName != NULL) + if (!transactional) { - StringInfo snapShotString = makeStringInfo(); - appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", - quote_literal_cstr( - snapshotName)); - ddlCommandList = lappend(ddlCommandList, snapShotString->data); + /* + * This uses repeatable read because we want to read the table in + * the state exactly as it was when the snapshot was created. This + * is needed when using this code for the initial data copy when + * using logical replication. The logical replication catchup might + * fail otherwise, because some of the updates that it needs to do + * have already been applied on the target. + */ + StringInfo beginTransaction = makeStringInfo(); + appendStringInfo(beginTransaction, + "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); + ddlCommandList = lappend(ddlCommandList, beginTransaction->data); + + /* Set snapshot for non-blocking shard split. */ + if (snapshotName != NULL) + { + StringInfo snapShotString = makeStringInfo(); + appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", + quote_literal_cstr( + snapshotName)); + ddlCommandList = lappend(ddlCommandList, snapShotString->data); + } } + bool exclusiveConnection = !transactional; char *copyCommand = CreateShardCopyCommand( - shardInterval, targetNode); - + shardInterval, targetNode, exclusiveConnection); ddlCommandList = lappend(ddlCommandList, copyCommand); - StringInfo commitCommand = makeStringInfo(); - appendStringInfo(commitCommand, "COMMIT;"); - ddlCommandList = lappend(ddlCommandList, commitCommand->data); - - Task *task = CitusMakeNode(Task); - task->jobId = shardInterval->shardId; - task->taskId = taskId; - task->taskType = READ_TASK; - task->replicationModel = REPLICATION_MODEL_INVALID; - SetTaskQueryStringList(task, ddlCommandList); - - ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); - SetPlacementNodeMetadata(taskPlacement, sourceNode); - - task->taskPlacementList = list_make1(taskPlacement); + if (!transactional) + { + StringInfo commitCommand = makeStringInfo(); + appendStringInfo(commitCommand, "COMMIT;"); + ddlCommandList = lappend(ddlCommandList, commitCommand->data); + } - copyTaskList = lappend(copyTaskList, task); - taskId++; + copyTaskList = lappend( + copyTaskList, + CreateRebalancerTask(sourceNode, ddlCommandList) + ); } - ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, - MaxAdaptiveExecutorPoolSize, - NULL /* jobIdList (ignored by API implementation) */); + if (transactional) + { + RunRebalancerTaskListInXact(copyTaskList); + } + else + { + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, + MaxAdaptiveExecutorPoolSize, + NULL /* jobIdList (ignored by API implementation) */); + } } @@ -1640,14 +1941,16 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte */ static char * CreateShardCopyCommand(ShardInterval *shard, - WorkerNode *targetNode) + WorkerNode *targetNode, + bool exclusiveConnection) { char *shardName = ConstructQualifiedShardName(shard); StringInfo query = makeStringInfo(); appendStringInfo(query, - "SELECT pg_catalog.worker_copy_table_to_node(%s::regclass, %u);", + "SELECT pg_catalog.worker_copy_table_to_node(%s::regclass, %u, %s);", quote_literal_cstr(shardName), - targetNode->nodeId); + targetNode->nodeId, + exclusiveConnection ? "true" : "false"); return query->data; } @@ -1996,19 +2299,39 @@ DropShardPlacementsFromMetadata(List *shardList, } +/* + * DropShardCommand returns the command to drop given shard. + */ +static char * +DropShardCommand(ShardInterval *shard) +{ + StringInfo dropShardCommand = makeStringInfo(); + appendStringInfo(dropShardCommand, DROP_REGULAR_TABLE_COMMAND, + ConstructQualifiedShardName(shard)); + return dropShardCommand->data; +} + + /* * UpdateColocatedShardPlacementMetadataOnWorkers updates the metadata about the - * placements of the given shard and its colocated shards by changing the nodename and + * placements of given --presumably-- colocated shards by changing the nodename and * nodeport of the shards from the source nodename/port to target nodename/port. * - * Note that the function does nothing if the given shard belongs to a non-mx table. + * Note that the function does nothing if the first shard in given list belongs to + * a non-mx table. */ static void -UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId, +UpdateColocatedShardPlacementMetadataOnWorkers(List *colocatedShardList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort) { - ShardInterval *shardInterval = LoadShardInterval(shardId); + if (list_length(colocatedShardList) == 0) + { + ereport(ERROR, (errmsg("unexpectedly got an empty list of colocated " + "shards"))); + } + + ShardInterval *shardInterval = linitial(colocatedShardList); ListCell *colocatedShardCell = NULL; bool shouldSyncMetadata = ShouldSyncTableMetadata(shardInterval->relationId); @@ -2020,8 +2343,6 @@ UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId, uint32 sourceGroupId = GroupForNode(sourceNodeName, sourceNodePort); uint32 targetGroupId = GroupForNode(targetNodeName, targetNodePort); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - /* iterate through the colocated shards and copy each */ foreach(colocatedShardCell, colocatedShardList) { diff --git a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c index f0f83744de5..5c9bf67efdd 100644 --- a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c +++ b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c @@ -30,7 +30,8 @@ PG_FUNCTION_INFO_V1(worker_copy_table_to_node); * * worker_copy_table_to_node( * source_table regclass, - * target_node_id integer + * target_node_id integer, + * exclusive_connection boolean * ) RETURNS VOID */ Datum @@ -38,6 +39,7 @@ worker_copy_table_to_node(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); uint32_t targetNodeId = PG_GETARG_INT32(1); + bool exclusiveConnection = PG_GETARG_BOOL(2); Oid schemaOid = get_rel_namespace(relationId); char *relationSchemaName = get_namespace_name(schemaOid); @@ -50,7 +52,8 @@ worker_copy_table_to_node(PG_FUNCTION_ARGS) DestReceiver *destReceiver = CreateShardCopyDestReceiver( executor, list_make2(relationSchemaName, relationName), - targetNodeId); + targetNodeId, + exclusiveConnection); StringInfo selectShardQueryForCopy = makeStringInfo(); diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index ba65635a7fb..811255a63df 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -64,6 +64,20 @@ typedef struct ShardCopyDestReceiver * Connection for destination shard (NULL if useLocalCopy is true) */ MultiConnection *connection; + + /* + * Should use an exclusive connection? + * + * When set to true, the connection is claimed exclusively for the COPY + * operation and cannot be used for any other purpose. At the end of + * the COPY operation, the connection is closed. Moreover, COPY command + * is executed outside of current transaction. + * + * Otherwise; connection is not forced to be exclusive, it can be used + * for other purposes and is not closed at the end of the COPY operation. + * Also, COPY command is executed inside of current transaction. + */ + bool exclusiveConnection; } ShardCopyDestReceiver; static bool ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); @@ -94,7 +108,7 @@ CanUseLocalCopy(uint32_t destinationNodeId) static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) { - int connectionFlags = OUTSIDE_TRANSACTION; + int connectionFlags = copyDest->exclusiveConnection ? OUTSIDE_TRANSACTION : 0; char *currentUser = CurrentUserName(); WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, false /* missingOk */); @@ -103,8 +117,10 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) workerNode->workerPort, currentUser, NULL /* database (current) */); - ClaimConnectionExclusively(copyDest->connection); - + if (copyDest->exclusiveConnection) + { + ClaimConnectionExclusively(copyDest->connection); + } RemoteTransactionBeginIfNecessary(copyDest->connection); @@ -139,7 +155,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) DestReceiver * CreateShardCopyDestReceiver(EState *executorState, List *destinationShardFullyQualifiedName, - uint32_t destinationNodeId) + uint32_t destinationNodeId, + bool exclusiveConnection) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0( sizeof(ShardCopyDestReceiver)); @@ -157,6 +174,7 @@ CreateShardCopyDestReceiver(EState *executorState, copyDest->tuplesSent = 0; copyDest->connection = NULL; copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId); + copyDest->exclusiveConnection = exclusiveConnection; return (DestReceiver *) copyDest; } @@ -336,7 +354,10 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) ResetReplicationOriginRemoteSession(copyDest->connection); - CloseConnection(copyDest->connection); + if (copyDest->exclusiveConnection) + { + CloseConnection(copyDest->connection); + } } } diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index c154ac040d4..3b223c89778 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -274,10 +274,12 @@ CreateShardCopyDestReceivers(EState *estate, ShardInterval *shardIntervalToSplit char *destinationShardNameCopy = pstrdup(sourceShardNamePrefix); AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId); + bool exclusiveConnection = true; DestReceiver *shardCopyDest = CreateShardCopyDestReceiver( estate, list_make2(destinationShardSchemaName, destinationShardNameCopy), - splitCopyInfo->destinationShardNodeId); + splitCopyInfo->destinationShardNodeId, + exclusiveConnection); shardCopyDests[index] = shardCopyDest; index++; diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 55009587558..46c91d458d5 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -240,7 +240,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo sourceNodePort, PLACEMENT_UPDATE_STATUS_COPYING_DATA); - CopyShardsToNode(sourceNode, targetNode, shardList, snapshot); + bool transactional = false; + CopyShardsToNode(sourceNode, targetNode, shardList, snapshot, transactional); /* * We can close this connection now, because we're done copying the diff --git a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql index 082883883ac..cfa2c854606 100644 --- a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql +++ b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql @@ -4,3 +4,6 @@ #include "udfs/citus_prepare_pg_upgrade/12.1-1.sql" #include "udfs/citus_finish_pg_upgrade/12.1-1.sql" + +#include "udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql" +#include "udfs/worker_copy_table_to_node/12.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql index cfd5892f1fa..f1befabf567 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql @@ -3,3 +3,16 @@ -- we have modified the relevant upgrade script to include any_value changes -- we don't need to upgrade this downgrade path for any_value changes -- since if we are doing a Citus downgrade, not PG downgrade, then it would be no-op. + +DROP FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata( + relation_id oid, replication_model "char", colocation_id bigint, + auto_converted boolean +); + +DROP FUNCTION pg_catalog.worker_copy_table_to_node( + source_table regclass, + target_node_id integer, + exclusive_connection boolean +); + +#include "../udfs/worker_copy_table_to_node/11.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql new file mode 100644 index 00000000000..1beea2918cc --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql @@ -0,0 +1,11 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata( + relation_id oid, + replication_model "char", + colocation_id bigint, + auto_converted boolean) +RETURNS void +LANGUAGE C +VOLATILE +AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(oid, "char", bigint, boolean) + IS 'Update pg_dist_node metadata for given none-distributed table, to convert it to another type of none-distributed table.'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_update_none_dist_table_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_update_none_dist_table_metadata/latest.sql new file mode 100644 index 00000000000..1beea2918cc --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_update_none_dist_table_metadata/latest.sql @@ -0,0 +1,11 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata( + relation_id oid, + replication_model "char", + colocation_id bigint, + auto_converted boolean) +RETURNS void +LANGUAGE C +VOLATILE +AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(oid, "char", bigint, boolean) + IS 'Update pg_dist_node metadata for given none-distributed table, to convert it to another type of none-distributed table.'; diff --git a/src/backend/distributed/sql/udfs/worker_copy_table_to_node/12.1-1.sql b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/12.1-1.sql new file mode 100644 index 00000000000..2d0b32a2439 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/12.1-1.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_copy_table_to_node( + source_table regclass, + target_node_id integer, + exclusive_connection boolean) +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_copy_table_to_node$$; +COMMENT ON FUNCTION pg_catalog.worker_copy_table_to_node(regclass, integer, boolean) + IS 'Perform copy of a shard'; diff --git a/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql index ebe093dee33..2d0b32a2439 100644 --- a/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_copy_table_to_node/latest.sql @@ -1,8 +1,9 @@ CREATE OR REPLACE FUNCTION pg_catalog.worker_copy_table_to_node( source_table regclass, - target_node_id integer) + target_node_id integer, + exclusive_connection boolean) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_copy_table_to_node$$; -COMMENT ON FUNCTION pg_catalog.worker_copy_table_to_node(regclass, integer) +COMMENT ON FUNCTION pg_catalog.worker_copy_table_to_node(regclass, integer, boolean) IS 'Perform copy of a shard'; diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index c76830c1d34..58a7068118a 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -505,23 +505,26 @@ SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag) /* - * LockReferencedReferenceShardDistributionMetadata acquires shard distribution - * metadata locks with the given lock mode on the reference tables which has a + * LockReferencedShardDistributionMetadata acquires shard distribution + * metadata locks with the given lock mode on tables with given type which has a * foreign key from the given relation. * * It also gets metadata locks on worker nodes to prevent concurrent write * operations on reference tables from metadata nodes. */ void -LockReferencedReferenceShardDistributionMetadata(uint64 shardId, LOCKMODE lockMode) +LockReferencedShardDistributionMetadata(uint64 shardId, CitusTableType tableType, + LOCKMODE lockMode) { Oid relationId = RelationIdForShard(shardId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey; - List *shardIntervalList = GetSortedReferenceShardIntervals(referencedRelationList); + List *shardIntervalList = + GetSortedReferenceShardIntervals(referencedRelationList, tableType); - if (list_length(shardIntervalList) > 0 && ClusterHasKnownMetadataWorkers()) + if (tableType == REFERENCE_TABLE && + list_length(shardIntervalList) > 0 && ClusterHasKnownMetadataWorkers()) { LockShardListMetadataOnWorkers(lockMode, shardIntervalList); } @@ -555,7 +558,7 @@ LockReferencedReferenceShardResources(uint64 shardId, LOCKMODE lockMode) */ List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey; List *referencedShardIntervalList = - GetSortedReferenceShardIntervals(referencedRelationList); + GetSortedReferenceShardIntervals(referencedRelationList, REFERENCE_TABLE); if (list_length(referencedShardIntervalList) > 0 && ClusterHasKnownMetadataWorkers() && @@ -580,17 +583,18 @@ LockReferencedReferenceShardResources(uint64 shardId, LOCKMODE lockMode) /* * GetSortedReferenceShardIntervals iterates through the given relation list, - * lists the shards of reference tables, and returns the list after sorting. + * lists the shards of tables that are classified as the given table type and + * returns the list after sorting. */ List * -GetSortedReferenceShardIntervals(List *relationList) +GetSortedReferenceShardIntervals(List *relationList, CitusTableType tableType) { List *shardIntervalList = NIL; Oid relationId = InvalidOid; foreach_oid(relationId, relationList) { - if (!IsCitusTableType(relationId, REFERENCE_TABLE)) + if (!IsCitusTableType(relationId, tableType)) { continue; } diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 7f90eaddaed..cf99ecbc948 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -264,6 +264,7 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, extern void CreateReferenceTableShard(Oid distributedTableId); extern void CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocationId); +extern int EmptySingleShardTableColocationDecideNodeId(uint32 colocationId); extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, List *foreignConstraintCommandList); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 728224fdefb..f9543d74a37 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -139,6 +139,10 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); extern char * TenantSchemaInsertCommand(Oid schemaId, uint32 colocationId); extern char * TenantSchemaDeleteCommand(char *schemaName); +extern char * UpdateNoneDistTableMetadataCommand(Oid relationId, + char replicationModel, + uint32 colocationId, + bool autoConverted); extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, bool collectCommands, diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 6536e89bc3a..6ce09710665 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -360,6 +360,10 @@ extern void UpdateDistributionColumnGlobally(Oid relationId, char distributionMe extern void UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distributionColumn, int colocationId); extern void DeletePartitionRow(Oid distributedRelationId); +extern void UpdateNoneDistTableMetadataGlobally(Oid relationId, char replicationModel, + uint32 colocationId, bool autoConverted); +extern void UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, + uint32 colocationId, bool autoConverted); extern void DeleteShardRow(uint64 shardId); extern void UpdatePlacementGroupId(uint64 placementId, int groupId); extern void DeleteShardPlacementRow(uint64 placementId); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 9efa1b7672c..5165fd294e9 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -169,8 +169,9 @@ extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardInterva extern void BlockWritesToShardList(List *shardList); /* Lock shard/relation metadata of the referenced reference table if exists */ -extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId, - LOCKMODE lock); +extern void LockReferencedShardDistributionMetadata(uint64 shardId, + CitusTableType tableType, + LOCKMODE lock); /* Lock shard data, for DML commands or remote fetches */ extern void LockShardResource(uint64 shardId, LOCKMODE lockmode); @@ -189,7 +190,8 @@ extern void LockTransactionRecovery(LOCKMODE lockMode); extern void SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode); extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode); -extern List * GetSortedReferenceShardIntervals(List *relationList); +extern List * GetSortedReferenceShardIntervals(List *relationList, + CitusTableType tableType); void AcquireCreateDistributedTableConcurrentlyLock(Oid relationId); diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index a37e5abdb8f..b290290e800 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -23,11 +23,16 @@ extern void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, char shardReplicationMode, ShardTransferType transferType); +extern void TransferCitusLocalTableShardInXact(int64 shardId, + char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort, + ShardTransferType transferType); extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId); extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, - List *shardIntervalList, char *snapshotName); + List *shardIntervalList, char *snapshotName, + bool transactional); extern void VerifyTablesHaveReplicaIdentity(List *colocatedTableList); extern bool RelationCanPublishAllModifications(Oid relationId); extern void UpdatePlacementUpdateStatusForShardIntervalList(List *shardIntervalList, diff --git a/src/include/distributed/worker_shard_copy.h b/src/include/distributed/worker_shard_copy.h index 77f57c761ec..66db21f73d8 100644 --- a/src/include/distributed/worker_shard_copy.h +++ b/src/include/distributed/worker_shard_copy.h @@ -17,7 +17,8 @@ extern bool EnableBinaryProtocol; extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState, List *destinationShardFullyQualifiedName, - uint32_t destinationNodeId); + uint32_t destinationNodeId, + bool exclusiveConnection); extern const char * CopyableColumnNamesFromRelationName(const char *schemaName, const char *relationName); diff --git a/src/test/regress/expected/arbitrary_configs_truncate_cascade.out b/src/test/regress/expected/arbitrary_configs_truncate_cascade.out index adf8a3cfcd0..4bf8edf7191 100644 --- a/src/test/regress/expected/arbitrary_configs_truncate_cascade.out +++ b/src/test/regress/expected/arbitrary_configs_truncate_cascade.out @@ -1,9 +1,11 @@ SET search_path TO truncate_cascade_tests_schema; +-- Hide detail of truncate error because it might either reference +-- table_with_fk_1 or table_with_fk_2 in the error message. +\set VERBOSITY TERSE -- Test truncate error on table with dependencies TRUNCATE table_with_pk; ERROR: cannot truncate a table referenced in a foreign key constraint -DETAIL: Table "table_with_fk_1" references "table_with_pk". -HINT: Truncate table "table_with_fk_1" at the same time, or use TRUNCATE ... CASCADE. +\set VERBOSITY DEFAULT -- Test truncate rollback on table with dependencies SELECT COUNT(*) FROM table_with_fk_1; count diff --git a/src/test/regress/expected/auto_undist_citus_local.out b/src/test/regress/expected/auto_undist_citus_local.out index 0eaec17e510..9e6c6e014a9 100644 --- a/src/test/regress/expected/auto_undist_citus_local.out +++ b/src/test/regress/expected/auto_undist_citus_local.out @@ -1201,9 +1201,9 @@ ALTER TABLE reference_table_1 OWNER TO another_user; SELECT run_command_on_placements('reference_table_1', 'ALTER TABLE %s OWNER TO another_user'); run_command_on_placements --------------------------------------------------------------------- - (localhost,57636,1810093,t,"ALTER TABLE") - (localhost,57637,1810093,t,"ALTER TABLE") - (localhost,57638,1810093,t,"ALTER TABLE") + (localhost,57636,1810092,t,"ALTER TABLE") + (localhost,57637,1810092,t,"ALTER TABLE") + (localhost,57638,1810092,t,"ALTER TABLE") (3 rows) BEGIN; diff --git a/src/test/regress/expected/create_single_shard_table.out b/src/test/regress/expected/create_single_shard_table.out index 248f196ff44..b6f4ad84c94 100644 --- a/src/test/regress/expected/create_single_shard_table.out +++ b/src/test/regress/expected/create_single_shard_table.out @@ -614,11 +614,11 @@ INSERT INTO "Table?!.1Table" VALUES (10, 15, (150, row_to_json(row(4,8)))::int_j INSERT INTO "Table?!.1Table" VALUES (5, 5, (5, row_to_json(row(5,5)))::int_jsonb_type, row_to_json(row(5,5), true)); -- tuples that are supposed to violate different data type / check constraints INSERT INTO "Table?!.1Table"(id, jsondata, name) VALUES (101, '{"a": 1}', 'text_1'); -ERROR: conflicting key value violates exclusion constraint "Table?!.1Table_name_excl_1730043" +ERROR: conflicting key value violates exclusion constraint "Table?!.1Table_name_excl_1730042" DETAIL: Key (name)=(text_1) conflicts with existing key (name)=(text_1). CONTEXT: while executing command on localhost:xxxxx INSERT INTO "Table?!.1Table"(id, jsondata, price) VALUES (101, '{"a": 1}', -1); -ERROR: new row for relation "Table?!.1Table_1730043" violates check constraint "Table?!.1Table_price_check" +ERROR: new row for relation "Table?!.1Table_1730042" violates check constraint "Table?!.1Table_price_check" DETAIL: Failing row contains (101, null, null, {"a": 1}, null, -1, 0, null, 5, 14, 74). CONTEXT: while executing command on localhost:xxxxx INSERT INTO "Table?!.1Table"(id, jsondata, age_with_default_col) VALUES (101, '{"a": 1}', -1); @@ -863,7 +863,7 @@ CREATE INDEX "my!Index2New" ON "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901 CREATE UNIQUE INDEX uniqueIndex2New ON "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789"(id); -- error out for already existing, because of the unique index INSERT INTO "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789" VALUES (1, 1, row_to_json(row(1,1), true)); -ERROR: duplicate key value violates unique constraint "partition1_nullKeyTable.1!?!901234567890123456_bf4a8ac1_1730056" +ERROR: duplicate key value violates unique constraint "partition1_nullKeyTable.1!?!901234567890123456_bf4a8ac1_1730054" DETAIL: Key (id)=(X) already exists. CONTEXT: while executing command on localhost:xxxxx -- verify all 4 shard indexes are created on the same node @@ -895,8 +895,8 @@ DETAIL: Reference tables and local tables can only have foreign keys to referen ROLLBACK; -- errors out because of foreign key violation INSERT INTO "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789" VALUES (100, 1, row_to_json(row(1,1), true)); -ERROR: insert or update on table "partition100_nullKeyTable.1!?!9012345678901234_0aba0bf3_1730058" violates foreign key constraint "fkey_to_dummy_ref_1730055" -DETAIL: Key (id)=(X) is not present in table "dummy_reference_table_1730059". +ERROR: insert or update on table "partition100_nullKeyTable.1!?!9012345678901234_0aba0bf3_1730056" violates foreign key constraint "fkey_to_dummy_ref_1730053" +DETAIL: Key (id)=(X) is not present in table "dummy_reference_table_1730057". CONTEXT: while executing command on localhost:xxxxx -- now inserts successfully INSERT INTO dummy_reference_table VALUES (100); @@ -1163,7 +1163,7 @@ BEGIN; INSERT INTO referencing_table VALUES (1, 2); -- fails INSERT INTO referencing_table VALUES (2, 2); -ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730100" +ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730098" DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx". CONTEXT: while executing command on localhost:xxxxx ROLLBACK; @@ -1209,7 +1209,7 @@ BEGIN; INSERT INTO referencing_table VALUES (1, 2); -- fails INSERT INTO referencing_table VALUES (2, 2); -ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730136" +ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730133" DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx". CONTEXT: while executing command on localhost:xxxxx ROLLBACK; @@ -1327,8 +1327,8 @@ SELECT result, success FROM run_command_on_workers($$ $$); result | success --------------------------------------------------------------------- - ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730153" | f - ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730153" | f + ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730146" | f + ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730146" | f (2 rows) DROP TABLE referencing_table, referenced_table; @@ -1343,8 +1343,8 @@ SELECT create_distributed_table('self_fkey_test', NULL, distribution_type=>null) INSERT INTO self_fkey_test VALUES (1, 1); -- ok INSERT INTO self_fkey_test VALUES (2, 3); -- fails -ERROR: insert or update on table "self_fkey_test_1730154" violates foreign key constraint "self_fkey_test_b_fkey_1730154" -DETAIL: Key (b)=(3) is not present in table "self_fkey_test_1730154". +ERROR: insert or update on table "self_fkey_test_1730147" violates foreign key constraint "self_fkey_test_b_fkey_1730147" +DETAIL: Key (b)=(3) is not present in table "self_fkey_test_1730147". CONTEXT: while executing command on localhost:xxxxx -- similar foreign key tests but this time create the referencing table later on -- referencing table is a single-shard table @@ -1368,7 +1368,7 @@ BEGIN; INSERT INTO referencing_table VALUES (1, 2); -- fails INSERT INTO referencing_table VALUES (2, 2); -ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730156" +ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730149" DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx". CONTEXT: while executing command on localhost:xxxxx ROLLBACK; @@ -1391,7 +1391,7 @@ BEGIN; INSERT INTO referencing_table VALUES (2, 1); -- fails INSERT INTO referencing_table VALUES (1, 2); -ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_b_fkey_1730158" +ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_b_fkey_1730151" DETAIL: Key (a, b)=(1, 2) is not present in table "referenced_table_xxxxxxx". CONTEXT: while executing command on localhost:xxxxx ROLLBACK; @@ -1498,7 +1498,7 @@ BEGIN; INSERT INTO referencing_table VALUES (1, 2); -- fails INSERT INTO referencing_table VALUES (2, 2); -ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730199" +ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730191" DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx". CONTEXT: while executing command on localhost:xxxxx ROLLBACK; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 72b35496652..37d200de219 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1399,9 +1399,11 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 12.1-1 ALTER EXTENSION citus UPDATE TO '12.1-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- -(0 rows) + | function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void + | function worker_copy_table_to_node(regclass,integer,boolean) void +(2 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_foreign_key.out b/src/test/regress/expected/multi_foreign_key.out index 832be27405d..e206a6fb63e 100644 --- a/src/test/regress/expected/multi_foreign_key.out +++ b/src/test/regress/expected/multi_foreign_key.out @@ -856,7 +856,7 @@ SELECT create_reference_table('reference_table_second'); CREATE TABLE referenced_local_table(id int PRIMARY KEY, other_column int); DROP TABLE reference_table CASCADE; NOTICE: drop cascades to constraint reference_table_second_referencing_column_fkey on table reference_table_second -NOTICE: drop cascades to constraint reference_table_second_referencing_column_fkey_1350654 on table public.reference_table_second_1350654 +NOTICE: drop cascades to constraint reference_table_second_referencing_column_fkey_1350653 on table public.reference_table_second_1350653 CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)" PL/pgSQL function citus_drop_trigger() line XX at PERFORM CREATE TABLE reference_table(id int, referencing_column int REFERENCES referenced_local_table(id)); @@ -917,7 +917,7 @@ DROP TABLE reference_table CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to constraint fk on table references_to_reference_table drop cascades to constraint fk on table reference_table_second -NOTICE: drop cascades to constraint fk_1350663 on table public.reference_table_second_1350663 +NOTICE: drop cascades to constraint fk_1350662 on table public.reference_table_second_1350662 CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)" PL/pgSQL function citus_drop_trigger() line XX at PERFORM CREATE TABLE reference_table(id int PRIMARY KEY, referencing_column int); @@ -1277,6 +1277,6 @@ ERROR: cannot create foreign key constraint since Citus does not support ON DEL -- we no longer need those tables DROP TABLE referenced_by_reference_table, references_to_reference_table, reference_table, reference_table_second, referenced_local_table, self_referencing_reference_table, dropfkeytest2, set_on_default_test_referenced, set_on_default_test_referencing; -NOTICE: drop cascades to constraint fk_1350664 on table public.reference_table_1350664 +NOTICE: drop cascades to constraint fk_1350663 on table public.reference_table_1350663 CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)" PL/pgSQL function citus_drop_trigger() line XX at PERFORM diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 3d8d8a787a8..4b72a439b11 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -793,7 +793,7 @@ WHERE ORDER BY 1,4,5; shardid | shardstate | shardlength | nodename | nodeport --------------------------------------------------------------------- - 1370021 | 1 | 0 | localhost | 57637 + 1370019 | 1 | 0 | localhost | 57637 (1 row) -- we should see the two shard placements after activation @@ -818,7 +818,7 @@ WHERE ORDER BY 1,4,5; shardid | shardstate | shardlength | nodename | nodeport --------------------------------------------------------------------- - 1370021 | 1 | 0 | localhost | 57637 + 1370019 | 1 | 0 | localhost | 57637 (1 row) SELECT 1 FROM master_remove_node('localhost', :worker_2_port); diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index cb346ff5730..12d5c5e9f95 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -871,8 +871,8 @@ SELECT create_reference_table('FKTABLE'); SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'fktable'::regclass::oid ORDER BY oid; pg_get_constraintdef --------------------------------------------------------------------- - FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES pktable(tid, id) ON DELETE SET DEFAULT (fk_id_del_set_default) FOREIGN KEY (tid, fk_id_del_set_null) REFERENCES pktable(tid, id) ON DELETE SET NULL (fk_id_del_set_null) + FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES pktable(tid, id) ON DELETE SET DEFAULT (fk_id_del_set_default) (2 rows) \c - - - :worker_1_port @@ -881,8 +881,8 @@ SET search_path TO pg15; SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'fktable'::regclass::oid ORDER BY oid; pg_get_constraintdef --------------------------------------------------------------------- - FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES pktable(tid, id) ON DELETE SET DEFAULT (fk_id_del_set_default) FOREIGN KEY (tid, fk_id_del_set_null) REFERENCES pktable(tid, id) ON DELETE SET NULL (fk_id_del_set_null) + FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES pktable(tid, id) ON DELETE SET DEFAULT (fk_id_del_set_default) (2 rows) -- also, make sure that it works as expected diff --git a/src/test/regress/expected/schema_based_sharding.out b/src/test/regress/expected/schema_based_sharding.out index 4493f96141a..408ef4460e2 100644 --- a/src/test/regress/expected/schema_based_sharding.out +++ b/src/test/regress/expected/schema_based_sharding.out @@ -391,8 +391,8 @@ SELECT EXISTS( (1 row) INSERT INTO tenant_4.another_partitioned_table VALUES (1, 'a'); -ERROR: insert or update on table "another_partitioned_table_child_1920090" violates foreign key constraint "another_partitioned_table_a_fkey_1920089" -DETAIL: Key (a)=(1) is not present in table "partitioned_table_1920087". +ERROR: insert or update on table "another_partitioned_table_child_1920088" violates foreign key constraint "another_partitioned_table_a_fkey_1920087" +DETAIL: Key (a)=(1) is not present in table "partitioned_table_1920085". CONTEXT: while executing command on localhost:xxxxx INSERT INTO tenant_4.partitioned_table VALUES (1, 'a'); INSERT INTO tenant_4.another_partitioned_table VALUES (1, 'a'); diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index d04c6e668be..fcd8f684cd5 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -82,6 +82,7 @@ ORDER BY 1; function citus_internal_start_replication_origin_tracking() function citus_internal_stop_replication_origin_tracking() function citus_internal_unregister_tenant_schema_globally(oid,text) + function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_internal_update_relation_colocation(oid,integer) function citus_is_clock_after(cluster_clock,cluster_clock) @@ -252,6 +253,7 @@ ORDER BY 1; function worker_apply_shard_ddl_command(bigint,text,text) function worker_change_sequence_dependency(regclass,regclass,regclass) function worker_copy_table_to_node(regclass,integer) + function worker_copy_table_to_node(regclass,integer,boolean) function worker_create_or_alter_role(text,text,text) function worker_create_or_replace_object(text) function worker_create_or_replace_object(text[]) @@ -338,5 +340,5 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(328 rows) +(330 rows) diff --git a/src/test/regress/sql/arbitrary_configs_truncate_cascade.sql b/src/test/regress/sql/arbitrary_configs_truncate_cascade.sql index 50f4d2318fd..2b805b6a41a 100644 --- a/src/test/regress/sql/arbitrary_configs_truncate_cascade.sql +++ b/src/test/regress/sql/arbitrary_configs_truncate_cascade.sql @@ -1,8 +1,14 @@ SET search_path TO truncate_cascade_tests_schema; +-- Hide detail of truncate error because it might either reference +-- table_with_fk_1 or table_with_fk_2 in the error message. +\set VERBOSITY TERSE + -- Test truncate error on table with dependencies TRUNCATE table_with_pk; +\set VERBOSITY DEFAULT + -- Test truncate rollback on table with dependencies SELECT COUNT(*) FROM table_with_fk_1; SELECT COUNT(*) FROM table_with_fk_2;