Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test branch #7128

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9d46cd8
Adds initial code
gurkanindibay Jul 25, 2023
304a00e
Fixed unit test problems
gurkanindibay Jul 28, 2023
7ccbb84
Adds unit tests
gurkanindibay Jul 28, 2023
b178a75
Fixes static code analysis issues
gurkanindibay Jul 28, 2023
20ae610
Removes unnecessary changes
gurkanindibay Jul 28, 2023
46ab6f1
Removes unnecessary changes
gurkanindibay Jul 28, 2023
8928c0f
Adds multi_extension.out output
gurkanindibay Jul 28, 2023
6da0baa
Adds citus_pause_node into test files
gurkanindibay Jul 28, 2023
afa7bf6
Fixes upgrade_list_citus_objects diff
gurkanindibay Jul 28, 2023
515627e
Adds code to debug
gurkanindibay Jul 28, 2023
d42f557
Adds node id detection with shard_id
gurkanindibay Jul 28, 2023
4ed78f1
Fixes errors in code
gurkanindibay Jul 28, 2023
24380f8
Fixes unit tests
gurkanindibay Jul 28, 2023
28cda81
Fixes coverage issue
gurkanindibay Jul 28, 2023
ed40dfe
Give details for exception message
gurkanindibay Jul 28, 2023
63311e5
Fixes some review notes
gurkanindibay Jul 29, 2023
1c05eeb
Updates udf name
gurkanindibay Jul 29, 2023
339a47a
Fixes review comments
gurkanindibay Jul 29, 2023
4c3341e
Fixes indentation
gurkanindibay Jul 29, 2023
b69c36a
Fixes static code analysis issues
gurkanindibay Jul 29, 2023
c41f93e
Fixes indentation
gurkanindibay Jul 29, 2023
1a1b633
Fixes multi extension tests
gurkanindibay Jul 29, 2023
3220bd9
Fixes test errors after rebase
gurkanindibay Jul 29, 2023
6f2ddf4
Removes empty line
gurkanindibay Jul 29, 2023
cc403bf
Fixes upgrade tests
gurkanindibay Jul 29, 2023
a05d5fc
Fixes upgrade tests
gurkanindibay Jul 29, 2023
29c5b0c
Adds comments to methods
gurkanindibay Jul 29, 2023
9e79cd6
Fixes indent issues
gurkanindibay Jul 29, 2023
997a5d7
Merge branch 'main' into citus_pause_node
gurkanindibay Jul 31, 2023
bb62b84
Parameterizes node id in no node test
gurkanindibay Jul 28, 2023
3edefdc
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 1, 2023
b471bb0
Merge branch 'citus_pause_node' of https://github.com/citusdata/citus…
gurkanindibay Jul 29, 2023
88695c6
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 2, 2023
dd72cf0
Merge branch 'citus_pause_node' of https://github.com/citusdata/citus…
gurkanindibay Jul 29, 2023
18c55a4
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 4, 2023
86e0831
Merge branch 'citus_pause_node' of https://github.com/citusdata/citus…
gurkanindibay Jul 29, 2023
be2e653
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 4, 2023
d10eb05
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 6, 2023
de83b01
Update src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql
gurkanindibay Aug 7, 2023
3fbe5e4
Update src/backend/distributed/sql/citus--12.0-1--12.1-1.sql
gurkanindibay Aug 7, 2023
1a5cf9d
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 8, 2023
d9cecba
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 14, 2023
48a5450
Fixes review comments
gurkanindibay Aug 17, 2023
458edd8
Fixes code changes
gurkanindibay Aug 17, 2023
eda5539
Merge branch 'main' into citus_pause_node
gurkanindibay Aug 17, 2023
7ac5f21
Adds a new empty space
gurkanindibay Aug 17, 2023
f641860
Fixes test issues
gurkanindibay Aug 17, 2023
3384677
Merge branch 'main' into test_branch
gurkanindibay Aug 17, 2023
c2c1b2b
Merge branch 'test_branch' of https://github.com/citusdata/citus into…
gurkanindibay Aug 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 137 additions & 68 deletions src/backend/distributed/metadata/node_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include "funcapi.h"
#include "utils/plancache.h"


#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
Expand Down Expand Up @@ -102,8 +101,8 @@ static HeapTuple GetNodeByNodeId(int32 nodeId);
static int32 GetNextGroupId(void);
static int GetNextNodeId(void);
static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport,
NodeMetadata *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
Expand Down Expand Up @@ -134,6 +133,12 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context,
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid);
static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly);
static void EnsureTransactionalMetadataSyncMode(void);
static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE
lockMode);
static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown);
static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32 lock_cooldown);

