From 333bcc794851ea11c0f1729914daee9cc4c4a404 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20Ozan=20Akg=C3=BCl?= Date: Wed, 9 Mar 2022 13:15:59 +0300 Subject: [PATCH] Global PID Helper Functions (#5768) * Introduces citus_nodename_for_nodeid and citus_nodeport_for_nodeid functions * Introduces citus_nodeid_for_gpid and citus_pid_for_gpid functions * Add tests --- .../distributed/metadata/node_metadata.c | 58 +++++++++- .../distributed/sql/citus--10.2-4--11.0-1.sql | 6 + .../sql/downgrades/citus--11.0-1--10.2-4.sql | 12 +- .../sql/udfs/citus_nodeid_for_gpid/11.0-1.sql | 7 ++ .../sql/udfs/citus_nodeid_for_gpid/latest.sql | 7 ++ .../udfs/citus_nodename_for_nodeid/11.0-1.sql | 7 ++ .../udfs/citus_nodename_for_nodeid/latest.sql | 7 ++ .../udfs/citus_nodeport_for_nodeid/11.0-1.sql | 7 ++ .../udfs/citus_nodeport_for_nodeid/latest.sql | 7 ++ .../sql/udfs/citus_pid_for_gpid/11.0-1.sql | 7 ++ .../sql/udfs/citus_pid_for_gpid/latest.sql | 7 ++ src/backend/distributed/test/global_pid.c | 33 ++++++ .../distributed/transaction/backend_data.c | 31 +++++ src/test/regress/expected/global_cancel.out | 22 ++++ .../isolation_citus_dist_activity.out | 106 ++++++++++++------ src/test/regress/expected/multi_extension.out | 6 +- .../expected/upgrade_list_citus_objects.out | 6 +- .../spec/isolation_citus_dist_activity.spec | 25 ++++- src/test/regress/sql/global_cancel.sql | 7 ++ 19 files changed, 319 insertions(+), 49 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_nodeid_for_gpid/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_nodeid_for_gpid/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_nodename_for_nodeid/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_nodename_for_nodeid/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_nodeport_for_nodeid/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_nodeport_for_nodeid/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_pid_for_gpid/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_pid_for_gpid/latest.sql create mode 100644 src/backend/distributed/test/global_pid.c diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 4d36549fd..f9f070166 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -146,6 +146,8 @@ PG_FUNCTION_INFO_V1(master_activate_node); PG_FUNCTION_INFO_V1(citus_update_node); PG_FUNCTION_INFO_V1(master_update_node); PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); +PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid); +PG_FUNCTION_INFO_V1(citus_nodeport_for_nodeid); /* @@ -1473,6 +1475,50 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) } +/* + * citus_nodename_for_nodeid returns the node name for the node with given node id + */ +Datum +citus_nodename_for_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int nodeId = PG_GETARG_INT32(0); + + bool missingOk = true; + WorkerNode *node = FindNodeWithNodeId(nodeId, missingOk); + + if (node == NULL) + { + PG_RETURN_NULL(); + } + + PG_RETURN_TEXT_P(cstring_to_text(node->workerName)); +} + + +/* + * citus_nodeport_for_nodeid returns the node port for the node with given node id + */ +Datum +citus_nodeport_for_nodeid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int nodeId = PG_GETARG_INT32(0); + + bool missingOk = true; + WorkerNode *node = FindNodeWithNodeId(nodeId, missingOk); + + if (node == NULL) + { + PG_RETURN_NULL(); + } + + PG_RETURN_INT32(node->workerPort); +} + + /* * FindWorkerNode searches over the worker nodes and returns the workerNode * if it already exists. Else, the function returns NULL. @@ -1550,21 +1596,21 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk) { - List *workerList = ActiveReadableNodeList(); - WorkerNode *workerNode = NULL; + List *nodeList = ActiveReadableNodeList(); + WorkerNode *node = NULL; - foreach_ptr(workerNode, workerList) + foreach_ptr(node, nodeList) { - if (workerNode->nodeId == nodeId) + if (node->nodeId == nodeId) { - return workerNode; + return node; } } /* there isn't any node with nodeId in pg_dist_node */ if (!missingOk) { - elog(ERROR, "worker node with node id %d could not be found", nodeId); + elog(ERROR, "node with node id %d could not be found", nodeId); } return NULL; diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 19ebefbfd..591092a0d 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -94,3 +94,9 @@ ALTER TABLE citus.pg_dist_object SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.pg_dist_object TO public; #include "udfs/citus_prepare_pg_upgrade/11.0-1.sql" #include "udfs/citus_finish_pg_upgrade/11.0-1.sql" + +#include "udfs/citus_nodename_for_nodeid/11.0-1.sql" +#include "udfs/citus_nodeport_for_nodeid/11.0-1.sql" + +#include "udfs/citus_nodeid_for_gpid/11.0-1.sql" +#include "udfs/citus_pid_for_gpid/11.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 3992dd62b..19d5bb22d 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -361,6 +361,12 @@ DROP FUNCTION get_nodeid_for_groupid(integer); RESET search_path; -DROP VIEW IF EXISTS pg_catalog.citus_stat_activity; -DROP FUNCTION IF EXISTS pg_catalog.citus_stat_activity; -DROP FUNCTION IF EXISTS pg_catalog.run_command_on_all_nodes; +DROP VIEW pg_catalog.citus_stat_activity; +DROP FUNCTION pg_catalog.citus_stat_activity; +DROP FUNCTION pg_catalog.run_command_on_all_nodes; + +DROP FUNCTION pg_catalog.citus_nodename_for_nodeid(integer); +DROP FUNCTION pg_catalog.citus_nodeport_for_nodeid(integer); + +DROP FUNCTION pg_catalog.citus_nodeid_for_gpid(bigint); +DROP FUNCTION pg_catalog.citus_pid_for_gpid(bigint); diff --git a/src/backend/distributed/sql/udfs/citus_nodeid_for_gpid/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_nodeid_for_gpid/11.0-1.sql new file mode 100644 index 000000000..c77dd94df --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_nodeid_for_gpid/11.0-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_nodeid_for_gpid(global_pid bigint) + RETURNS integer + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_nodeid_for_gpid$$; + +COMMENT ON FUNCTION pg_catalog.citus_nodeid_for_gpid(global_pid bigint) + IS 'returns node id for the global process with given global pid'; diff --git a/src/backend/distributed/sql/udfs/citus_nodeid_for_gpid/latest.sql b/src/backend/distributed/sql/udfs/citus_nodeid_for_gpid/latest.sql new file mode 100644 index 000000000..c77dd94df --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_nodeid_for_gpid/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_nodeid_for_gpid(global_pid bigint) + RETURNS integer + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_nodeid_for_gpid$$; + +COMMENT ON FUNCTION pg_catalog.citus_nodeid_for_gpid(global_pid bigint) + IS 'returns node id for the global process with given global pid'; diff --git a/src/backend/distributed/sql/udfs/citus_nodename_for_nodeid/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_nodename_for_nodeid/11.0-1.sql new file mode 100644 index 000000000..4032029c2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_nodename_for_nodeid/11.0-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_nodename_for_nodeid(nodeid integer) + RETURNS text + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_nodename_for_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_nodename_for_nodeid(nodeid integer) + IS 'returns node name for the node with given node id'; diff --git a/src/backend/distributed/sql/udfs/citus_nodename_for_nodeid/latest.sql b/src/backend/distributed/sql/udfs/citus_nodename_for_nodeid/latest.sql new file mode 100644 index 000000000..4032029c2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_nodename_for_nodeid/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_nodename_for_nodeid(nodeid integer) + RETURNS text + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_nodename_for_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_nodename_for_nodeid(nodeid integer) + IS 'returns node name for the node with given node id'; diff --git a/src/backend/distributed/sql/udfs/citus_nodeport_for_nodeid/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_nodeport_for_nodeid/11.0-1.sql new file mode 100644 index 000000000..d543949d8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_nodeport_for_nodeid/11.0-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_nodeport_for_nodeid(nodeid integer) + RETURNS integer + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_nodeport_for_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_nodeport_for_nodeid(nodeid integer) + IS 'returns node port for the node with given node id'; diff --git a/src/backend/distributed/sql/udfs/citus_nodeport_for_nodeid/latest.sql b/src/backend/distributed/sql/udfs/citus_nodeport_for_nodeid/latest.sql new file mode 100644 index 000000000..d543949d8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_nodeport_for_nodeid/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_nodeport_for_nodeid(nodeid integer) + RETURNS integer + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_nodeport_for_nodeid$$; + +COMMENT ON FUNCTION pg_catalog.citus_nodeport_for_nodeid(nodeid integer) + IS 'returns node port for the node with given node id'; diff --git a/src/backend/distributed/sql/udfs/citus_pid_for_gpid/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_pid_for_gpid/11.0-1.sql new file mode 100644 index 000000000..c6564d119 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pid_for_gpid/11.0-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_pid_for_gpid(global_pid bigint) + RETURNS integer + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_pid_for_gpid$$; + +COMMENT ON FUNCTION pg_catalog.citus_pid_for_gpid(global_pid bigint) + IS 'returns process id for the global process with given global pid'; diff --git a/src/backend/distributed/sql/udfs/citus_pid_for_gpid/latest.sql b/src/backend/distributed/sql/udfs/citus_pid_for_gpid/latest.sql new file mode 100644 index 000000000..c6564d119 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pid_for_gpid/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_pid_for_gpid(global_pid bigint) + RETURNS integer + LANGUAGE C STABLE STRICT + AS 'MODULE_PATHNAME', $$citus_pid_for_gpid$$; + +COMMENT ON FUNCTION pg_catalog.citus_pid_for_gpid(global_pid bigint) + IS 'returns process id for the global process with given global pid'; diff --git a/src/backend/distributed/test/global_pid.c b/src/backend/distributed/test/global_pid.c new file mode 100644 index 000000000..e5ef60a96 --- /dev/null +++ b/src/backend/distributed/test/global_pid.c @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * + * test/src/global_pid.c + * + * This file contains functions to test the global pid. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" + +#include "distributed/backend_data.h" +#include "distributed/metadata_cache.h" + +PG_FUNCTION_INFO_V1(test_assign_global_pid); + + +/* + * test_assign_global_pid is the wrapper UDF for AssignGlobalPID and is only meant for use + * in tests. + */ +Datum +test_assign_global_pid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + AssignGlobalPID(); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index fd482c7eb..925071c35 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -100,6 +100,8 @@ PG_FUNCTION_INFO_V1(get_global_active_transactions); PG_FUNCTION_INFO_V1(get_all_active_transactions); PG_FUNCTION_INFO_V1(citus_calculate_gpid); PG_FUNCTION_INFO_V1(citus_backend_gpid); +PG_FUNCTION_INFO_V1(citus_nodeid_for_gpid); +PG_FUNCTION_INFO_V1(citus_pid_for_gpid); /* @@ -947,6 +949,35 @@ citus_backend_gpid(PG_FUNCTION_ARGS) } +/* + * citus_nodeid_for_gpid returns node id for the global process with given global pid + */ +Datum +citus_nodeid_for_gpid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + uint64 globalPID = PG_GETARG_INT64(0); + + bool missingOk = false; + PG_RETURN_INT32(ExtractNodeIdFromGlobalPID(globalPID, missingOk)); +} + + +/* + * citus_pid_for_gpid returns process id for the global process with given global pid + */ +Datum +citus_pid_for_gpid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + uint64 globalPID = PG_GETARG_INT64(0); + + PG_RETURN_INT32(ExtractProcessIdFromGlobalPID(globalPID)); +} + + /* * ExtractGlobalPID extracts the global process id from the application name and returns it * if the application name is not compatible with Citus' application names returns 0. diff --git a/src/test/regress/expected/global_cancel.out b/src/test/regress/expected/global_cancel.out index e8c016c56..ede4bc7e3 100644 --- a/src/test/regress/expected/global_cancel.out +++ b/src/test/regress/expected/global_cancel.out @@ -88,5 +88,27 @@ SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_back t (1 row) +SELECT nodename = citus_nodename_for_nodeid(nodeid) AND nodeport = citus_nodeport_for_nodeid(nodeid) +FROM pg_dist_node +WHERE isactive = true AND noderole = 'primary'; + ?column? +--------------------------------------------------------------------- + t + t + t +(3 rows) + +SELECT citus_nodeid_for_gpid(10000000000 * 2 + 3); + citus_nodeid_for_gpid +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT citus_pid_for_gpid(10000000000 * 2 + 3); + citus_pid_for_gpid +--------------------------------------------------------------------- + 3 +(1 row) + DROP SCHEMA global_cancel CASCADE; NOTICE: drop cascades to table global_cancel.dist_table diff --git a/src/test/regress/expected/isolation_citus_dist_activity.out b/src/test/regress/expected/isolation_citus_dist_activity.out index 4777da59c..171d2faef 100644 --- a/src/test/regress/expected/isolation_citus_dist_activity.out +++ b/src/test/regress/expected/isolation_citus_dist_activity.out @@ -1,4 +1,25 @@ -Parsed test spec with 3 sessions +Parsed test spec with 4 sessions + +starting permutation: add-coordinator-to-metadata +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step add-coordinator-to-metadata: + SELECT 1 FROM citus_add_node('localhost', 57636, groupid:=0); + SELECT test_assign_global_pid(); + +?column? +--------------------------------------------------------------------- + 1 +(1 row) + +test_assign_global_pid +--------------------------------------------------------------------- + +(1 row) + starting permutation: s1-cache-connections s1-begin s2-begin s3-begin s1-alter-table s2-sleep s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback create_distributed_table @@ -32,32 +53,32 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%'), ('%citus_add_node%')) AND backend_type = 'client backend' ORDER BY query DESC; -query |state |wait_event_type|wait_event|usename |datname +query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- ALTER TABLE test_table ADD COLUMN x INT; -|idle in transaction|Client |ClientRead|postgres|regression +|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s3-view-worker: - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; -query |state |wait_event_type|wait_event|usename |datname +query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -SELECT worker_apply_shard_ddl_command (1300004, 'public', ' +SELECT worker_apply_shard_ddl_command (1300009, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; -')|idle in transaction|Client |ClientRead|postgres|regression -SELECT worker_apply_shard_ddl_command (1300003, 'public', ' +')|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression +SELECT worker_apply_shard_ddl_command (1300008, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; -')|idle in transaction|Client |ClientRead|postgres|regression -SELECT worker_apply_shard_ddl_command (1300002, 'public', ' +')|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression +SELECT worker_apply_shard_ddl_command (1300007, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; -')|idle in transaction|Client |ClientRead|postgres|regression -SELECT worker_apply_shard_ddl_command (1300001, 'public', ' +')|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression +SELECT worker_apply_shard_ddl_command (1300006, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; -')|idle in transaction|Client |ClientRead|postgres|regression +')|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression (4 rows) step s2-rollback: @@ -102,21 +123,21 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%'), ('%citus_add_node%')) AND backend_type = 'client backend' ORDER BY query DESC; -query |state |wait_event_type|wait_event|usename |datname +query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- INSERT INTO test_table VALUES (100, 100); -|idle in transaction|Client |ClientRead|postgres|regression +|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s3-view-worker: - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; -query |state |wait_event_type|wait_event|usename |datname +query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -INSERT INTO public.test_table_1300008 (column1, column2) VALUES (100, 100)|idle in transaction|Client |ClientRead|postgres|regression +INSERT INTO public.test_table_1300013 (column1, column2) VALUES (100, 100)|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s2-rollback: @@ -166,24 +187,24 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%'), ('%citus_add_node%')) AND backend_type = 'client backend' ORDER BY query DESC; -query |state |wait_event_type|wait_event|usename |datname +query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- SELECT count(*) FROM test_table; -|idle in transaction|Client |ClientRead|postgres|regression +|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s3-view-worker: - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; -query |state |wait_event_type|wait_event|usename |datname +query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -SELECT count(*) AS count FROM public.test_table_1300014 test_table WHERE true|idle in transaction|Client |ClientRead|postgres|regression -SELECT count(*) AS count FROM public.test_table_1300013 test_table WHERE true|idle in transaction|Client |ClientRead|postgres|regression -SELECT count(*) AS count FROM public.test_table_1300012 test_table WHERE true|idle in transaction|Client |ClientRead|postgres|regression -SELECT count(*) AS count FROM public.test_table_1300011 test_table WHERE true|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300019 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300018 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300017 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300016 test_table WHERE true|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression (4 rows) step s2-rollback: @@ -233,21 +254,21 @@ pg_sleep (1 row) step s2-view-dist: - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%'), ('%citus_add_node%')) AND backend_type = 'client backend' ORDER BY query DESC; -query |state |wait_event_type|wait_event|usename |datname +query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- SELECT count(*) FROM test_table WHERE column1 = 55; -|idle in transaction|Client |ClientRead|postgres|regression +|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s3-view-worker: - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; -query |state |wait_event_type|wait_event|usename |datname +query |citus_nodename_for_nodeid|citus_nodeport_for_nodeid|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -SELECT count(*) AS count FROM public.test_table_1300017 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)|idle in transaction|Client |ClientRead|postgres|regression +SELECT count(*) AS count FROM public.test_table_1300022 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)|localhost | 57636|idle in transaction|Client |ClientRead|postgres|regression (1 row) step s2-rollback: @@ -259,3 +280,18 @@ step s1-commit: step s3-rollback: ROLLBACK; + +starting permutation: remove-coordinator-from-metadata +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step remove-coordinator-from-metadata: + SELECT citus_remove_node('localhost', 57636); + +citus_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index bcd998ee4..31f79c83a 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1009,6 +1009,10 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_internal_delete_colocation_metadata(integer) void | function citus_internal_global_blocked_processes() SETOF record | function citus_internal_local_blocked_processes() SETOF record + | function citus_nodeid_for_gpid(bigint) integer + | function citus_nodename_for_nodeid(integer) text + | function citus_nodeport_for_nodeid(integer) integer + | function citus_pid_for_gpid(bigint) integer | function citus_run_local_command(text) void | function citus_shard_indexes_on_worker() SETOF record | function citus_shards_on_worker() SETOF record @@ -1024,7 +1028,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record | table pg_dist_object | view citus_stat_activity -(36 rows) +(40 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 01dec9ef1..afd30523c 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -90,6 +90,10 @@ ORDER BY 1; function citus_local_disk_space_stats() function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) function citus_node_capacity_1(integer) + function citus_nodeid_for_gpid(bigint) + function citus_nodename_for_nodeid(integer) + function citus_nodeport_for_nodeid(integer) + function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() function citus_relation_size(regclass) @@ -278,5 +282,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(262 rows) +(266 rows) diff --git a/src/test/regress/spec/isolation_citus_dist_activity.spec b/src/test/regress/spec/isolation_citus_dist_activity.spec index 9a4e148a1..b2e977dda 100644 --- a/src/test/regress/spec/isolation_citus_dist_activity.spec +++ b/src/test/regress/spec/isolation_citus_dist_activity.spec @@ -1,5 +1,9 @@ setup { + CREATE OR REPLACE FUNCTION test_assign_global_pid() + RETURNS void + LANGUAGE C STRICT + AS 'citus', $$test_assign_global_pid$$; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 4; select setval('pg_dist_shardid_seq', GREATEST(1300000, nextval('pg_dist_shardid_seq'))); @@ -71,8 +75,7 @@ step "s2-sleep" step "s2-view-dist" { - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%')) AND backend_type = 'client backend' ORDER BY query DESC; - + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%BEGIN%'), ('%pg_catalog.pg_isolation_test_session_is_blocked%'), ('%citus_add_node%')) AND backend_type = 'client backend' ORDER BY query DESC; } session "s3" @@ -89,12 +92,28 @@ step "s3-rollback" step "s3-view-worker" { - SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; + SELECT query, citus_nodename_for_nodeid(citus_nodeid_for_gpid(global_pid)), citus_nodeport_for_nodeid(citus_nodeid_for_gpid(global_pid)), state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES('%pg_prepared_xacts%'), ('%COMMIT%'), ('%csa_from_one_node%')) AND is_worker_query = true AND backend_type = 'client backend' ORDER BY query DESC; } +session "s4" +step "add-coordinator-to-metadata" +{ + SELECT 1 FROM citus_add_node('localhost', 57636, groupid:=0); + SELECT test_assign_global_pid(); +} + +step "remove-coordinator-from-metadata" +{ + SELECT citus_remove_node('localhost', 57636); +} + +permutation "add-coordinator-to-metadata" + // we prefer to sleep before "s2-view-dist" so that we can ensure // the "wait_event" in the output doesn't change randomly (e.g., NULL to CliendRead etc.) permutation "s1-cache-connections" "s1-begin" "s2-begin" "s3-begin" "s1-alter-table" "s2-sleep" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" permutation "s1-cache-connections" "s1-begin" "s2-begin" "s3-begin" "s1-insert" "s2-sleep" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" permutation "s1-cache-connections" "s1-begin" "s2-begin" "s3-begin" "s1-select" "s2-sleep" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" permutation "s1-cache-connections" "s1-begin" "s2-begin" "s3-begin" "s1-select-router" "s2-sleep" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" + +permutation "remove-coordinator-from-metadata" diff --git a/src/test/regress/sql/global_cancel.sql b/src/test/regress/sql/global_cancel.sql index 13371b2d0..6c4341877 100644 --- a/src/test/regress/sql/global_cancel.sql +++ b/src/test/regress/sql/global_cancel.sql @@ -53,4 +53,11 @@ RESET client_min_messages; SELECT citus_backend_gpid() = citus_calculate_gpid(:coordinator_node_id, pg_backend_pid()); +SELECT nodename = citus_nodename_for_nodeid(nodeid) AND nodeport = citus_nodeport_for_nodeid(nodeid) +FROM pg_dist_node +WHERE isactive = true AND noderole = 'primary'; + +SELECT citus_nodeid_for_gpid(10000000000 * 2 + 3); +SELECT citus_pid_for_gpid(10000000000 * 2 + 3); + DROP SCHEMA global_cancel CASCADE;