From 9d46cd89c8ba34245cea9bc81893cb780a99f96d Mon Sep 17 00:00:00 2001 From: gindibay Date: Tue, 25 Jul 2023 18:26:25 +0300 Subject: [PATCH 01/35] Adds initial code --- .gitignore | 1 + .../distributed/metadata/node_metadata.c | 27 +++++++++++++++++++ .../distributed/sql/citus--12.0-1--12.1-1.sql | 4 +++ .../sql/downgrades/citus--12.1-1--12.0-1.sql | 2 ++ .../sql/udfs/citus_pause_node/12.1-1.sql | 9 +++++++ .../sql/udfs/citus_pause_node/latest.sql | 9 +++++++ .../regress/sql/multi_cluster_management.sql | 3 +++ 7 files changed, 55 insertions(+) create mode 100644 src/backend/distributed/sql/udfs/citus_pause_node/12.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_pause_node/latest.sql diff --git a/.gitignore b/.gitignore index df447746a9d..44846b2241d 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,4 @@ lib*.pc # style related temporary outputs *.uncrustify .venv +test-cluster/* diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 60a5ab92bed..b00737add19 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -152,6 +152,7 @@ PG_FUNCTION_INFO_V1(master_disable_node); PG_FUNCTION_INFO_V1(citus_activate_node); PG_FUNCTION_INFO_V1(master_activate_node); PG_FUNCTION_INFO_V1(citus_update_node); +PG_FUNCTION_INFO_V1(citus_pause_node); PG_FUNCTION_INFO_V1(master_update_node); PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid); @@ -1329,6 +1330,32 @@ citus_update_node(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +Datum +citus_pause_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int32 nodeId = PG_GETARG_INT32(0); + List *placementList = NIL; + + WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId); + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND), + errmsg("node %u not found", nodeId))); + } + + + if (NodeIsPrimary(workerNode)) + { + placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); + LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); + } + + PG_RETURN_VOID(); + +} + /* * master_update_node is a wrapper function for old UDF name. diff --git a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql index 685915ebd96..d314d0b90a2 100644 --- a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql +++ b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql @@ -1,3 +1,7 @@ -- citus--12.0-1--12.1-1 +#include "udfs/citus_pause_node/12.1-1.sql" + + + -- bump version to 12.1-1 diff --git a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql index c498a50593f..6088c967725 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql @@ -1,2 +1,4 @@ -- citus--12.1-1--12.0-1 -- this is an empty downgrade path since citus--12.0-1--12.1-1.sql is empty for now + +DROP FUNCTION pg_catalog.citus_pause_node(int); diff --git a/src/backend/distributed/sql/udfs/citus_pause_node/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_pause_node/12.1-1.sql new file mode 100644 index 00000000000..b3496149225 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pause_node/12.1-1.sql @@ -0,0 +1,9 @@ +CREATE FUNCTION pg_catalog.citus_pause_node(node_id int) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_pause_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_pause_node(node_id int) + IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node(int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_pause_node/latest.sql b/src/backend/distributed/sql/udfs/citus_pause_node/latest.sql new file mode 100644 index 00000000000..b3496149225 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pause_node/latest.sql @@ -0,0 +1,9 @@ +CREATE FUNCTION pg_catalog.citus_pause_node(node_id int) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_pause_node$$; + +COMMENT ON FUNCTION pg_catalog.citus_pause_node(node_id int) + IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node(int) FROM PUBLIC; diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 9ec0eb28e17..d36545cfa19 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -376,6 +376,9 @@ SELECT citus_update_node(:worker_1_node, 'localhost', 9992); SELECT citus_nodename_for_nodeid(:worker_1_node); SELECT citus_nodeport_for_nodeid(:worker_1_node); +--citus_pause_node allows pausing a node from the non-default cluster +--TODO add test cases here + SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset From 304a00ee08b5f3fcc126d873791221764363f2b7 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 14:13:28 +0300 Subject: [PATCH 02/35] Fixed unit test problems --- .../spec/isolation_node_pause_ops.spec | 173 ++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 src/test/regress/spec/isolation_node_pause_ops.spec diff --git a/src/test/regress/spec/isolation_node_pause_ops.spec b/src/test/regress/spec/isolation_node_pause_ops.spec new file mode 100644 index 00000000000..bb40918f7bb --- /dev/null +++ b/src/test/regress/spec/isolation_node_pause_ops.spec @@ -0,0 +1,173 @@ +setup +{ + SELECT citus_set_coordinator_host('localhost', 57636); + SET citus.shard_replication_factor TO 1; + CREATE SCHEMA tenant1; + CREATE TABLE tenant1.table1(id int PRIMARY KEY, name text, col bigint); + INSERT INTO tenant1.table1 SELECT i, 'asd', i*1000 FROM generate_series(11, 20) i; + + CREATE TABLE tenant1.table2(id int PRIMARY KEY, name text, col bigint); + + CREATE TABLE public.ref(id int PRIMARY KEY); + SELECT create_reference_table('public.ref'); + + CREATE TABLE tenant1.table3(id int PRIMARY KEY, name text, col1 int, col int REFERENCES public.ref(id)); + SELECT citus_add_local_table_to_metadata('tenant1.table3'); +} + +teardown +{ + DROP TABLE public.ref CASCADE; + DROP SCHEMA IF EXISTS tenant1, tenant2 CASCADE; + SELECT citus_remove_node('localhost', 57636); +} + +session "s1" + +step "s1-begin" +{ + BEGIN; + SET citus.shard_replication_factor TO 1; +} + +step "s1-schema-distribute" +{ + SELECT citus_schema_distribute('tenant1'); +} + +step "s1-schema-undistribute" +{ + SELECT citus_schema_undistribute('tenant1'); +} + +step "s1-verify-distributed-schema" +{ + SELECT logicalrelid, partmethod, partkey, (colocationid = (SELECT colocationid AS tenant_colocid FROM pg_dist_schema)) AS is_correct_colocid, repmodel, autoconverted FROM pg_dist_partition WHERE logicalrelid::text LIKE 'tenant%' ORDER BY logicalrelid; +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-drop-schema" +{ + DROP SCHEMA tenant1 CASCADE; +} + +step "s2-rename-schema" +{ + ALTER SCHEMA tenant1 RENAME TO tenant2; +} + +step "s2-add-table" +{ + SET citus.shard_replication_factor TO 1; + CREATE TABLE tenant1.table4(id int PRIMARY KEY, name text, col bigint); +} + +step "s2-drop-table" +{ + DROP TABLE tenant1.table3; +} + +step "s2-alter-col-type" +{ + ALTER TABLE tenant1.table3 ALTER COLUMN col1 TYPE text; +} + +step "s2-add-foreign-key" +{ + ALTER TABLE tenant1.table3 ADD CONSTRAINT table3_fk1 FOREIGN KEY (id) REFERENCES tenant1.table2 (id); +} + +step "s2-drop-foreign-key" +{ + ALTER TABLE tenant1.table3 DROP CONSTRAINT table3_col_fkey; +} + +step "s2-create-unique-index" +{ + CREATE UNIQUE INDEX idx_2 ON tenant1.table3 (col); +} + +step "s2-create-unique-index-concurrently" +{ + CREATE UNIQUE INDEX CONCURRENTLY idx_3 ON tenant1.table3 (col); +} + +step "s2-reindex-unique-concurrently" +{ + REINDEX INDEX CONCURRENTLY tenant1.idx_2; +} + +step "s2-insert" +{ + // we insert into public.ref table as well to prevent fkey violation + INSERT INTO public.ref SELECT i FROM generate_series(11, 20) i; + INSERT INTO tenant1.table3 SELECT i, 'asd', i*1000 FROM generate_series(11, 20) i; +} + +step "s2-delete" +{ + DELETE FROM tenant1.table3; +} + +step "s2-update" +{ + UPDATE tenant1.table3 SET col = 11 WHERE col = 11; +} + +// DROP SCHEMA +permutation "s1-begin" "s1-schema-distribute" "s2-drop-schema" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-drop-schema" "s1-commit" "s1-verify-distributed-schema" + +// RENAME SCHEMA +permutation "s1-begin" "s1-schema-distribute" "s2-rename-schema" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-rename-schema" "s1-commit" "s1-verify-distributed-schema" + +// CREATE TABLE +permutation "s1-begin" "s1-schema-distribute" "s2-add-table" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-add-table" "s1-commit" "s1-verify-distributed-schema" + +// DROP TABLE +permutation "s1-begin" "s1-schema-distribute" "s2-drop-table" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-drop-table" "s1-commit" "s1-verify-distributed-schema" + +// ALTER TABLE ALTER COLUMN TYPE +permutation "s1-begin" "s1-schema-distribute" "s2-alter-col-type" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-alter-col-type" "s1-commit" "s1-verify-distributed-schema" + +// ADD FOREIGN KEY +permutation "s1-begin" "s1-schema-distribute" "s2-add-foreign-key" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-add-foreign-key" "s1-commit" "s1-verify-distributed-schema" + +// DROP FOREIGN KEY +permutation "s1-begin" "s1-schema-distribute" "s2-drop-foreign-key" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-drop-foreign-key" "s1-commit" "s1-verify-distributed-schema" + +// CREATE UNIQUE INDEX +permutation "s1-begin" "s1-schema-distribute" "s2-create-unique-index" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-create-unique-index" "s1-commit" "s1-verify-distributed-schema" + +// CREATE UNIQUE INDEX CONCURRENTLY +permutation "s1-begin" "s1-schema-distribute" "s2-create-unique-index-concurrently" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-create-unique-index-concurrently" "s1-commit" "s1-verify-distributed-schema" + +// REINDEX CONCURRENTLY +permutation "s2-create-unique-index" "s1-begin" "s1-schema-distribute" "s2-reindex-unique-concurrently" "s1-commit" "s1-verify-distributed-schema" +permutation "s2-create-unique-index" "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-reindex-unique-concurrently" "s1-commit" "s1-verify-distributed-schema" + +// INSERT +permutation "s1-begin" "s1-schema-distribute" "s2-insert" "s1-commit" "s1-verify-distributed-schema" +permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-insert" "s1-commit" "s1-verify-distributed-schema" + +// UPDATE +permutation "s2-insert" "s1-begin" "s1-schema-distribute" "s2-update" "s1-commit" "s1-verify-distributed-schema" +permutation "s2-insert" "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-update" "s1-commit" "s1-verify-distributed-schema" + +// DELETE +permutation "s2-insert" "s1-begin" "s1-schema-distribute" "s2-delete" "s1-commit" "s1-verify-distributed-schema" +permutation "s2-insert" "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-delete" "s1-commit" "s1-verify-distributed-schema" From 7ccbb848b05321f38131966e1c5b9e897f60ee0c Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 14:25:56 +0300 Subject: [PATCH 03/35] Adds unit tests --- .../expected/isolation_citus_pause_node.out | 53 ++++++ src/test/regress/isolation_schedule | 2 + .../spec/isolation_citus_pause_node.spec | 104 +++++++++++ ...create_distributed_table_concurrently.spec | 2 +- .../spec/isolation_node_pause_ops.spec | 173 ------------------ 5 files changed, 160 insertions(+), 174 deletions(-) create mode 100644 src/test/regress/expected/isolation_citus_pause_node.out create mode 100644 src/test/regress/spec/isolation_citus_pause_node.spec delete mode 100644 src/test/regress/spec/isolation_node_pause_ops.spec diff --git a/src/test/regress/expected/isolation_citus_pause_node.out b/src/test/regress/expected/isolation_citus_pause_node.out new file mode 100644 index 00000000000..78a07c68ae8 --- /dev/null +++ b/src/test/regress/expected/isolation_citus_pause_node.out @@ -0,0 +1,53 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-pause-node s2-begin s2-insert s2-end s1-end +step s1-begin: + BEGIN; + +step s1-pause-node: + SELECT pg_catalog.citus_pause_node(2); + +citus_pause_node +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s2-insert: + -- Set statement_timeout for the session (in milliseconds) + SET statement_timeout = 1000; -- 1 seconds + SET client_min_messages = 'notice'; + -- Variable to track if the INSERT statement was successful + DO $$ + DECLARE + insert_successful BOOLEAN := FALSE; + BEGIN + -- Execute the INSERT statement + insert into employee values(11,'e11',3); + -- If we reach this point, the INSERT statement was successful + insert_successful := TRUE; + IF insert_successful THEN + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; + END IF; + -- You can add additional processing here if needed + EXCEPTION + WHEN query_canceled THEN + -- The INSERT statement was canceled due to timeout + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + WHEN OTHERS THEN + -- Any other exception raised during the INSERT statement + RAISE; + END; + $$ + LANGUAGE plpgsql; + +step s2-insert: <... completed> +s2: NOTICE: query_canceled exception raised. This means that citus_pause_node was able to get the lock. +step s2-end: + COMMIT; + +step s1-end: + COMMIT; + diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 1484c712fea..5fe9af7bc7e 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -77,8 +77,10 @@ test: isolation_global_pid test: isolation_citus_locks test: isolation_reference_table test: isolation_schema_based_sharding +test: isolation_citus_pause_node test: isolation_citus_schema_distribute_undistribute + # Rebalancer test: isolation_blocking_move_single_shard_commands test: isolation_blocking_move_multi_shard_commands diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec new file mode 100644 index 00000000000..632bcf90bdc --- /dev/null +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -0,0 +1,104 @@ +setup +{ + SET citus.shard_replication_factor to 1; + + CREATE TABLE company(id int primary key, name text); + select create_distributed_table('company', 'id'); + + create table employee(id int , name text, company_id int ); + alter table employee add constraint employee_pkey primary key (id,company_id); + + select create_distributed_table('employee', 'company_id'); + + insert into company values(1,'c1'); + insert into company values(2,'c2'); + insert into company values(3,'c3'); + + insert into employee values(1,'e1',1); + insert into employee values(2,'e2',1); + insert into employee values(3,'e3',1); + + insert into employee values(4,'e4',2); + insert into employee values(5,'e5',2); + insert into employee values(6,'e6',2); + + insert into employee values(7,'e7',3); + insert into employee values(8,'e8',3); + insert into employee values(9,'e9',3); + insert into employee values(10,'e10',3); + + +} + +teardown +{ + DROP TABLE company,employee; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-pause-node" +{ + SELECT pg_catalog.citus_pause_node(2); +} + +step "s1-end" +{ + COMMIT; +} + +session "s2" + + +step "s2-begin" +{ + BEGIN; +} + +step "s2-insert" +{ + -- Set statement_timeout for the session (in milliseconds) + SET statement_timeout = 1000; -- 1 seconds + SET client_min_messages = 'notice'; + + -- Variable to track if the INSERT statement was successful + DO $$ + DECLARE + insert_successful BOOLEAN := FALSE; + BEGIN + -- Execute the INSERT statement + insert into employee values(11,'e11',3); + + -- If we reach this point, the INSERT statement was successful + insert_successful := TRUE; + + IF insert_successful THEN + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; + END IF; + + + -- You can add additional processing here if needed + EXCEPTION + WHEN query_canceled THEN + -- The INSERT statement was canceled due to timeout + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + WHEN OTHERS THEN + -- Any other exception raised during the INSERT statement + RAISE; + END; + + $$ + LANGUAGE plpgsql; +} + +step "s2-end" +{ + COMMIT; +} + +permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-insert" "s2-end" "s1-end" diff --git a/src/test/regress/spec/isolation_create_distributed_table_concurrently.spec b/src/test/regress/spec/isolation_create_distributed_table_concurrently.spec index f6a83d309f1..da9b7393b05 100644 --- a/src/test/regress/spec/isolation_create_distributed_table_concurrently.spec +++ b/src/test/regress/spec/isolation_create_distributed_table_concurrently.spec @@ -140,7 +140,7 @@ step "s2-commit" session "s3" // this advisory lock with (almost) random values are only used -// for testing purposes. For details, check Citus' logical replication +// for testing purposes. For details, check Citus logical replication // source code step "s3-acquire-split-advisory-lock" { diff --git a/src/test/regress/spec/isolation_node_pause_ops.spec b/src/test/regress/spec/isolation_node_pause_ops.spec deleted file mode 100644 index bb40918f7bb..00000000000 --- a/src/test/regress/spec/isolation_node_pause_ops.spec +++ /dev/null @@ -1,173 +0,0 @@ -setup -{ - SELECT citus_set_coordinator_host('localhost', 57636); - SET citus.shard_replication_factor TO 1; - CREATE SCHEMA tenant1; - CREATE TABLE tenant1.table1(id int PRIMARY KEY, name text, col bigint); - INSERT INTO tenant1.table1 SELECT i, 'asd', i*1000 FROM generate_series(11, 20) i; - - CREATE TABLE tenant1.table2(id int PRIMARY KEY, name text, col bigint); - - CREATE TABLE public.ref(id int PRIMARY KEY); - SELECT create_reference_table('public.ref'); - - CREATE TABLE tenant1.table3(id int PRIMARY KEY, name text, col1 int, col int REFERENCES public.ref(id)); - SELECT citus_add_local_table_to_metadata('tenant1.table3'); -} - -teardown -{ - DROP TABLE public.ref CASCADE; - DROP SCHEMA IF EXISTS tenant1, tenant2 CASCADE; - SELECT citus_remove_node('localhost', 57636); -} - -session "s1" - -step "s1-begin" -{ - BEGIN; - SET citus.shard_replication_factor TO 1; -} - -step "s1-schema-distribute" -{ - SELECT citus_schema_distribute('tenant1'); -} - -step "s1-schema-undistribute" -{ - SELECT citus_schema_undistribute('tenant1'); -} - -step "s1-verify-distributed-schema" -{ - SELECT logicalrelid, partmethod, partkey, (colocationid = (SELECT colocationid AS tenant_colocid FROM pg_dist_schema)) AS is_correct_colocid, repmodel, autoconverted FROM pg_dist_partition WHERE logicalrelid::text LIKE 'tenant%' ORDER BY logicalrelid; -} - -step "s1-commit" -{ - COMMIT; -} - -session "s2" - -step "s2-drop-schema" -{ - DROP SCHEMA tenant1 CASCADE; -} - -step "s2-rename-schema" -{ - ALTER SCHEMA tenant1 RENAME TO tenant2; -} - -step "s2-add-table" -{ - SET citus.shard_replication_factor TO 1; - CREATE TABLE tenant1.table4(id int PRIMARY KEY, name text, col bigint); -} - -step "s2-drop-table" -{ - DROP TABLE tenant1.table3; -} - -step "s2-alter-col-type" -{ - ALTER TABLE tenant1.table3 ALTER COLUMN col1 TYPE text; -} - -step "s2-add-foreign-key" -{ - ALTER TABLE tenant1.table3 ADD CONSTRAINT table3_fk1 FOREIGN KEY (id) REFERENCES tenant1.table2 (id); -} - -step "s2-drop-foreign-key" -{ - ALTER TABLE tenant1.table3 DROP CONSTRAINT table3_col_fkey; -} - -step "s2-create-unique-index" -{ - CREATE UNIQUE INDEX idx_2 ON tenant1.table3 (col); -} - -step "s2-create-unique-index-concurrently" -{ - CREATE UNIQUE INDEX CONCURRENTLY idx_3 ON tenant1.table3 (col); -} - -step "s2-reindex-unique-concurrently" -{ - REINDEX INDEX CONCURRENTLY tenant1.idx_2; -} - -step "s2-insert" -{ - // we insert into public.ref table as well to prevent fkey violation - INSERT INTO public.ref SELECT i FROM generate_series(11, 20) i; - INSERT INTO tenant1.table3 SELECT i, 'asd', i*1000 FROM generate_series(11, 20) i; -} - -step "s2-delete" -{ - DELETE FROM tenant1.table3; -} - -step "s2-update" -{ - UPDATE tenant1.table3 SET col = 11 WHERE col = 11; -} - -// DROP SCHEMA -permutation "s1-begin" "s1-schema-distribute" "s2-drop-schema" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-drop-schema" "s1-commit" "s1-verify-distributed-schema" - -// RENAME SCHEMA -permutation "s1-begin" "s1-schema-distribute" "s2-rename-schema" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-rename-schema" "s1-commit" "s1-verify-distributed-schema" - -// CREATE TABLE -permutation "s1-begin" "s1-schema-distribute" "s2-add-table" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-add-table" "s1-commit" "s1-verify-distributed-schema" - -// DROP TABLE -permutation "s1-begin" "s1-schema-distribute" "s2-drop-table" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-drop-table" "s1-commit" "s1-verify-distributed-schema" - -// ALTER TABLE ALTER COLUMN TYPE -permutation "s1-begin" "s1-schema-distribute" "s2-alter-col-type" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-alter-col-type" "s1-commit" "s1-verify-distributed-schema" - -// ADD FOREIGN KEY -permutation "s1-begin" "s1-schema-distribute" "s2-add-foreign-key" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-add-foreign-key" "s1-commit" "s1-verify-distributed-schema" - -// DROP FOREIGN KEY -permutation "s1-begin" "s1-schema-distribute" "s2-drop-foreign-key" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-drop-foreign-key" "s1-commit" "s1-verify-distributed-schema" - -// CREATE UNIQUE INDEX -permutation "s1-begin" "s1-schema-distribute" "s2-create-unique-index" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-create-unique-index" "s1-commit" "s1-verify-distributed-schema" - -// CREATE UNIQUE INDEX CONCURRENTLY -permutation "s1-begin" "s1-schema-distribute" "s2-create-unique-index-concurrently" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-create-unique-index-concurrently" "s1-commit" "s1-verify-distributed-schema" - -// REINDEX CONCURRENTLY -permutation "s2-create-unique-index" "s1-begin" "s1-schema-distribute" "s2-reindex-unique-concurrently" "s1-commit" "s1-verify-distributed-schema" -permutation "s2-create-unique-index" "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-reindex-unique-concurrently" "s1-commit" "s1-verify-distributed-schema" - -// INSERT -permutation "s1-begin" "s1-schema-distribute" "s2-insert" "s1-commit" "s1-verify-distributed-schema" -permutation "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-insert" "s1-commit" "s1-verify-distributed-schema" - -// UPDATE -permutation "s2-insert" "s1-begin" "s1-schema-distribute" "s2-update" "s1-commit" "s1-verify-distributed-schema" -permutation "s2-insert" "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-update" "s1-commit" "s1-verify-distributed-schema" - -// DELETE -permutation "s2-insert" "s1-begin" "s1-schema-distribute" "s2-delete" "s1-commit" "s1-verify-distributed-schema" -permutation "s2-insert" "s1-schema-distribute" "s1-begin" "s1-schema-undistribute" "s2-delete" "s1-commit" "s1-verify-distributed-schema" From b178a7524c8e185a3a949f07e9d808971113b6d5 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 14:26:53 +0300 Subject: [PATCH 04/35] Fixes static code analysis issues --- src/backend/distributed/metadata/node_metadata.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index b00737add19..36f5c2a47e3 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1330,6 +1330,7 @@ citus_update_node(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + Datum citus_pause_node(PG_FUNCTION_ARGS) { @@ -1353,7 +1354,6 @@ citus_pause_node(PG_FUNCTION_ARGS) } PG_RETURN_VOID(); - } From 20ae610cdc5db18877381fc3ab6ab859367e50c2 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 15:57:56 +0300 Subject: [PATCH 05/35] Removes unnecessary changes --- src/test/regress/sql/multi_cluster_management.sql | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index d36545cfa19..9ec0eb28e17 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -376,9 +376,6 @@ SELECT citus_update_node(:worker_1_node, 'localhost', 9992); SELECT citus_nodename_for_nodeid(:worker_1_node); SELECT citus_nodeport_for_nodeid(:worker_1_node); ---citus_pause_node allows pausing a node from the non-default cluster ---TODO add test cases here - SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset From 46ab6f1c2bfb7d9e0ef5642a43ccf5f74e00b045 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 16:00:11 +0300 Subject: [PATCH 06/35] Removes unnecessary changes --- .../spec/isolation_create_distributed_table_concurrently.spec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/regress/spec/isolation_create_distributed_table_concurrently.spec b/src/test/regress/spec/isolation_create_distributed_table_concurrently.spec index da9b7393b05..f6a83d309f1 100644 --- a/src/test/regress/spec/isolation_create_distributed_table_concurrently.spec +++ b/src/test/regress/spec/isolation_create_distributed_table_concurrently.spec @@ -140,7 +140,7 @@ step "s2-commit" session "s3" // this advisory lock with (almost) random values are only used -// for testing purposes. For details, check Citus logical replication +// for testing purposes. For details, check Citus' logical replication // source code step "s3-acquire-split-advisory-lock" { From 8928c0ff88d0221f1075367665233a505f2fe640 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 17:09:07 +0300 Subject: [PATCH 07/35] Adds multi_extension.out output --- src/test/regress/expected/multi_extension.out | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index a1f6f8da95f..79e5a85ec7a 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1401,7 +1401,8 @@ ALTER EXTENSION citus UPDATE TO '12.1-1'; SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- -(0 rows) + | function citus_pause_node(integer) void +(1 row) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version From 6da0baada44b4a5b55f5502a1c0eea356c65ee6d Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 17:15:55 +0300 Subject: [PATCH 08/35] Adds citus_pause_node into test files --- src/test/regress/expected/upgrade_list_citus_objects.out | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 3e9698788fd..5a668571bb7 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -103,6 +103,7 @@ ORDER BY 1; function citus_nodeid_for_gpid(bigint) function citus_nodename_for_nodeid(integer) function citus_nodeport_for_nodeid(integer) + function citus_pause_node(integer) function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() From afa7bf640d143e406e8b3f4e56c322d750ebed71 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 17:21:11 +0300 Subject: [PATCH 09/35] Fixes upgrade_list_citus_objects diff --- src/test/regress/expected/upgrade_list_citus_objects.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 5a668571bb7..efc93f6519f 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -339,5 +339,5 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(330 rows) +(331 rows) From 515627e3cc442ec764da9433cc6f30999bacf0c7 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 17:51:42 +0300 Subject: [PATCH 10/35] Adds code to debug --- src/test/regress/spec/isolation_citus_pause_node.spec | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index 632bcf90bdc..f98b001fabb 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -40,6 +40,10 @@ session "s1" step "s1-begin" { BEGIN; + select * from pg_dist_node; + SELECT get_shard_id_for_distribution_column('employee', 3); + + } step "s1-pause-node" From d42f5579edb36fbcd06feb82898c5b22bf1049a9 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 19:19:48 +0300 Subject: [PATCH 11/35] Adds node id detection with shard_id --- .../spec/isolation_citus_pause_node.spec | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index f98b001fabb..084ace7b3ed 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -48,7 +48,32 @@ step "s1-begin" step "s1-pause-node" { - SELECT pg_catalog.citus_pause_node(2); + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into shard_id; + + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shard_id = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + raise notice 'node id is %',v_node_id; + + + -- Pause the node + SELECT pg_catalog.citus_pause_node(v_node_id); + END; + $$ + LANGUAGE plpgsql; } step "s1-end" @@ -73,15 +98,16 @@ step "s2-insert" -- Variable to track if the INSERT statement was successful DO $$ DECLARE - insert_successful BOOLEAN := FALSE; + v_insert_successful BOOLEAN := FALSE; BEGIN + -- Execute the INSERT statement insert into employee values(11,'e11',3); -- If we reach this point, the INSERT statement was successful - insert_successful := TRUE; + v_insert_successful := TRUE; - IF insert_successful THEN + IF v_insert_successful THEN RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; END IF; From 4ed78f148e8d491fd49cc974c7a39a77b9d08f63 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 19:26:25 +0300 Subject: [PATCH 12/35] Fixes errors in code --- src/test/regress/spec/isolation_citus_pause_node.spec | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index 084ace7b3ed..8e5eb885874 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -40,10 +40,6 @@ session "s1" step "s1-begin" { BEGIN; - select * from pg_dist_node; - SELECT get_shard_id_for_distribution_column('employee', 3); - - } step "s1-pause-node" @@ -57,10 +53,10 @@ step "s1-pause-node" v_node_port int; BEGIN -- Get the shard id for the distribution column - SELECT get_shard_id_for_distribution_column('employee', 3) into shard_id; + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; --Get the node id for the shard id - SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shard_id = v_shard_id limit 1; + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; raise notice 'node name is %',v_node_name; raise notice 'node port is %',v_node_port; @@ -70,7 +66,7 @@ step "s1-pause-node" -- Pause the node - SELECT pg_catalog.citus_pause_node(v_node_id); + perform pg_catalog.citus_pause_node(v_node_id) ; END; $$ LANGUAGE plpgsql; From 24380f80b982c2ccf476bd113625590be821710a Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 20:34:42 +0300 Subject: [PATCH 13/35] Fixes unit tests --- .../expected/isolation_citus_pause_node.out | 39 ++++++++++++++----- .../spec/isolation_citus_pause_node.spec | 6 ++- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/test/regress/expected/isolation_citus_pause_node.out b/src/test/regress/expected/isolation_citus_pause_node.out index 78a07c68ae8..0307bc3555b 100644 --- a/src/test/regress/expected/isolation_citus_pause_node.out +++ b/src/test/regress/expected/isolation_citus_pause_node.out @@ -4,14 +4,35 @@ starting permutation: s1-begin s1-pause-node s2-begin s2-insert s2-end s1-end step s1-begin: BEGIN; +s1: NOTICE: step s1-pause-node: - SELECT pg_catalog.citus_pause_node(2); - -citus_pause_node ---------------------------------------------------------------------- - -(1 row) +SET client_min_messages = 'notice'; +DO $$ +DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; +BEGIN +--The first message in the block is being printed on the top of the code block. So adding a dummy message +--to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node(v_node_id) ; +END; +$$ +LANGUAGE plpgsql; +s1: NOTICE: node name is localhost +s1: NOTICE: node port is 57638 step s2-begin: BEGIN; @@ -22,13 +43,13 @@ step s2-insert: -- Variable to track if the INSERT statement was successful DO $$ DECLARE - insert_successful BOOLEAN := FALSE; + v_insert_successful BOOLEAN := FALSE; BEGIN -- Execute the INSERT statement insert into employee values(11,'e11',3); -- If we reach this point, the INSERT statement was successful - insert_successful := TRUE; - IF insert_successful THEN + v_insert_successful := TRUE; + IF v_insert_successful THEN RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; END IF; -- You can add additional processing here if needed diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index 8e5eb885874..77af5dfd690 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -52,7 +52,10 @@ step "s1-pause-node" v_node_name text; v_node_port int; BEGIN - -- Get the shard id for the distribution column + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; --Get the node id for the shard id @@ -62,7 +65,6 @@ step "s1-pause-node" -- Get the node id for the shard id SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; - raise notice 'node id is %',v_node_id; -- Pause the node From 28cda815a3fe95aadc9f122119cf60543e1b4ff1 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 22:24:09 +0300 Subject: [PATCH 14/35] Fixes coverage issue --- .gitignore | 1 - .../expected/isolation_citus_pause_node.out | 20 +++++++++++++++++++ .../spec/isolation_citus_pause_node.spec | 17 +++++++++++++++- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 44846b2241d..df447746a9d 100644 --- a/.gitignore +++ b/.gitignore @@ -55,4 +55,3 @@ lib*.pc # style related temporary outputs *.uncrustify .venv -test-cluster/* diff --git a/src/test/regress/expected/isolation_citus_pause_node.out b/src/test/regress/expected/isolation_citus_pause_node.out index 0307bc3555b..19f8551567a 100644 --- a/src/test/regress/expected/isolation_citus_pause_node.out +++ b/src/test/regress/expected/isolation_citus_pause_node.out @@ -72,3 +72,23 @@ step s2-end: step s1-end: COMMIT; + +starting permutation: s1-begin s1-node-not-found s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: Node not found. +step s1-node-not-found: +DO $$ +BEGIN + select citus_pause_node(25000); +EXCEPTION + WHEN SQLSTATE 'P0002' THEN + RAISE NOTICE 'Node not found.'; +END; +$$ +LANGUAGE plpgsql; + +step s1-end: + COMMIT; + diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index 77af5dfd690..72af10eef58 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -42,6 +42,20 @@ step "s1-begin" BEGIN; } +step "s1-node-not-found" +{ + DO $$ + BEGIN + select citus_pause_node(25000); + + EXCEPTION + WHEN SQLSTATE 'P0002' THEN + RAISE NOTICE 'Node not found.'; + END; + $$ + LANGUAGE plpgsql; +} + step "s1-pause-node" { SET client_min_messages = 'notice'; @@ -129,4 +143,5 @@ step "s2-end" COMMIT; } -permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-insert" "s2-end" "s1-end" +permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-insert" "s2-end" "s1-end" +permutation "s1-begin" "s1-node-not-found" "s1-end" From bb62b84ad74ab721466c05bb55d4e9bf300a65f3 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 22:57:10 +0300 Subject: [PATCH 15/35] Parameterizes node id in no node test --- .../expected/isolation_citus_pause_node.out | 19 ++++++++++++++++-- .../spec/isolation_citus_pause_node.spec | 20 ++++++++++++++++++- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/test/regress/expected/isolation_citus_pause_node.out b/src/test/regress/expected/isolation_citus_pause_node.out index 19f8551567a..ad2213bf45d 100644 --- a/src/test/regress/expected/isolation_citus_pause_node.out +++ b/src/test/regress/expected/isolation_citus_pause_node.out @@ -80,8 +80,23 @@ step s1-begin: s1: NOTICE: Node not found. step s1-node-not-found: DO $$ -BEGIN - select citus_pause_node(25000); +DECLARE + v_node_id int; + v_node_exists boolean := true; + v_count int :=-1; + BEGIN + -- Get a node-id that does not exist in the cluster + while v_node_exists loop + --get a random node id in the range of 1000 to 2000 + v_node_id := FLOOR(RANDOM()*(2000- 1000 + 1)) + 1000; + begin + select count(0) into v_count from pg_dist_node where nodeid = v_node_id; + if v_count = 0 then + v_node_exists := false; + end if; + end; + end loop; + select citus_pause_node(v_node_id); EXCEPTION WHEN SQLSTATE 'P0002' THEN RAISE NOTICE 'Node not found.'; diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index 72af10eef58..6a9fc9bbbfe 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -45,8 +45,26 @@ step "s1-begin" step "s1-node-not-found" { DO $$ + DECLARE + v_node_id int; + v_node_exists boolean := true; + v_count int :=-1; BEGIN - select citus_pause_node(25000); + + -- Get a node-id that does not exist in the cluster + + while v_node_exists loop + --get a random node id in the range of 1000 to 2000 + v_node_id := FLOOR(RANDOM()*(2000- 1000 + 1)) + 1000; + begin + select count(0) into v_count from pg_dist_node where nodeid = v_node_id; + if v_count = 0 then + v_node_exists := false; + end if; + end; + end loop; + select citus_pause_node(v_node_id); + EXCEPTION WHEN SQLSTATE 'P0002' THEN From ed40dfe1a7482679aa4bc2068db19cba3655f337 Mon Sep 17 00:00:00 2001 From: gindibay Date: Fri, 28 Jul 2023 23:35:14 +0300 Subject: [PATCH 16/35] Give details for exception message --- .../expected/isolation_citus_pause_node.out | 16 +++++++++++----- .../regress/spec/isolation_citus_pause_node.spec | 12 +++++++++--- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/test/regress/expected/isolation_citus_pause_node.out b/src/test/regress/expected/isolation_citus_pause_node.out index ad2213bf45d..85e03b2324d 100644 --- a/src/test/regress/expected/isolation_citus_pause_node.out +++ b/src/test/regress/expected/isolation_citus_pause_node.out @@ -77,13 +77,15 @@ starting permutation: s1-begin s1-node-not-found s1-end step s1-begin: BEGIN; -s1: NOTICE: Node not found. +s1: NOTICE: Node not found. step s1-node-not-found: DO $$ DECLARE - v_node_id int; - v_node_exists boolean := true; - v_count int :=-1; + v_node_id int := -1; + v_node_exists boolean := true; + v_count int := -1; + v_exception_message text; + v_expected_exception_message text := ''; BEGIN -- Get a node-id that does not exist in the cluster while v_node_exists loop @@ -99,7 +101,11 @@ DECLARE select citus_pause_node(v_node_id); EXCEPTION WHEN SQLSTATE 'P0002' THEN - RAISE NOTICE 'Node not found.'; + GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; + v_expected_exception_message := 'node ' || v_node_id || ' not found'; + if v_exception_message = v_expected_exception_message then + RAISE NOTICE 'Node not found.'; + end if; END; $$ LANGUAGE plpgsql; diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index 6a9fc9bbbfe..a25e3d2c23e 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -46,9 +46,11 @@ step "s1-node-not-found" { DO $$ DECLARE - v_node_id int; + v_node_id int := -1; v_node_exists boolean := true; - v_count int :=-1; + v_count int := -1; + v_exception_message text; + v_expected_exception_message text := ''; BEGIN -- Get a node-id that does not exist in the cluster @@ -68,7 +70,11 @@ step "s1-node-not-found" EXCEPTION WHEN SQLSTATE 'P0002' THEN - RAISE NOTICE 'Node not found.'; + GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; + v_expected_exception_message := 'node ' || v_node_id || ' not found'; + if v_exception_message = v_expected_exception_message then + RAISE NOTICE 'Node not found.'; + end if; END; $$ LANGUAGE plpgsql; From 63311e546ff9a80b6d6c33b6752908fffff96374 Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 17:35:15 +0300 Subject: [PATCH 17/35] Fixes some review notes --- .../expected/isolation_citus_pause_node.out | 282 +++++++++++++++++- .../spec/isolation_citus_pause_node.spec | 116 +++++-- 2 files changed, 357 insertions(+), 41 deletions(-) diff --git a/src/test/regress/expected/isolation_citus_pause_node.out b/src/test/regress/expected/isolation_citus_pause_node.out index 85e03b2324d..18371b724fd 100644 --- a/src/test/regress/expected/isolation_citus_pause_node.out +++ b/src/test/regress/expected/isolation_citus_pause_node.out @@ -1,6 +1,6 @@ Parsed test spec with 2 sessions -starting permutation: s1-begin s1-pause-node s2-begin s2-insert s2-end s1-end +starting permutation: s1-begin s1-pause-node s2-begin s2-insert-distributed s2-end s1-end step s1-begin: BEGIN; @@ -36,7 +36,7 @@ s1: NOTICE: node port is 57638 step s2-begin: BEGIN; -step s2-insert: +step s2-insert-distributed: -- Set statement_timeout for the session (in milliseconds) SET statement_timeout = 1000; -- 1 seconds SET client_min_messages = 'notice'; @@ -64,7 +64,7 @@ step s2-insert: $$ LANGUAGE plpgsql; -step s2-insert: <... completed> +step s2-insert-distributed: <... completed> s2: NOTICE: query_canceled exception raised. This means that citus_pause_node was able to get the lock. step s2-end: COMMIT; @@ -73,6 +73,267 @@ step s1-end: COMMIT; +starting permutation: s1-begin s1-pause-node s2-begin s2-delete-distributed s2-end s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: +SET client_min_messages = 'notice'; +DO $$ +DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; +BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node(v_node_id) ; +END; +$$ +LANGUAGE plpgsql; + +s1: NOTICE: node name is localhost +s1: NOTICE: node port is 57638 +step s2-begin: +BEGIN; + +step s2-delete-distributed: +-- Set statement_timeout for the session (in milliseconds) +SET statement_timeout = 1000; -- 1 seconds +SET client_min_messages = 'notice'; +-- Variable to track if the DELETE statement was successful +DO $$ +DECLARE + v_delete_successful BOOLEAN := FALSE; +BEGIN + -- Execute the DELETE statement + delete from employee where id = 9; + -- If we reach this point, the DELETE statement was successful + v_delete_successful := TRUE; + IF v_delete_successful THEN + RAISE NOTICE 'DELETE statement completed successfully. This means that citus_pause_node could not get the lock.'; + END IF; +-- You can add additional processing here if needed +EXCEPTION + WHEN query_canceled THEN + -- The INSERT statement was canceled due to timeout + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + WHEN OTHERS THEN + -- Any other exception raised during the INSERT statement + RAISE; +END; +$$ +LANGUAGE plpgsql; + +step s2-delete-distributed: <... completed> +s2: NOTICE: query_canceled exception raised. This means that citus_pause_node was able to get the lock. +step s2-end: +COMMIT; + +step s1-end: + COMMIT; + + +starting permutation: s1-begin s1-pause-node s2-begin s2-select-distributed s2-end s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: +SET client_min_messages = 'notice'; +DO $$ +DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; +BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node(v_node_id) ; +END; +$$ +LANGUAGE plpgsql; + +s1: NOTICE: node name is localhost +s1: NOTICE: node port is 57638 +step s2-begin: +BEGIN; + +step s2-select-distributed: +select * from employee where id = 10; + +id|name|company_id +--------------------------------------------------------------------- +10|e10 | 3 +(1 row) + +step s2-end: +COMMIT; + +step s1-end: + COMMIT; + + +starting permutation: s1-begin s1-pause-node s2-begin s2-insert-reference s2-end s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: +SET client_min_messages = 'notice'; +DO $$ +DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; +BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node(v_node_id) ; +END; +$$ +LANGUAGE plpgsql; + +s1: NOTICE: node name is localhost +s1: NOTICE: node port is 57638 +step s2-begin: +BEGIN; + +step s2-insert-reference: +-- Set statement_timeout for the session (in milliseconds) +SET statement_timeout = 1000; -- 1 seconds +SET client_min_messages = 'notice'; +-- Variable to track if the INSERT statement was successful +DO $$ +DECLARE + v_insert_successful BOOLEAN := FALSE; +BEGIN + -- Execute the INSERT statement + insert into city values(3,'city3'); + -- If we reach this point, the INSERT statement was successful + v_insert_successful := TRUE; + IF v_insert_successful THEN + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; + END IF; +EXCEPTION WHEN query_canceled THEN + -- The INSERT statement was canceled due to timeout + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + WHEN OTHERS THEN + -- Any other exception raised during the INSERT statement + RAISE; +END; +$$ +LANGUAGE plpgsql; + +step s2-insert-reference: <... completed> +s2: NOTICE: query_canceled exception raised. This means that citus_pause_node was able to get the lock. +step s2-end: +COMMIT; + +step s1-end: + COMMIT; + + +starting permutation: s1-begin s1-pause-node s1-pause-node s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node: +SET client_min_messages = 'notice'; +DO $$ +DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; +BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node(v_node_id) ; +END; +$$ +LANGUAGE plpgsql; + +s1: NOTICE: node name is localhost +s1: NOTICE: node port is 57638 +s1: NOTICE: +step s1-pause-node: +SET client_min_messages = 'notice'; +DO $$ +DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; +BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node(v_node_id) ; +END; +$$ +LANGUAGE plpgsql; + +s1: NOTICE: node name is localhost +s1: NOTICE: node port is 57638 +step s1-end: + COMMIT; + + starting permutation: s1-begin s1-node-not-found s1-end step s1-begin: BEGIN; @@ -83,22 +344,11 @@ DO $$ DECLARE v_node_id int := -1; v_node_exists boolean := true; - v_count int := -1; v_exception_message text; v_expected_exception_message text := ''; BEGIN - -- Get a node-id that does not exist in the cluster - while v_node_exists loop - --get a random node id in the range of 1000 to 2000 - v_node_id := FLOOR(RANDOM()*(2000- 1000 + 1)) + 1000; - begin - select count(0) into v_count from pg_dist_node where nodeid = v_node_id; - if v_count = 0 then - v_node_exists := false; - end if; - end; - end loop; - select citus_pause_node(v_node_id); + select nextval('pg_dist_node_nodeid_seq')::int into v_node_id; + select citus_pause_node(v_node_id); EXCEPTION WHEN SQLSTATE 'P0002' THEN GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index a25e3d2c23e..8b8dcded9c7 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -2,7 +2,10 @@ setup { SET citus.shard_replication_factor to 1; - CREATE TABLE company(id int primary key, name text); + create table city (id int , name text ); + SELECT create_reference_table('city'); + + CREATE TABLE company(id int primary key, name text, city_id int); select create_distributed_table('company', 'id'); create table employee(id int , name text, company_id int ); @@ -10,9 +13,13 @@ setup select create_distributed_table('employee', 'company_id'); - insert into company values(1,'c1'); - insert into company values(2,'c2'); - insert into company values(3,'c3'); + insert into city values(1,'city1'); + insert into city values(2,'city2'); + + + insert into company values(1,'c1', 1); + insert into company values(2,'c2',2); + insert into company values(3,'c3',1); insert into employee values(1,'e1',1); insert into employee values(2,'e2',1); @@ -32,7 +39,7 @@ setup teardown { - DROP TABLE company,employee; + DROP TABLE employee,company,city; } session "s1" @@ -46,28 +53,13 @@ step "s1-node-not-found" { DO $$ DECLARE - v_node_id int := -1; + v_node_id int:= -1; v_node_exists boolean := true; - v_count int := -1; v_exception_message text; v_expected_exception_message text := ''; BEGIN - - -- Get a node-id that does not exist in the cluster - - while v_node_exists loop - --get a random node id in the range of 1000 to 2000 - v_node_id := FLOOR(RANDOM()*(2000- 1000 + 1)) + 1000; - begin - select count(0) into v_count from pg_dist_node where nodeid = v_node_id; - if v_count = 0 then - v_node_exists := false; - end if; - end; - end loop; - select citus_pause_node(v_node_id); - - + select nextval('pg_dist_node_nodeid_seq')::int into v_node_id; + select citus_pause_node(v_node_id) ; EXCEPTION WHEN SQLSTATE 'P0002' THEN GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; @@ -125,7 +117,7 @@ step "s2-begin" BEGIN; } -step "s2-insert" +step "s2-insert-distributed" { -- Set statement_timeout for the session (in milliseconds) SET statement_timeout = 1000; -- 1 seconds @@ -162,10 +154,84 @@ step "s2-insert" LANGUAGE plpgsql; } +step "s2-insert-reference"{ + -- Set statement_timeout for the session (in milliseconds) + SET statement_timeout = 1000; -- 1 seconds + SET client_min_messages = 'notice'; + + -- Variable to track if the INSERT statement was successful + DO $$ + DECLARE + v_insert_successful BOOLEAN := FALSE; + BEGIN + + -- Execute the INSERT statement + insert into city values(3,'city3'); + + -- If we reach this point, the INSERT statement was successful + v_insert_successful := TRUE; + + IF v_insert_successful THEN + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; + END IF; + + EXCEPTION WHEN query_canceled THEN + -- The INSERT statement was canceled due to timeout + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + WHEN OTHERS THEN + -- Any other exception raised during the INSERT statement + RAISE; + END; + $$ + LANGUAGE plpgsql; +} + +step "s2-select-distributed"{ + select * from employee where id = 10; +} + + +step "s2-delete-distributed"{ + -- Set statement_timeout for the session (in milliseconds) + SET statement_timeout = 1000; -- 1 seconds + SET client_min_messages = 'notice'; + + -- Variable to track if the DELETE statement was successful + DO $$ + DECLARE + v_delete_successful BOOLEAN := FALSE; + BEGIN + + -- Execute the DELETE statement + delete from employee where id = 9; + + -- If we reach this point, the DELETE statement was successful + v_delete_successful := TRUE; + + IF v_delete_successful THEN + RAISE NOTICE 'DELETE statement completed successfully. This means that citus_pause_node could not get the lock.'; + END IF; + -- You can add additional processing here if needed + EXCEPTION + WHEN query_canceled THEN + -- The INSERT statement was canceled due to timeout + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + WHEN OTHERS THEN + -- Any other exception raised during the INSERT statement + RAISE; + END; + $$ + LANGUAGE plpgsql; +} + step "s2-end" { COMMIT; } -permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-insert" "s2-end" "s1-end" +permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-insert-distributed" "s2-end" "s1-end" +permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-delete-distributed" "s2-end" "s1-end" +permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-select-distributed" "s2-end" "s1-end" +permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-insert-reference" "s2-end" "s1-end" +permutation "s1-begin" "s1-pause-node" "s1-pause-node" "s1-end" permutation "s1-begin" "s1-node-not-found" "s1-end" From 1c05eebebe2bc0bf9851a37d5da454edaab24e16 Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 19:21:13 +0300 Subject: [PATCH 18/35] Updates udf name --- .../distributed/metadata/node_metadata.c | 4 +-- .../distributed/sql/citus--12.0-1--12.1-1.sql | 2 +- .../sql/downgrades/citus--12.1-1--12.0-1.sql | 2 +- .../sql/udfs/citus_pause_node/12.1-1.sql | 9 ------ .../sql/udfs/citus_pause_node/latest.sql | 9 ------ .../citus_pause_node_within_txn/12.1-1.sql | 9 ++++++ .../citus_pause_node_within_txn/latest.sql | 9 ++++++ .../expected/isolation_citus_pause_node.out | 32 +++++++++---------- .../spec/isolation_citus_pause_node.spec | 16 +++++----- 9 files changed, 46 insertions(+), 46 deletions(-) delete mode 100644 src/backend/distributed/sql/udfs/citus_pause_node/12.1-1.sql delete mode 100644 src/backend/distributed/sql/udfs/citus_pause_node/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 36f5c2a47e3..44e80404c2a 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -152,7 +152,7 @@ PG_FUNCTION_INFO_V1(master_disable_node); PG_FUNCTION_INFO_V1(citus_activate_node); PG_FUNCTION_INFO_V1(master_activate_node); PG_FUNCTION_INFO_V1(citus_update_node); -PG_FUNCTION_INFO_V1(citus_pause_node); +PG_FUNCTION_INFO_V1(citus_pause_node_within_txn); PG_FUNCTION_INFO_V1(master_update_node); PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid); @@ -1332,7 +1332,7 @@ citus_update_node(PG_FUNCTION_ARGS) Datum -citus_pause_node(PG_FUNCTION_ARGS) +citus_pause_node_within_txn(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); diff --git a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql index d314d0b90a2..c66860832da 100644 --- a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql +++ b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql @@ -1,6 +1,6 @@ -- citus--12.0-1--12.1-1 -#include "udfs/citus_pause_node/12.1-1.sql" +#include "udfs/citus_pause_node_within_txn/12.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql index 6088c967725..652b53cb10d 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql @@ -1,4 +1,4 @@ -- citus--12.1-1--12.0-1 -- this is an empty downgrade path since citus--12.0-1--12.1-1.sql is empty for now -DROP FUNCTION pg_catalog.citus_pause_node(int); +DROP FUNCTION pg_catalog.citus_pause_node_within_txn(int); diff --git a/src/backend/distributed/sql/udfs/citus_pause_node/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_pause_node/12.1-1.sql deleted file mode 100644 index b3496149225..00000000000 --- a/src/backend/distributed/sql/udfs/citus_pause_node/12.1-1.sql +++ /dev/null @@ -1,9 +0,0 @@ -CREATE FUNCTION pg_catalog.citus_pause_node(node_id int) - RETURNS void - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$citus_pause_node$$; - -COMMENT ON FUNCTION pg_catalog.citus_pause_node(node_id int) - IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; - -REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node(int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_pause_node/latest.sql b/src/backend/distributed/sql/udfs/citus_pause_node/latest.sql deleted file mode 100644 index b3496149225..00000000000 --- a/src/backend/distributed/sql/udfs/citus_pause_node/latest.sql +++ /dev/null @@ -1,9 +0,0 @@ -CREATE FUNCTION pg_catalog.citus_pause_node(node_id int) - RETURNS void - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$citus_pause_node$$; - -COMMENT ON FUNCTION pg_catalog.citus_pause_node(node_id int) - IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; - -REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node(int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql new file mode 100644 index 00000000000..298263b52a2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql @@ -0,0 +1,9 @@ +CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$; + +COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int) + IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql new file mode 100644 index 00000000000..298263b52a2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql @@ -0,0 +1,9 @@ +CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$; + +COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int) + IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int) FROM PUBLIC; diff --git a/src/test/regress/expected/isolation_citus_pause_node.out b/src/test/regress/expected/isolation_citus_pause_node.out index 18371b724fd..231b777cf29 100644 --- a/src/test/regress/expected/isolation_citus_pause_node.out +++ b/src/test/regress/expected/isolation_citus_pause_node.out @@ -26,7 +26,7 @@ BEGIN -- Get the node id for the shard id SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; -- Pause the node - perform pg_catalog.citus_pause_node(v_node_id) ; + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; END; $$ LANGUAGE plpgsql; @@ -50,13 +50,13 @@ step s2-insert-distributed: -- If we reach this point, the INSERT statement was successful v_insert_successful := TRUE; IF v_insert_successful THEN - RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; END IF; -- You can add additional processing here if needed EXCEPTION WHEN query_canceled THEN -- The INSERT statement was canceled due to timeout - RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; WHEN OTHERS THEN -- Any other exception raised during the INSERT statement RAISE; @@ -65,7 +65,7 @@ step s2-insert-distributed: LANGUAGE plpgsql; step s2-insert-distributed: <... completed> -s2: NOTICE: query_canceled exception raised. This means that citus_pause_node was able to get the lock. +s2: NOTICE: query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock. step s2-end: COMMIT; @@ -99,7 +99,7 @@ BEGIN -- Get the node id for the shard id SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; -- Pause the node - perform pg_catalog.citus_pause_node(v_node_id) ; + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; END; $$ LANGUAGE plpgsql; @@ -123,13 +123,13 @@ BEGIN -- If we reach this point, the DELETE statement was successful v_delete_successful := TRUE; IF v_delete_successful THEN - RAISE NOTICE 'DELETE statement completed successfully. This means that citus_pause_node could not get the lock.'; + RAISE NOTICE 'DELETE statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; END IF; -- You can add additional processing here if needed EXCEPTION WHEN query_canceled THEN -- The INSERT statement was canceled due to timeout - RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; WHEN OTHERS THEN -- Any other exception raised during the INSERT statement RAISE; @@ -138,7 +138,7 @@ $$ LANGUAGE plpgsql; step s2-delete-distributed: <... completed> -s2: NOTICE: query_canceled exception raised. This means that citus_pause_node was able to get the lock. +s2: NOTICE: query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock. step s2-end: COMMIT; @@ -172,7 +172,7 @@ BEGIN -- Get the node id for the shard id SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; -- Pause the node - perform pg_catalog.citus_pause_node(v_node_id) ; + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; END; $$ LANGUAGE plpgsql; @@ -223,7 +223,7 @@ BEGIN -- Get the node id for the shard id SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; -- Pause the node - perform pg_catalog.citus_pause_node(v_node_id) ; + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; END; $$ LANGUAGE plpgsql; @@ -247,11 +247,11 @@ BEGIN -- If we reach this point, the INSERT statement was successful v_insert_successful := TRUE; IF v_insert_successful THEN - RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; END IF; EXCEPTION WHEN query_canceled THEN -- The INSERT statement was canceled due to timeout - RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; WHEN OTHERS THEN -- Any other exception raised during the INSERT statement RAISE; @@ -260,7 +260,7 @@ $$ LANGUAGE plpgsql; step s2-insert-reference: <... completed> -s2: NOTICE: query_canceled exception raised. This means that citus_pause_node was able to get the lock. +s2: NOTICE: query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock. step s2-end: COMMIT; @@ -294,7 +294,7 @@ BEGIN -- Get the node id for the shard id SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; -- Pause the node - perform pg_catalog.citus_pause_node(v_node_id) ; + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; END; $$ LANGUAGE plpgsql; @@ -323,7 +323,7 @@ BEGIN -- Get the node id for the shard id SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; -- Pause the node - perform pg_catalog.citus_pause_node(v_node_id) ; + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; END; $$ LANGUAGE plpgsql; @@ -348,7 +348,7 @@ DECLARE v_expected_exception_message text := ''; BEGIN select nextval('pg_dist_node_nodeid_seq')::int into v_node_id; - select citus_pause_node(v_node_id); + select citus_pause_node_within_txn(v_node_id); EXCEPTION WHEN SQLSTATE 'P0002' THEN GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index 8b8dcded9c7..152d62bd002 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -59,7 +59,7 @@ step "s1-node-not-found" v_expected_exception_message text := ''; BEGIN select nextval('pg_dist_node_nodeid_seq')::int into v_node_id; - select citus_pause_node(v_node_id) ; + select citus_pause_node_within_txn(v_node_id) ; EXCEPTION WHEN SQLSTATE 'P0002' THEN GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; @@ -98,7 +98,7 @@ step "s1-pause-node" -- Pause the node - perform pg_catalog.citus_pause_node(v_node_id) ; + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; END; $$ LANGUAGE plpgsql; @@ -136,7 +136,7 @@ step "s2-insert-distributed" v_insert_successful := TRUE; IF v_insert_successful THEN - RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; END IF; @@ -144,7 +144,7 @@ step "s2-insert-distributed" EXCEPTION WHEN query_canceled THEN -- The INSERT statement was canceled due to timeout - RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; WHEN OTHERS THEN -- Any other exception raised during the INSERT statement RAISE; @@ -172,12 +172,12 @@ step "s2-insert-reference"{ v_insert_successful := TRUE; IF v_insert_successful THEN - RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node could not get the lock.'; + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; END IF; EXCEPTION WHEN query_canceled THEN -- The INSERT statement was canceled due to timeout - RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; WHEN OTHERS THEN -- Any other exception raised during the INSERT statement RAISE; @@ -209,13 +209,13 @@ step "s2-delete-distributed"{ v_delete_successful := TRUE; IF v_delete_successful THEN - RAISE NOTICE 'DELETE statement completed successfully. This means that citus_pause_node could not get the lock.'; + RAISE NOTICE 'DELETE statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; END IF; -- You can add additional processing here if needed EXCEPTION WHEN query_canceled THEN -- The INSERT statement was canceled due to timeout - RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node was able to get the lock.'; + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; WHEN OTHERS THEN -- Any other exception raised during the INSERT statement RAISE; From 339a47a18a791f621399018e33e3e2ea5bf6af31 Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 20:01:27 +0300 Subject: [PATCH 19/35] Fixes review comments --- .../distributed/metadata/node_metadata.c | 21 ++++++++++++------- .../expected/upgrade_list_citus_objects.out | 2 +- src/test/regress/isolation_schedule | 1 - 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 44e80404c2a..aac0f0155f4 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -134,6 +134,7 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); +static void lock_shards_in_worker_placement_list(WorkerNode *workerNode, LOCKMODE lockMode); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_set_coordinator_host); @@ -1160,6 +1161,15 @@ ActivateNodeList(MetadataSyncContext *context) SetNodeMetadata(context, localOnly); } +/* +* Adds locks into all shards placed into given workerNode. +*/ +void lock_shards_in_worker_placement_list(WorkerNode *workerNode , LOCKMODE lockMode){ + List *placementList = NIL; + + placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); + LockShardsInPlacementListMetadata(placementList, lockMode); +} /* * citus_update_node moves the requested node to a different nodename and nodeport. It @@ -1189,7 +1199,6 @@ citus_update_node(PG_FUNCTION_ARGS) int32 lock_cooldown = PG_GETARG_INT32(4); char *newNodeNameString = text_to_cstring(newNodeName); - List *placementList = NIL; BackgroundWorkerHandle *handle = NULL; WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, @@ -1282,8 +1291,7 @@ citus_update_node(PG_FUNCTION_ARGS) } } - placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); - LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); + lock_shards_in_worker_placement_list(workerNode, AccessExclusiveLock); } /* @@ -1331,13 +1339,14 @@ citus_update_node(PG_FUNCTION_ARGS) } + + Datum citus_pause_node_within_txn(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); int32 nodeId = PG_GETARG_INT32(0); - List *placementList = NIL; WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId); if (workerNode == NULL) @@ -1346,11 +1355,9 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS) errmsg("node %u not found", nodeId))); } - if (NodeIsPrimary(workerNode)) { - placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); - LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); + lock_shards_in_worker_placement_list(workerNode, AccessExclusiveLock); } PG_RETURN_VOID(); diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index efc93f6519f..b94a120bc03 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -103,7 +103,7 @@ ORDER BY 1; function citus_nodeid_for_gpid(bigint) function citus_nodename_for_nodeid(integer) function citus_nodeport_for_nodeid(integer) - function citus_pause_node(integer) + function citus_pause_node_within_txn(integer) function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 5fe9af7bc7e..d8cc77c73f3 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -80,7 +80,6 @@ test: isolation_schema_based_sharding test: isolation_citus_pause_node test: isolation_citus_schema_distribute_undistribute - # Rebalancer test: isolation_blocking_move_single_shard_commands test: isolation_blocking_move_multi_shard_commands From 4c3341e64be83068f1f6a2a291236054c64f1bd2 Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 20:07:24 +0300 Subject: [PATCH 20/35] Fixes indentation --- src/backend/distributed/metadata/node_metadata.c | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index aac0f0155f4..8e092bdfdb0 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -134,7 +134,8 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); -static void lock_shards_in_worker_placement_list(WorkerNode *workerNode, LOCKMODE lockMode); +static void lock_shards_in_worker_placement_list(WorkerNode *workerNode, LOCKMODE + lockMode); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_set_coordinator_host); @@ -1161,16 +1162,20 @@ ActivateNodeList(MetadataSyncContext *context) SetNodeMetadata(context, localOnly); } + /* -* Adds locks into all shards placed into given workerNode. -*/ -void lock_shards_in_worker_placement_list(WorkerNode *workerNode , LOCKMODE lockMode){ + * Adds locks into all shards placed into given workerNode. + */ +void +lock_shards_in_worker_placement_list(WorkerNode *workerNode, LOCKMODE lockMode) +{ List *placementList = NIL; placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); LockShardsInPlacementListMetadata(placementList, lockMode); } + /* * citus_update_node moves the requested node to a different nodename and nodeport. It * locks to ensure no queries are running concurrently; and is intended for customers who @@ -1339,8 +1344,6 @@ citus_update_node(PG_FUNCTION_ARGS) } - - Datum citus_pause_node_within_txn(PG_FUNCTION_ARGS) { From b69c36af492d21239869786a5f2d7ad36268ee8e Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 20:23:07 +0300 Subject: [PATCH 21/35] Fixes static code analysis issues --- src/backend/distributed/metadata/node_metadata.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 8e092bdfdb0..6d6aeadd1df 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -134,7 +134,7 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); -static void lock_shards_in_worker_placement_list(WorkerNode *workerNode, LOCKMODE +static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode); /* declarations for dynamic loading */ @@ -1167,11 +1167,10 @@ ActivateNodeList(MetadataSyncContext *context) * Adds locks into all shards placed into given workerNode. */ void -lock_shards_in_worker_placement_list(WorkerNode *workerNode, LOCKMODE lockMode) +LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode) { - List *placementList = NIL; - placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); + List *placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); LockShardsInPlacementListMetadata(placementList, lockMode); } @@ -1296,7 +1295,7 @@ citus_update_node(PG_FUNCTION_ARGS) } } - lock_shards_in_worker_placement_list(workerNode, AccessExclusiveLock); + LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); } /* @@ -1360,7 +1359,7 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS) if (NodeIsPrimary(workerNode)) { - lock_shards_in_worker_placement_list(workerNode, AccessExclusiveLock); + LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); } PG_RETURN_VOID(); From c41f93e40298ad17b880ffedb5aab0a97c34276d Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 20:28:27 +0300 Subject: [PATCH 22/35] Fixes indentation --- src/backend/distributed/metadata/node_metadata.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 6d6aeadd1df..7aa07933571 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -135,7 +135,7 @@ static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionP static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE - lockMode); + lockMode); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_set_coordinator_host); @@ -1169,7 +1169,6 @@ ActivateNodeList(MetadataSyncContext *context) void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode) { - List *placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); LockShardsInPlacementListMetadata(placementList, lockMode); } From 1a1b633d5575a84f239c5508d1252242ccefdab4 Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 20:31:45 +0300 Subject: [PATCH 23/35] Fixes multi extension tests --- src/test/regress/expected/multi_extension.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 79e5a85ec7a..db5ad3288a7 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1401,7 +1401,7 @@ ALTER EXTENSION citus UPDATE TO '12.1-1'; SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- - | function citus_pause_node(integer) void + | function citus_pause_node_within_txn(integer) void (1 row) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; From 3220bd9c476c0dfd92a169f8d169afdfd42fe1f5 Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 21:22:22 +0300 Subject: [PATCH 24/35] Fixes test errors after rebase --- src/backend/distributed/metadata/node_metadata.c | 3 +++ src/test/regress/expected/upgrade_list_citus_objects.out | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 7aa07933571..5b8ca1f0137 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1342,6 +1342,9 @@ citus_update_node(PG_FUNCTION_ARGS) } +/* + * + */ Datum citus_pause_node_within_txn(PG_FUNCTION_ARGS) { diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index b94a120bc03..d536bfdfa00 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -339,5 +339,5 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(331 rows) +(329 rows) From 6f2ddf44b434da795fadca9e2231b31895a3e0ed Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 21:30:34 +0300 Subject: [PATCH 25/35] Removes empty line --- src/test/regress/expected/upgrade_list_citus_objects.out | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 89428bf7872..e03a6656efa 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -5,8 +5,6 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND refobjid = e.oid AND deptype = 'e' AND e.extname='citus' - AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value(anyelement)' - AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value_agg(anyelement,anyelement)' ORDER BY 1; description --------------------------------------------------------------------- @@ -15,6 +13,8 @@ ORDER BY 1; function alter_old_partitions_set_access_method(regclass,timestamp with time zone,name) function alter_role_if_exists(text,text) function alter_table_set_access_method(regclass,text) + function any_value(anyelement) + function any_value_agg(anyelement,anyelement) function array_cat_agg(anycompatiblearray) function assign_distributed_transaction_id(integer,bigint,timestamp with time zone) function authinfo_valid(text) @@ -340,5 +340,3 @@ ORDER BY 1; view pg_dist_shard_placement view time_partitions (329 rows) - - From cc403bf458ec7ce740d9c889fa7debf14945eee7 Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 23:31:29 +0300 Subject: [PATCH 26/35] Fixes upgrade tests --- src/test/regress/expected/upgrade_list_citus_objects.out | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index e03a6656efa..7a45521748e 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -13,8 +13,6 @@ ORDER BY 1; function alter_old_partitions_set_access_method(regclass,timestamp with time zone,name) function alter_role_if_exists(text,text) function alter_table_set_access_method(regclass,text) - function any_value(anyelement) - function any_value_agg(anyelement,anyelement) function array_cat_agg(anycompatiblearray) function assign_distributed_transaction_id(integer,bigint,timestamp with time zone) function authinfo_valid(text) From a05d5fc5fcfec6470d269320ca36aaac78e7afcb Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 23:45:02 +0300 Subject: [PATCH 27/35] Fixes upgrade tests --- src/test/regress/expected/upgrade_list_citus_objects.out | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 7a45521748e..9dcc021f6c2 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -5,6 +5,8 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND refobjid = e.oid AND deptype = 'e' AND e.extname='citus' + AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value(anyelement)' + AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value_agg(anyelement,anyelement)' ORDER BY 1; description --------------------------------------------------------------------- @@ -338,3 +340,4 @@ ORDER BY 1; view pg_dist_shard_placement view time_partitions (329 rows) + From 29c5b0c98a83e82fec0503ec83ad67f0225c26df Mon Sep 17 00:00:00 2001 From: gindibay Date: Sun, 30 Jul 2023 00:41:51 +0300 Subject: [PATCH 28/35] Adds comments to methods --- src/backend/distributed/metadata/node_metadata.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 5b8ca1f0137..f0db234b272 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1164,7 +1164,11 @@ ActivateNodeList(MetadataSyncContext *context) /* - * Adds locks into all shards placed into given workerNode. + * Acquires shard metadata locks on all shards residing in the given worker node + * + * TODO: This function is not compatible with query from any node feature. + * To ensure proper behavior, it is essential to acquire locks on placements across all nodes + * rather than limiting it to just the coordinator (or the specific node from which this function is called) */ void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode) @@ -1343,8 +1347,12 @@ citus_update_node(PG_FUNCTION_ARGS) /* - * - */ +This function is designed to obtain locks for all the shards in a worker placement list. +Once the transaction is committed, the acquired locks will be automatically released. +Therefore, it is essential to invoke this function within a transaction. +This function proves beneficial when there is a need to temporarily disable writes to a specific node within a transaction. +*/ + Datum citus_pause_node_within_txn(PG_FUNCTION_ARGS) { From 9e79cd610e96ff92e37447595671e4fc76fec5ae Mon Sep 17 00:00:00 2001 From: gindibay Date: Sun, 30 Jul 2023 01:17:36 +0300 Subject: [PATCH 29/35] Fixes indent issues --- src/backend/distributed/metadata/node_metadata.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index f0db234b272..3a8df5ca922 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1347,12 +1347,11 @@ citus_update_node(PG_FUNCTION_ARGS) /* -This function is designed to obtain locks for all the shards in a worker placement list. -Once the transaction is committed, the acquired locks will be automatically released. -Therefore, it is essential to invoke this function within a transaction. -This function proves beneficial when there is a need to temporarily disable writes to a specific node within a transaction. -*/ - + * This function is designed to obtain locks for all the shards in a worker placement list. + * Once the transaction is committed, the acquired locks will be automatically released. + * Therefore, it is essential to invoke this function within a transaction. + * This function proves beneficial when there is a need to temporarily disable writes to a specific node within a transaction. + */ Datum citus_pause_node_within_txn(PG_FUNCTION_ARGS) { From de83b0130554bb40a62ae633bebcd80521eb121c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCrkan=20=C4=B0ndibay?= Date: Mon, 7 Aug 2023 18:44:50 +0300 Subject: [PATCH 30/35] Update src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql Co-authored-by: Hanefi Onaldi --- src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql index 958fa74f82e..f01b3087519 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql @@ -1,6 +1,5 @@ -- citus--12.1-1--12.0-1 --- this is an empty downgrade path since citus--12.0-1--12.1-1.sql is empty for now DROP FUNCTION pg_catalog.citus_pause_node_within_txn(int); From 3fbe5e48612df0651abd1e247b32d7a8c479290e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCrkan=20=C4=B0ndibay?= Date: Mon, 7 Aug 2023 18:45:30 +0300 Subject: [PATCH 31/35] Update src/backend/distributed/sql/citus--12.0-1--12.1-1.sql Co-authored-by: Hanefi Onaldi --- src/backend/distributed/sql/citus--12.0-1--12.1-1.sql | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql index 7214f7ece8a..a150ec2b018 100644 --- a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql +++ b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql @@ -1,10 +1,7 @@ -- citus--12.0-1--12.1-1 -#include "udfs/citus_pause_node_within_txn/12.1-1.sql" - - - -- bump version to 12.1-1 +#include "udfs/citus_pause_node_within_txn/12.1-1.sql" #include "udfs/citus_prepare_pg_upgrade/12.1-1.sql" #include "udfs/citus_finish_pg_upgrade/12.1-1.sql" From 48a5450d595a00be4734956522cee305a9cd9587 Mon Sep 17 00:00:00 2001 From: gindibay Date: Thu, 17 Aug 2023 12:05:58 +0300 Subject: [PATCH 32/35] Fixes review comments --- .../distributed/metadata/node_metadata.c | 159 ++++++++++-------- 1 file changed, 89 insertions(+), 70 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 3a8df5ca922..7a1ef237abf 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -9,7 +9,6 @@ #include "funcapi.h" #include "utils/plancache.h" - #include "access/genam.h" #include "access/heapam.h" #include "access/htup.h" @@ -102,8 +101,8 @@ static HeapTuple GetNodeByNodeId(int32 nodeId); static int32 GetNextGroupId(void); static int GetNextNodeId(void); static void InsertPlaceholderCoordinatorRecord(void); -static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata - *nodeMetadata); +static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, + NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -163,7 +162,6 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid); PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); - /* * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to * sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK. @@ -547,7 +545,8 @@ citus_disable_node(PG_FUNCTION_ARGS) "metadata is not allowed"), errhint("You can force disabling node, SELECT " "citus_disable_node('%s', %d, " - "synchronous:=true);", workerNode->workerName, + "synchronous:=true);", + workerNode->workerName, nodePort), errdetail("Citus uses the first worker node in the " "metadata for certain internal operations when " @@ -696,8 +695,7 @@ citus_set_node_property(PG_FUNCTION_ARGS) else { ereport(ERROR, (errmsg( - "only the 'shouldhaveshards' property can be set using this function" - ))); + "only the 'shouldhaveshards' property can be set using this function"))); } TransactionModifiedNodeMetadata = true; @@ -1178,6 +1176,81 @@ LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode) } +BackgroundWorkerHandle * +CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown) +{ + BackgroundWorkerHandle *handle = NULL; + handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown); + if (!handle) + { + /* + * We failed to start a background worker, which probably means that we exceeded + * max_worker_processes, and this is unlikely to be resolved by retrying. We do not want + * to repeatedly throw an error because if citus_update_node is called to complete a + * failover then finishing is the only way to bring the cluster back up. Therefore we + * give up on killing other backends and simply wait for the lock. We do set + * lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock. + */ + SetLockTimeoutLocally(lock_cooldown); + ereport(WARNING, (errmsg( + "could not start background worker to kill backends with conflicting" + " locks to force the update. Degrading to acquiring locks " + "with a lock time out."), + errhint( + "Increasing max_worker_processes might help."))); + } + return handle; +} + + +/* + * This function is used to lock shards in a primary node. + * If force is true, we start a background worker to kill backends holding + * conflicting locks with this backend. + * + * If the node is a primary node we block reads and writes. + * + * This lock has two purposes: + * + * - Ensure buggy code in Citus doesn't cause failures when the + * nodename/nodeport of a node changes mid-query + * + * - Provide fencing during failover, after this function returns all + * connections will use the new node location. + * + * Drawback: + * + * - This function blocks until all previous queries have finished. This + * means that long-running queries will prevent failover. + * + * In case of node failure said long-running queries will fail in the end + * anyway as they will be unable to commit successfully on the failed + * machine. To cause quick failure of these queries use force => true + * during the invocation of citus_update_node to terminate conflicting + * backends proactively. + * + * It might be worth blocking reads to a secondary for the same reasons, + * though we currently only query secondaries on follower clusters + * where these locks will have no effect. + */ +BackgroundWorkerHandle * +LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32 + lock_cooldown) +{ + BackgroundWorkerHandle *handle = NULL; + + if (NodeIsPrimary(workerNode)) + { + if (force) + { + handle = CheckBackgroundWorkerToObtainLocks(lock_cooldown); + } + LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); + } + return handle; +} + + /* * citus_update_node moves the requested node to a different nodename and nodeport. It * locks to ensure no queries are running concurrently; and is intended for customers who @@ -1243,63 +1316,8 @@ citus_update_node(PG_FUNCTION_ARGS) EnsureTransactionalMetadataSyncMode(); } - /* - * If the node is a primary node we block reads and writes. - * - * This lock has two purposes: - * - * - Ensure buggy code in Citus doesn't cause failures when the - * nodename/nodeport of a node changes mid-query - * - * - Provide fencing during failover, after this function returns all - * connections will use the new node location. - * - * Drawback: - * - * - This function blocks until all previous queries have finished. This - * means that long-running queries will prevent failover. - * - * In case of node failure said long-running queries will fail in the end - * anyway as they will be unable to commit successfully on the failed - * machine. To cause quick failure of these queries use force => true - * during the invocation of citus_update_node to terminate conflicting - * backends proactively. - * - * It might be worth blocking reads to a secondary for the same reasons, - * though we currently only query secondaries on follower clusters - * where these locks will have no effect. - */ - if (NodeIsPrimary(workerNode)) - { - /* - * before acquiring the locks check if we want a background worker to help us to - * aggressively obtain the locks. - */ - if (force) - { - handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown); - if (!handle) - { - /* - * We failed to start a background worker, which probably means that we exceeded - * max_worker_processes, and this is unlikely to be resolved by retrying. We do not want - * to repeatedly throw an error because if citus_update_node is called to complete a - * failover then finishing is the only way to bring the cluster back up. Therefore we - * give up on killing other backends and simply wait for the lock. We do set - * lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock. - */ - SetLockTimeoutLocally(lock_cooldown); - ereport(WARNING, (errmsg( - "could not start background worker to kill backends with conflicting" - " locks to force the update. Degrading to acquiring locks " - "with a lock time out."), - errhint( - "Increasing max_worker_processes might help."))); - } - } - - LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); - } + handle = LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, + lock_cooldown); /* * if we have planned statements such as prepared statements, we should clear the cache so that @@ -1358,6 +1376,8 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); int32 nodeId = PG_GETARG_INT32(0); + bool force = PG_GETARG_BOOL(1); + int32 lock_cooldown = PG_GETARG_INT32(2); WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId); if (workerNode == NULL) @@ -1366,10 +1386,7 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS) errmsg("node %u not found", nodeId))); } - if (NodeIsPrimary(workerNode)) - { - LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); - } + LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, lock_cooldown); PG_RETURN_VOID(); } @@ -1992,7 +2009,8 @@ ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode) ereport(ERROR, (errmsg("cannot remove or disable the node " "%s:%d because because it contains " "the only shard placement for " - "shard " UINT64_FORMAT, workerNode->workerName, + "shard " UINT64_FORMAT, + workerNode->workerName, workerNode->workerPort, placement->shardId), errdetail("One of the table(s) that prevents the operation " "complete successfully is %s", @@ -2544,7 +2562,8 @@ ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *fi if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID) { ereport(ERROR, (errmsg("cannot change \"%s\" field of the " - "coordinator node", field))); + "coordinator node", + field))); } } From 7ac5f21dc060dbc5d3ac1b473154fbda92db51b4 Mon Sep 17 00:00:00 2001 From: gindibay Date: Thu, 17 Aug 2023 12:19:32 +0300 Subject: [PATCH 33/35] Adds a new empty space --- src/backend/distributed/metadata/node_metadata.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 7a1ef237abf..431c69602c4 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1018,6 +1018,7 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly) BoolGetDatum(true)); updatedActivatedNodeList = lappend(updatedActivatedNodeList, node); + } /* reset activated nodes inside metadataSyncContext afer local update */ From 458edd82943dbc752719e2813b02ee06dbb4174b Mon Sep 17 00:00:00 2001 From: gindibay Date: Thu, 17 Aug 2023 13:40:48 +0300 Subject: [PATCH 34/35] Fixes code changes --- src/backend/distributed/metadata/node_metadata.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 431c69602c4..54988c0a673 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -135,6 +135,10 @@ static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode); +static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown); +static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32 lock_cooldown); + +/* Function definitions go here */ /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_set_coordinator_host); From f641860175a1f47ec8a1904ca8d054edd42df34f Mon Sep 17 00:00:00 2001 From: gindibay Date: Thu, 17 Aug 2023 15:30:15 +0300 Subject: [PATCH 35/35] Fixes test issues --- .../sql/downgrades/citus--12.1-1--12.0-1.sql | 2 +- .../citus_pause_node_within_txn/12.1-1.sql | 10 +- .../citus_pause_node_within_txn/latest.sql | 8 +- .../expected/isolation_citus_pause_node.out | 540 ++++++++++-------- .../expected/upgrade_list_citus_objects.out | 2 +- .../spec/isolation_citus_pause_node.spec | 34 ++ 6 files changed, 355 insertions(+), 241 deletions(-) diff --git a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql index f01b3087519..6da7664e853 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql @@ -1,7 +1,7 @@ -- citus--12.1-1--12.0-1 -DROP FUNCTION pg_catalog.citus_pause_node_within_txn(int); +DROP FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int); -- we have modified the relevant upgrade script to include any_value changes diff --git a/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql index 298263b52a2..12cb54c0a1c 100644 --- a/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/12.1-1.sql @@ -1,9 +1,13 @@ -CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int) +CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int, + force bool DEFAULT false, + lock_cooldown int DEFAULT 10000) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$; -COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int) +COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int, + force bool, + lock_cooldown int) IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; -REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int) FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql index 298263b52a2..d540df80953 100644 --- a/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_pause_node_within_txn/latest.sql @@ -1,9 +1,11 @@ -CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int) +CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int,force bool ,lock_cooldown int) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$; -COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int) +COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int, + force bool, + lock_cooldown int ) IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node'; -REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int) FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int) FROM PUBLIC; diff --git a/src/test/regress/expected/isolation_citus_pause_node.out b/src/test/regress/expected/isolation_citus_pause_node.out index 231b777cf29..df50af47147 100644 --- a/src/test/regress/expected/isolation_citus_pause_node.out +++ b/src/test/regress/expected/isolation_citus_pause_node.out @@ -6,30 +6,104 @@ step s1-begin: s1: NOTICE: step s1-pause-node: -SET client_min_messages = 'notice'; -DO $$ -DECLARE - v_shard_id int; - v_node_id int; - v_node_name text; - v_node_port int; -BEGIN ---The first message in the block is being printed on the top of the code block. So adding a dummy message ---to make sure that the first message is printed in correct place. - raise notice ''; - -- Get the shard id for the distribution column - SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; - --Get the node id for the shard id - SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; - raise notice 'node name is %',v_node_name; - raise notice 'node port is %',v_node_port; - -- Get the node id for the shard id - SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; - -- Pause the node - perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; -END; -$$ -LANGUAGE plpgsql; + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; + +s1: NOTICE: node name is localhost +s1: NOTICE: node port is 57638 +step s2-begin: + BEGIN; + +step s2-insert-distributed: + -- Set statement_timeout for the session (in milliseconds) + SET statement_timeout = 1000; -- 1 seconds + SET client_min_messages = 'notice'; + -- Variable to track if the INSERT statement was successful + DO $$ + DECLARE + v_insert_successful BOOLEAN := FALSE; + BEGIN + -- Execute the INSERT statement + insert into employee values(11,'e11',3); + -- If we reach this point, the INSERT statement was successful + v_insert_successful := TRUE; + IF v_insert_successful THEN + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; + END IF; + -- You can add additional processing here if needed + EXCEPTION + WHEN query_canceled THEN + -- The INSERT statement was canceled due to timeout + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; + WHEN OTHERS THEN + -- Any other exception raised during the INSERT statement + RAISE; + END; + $$ + LANGUAGE plpgsql; + +step s2-insert-distributed: <... completed> +s2: NOTICE: query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock. +step s2-end: + COMMIT; + +step s1-end: + COMMIT; + + +starting permutation: s1-begin s1-pause-node-force s2-begin s2-insert-distributed s2-end s1-end +step s1-begin: + BEGIN; + +s1: NOTICE: +step s1-pause-node-force: + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + v_force boolean := true; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node with force true + perform pg_catalog.citus_pause_node_within_txn(v_node_id,v_force) ; + END; + $$ + LANGUAGE plpgsql; s1: NOTICE: node name is localhost s1: NOTICE: node port is 57638 @@ -75,115 +149,115 @@ step s1-end: starting permutation: s1-begin s1-pause-node s2-begin s2-delete-distributed s2-end s1-end step s1-begin: - BEGIN; + BEGIN; s1: NOTICE: step s1-pause-node: -SET client_min_messages = 'notice'; -DO $$ -DECLARE - v_shard_id int; - v_node_id int; - v_node_name text; - v_node_port int; -BEGIN - --The first message in the block is being printed on the top of the code block. So adding a dummy message - --to make sure that the first message is printed in correct place. - raise notice ''; - -- Get the shard id for the distribution column - SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; - --Get the node id for the shard id - SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; - raise notice 'node name is %',v_node_name; - raise notice 'node port is %',v_node_port; - -- Get the node id for the shard id - SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; - -- Pause the node - perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; -END; -$$ -LANGUAGE plpgsql; + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; s1: NOTICE: node name is localhost s1: NOTICE: node port is 57638 step s2-begin: -BEGIN; + BEGIN; step s2-delete-distributed: --- Set statement_timeout for the session (in milliseconds) -SET statement_timeout = 1000; -- 1 seconds -SET client_min_messages = 'notice'; --- Variable to track if the DELETE statement was successful -DO $$ -DECLARE - v_delete_successful BOOLEAN := FALSE; -BEGIN - -- Execute the DELETE statement - delete from employee where id = 9; - -- If we reach this point, the DELETE statement was successful - v_delete_successful := TRUE; - IF v_delete_successful THEN - RAISE NOTICE 'DELETE statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; - END IF; --- You can add additional processing here if needed -EXCEPTION - WHEN query_canceled THEN - -- The INSERT statement was canceled due to timeout - RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; - WHEN OTHERS THEN - -- Any other exception raised during the INSERT statement - RAISE; -END; -$$ -LANGUAGE plpgsql; - + -- Set statement_timeout for the session (in milliseconds) + SET statement_timeout = 1000; -- 1 seconds + SET client_min_messages = 'notice'; + -- Variable to track if the DELETE statement was successful + DO $$ + DECLARE + v_delete_successful BOOLEAN := FALSE; + BEGIN + -- Execute the DELETE statement + delete from employee where id = 9; + -- If we reach this point, the DELETE statement was successful + v_delete_successful := TRUE; + IF v_delete_successful THEN + RAISE NOTICE 'DELETE statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; + END IF; + -- You can add additional processing here if needed + EXCEPTION + WHEN query_canceled THEN + -- The INSERT statement was canceled due to timeout + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; + WHEN OTHERS THEN + -- Any other exception raised during the INSERT statement + RAISE; + END; + $$ + LANGUAGE plpgsql; + step s2-delete-distributed: <... completed> s2: NOTICE: query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock. step s2-end: -COMMIT; + COMMIT; step s1-end: - COMMIT; + COMMIT; starting permutation: s1-begin s1-pause-node s2-begin s2-select-distributed s2-end s1-end step s1-begin: - BEGIN; + BEGIN; s1: NOTICE: step s1-pause-node: -SET client_min_messages = 'notice'; -DO $$ -DECLARE - v_shard_id int; - v_node_id int; - v_node_name text; - v_node_port int; -BEGIN - --The first message in the block is being printed on the top of the code block. So adding a dummy message - --to make sure that the first message is printed in correct place. - raise notice ''; - -- Get the shard id for the distribution column - SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; - --Get the node id for the shard id - SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; - raise notice 'node name is %',v_node_name; - raise notice 'node port is %',v_node_port; - -- Get the node id for the shard id - SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; - -- Pause the node - perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; -END; -$$ -LANGUAGE plpgsql; + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; s1: NOTICE: node name is localhost s1: NOTICE: node port is 57638 step s2-begin: -BEGIN; + BEGIN; step s2-select-distributed: -select * from employee where id = 10; + select * from employee where id = 10; id|name|company_id --------------------------------------------------------------------- @@ -191,175 +265,175 @@ id|name|company_id (1 row) step s2-end: -COMMIT; + COMMIT; step s1-end: - COMMIT; + COMMIT; starting permutation: s1-begin s1-pause-node s2-begin s2-insert-reference s2-end s1-end step s1-begin: - BEGIN; + BEGIN; s1: NOTICE: step s1-pause-node: -SET client_min_messages = 'notice'; -DO $$ -DECLARE - v_shard_id int; - v_node_id int; - v_node_name text; - v_node_port int; -BEGIN - --The first message in the block is being printed on the top of the code block. So adding a dummy message - --to make sure that the first message is printed in correct place. - raise notice ''; - -- Get the shard id for the distribution column - SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; - --Get the node id for the shard id - SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; - raise notice 'node name is %',v_node_name; - raise notice 'node port is %',v_node_port; - -- Get the node id for the shard id - SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; - -- Pause the node - perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; -END; -$$ -LANGUAGE plpgsql; + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; s1: NOTICE: node name is localhost s1: NOTICE: node port is 57638 step s2-begin: -BEGIN; + BEGIN; step s2-insert-reference: --- Set statement_timeout for the session (in milliseconds) -SET statement_timeout = 1000; -- 1 seconds -SET client_min_messages = 'notice'; --- Variable to track if the INSERT statement was successful -DO $$ -DECLARE - v_insert_successful BOOLEAN := FALSE; -BEGIN - -- Execute the INSERT statement - insert into city values(3,'city3'); - -- If we reach this point, the INSERT statement was successful - v_insert_successful := TRUE; - IF v_insert_successful THEN - RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; - END IF; -EXCEPTION WHEN query_canceled THEN - -- The INSERT statement was canceled due to timeout - RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; - WHEN OTHERS THEN - -- Any other exception raised during the INSERT statement - RAISE; -END; -$$ -LANGUAGE plpgsql; - + -- Set statement_timeout for the session (in milliseconds) + SET statement_timeout = 1000; -- 1 seconds + SET client_min_messages = 'notice'; + -- Variable to track if the INSERT statement was successful + DO $$ + DECLARE + v_insert_successful BOOLEAN := FALSE; + BEGIN + -- Execute the INSERT statement + insert into city values(3,'city3'); + -- If we reach this point, the INSERT statement was successful + v_insert_successful := TRUE; + IF v_insert_successful THEN + RAISE NOTICE 'INSERT statement completed successfully. This means that citus_pause_node_within_txn could not get the lock.'; + END IF; + EXCEPTION WHEN query_canceled THEN + -- The INSERT statement was canceled due to timeout + RAISE NOTICE 'query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock.'; + WHEN OTHERS THEN + -- Any other exception raised during the INSERT statement + RAISE; + END; + $$ + LANGUAGE plpgsql; + step s2-insert-reference: <... completed> s2: NOTICE: query_canceled exception raised. This means that citus_pause_node_within_txn was able to get the lock. step s2-end: -COMMIT; + COMMIT; step s1-end: - COMMIT; + COMMIT; starting permutation: s1-begin s1-pause-node s1-pause-node s1-end step s1-begin: - BEGIN; + BEGIN; s1: NOTICE: step s1-pause-node: -SET client_min_messages = 'notice'; -DO $$ -DECLARE - v_shard_id int; - v_node_id int; - v_node_name text; - v_node_port int; -BEGIN - --The first message in the block is being printed on the top of the code block. So adding a dummy message - --to make sure that the first message is printed in correct place. - raise notice ''; - -- Get the shard id for the distribution column - SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; - --Get the node id for the shard id - SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; - raise notice 'node name is %',v_node_name; - raise notice 'node port is %',v_node_port; - -- Get the node id for the shard id - SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; - -- Pause the node - perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; -END; -$$ -LANGUAGE plpgsql; + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; s1: NOTICE: node name is localhost s1: NOTICE: node port is 57638 s1: NOTICE: step s1-pause-node: -SET client_min_messages = 'notice'; -DO $$ -DECLARE - v_shard_id int; - v_node_id int; - v_node_name text; - v_node_port int; -BEGIN - --The first message in the block is being printed on the top of the code block. So adding a dummy message - --to make sure that the first message is printed in correct place. - raise notice ''; - -- Get the shard id for the distribution column - SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; - --Get the node id for the shard id - SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; - raise notice 'node name is %',v_node_name; - raise notice 'node port is %',v_node_port; - -- Get the node id for the shard id - SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; - -- Pause the node - perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; -END; -$$ -LANGUAGE plpgsql; + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + -- Pause the node + perform pg_catalog.citus_pause_node_within_txn(v_node_id) ; + END; + $$ + LANGUAGE plpgsql; s1: NOTICE: node name is localhost s1: NOTICE: node port is 57638 step s1-end: - COMMIT; + COMMIT; starting permutation: s1-begin s1-node-not-found s1-end step s1-begin: - BEGIN; + BEGIN; -s1: NOTICE: Node not found. +s1: NOTICE: Node not found. step s1-node-not-found: -DO $$ -DECLARE - v_node_id int := -1; - v_node_exists boolean := true; - v_exception_message text; - v_expected_exception_message text := ''; - BEGIN - select nextval('pg_dist_node_nodeid_seq')::int into v_node_id; - select citus_pause_node_within_txn(v_node_id); -EXCEPTION - WHEN SQLSTATE 'P0002' THEN - GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; - v_expected_exception_message := 'node ' || v_node_id || ' not found'; - if v_exception_message = v_expected_exception_message then - RAISE NOTICE 'Node not found.'; - end if; -END; -$$ -LANGUAGE plpgsql; + DO $$ + DECLARE + v_node_id int:= -1; + v_node_exists boolean := true; + v_exception_message text; + v_expected_exception_message text := ''; + BEGIN + select nextval('pg_dist_node_nodeid_seq')::int into v_node_id; + select citus_pause_node_within_txn(v_node_id) ; + EXCEPTION + WHEN SQLSTATE 'P0002' THEN + GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT; + v_expected_exception_message := 'node ' || v_node_id || ' not found'; + if v_exception_message = v_expected_exception_message then + RAISE NOTICE 'Node not found.'; + end if; + END; + $$ + LANGUAGE plpgsql; step s1-end: - COMMIT; + COMMIT; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 9dcc021f6c2..6652acd6f01 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -103,7 +103,7 @@ ORDER BY 1; function citus_nodeid_for_gpid(bigint) function citus_nodename_for_nodeid(integer) function citus_nodeport_for_nodeid(integer) - function citus_pause_node_within_txn(integer) + function citus_pause_node_within_txn(integer,bool,integer) function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() diff --git a/src/test/regress/spec/isolation_citus_pause_node.spec b/src/test/regress/spec/isolation_citus_pause_node.spec index 152d62bd002..d4132ec593d 100644 --- a/src/test/regress/spec/isolation_citus_pause_node.spec +++ b/src/test/regress/spec/isolation_citus_pause_node.spec @@ -104,6 +104,39 @@ step "s1-pause-node" LANGUAGE plpgsql; } +step "s1-pause-node-force" +{ + SET client_min_messages = 'notice'; + DO $$ + DECLARE + v_shard_id int; + v_node_id int; + v_node_name text; + v_node_port int; + v_force boolean := true; + BEGIN + --The first message in the block is being printed on the top of the code block. So adding a dummy message + --to make sure that the first message is printed in correct place. + raise notice ''; + -- Get the shard id for the distribution column + SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id; + + --Get the node id for the shard id + SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1; + raise notice 'node name is %',v_node_name; + raise notice 'node port is %',v_node_port; + + -- Get the node id for the shard id + SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1; + + + -- Pause the node with force true + perform pg_catalog.citus_pause_node_within_txn(v_node_id,v_force) ; + END; + $$ + LANGUAGE plpgsql; +} + step "s1-end" { COMMIT; @@ -230,6 +263,7 @@ step "s2-end" } permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-insert-distributed" "s2-end" "s1-end" +permutation "s1-begin" "s1-pause-node-force" "s2-begin" "s2-insert-distributed" "s2-end" "s1-end" permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-delete-distributed" "s2-end" "s1-end" permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-select-distributed" "s2-end" "s1-end" permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-insert-reference" "s2-end" "s1-end"