/* Function definitions go here */

/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
Expand All @@ -152,6 +157,7 @@ PG_FUNCTION_INFO_V1(master_disable_node);
PG_FUNCTION_INFO_V1(citus_activate_node);
PG_FUNCTION_INFO_V1(master_activate_node);
PG_FUNCTION_INFO_V1(citus_update_node);
PG_FUNCTION_INFO_V1(citus_pause_node_within_txn);
PG_FUNCTION_INFO_V1(master_update_node);
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid);
Expand All @@ -160,7 +166,6 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid);
PG_FUNCTION_INFO_V1(citus_is_coordinator);
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);


/*
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
* sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK.
Expand Down Expand Up @@ -544,7 +549,8 @@ citus_disable_node(PG_FUNCTION_ARGS)
"metadata is not allowed"),
errhint("You can force disabling node, SELECT "
"citus_disable_node('%s', %d, "
"synchronous:=true);", workerNode->workerName,
"synchronous:=true);",
workerNode->workerName,
nodePort),
errdetail("Citus uses the first worker node in the "
"metadata for certain internal operations when "
Expand Down Expand Up @@ -693,8 +699,7 @@ citus_set_node_property(PG_FUNCTION_ARGS)
else
{
ereport(ERROR, (errmsg(
"only the 'shouldhaveshards' property can be set using this function"
)));
"only the 'shouldhaveshards' property can be set using this function")));
}

TransactionModifiedNodeMetadata = true;
Expand Down Expand Up @@ -1017,6 +1022,7 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly)
BoolGetDatum(true));

updatedActivatedNodeList = lappend(updatedActivatedNodeList, node);

}

/* reset activated nodes inside metadataSyncContext afer local update */
Expand Down Expand Up @@ -1160,6 +1166,96 @@ ActivateNodeList(MetadataSyncContext *context)
}


/*
* Acquires shard metadata locks on all shards residing in the given worker node
*
* TODO: This function is not compatible with query from any node feature.
* To ensure proper behavior, it is essential to acquire locks on placements across all nodes
* rather than limiting it to just the coordinator (or the specific node from which this function is called)
*/
void
LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode)
{
List *placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
LockShardsInPlacementListMetadata(placementList, lockMode);
}


BackgroundWorkerHandle *
CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown)
{
BackgroundWorkerHandle *handle = NULL;
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
if (!handle)
{
/*
* We failed to start a background worker, which probably means that we exceeded
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
* to repeatedly throw an error because if citus_update_node is called to complete a
* failover then finishing is the only way to bring the cluster back up. Therefore we
* give up on killing other backends and simply wait for the lock. We do set
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
*/
SetLockTimeoutLocally(lock_cooldown);
ereport(WARNING, (errmsg(
"could not start background worker to kill backends with conflicting"
" locks to force the update. Degrading to acquiring locks "
"with a lock time out."),
errhint(
"Increasing max_worker_processes might help.")));
}
return handle;
}


/*
* This function is used to lock shards in a primary node.
* If force is true, we start a background worker to kill backends holding
* conflicting locks with this backend.
*
* If the node is a primary node we block reads and writes.
*
* This lock has two purposes:
*
* - Ensure buggy code in Citus doesn't cause failures when the
* nodename/nodeport of a node changes mid-query
*
* - Provide fencing during failover, after this function returns all
* connections will use the new node location.
*
* Drawback:
*
* - This function blocks until all previous queries have finished. This
* means that long-running queries will prevent failover.
*
* In case of node failure said long-running queries will fail in the end
* anyway as they will be unable to commit successfully on the failed
* machine. To cause quick failure of these queries use force => true
* during the invocation of citus_update_node to terminate conflicting
* backends proactively.
*
* It might be worth blocking reads to a secondary for the same reasons,
* though we currently only query secondaries on follower clusters
* where these locks will have no effect.
*/
BackgroundWorkerHandle *
LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32
lock_cooldown)
{
BackgroundWorkerHandle *handle = NULL;

if (NodeIsPrimary(workerNode))
{
if (force)
{
handle = CheckBackgroundWorkerToObtainLocks(lock_cooldown);
}
LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock);
}
return handle;
}


