From f6cd4d0f0722483d5ffa32b30812161dd166a3b1 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Wed, 9 Feb 2022 10:09:09 +0300 Subject: [PATCH] Overrides pg_cancel_backend and pg_terminate_backend to accept global pid --- .../commands/citus_global_signal.c | 127 ++++++++++++++++++ .../distributed/metadata/metadata_cache.c | 6 +- .../distributed/metadata/node_metadata.c | 25 ++++ .../distributed/operations/citus_tools.c | 15 ++- .../distributed/sql/citus--10.2-4--11.0-1.sql | 3 + .../sql/downgrades/citus--11.0-1--10.2-4.sql | 3 + .../sql/udfs/pg_cancel_backend/11.0-1.sql | 9 ++ .../sql/udfs/pg_cancel_backend/latest.sql | 9 ++ .../sql/udfs/pg_terminate_backend/11.0-1.sql | 9 ++ .../sql/udfs/pg_terminate_backend/latest.sql | 9 ++ .../distributed/transaction/backend_data.c | 40 +++++- src/include/distributed/backend_data.h | 7 + src/include/distributed/worker_manager.h | 1 + src/test/regress/bin/normalize.sed | 4 + .../regress/expected/follower_single_node.out | 38 +++++- src/test/regress/expected/global_cancel.out | 83 ++++++++++++ src/test/regress/expected/multi_extension.out | 6 +- .../expected/upgrade_list_citus_objects.out | 4 +- src/test/regress/multi_1_schedule | 1 + src/test/regress/sql/follower_single_node.sql | 27 +++- src/test/regress/sql/global_cancel.sql | 51 +++++++ 21 files changed, 462 insertions(+), 15 deletions(-) create mode 100644 src/backend/distributed/commands/citus_global_signal.c create mode 100644 src/backend/distributed/sql/udfs/pg_cancel_backend/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/pg_cancel_backend/latest.sql create mode 100644 src/backend/distributed/sql/udfs/pg_terminate_backend/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/pg_terminate_backend/latest.sql create mode 100644 src/test/regress/expected/global_cancel.out create mode 100644 src/test/regress/sql/global_cancel.sql diff --git a/src/backend/distributed/commands/citus_global_signal.c b/src/backend/distributed/commands/citus_global_signal.c new file mode 100644 index 000000000..fc7618159 --- /dev/null +++ b/src/backend/distributed/commands/citus_global_signal.c @@ -0,0 +1,127 @@ +/*------------------------------------------------------------------------- + * + * citus_global_signal.c + * Commands for Citus' overriden versions of pg_cancel_backend + * and pg_terminate_backend statements. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/pg_version_constants.h" + +#include "distributed/backend_data.h" +#include "distributed/metadata_cache.h" +#include "distributed/worker_manager.h" +#include "lib/stringinfo.h" +#include "signal.h" + +static bool CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig); + +PG_FUNCTION_INFO_V1(pg_cancel_backend); +PG_FUNCTION_INFO_V1(pg_terminate_backend); + +/* + * pg_cancel_backend overrides the Postgres' pg_cancel_backend to cancel + * a query with a global pid so a query can be cancelled from another node. + * + * To cancel a query that is on another node, a pg_cancel_backend command is sent + * to that node. This new command is sent with pid instead of global pid, so original + * pg_cancel_backend function is used. + */ +Datum +pg_cancel_backend(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + uint64 pid = PG_GETARG_INT64(0); + + int sig = SIGINT; + uint64 timeout = 0; + bool success = CitusSignalBackend(pid, timeout, sig); + + PG_RETURN_BOOL(success); +} + + +/* + * pg_terminate_backend overrides the Postgres' pg_terminate_backend to terminate + * a query with a global pid so a query can be terminated from another node. + * + * To terminate a query that is on another node, a pg_terminate_backend command is sent + * to that node. This new command is sent with pid instead of global pid, so original + * pg_terminate_backend function is used. + */ +Datum +pg_terminate_backend(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + uint64 pid = PG_GETARG_INT64(0); + uint64 timeout = PG_GETARG_INT64(1); + + int sig = SIGTERM; + bool success = CitusSignalBackend(pid, timeout, sig); + + PG_RETURN_BOOL(success); +} + + +/* + * CitusSignalBackend gets a global pid and and ends the original query with the global pid + * that might have started in another node by connecting to that node and running either + * pg_cancel_backend or pg_terminate_backend based on the withTerminate argument. + */ +static bool +CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig) +{ + Assert((sig == SIGINT) || (sig == SIGTERM)); + +#if PG_VERSION_NUM < PG_VERSION_14 + if (timeout != 0) + { + elog(ERROR, "timeout parameter is only supported on Postgres 14 or later"); + } +#endif + + int nodeId = ExtractNodeIdFromGlobalPID(globalPID); + int processId = ExtractProcessIdFromGlobalPID(globalPID); + + WorkerNode *workerNode = FindNodeWithNodeId(nodeId); + + StringInfo cancelQuery = makeStringInfo(); + + if (sig == SIGINT) + { + appendStringInfo(cancelQuery, "SELECT pg_cancel_backend(%d::integer)", processId); + } + else + { +#if PG_VERSION_NUM >= PG_VERSION_14 + appendStringInfo(cancelQuery, + "SELECT pg_terminate_backend(%d::integer, %lu::bigint)", + processId, timeout); +#else + appendStringInfo(cancelQuery, "SELECT pg_terminate_backend(%d::integer)", + processId); +#endif + } + + StringInfo queryResult = makeStringInfo(); + + bool reportResultError = true; + + bool success = ExecuteRemoteQueryOrCommand(workerNode->workerName, + workerNode->workerPort, cancelQuery->data, + queryResult, reportResultError); + + if (success && queryResult && strcmp(queryResult->data, "f") == 0) + { + success = false; + } + + return success; +} diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 161a56942..2265ca691 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -31,6 +31,7 @@ #include "commands/dbcommands.h" #include "commands/extension.h" #include "commands/trigger.h" +#include "distributed/backend_data.h" #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/citus_ruleutils.h" @@ -3666,9 +3667,10 @@ GetLocalNodeId(void) /* * This is expected if the coordinator is not added to the metadata. - * We'll return 0 for this case and for all cases so views can function almost normally + * We'll return GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA for this case and + * for all cases so views can function almost normally */ - nodeId = 0; + nodeId = GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA; } LocalNodeId = nodeId; diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index a32e0aa20..706f000cb 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1539,6 +1539,31 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) } +/* + * FindNodeWithNodeId searches pg_dist_node and returns the node with the nodeId. + * If the node cannot be found this functions errors. + */ +WorkerNode * +FindNodeWithNodeId(int nodeId) +{ + List *workerList = ActiveReadableNodeList(); + WorkerNode *workerNode = NULL; + + foreach_ptr(workerNode, workerList) + { + if (workerNode->nodeId == nodeId) + { + return workerNode; + } + } + + /* there isn't any node with nodeId in pg_dist_node */ + elog(ERROR, "worker node with node id %d could not be found", nodeId); + + return NULL; +} + + /* * ReadDistNode iterates over pg_dist_node table, converts each row * into it's memory representation (i.e., WorkerNode) and adds them into diff --git a/src/backend/distributed/operations/citus_tools.c b/src/backend/distributed/operations/citus_tools.c index cb87e70b1..b7905d9f8 100644 --- a/src/backend/distributed/operations/citus_tools.c +++ b/src/backend/distributed/operations/citus_tools.c @@ -14,6 +14,7 @@ #include "access/htup_details.h" #include "catalog/pg_type.h" +#include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" @@ -50,8 +51,6 @@ static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, bool *statusArray, StringInfo *resultStringArray, int commandCount); -static bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, - char *queryString, StringInfo queryResult); static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, StringInfo *nodeNameArray, int *nodePortArray, bool *statusArray, @@ -474,9 +473,10 @@ ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, int32 nodePort = nodePortArray[commandIndex]; char *queryString = commandStringArray[commandIndex]->data; StringInfo queryResultString = resultStringArray[commandIndex]; + bool reportResultError = false; bool success = ExecuteRemoteQueryOrCommand(nodeName, nodePort, queryString, - queryResultString); + queryResultString, reportResultError); statusArray[commandIndex] = success; @@ -491,9 +491,9 @@ ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, * (success/failure), and query result. The query is expected to return a single * target containing zero or one rows. */ -static bool +bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, - StringInfo queryResultString) + StringInfo queryResultString, bool reportResultError) { int connectionFlags = FORCE_NEW_CONNECTION; MultiConnection *connection = @@ -517,6 +517,11 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts); bool success = EvaluateQueryResult(connection, queryResult, queryResultString); + if (!success && reportResultError) + { + ReportResultError(connection, queryResult, ERROR); + } + PQclear(queryResult); /* close the connection */ diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index d81b2c719..a8cfa24fd 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -35,6 +35,9 @@ GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; -- we have to recreate this view because recreated citus_dist_stat_activity that this view depends #include "udfs/citus_lock_waits/11.0-1.sql" +#include "udfs/pg_cancel_backend/11.0-1.sql" +#include "udfs/pg_terminate_backend/11.0-1.sql" + DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text); DROP FUNCTION pg_catalog.master_get_table_metadata(text); DROP FUNCTION pg_catalog.master_append_table_to_shard(bigint, text, text, integer); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 204548c51..399e42ff0 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -212,3 +212,6 @@ DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]); #include "../udfs/worker_create_or_replace_object/9.0-1.sql" RESET search_path; + +DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE; diff --git a/src/backend/distributed/sql/udfs/pg_cancel_backend/11.0-1.sql b/src/backend/distributed/sql/udfs/pg_cancel_backend/11.0-1.sql new file mode 100644 index 000000000..3a355aa6d --- /dev/null +++ b/src/backend/distributed/sql/udfs/pg_cancel_backend/11.0-1.sql @@ -0,0 +1,9 @@ +DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(global_pid bigint) CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.pg_cancel_backend(global_pid bigint) + RETURNS BOOL + LANGUAGE C +AS 'MODULE_PATHNAME', $$pg_cancel_backend$$; + +COMMENT ON FUNCTION pg_catalog.pg_cancel_backend(global_pid bigint) + IS 'cancels a Citus query which might be on any node in the Citus cluster'; diff --git a/src/backend/distributed/sql/udfs/pg_cancel_backend/latest.sql b/src/backend/distributed/sql/udfs/pg_cancel_backend/latest.sql new file mode 100644 index 000000000..3a355aa6d --- /dev/null +++ b/src/backend/distributed/sql/udfs/pg_cancel_backend/latest.sql @@ -0,0 +1,9 @@ +DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(global_pid bigint) CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.pg_cancel_backend(global_pid bigint) + RETURNS BOOL + LANGUAGE C +AS 'MODULE_PATHNAME', $$pg_cancel_backend$$; + +COMMENT ON FUNCTION pg_catalog.pg_cancel_backend(global_pid bigint) + IS 'cancels a Citus query which might be on any node in the Citus cluster'; diff --git a/src/backend/distributed/sql/udfs/pg_terminate_backend/11.0-1.sql b/src/backend/distributed/sql/udfs/pg_terminate_backend/11.0-1.sql new file mode 100644 index 000000000..9b9798b76 --- /dev/null +++ b/src/backend/distributed/sql/udfs/pg_terminate_backend/11.0-1.sql @@ -0,0 +1,9 @@ +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint) CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint DEFAULT 0) + RETURNS BOOL + LANGUAGE C +AS 'MODULE_PATHNAME', $$pg_terminate_backend$$; + +COMMENT ON FUNCTION pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint) + IS 'terminates a Citus query which might be on any node in the Citus cluster'; diff --git a/src/backend/distributed/sql/udfs/pg_terminate_backend/latest.sql b/src/backend/distributed/sql/udfs/pg_terminate_backend/latest.sql new file mode 100644 index 000000000..9b9798b76 --- /dev/null +++ b/src/backend/distributed/sql/udfs/pg_terminate_backend/latest.sql @@ -0,0 +1,9 @@ +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint) CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint DEFAULT 0) + RETURNS BOOL + LANGUAGE C +AS 'MODULE_PATHNAME', $$pg_terminate_backend$$; + +COMMENT ON FUNCTION pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint) + IS 'terminates a Citus query which might be on any node in the Citus cluster'; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index bc54e1da9..90f7b719a 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -33,6 +33,7 @@ #include "distributed/shared_connection_stats.h" #include "distributed/transaction_identifier.h" #include "distributed/tuplestore.h" +#include "distributed/worker_manager.h" #include "nodes/execnodes.h" #include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */ #include "replication/walsender.h" @@ -47,6 +48,7 @@ #define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();" #define ACTIVE_TRANSACTION_COLUMN_COUNT 7 +#define GLOBAL_PID_NODE_ID_MULTIPLIER 10000000000 /* * Each backend's data reside in the shared memory @@ -864,7 +866,7 @@ GenerateGlobalPID(void) * node ids might cause overflow. But even for the applications that scale around 50 nodes every * day it'd take about 100K years. So we are not worried. */ - return (((uint64) GetLocalNodeId()) * 10000000000) + getpid(); + return (((uint64) GetLocalNodeId()) * GLOBAL_PID_NODE_ID_MULTIPLIER) + getpid(); } @@ -907,6 +909,42 @@ ExtractGlobalPID(char *applicationName) } +/* + * ExtractNodeIdFromGlobalPID extracts the node id from the global pid. + * Global pid is constructed by multiplying node id with GLOBAL_PID_NODE_ID_MULTIPLIER + * and adding process id. So integer division of global pid by GLOBAL_PID_NODE_ID_MULTIPLIER + * gives us the node id. + */ +int +ExtractNodeIdFromGlobalPID(uint64 globalPID) +{ + int nodeId = (int) (globalPID / GLOBAL_PID_NODE_ID_MULTIPLIER); + + if (nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA) + { + ereport(ERROR, (errmsg("originator node of the query with the global pid " + "%lu is not in Citus' metadata", globalPID), + errhint("connect to the node directly run pg_cancel_backend(pid) " + "or pg_terminate_backend(pid)"))); + } + + return nodeId; +} + + +/* + * ExtractProcessIdFromGlobalPID extracts the process id from the global pid. + * Global pid is constructed by multiplying node id with GLOBAL_PID_NODE_ID_MULTIPLIER + * and adding process id. So global pid mod GLOBAL_PID_NODE_ID_MULTIPLIER gives us the + * process id. + */ +int +ExtractProcessIdFromGlobalPID(uint64 globalPID) +{ + return (int) (globalPID % GLOBAL_PID_NODE_ID_MULTIPLIER); +} + + /* * CurrentDistributedTransactionNumber returns the transaction number of the * current distributed transaction. The caller must make sure a distributed diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index 7f3a81e88..f01358407 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -68,6 +68,8 @@ extern void MarkCitusInitiatedCoordinatorBackend(void); extern void AssignGlobalPID(void); extern uint64 GetGlobalPID(void); extern uint64 ExtractGlobalPID(char *applicationName); +extern int ExtractNodeIdFromGlobalPID(uint64 globalPID); +extern int ExtractProcessIdFromGlobalPID(uint64 globalPID); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern bool MyBackendGotCancelledDueToDeadlock(bool clearState); @@ -78,6 +80,11 @@ extern int GetAllActiveClientBackendCount(void); extern void IncrementClientBackendCounter(void); extern void DecrementClientBackendCounter(void); +extern bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, + char *queryString, StringInfo queryResultString, + bool reportResultError); + #define INVALID_CITUS_INTERNAL_BACKEND_GPID 0 +#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999 #endif /* BACKEND_DATA_H */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 0a6b637b3..db8adaedb 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -86,6 +86,7 @@ extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); +extern WorkerNode * FindNodeWithNodeId(int nodeId); extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void); extern void InsertCoordinatorIfClusterEmpty(void); diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 25f4388e1..916d62afa 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -256,3 +256,7 @@ s/CREATE TABLESPACE test_tablespace LOCATION.*/CREATE TABLESPACE test_tablespace s/(.*absolute correlation \()([0,1]\.[0-9]+)(\) of var attribute [0-9]+ is smaller than.*)/\1X\.YZ\3/g s/NOTICE: issuing WITH placement_data\(shardid, shardstate, shardlength, groupid, placementid\) AS \(VALUES \([0-9]+, [0-9]+, [0-9]+, [0-9]+, [0-9]+\)\)/NOTICE: issuing WITH placement_data\(shardid, shardstate, shardlength, groupid, placementid\) AS \(VALUES \(xxxxxx, xxxxxx, xxxxxx, xxxxxx, xxxxxx\)\)/g + +# global_pid when pg_cancel_backend is sent to workers +s/pg_cancel_backend\('[0-9]+'::bigint\)/pg_cancel_backend('xxxxx'::bigint)/g +s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_backend(xxxxx::integer)/g diff --git a/src/test/regress/expected/follower_single_node.out b/src/test/regress/expected/follower_single_node.out index 7fbc40280..de2f88b6e 100644 --- a/src/test/regress/expected/follower_single_node.out +++ b/src/test/regress/expected/follower_single_node.out @@ -239,7 +239,7 @@ ERROR: node group 0 does not have a secondary node -- should work this time \c -reuse-previous=off regression - - :master_port SET search_path TO single_node; -SELECT 1 FROM master_add_node('localhost', :follower_master_port, groupid => 0, noderole => 'secondary'); +SELECT 1 FROM master_add_node('localhost', :follower_master_port, groupid => 0, noderole => 'secondary', nodecluster => 'second-cluster'); ?column? --------------------------------------------------------------------- 1 @@ -251,7 +251,7 @@ SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhavesha 1 (1 row) -\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always'" +\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" SET search_path TO single_node; SELECT * FROM test WHERE x = 1; x | y @@ -387,6 +387,40 @@ SELECT * FROM columnar_test ORDER BY 1,2; 1 | 8 (6 rows) +\c -reuse-previous=off regression - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path TO single_node; +CREATE TABLE dist_table (a INT, b INT); +SELECT create_distributed_table ('dist_table', 'a', shard_count:=4); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_table VALUES (1, 1); +\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" +SET search_path TO single_node; +SELECT * FROM dist_table; + a | b +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +SELECT global_pid AS follower_coordinator_gpid FROM get_all_active_transactions() WHERE process_id = pg_backend_pid() \gset +SELECT pg_typeof(:follower_coordinator_gpid); + pg_typeof +--------------------------------------------------------------------- + bigint +(1 row) + +SELECT pg_cancel_backend(:follower_coordinator_gpid); +ERROR: canceling statement due to user request +SET citus.log_remote_commands TO ON; +SELECT pg_cancel_backend(:follower_coordinator_gpid) FROM dist_table WHERE a = 1; +NOTICE: executing the command locally: SELECT pg_cancel_backend('xxxxx'::bigint) AS pg_cancel_backend FROM single_node.dist_table_102008 dist_table WHERE (a OPERATOR(pg_catalog.=) 1) +NOTICE: issuing SELECT pg_cancel_backend(xxxxx::integer) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +ERROR: canceling statement due to user request -- Cleanup \c -reuse-previous=off regression - - :master_port SET search_path TO single_node; diff --git a/src/test/regress/expected/global_cancel.out b/src/test/regress/expected/global_cancel.out new file mode 100644 index 000000000..5ebc4098d --- /dev/null +++ b/src/test/regress/expected/global_cancel.out @@ -0,0 +1,83 @@ +CREATE SCHEMA global_cancel; +SET search_path TO global_cancel; +SET citus.next_shard_id TO 56789000; +CREATE TABLE dist_table (a INT, b INT); +SELECT create_distributed_table ('dist_table', 'a', shard_count:=4); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_table VALUES (1, 1); +SELECT global_pid AS coordinator_gpid FROM get_all_active_transactions() WHERE process_id = pg_backend_pid() \gset +SELECT pg_typeof(:coordinator_gpid); + pg_typeof +--------------------------------------------------------------------- + bigint +(1 row) + +SELECT pg_cancel_backend(:coordinator_gpid); +ERROR: canceling statement due to user request +SET citus.log_remote_commands TO ON; +SELECT pg_cancel_backend(:coordinator_gpid) FROM dist_table WHERE a = 1; +NOTICE: issuing SELECT pg_cancel_backend('xxxxx'::bigint) AS pg_cancel_backend FROM global_cancel.dist_table_56789000 dist_table WHERE (a OPERATOR(pg_catalog.=) 1) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +ERROR: canceling statement due to user request +BEGIN; +SELECT pg_cancel_backend(:coordinator_gpid) FROM dist_table WHERE a = 1; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT pg_cancel_backend('xxxxx'::bigint) AS pg_cancel_backend FROM global_cancel.dist_table_56789000 dist_table WHERE (a OPERATOR(pg_catalog.=) 1) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +ERROR: canceling statement due to user request +END; +SET citus.log_remote_commands TO OFF; +SELECT global_pid AS maintenance_daemon_gpid +FROM pg_stat_activity psa JOIN get_all_active_transactions() gaat ON psa.pid = gaat.process_id +WHERE application_name = 'Citus Maintenance Daemon' \gset +SET client_min_messages TO ERROR; +CREATE USER global_cancel_user; +SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +RESET client_min_messages; +SET ROLE global_cancel_user; +SELECT pg_typeof(:maintenance_daemon_gpid); + pg_typeof +--------------------------------------------------------------------- + bigint +(1 row) + +SELECT pg_cancel_backend(:maintenance_daemon_gpid); +ERROR: must be a superuser to cancel superuser query +CONTEXT: while executing command on localhost:xxxxx +SELECT pg_terminate_backend(:maintenance_daemon_gpid); +ERROR: must be a superuser to terminate superuser process +CONTEXT: while executing command on localhost:xxxxx +RESET ROLE; +SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset +SET client_min_messages TO DEBUG; +-- 10000000000 is the node id multiplier for global pid +SELECT pg_cancel_backend(10000000000 * :coordinator_node_id + 0); +DEBUG: PID 0 is not a PostgreSQL server process +DETAIL: from localhost:xxxxx + pg_cancel_backend +--------------------------------------------------------------------- + f +(1 row) + +SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0); +DEBUG: PID 0 is not a PostgreSQL server process +DETAIL: from localhost:xxxxx + pg_terminate_backend +--------------------------------------------------------------------- + f +(1 row) + +RESET client_min_messages; +DROP SCHEMA global_cancel CASCADE; +NOTICE: drop cascades to table dist_table diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 0b185659b..dd93bdd44 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1001,7 +1001,7 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 11.0-1 ALTER EXTENSION citus UPDATE TO '11.0-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- function citus_disable_node(text,integer) void | function create_distributed_function(regprocedure,text,text) void | @@ -1016,10 +1016,12 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_shard_indexes_on_worker() SETOF record | function citus_shards_on_worker() SETOF record | function create_distributed_function(regprocedure,text,text,boolean) void + | function pg_cancel_backend(bigint) boolean + | function pg_terminate_backend(bigint,bigint) boolean | function worker_create_or_replace_object(text[]) boolean | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void -(16 rows) +(18 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 7ae524a3b..13a15c658 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -172,6 +172,8 @@ ORDER BY 1; function master_update_shard_statistics(bigint) function master_update_table_statistics(regclass) function notify_constraint_dropped() + function pg_cancel_backend(bigint) + function pg_terminate_backend(bigint, bigint) function poolinfo_valid(text) function read_intermediate_result(text,citus_copy_format) function read_intermediate_results(text[],citus_copy_format) @@ -268,5 +270,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(252 rows) +(254 rows) diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 4d7f68e73..a95efbac9 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -282,6 +282,7 @@ test: create_citus_local_table_cascade test: fkeys_between_local_ref test: auto_undist_citus_local test: mx_regular_user +test: global_cancel test: remove_coordinator # ---------- diff --git a/src/test/regress/sql/follower_single_node.sql b/src/test/regress/sql/follower_single_node.sql index 4cd7d9d0a..482c0b575 100644 --- a/src/test/regress/sql/follower_single_node.sql +++ b/src/test/regress/sql/follower_single_node.sql @@ -107,10 +107,10 @@ SELECT * FROM test WHERE x = 1; \c -reuse-previous=off regression - - :master_port SET search_path TO single_node; -SELECT 1 FROM master_add_node('localhost', :follower_master_port, groupid => 0, noderole => 'secondary'); +SELECT 1 FROM master_add_node('localhost', :follower_master_port, groupid => 0, noderole => 'secondary', nodecluster => 'second-cluster'); SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); -\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always'" +\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" SET search_path TO single_node; SELECT * FROM test WHERE x = 1; @@ -169,6 +169,29 @@ INSERT INTO columnar_test(a, b) VALUES (1, 8); \c - - - :follower_master_port SELECT * FROM columnar_test ORDER BY 1,2; + +\c -reuse-previous=off regression - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path TO single_node; + +CREATE TABLE dist_table (a INT, b INT); +SELECT create_distributed_table ('dist_table', 'a', shard_count:=4); +INSERT INTO dist_table VALUES (1, 1); + +\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" +SET search_path TO single_node; + +SELECT * FROM dist_table; + +SELECT global_pid AS follower_coordinator_gpid FROM get_all_active_transactions() WHERE process_id = pg_backend_pid() \gset +SELECT pg_typeof(:follower_coordinator_gpid); + +SELECT pg_cancel_backend(:follower_coordinator_gpid); + +SET citus.log_remote_commands TO ON; +SELECT pg_cancel_backend(:follower_coordinator_gpid) FROM dist_table WHERE a = 1; + + -- Cleanup \c -reuse-previous=off regression - - :master_port SET search_path TO single_node; diff --git a/src/test/regress/sql/global_cancel.sql b/src/test/regress/sql/global_cancel.sql new file mode 100644 index 000000000..edf380771 --- /dev/null +++ b/src/test/regress/sql/global_cancel.sql @@ -0,0 +1,51 @@ +CREATE SCHEMA global_cancel; +SET search_path TO global_cancel; +SET citus.next_shard_id TO 56789000; + +CREATE TABLE dist_table (a INT, b INT); +SELECT create_distributed_table ('dist_table', 'a', shard_count:=4); +INSERT INTO dist_table VALUES (1, 1); + +SELECT global_pid AS coordinator_gpid FROM get_all_active_transactions() WHERE process_id = pg_backend_pid() \gset +SELECT pg_typeof(:coordinator_gpid); + +SELECT pg_cancel_backend(:coordinator_gpid); + +SET citus.log_remote_commands TO ON; +SELECT pg_cancel_backend(:coordinator_gpid) FROM dist_table WHERE a = 1; + +BEGIN; +SELECT pg_cancel_backend(:coordinator_gpid) FROM dist_table WHERE a = 1; +END; + +SET citus.log_remote_commands TO OFF; + +SELECT global_pid AS maintenance_daemon_gpid +FROM pg_stat_activity psa JOIN get_all_active_transactions() gaat ON psa.pid = gaat.process_id +WHERE application_name = 'Citus Maintenance Daemon' \gset + +SET client_min_messages TO ERROR; +CREATE USER global_cancel_user; +SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); +RESET client_min_messages; + +SET ROLE global_cancel_user; + +SELECT pg_typeof(:maintenance_daemon_gpid); + +SELECT pg_cancel_backend(:maintenance_daemon_gpid); +SELECT pg_terminate_backend(:maintenance_daemon_gpid); + +RESET ROLE; + +SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset + +SET client_min_messages TO DEBUG; + +-- 10000000000 is the node id multiplier for global pid +SELECT pg_cancel_backend(10000000000 * :coordinator_node_id + 0); +SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0); + +RESET client_min_messages; + +DROP SCHEMA global_cancel CASCADE;