/*
* citus_update_node moves the requested node to a different nodename and nodeport. It
* locks to ensure no queries are running concurrently; and is intended for customers who
Expand Down Expand Up @@ -1188,7 +1284,6 @@ citus_update_node(PG_FUNCTION_ARGS)
int32 lock_cooldown = PG_GETARG_INT32(4);

char *newNodeNameString = text_to_cstring(newNodeName);
List *placementList = NIL;
BackgroundWorkerHandle *handle = NULL;

WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
Expand Down Expand Up @@ -1226,64 +1321,8 @@ citus_update_node(PG_FUNCTION_ARGS)
EnsureTransactionalMetadataSyncMode();
}

/*
* If the node is a primary node we block reads and writes.
*
* This lock has two purposes:
*
* - Ensure buggy code in Citus doesn't cause failures when the
* nodename/nodeport of a node changes mid-query
*
* - Provide fencing during failover, after this function returns all
* connections will use the new node location.
*
* Drawback:
*
* - This function blocks until all previous queries have finished. This
* means that long-running queries will prevent failover.
*
* In case of node failure said long-running queries will fail in the end
* anyway as they will be unable to commit successfully on the failed
* machine. To cause quick failure of these queries use force => true
* during the invocation of citus_update_node to terminate conflicting
* backends proactively.
*
* It might be worth blocking reads to a secondary for the same reasons,
* though we currently only query secondaries on follower clusters
* where these locks will have no effect.
*/
if (NodeIsPrimary(workerNode))
{
/*
* before acquiring the locks check if we want a background worker to help us to
* aggressively obtain the locks.
*/
if (force)
{
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
if (!handle)
{
/*
* We failed to start a background worker, which probably means that we exceeded
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
* to repeatedly throw an error because if citus_update_node is called to complete a
* failover then finishing is the only way to bring the cluster back up. Therefore we
* give up on killing other backends and simply wait for the lock. We do set
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
*/
SetLockTimeoutLocally(lock_cooldown);
ereport(WARNING, (errmsg(
"could not start background worker to kill backends with conflicting"
" locks to force the update. Degrading to acquiring locks "
"with a lock time out."),
errhint(
"Increasing max_worker_processes might help.")));
}
}

placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
}
handle = LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force,
lock_cooldown);

/*
* if we have planned statements such as prepared statements, we should clear the cache so that
Expand Down Expand Up @@ -1330,6 +1369,34 @@ citus_update_node(PG_FUNCTION_ARGS)
}


/*
* This function is designed to obtain locks for all the shards in a worker placement list.
* Once the transaction is committed, the acquired locks will be automatically released.
* Therefore, it is essential to invoke this function within a transaction.
* This function proves beneficial when there is a need to temporarily disable writes to a specific node within a transaction.
*/
Datum
citus_pause_node_within_txn(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

int32 nodeId = PG_GETARG_INT32(0);
bool force = PG_GETARG_BOOL(1);
int32 lock_cooldown = PG_GETARG_INT32(2);

WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),
errmsg("node %u not found", nodeId)));
}

LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, lock_cooldown);

PG_RETURN_VOID();
}


/*
* master_update_node is a wrapper function for old UDF name.
*/
Expand Down Expand Up @@ -1947,7 +2014,8 @@ ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode)
ereport(ERROR, (errmsg("cannot remove or disable the node "
"%s:%d because because it contains "
"the only shard placement for "
"shard " UINT64_FORMAT, workerNode->workerName,
"shard " UINT64_FORMAT,
workerNode->workerName,
workerNode->workerPort, placement->shardId),
errdetail("One of the table(s) that prevents the operation "
"complete successfully is %s",
Expand Down Expand Up @@ -2499,7 +2567,8 @@ ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *fi
if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID)
{
ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
"coordinator node", field)));
"coordinator node",
field)));
}
}

Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/sql/citus--12.0-1--12.1-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

-- bump version to 12.1-1

#include "udfs/citus_pause_node_within_txn/12.1-1.sql"
#include "udfs/citus_prepare_pg_upgrade/12.1-1.sql"
#include "udfs/citus_finish_pg_upgrade/12.1-1.sql"
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
-- citus--12.1-1--12.0-1


DROP FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int);


-- we have modified the relevant upgrade script to include any_value changes
-- we don't need to upgrade this downgrade path for any_value changes
-- since if we are doing a Citus downgrade, not PG downgrade, then it would be no-op.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int,force bool ,lock_cooldown int)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$;

COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int,
force bool,
lock_cooldown int )
IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node';

REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int) FROM PUBLIC;
Loading