Overrides pg_cancel_backend and pg_terminate_backend to accept global pid

pull/5699/head
Halil Ozan Akgul 2022-02-09 10:09:09 +03:00
parent 70dc85239f
commit f6cd4d0f07
21 changed files with 462 additions and 15 deletions

View File

@ -0,0 +1,127 @@
/*-------------------------------------------------------------------------
*
* citus_global_signal.c
* Commands for Citus' overriden versions of pg_cancel_backend
* and pg_terminate_backend statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/pg_version_constants.h"
#include "distributed/backend_data.h"
#include "distributed/metadata_cache.h"
#include "distributed/worker_manager.h"
#include "lib/stringinfo.h"
#include "signal.h"
static bool CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig);
PG_FUNCTION_INFO_V1(pg_cancel_backend);
PG_FUNCTION_INFO_V1(pg_terminate_backend);
/*
* pg_cancel_backend overrides the Postgres' pg_cancel_backend to cancel
* a query with a global pid so a query can be cancelled from another node.
*
* To cancel a query that is on another node, a pg_cancel_backend command is sent
* to that node. This new command is sent with pid instead of global pid, so original
* pg_cancel_backend function is used.
*/
Datum
pg_cancel_backend(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 pid = PG_GETARG_INT64(0);
int sig = SIGINT;
uint64 timeout = 0;
bool success = CitusSignalBackend(pid, timeout, sig);
PG_RETURN_BOOL(success);
}
/*
* pg_terminate_backend overrides the Postgres' pg_terminate_backend to terminate
* a query with a global pid so a query can be terminated from another node.
*
* To terminate a query that is on another node, a pg_terminate_backend command is sent
* to that node. This new command is sent with pid instead of global pid, so original
* pg_terminate_backend function is used.
*/
Datum
pg_terminate_backend(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 pid = PG_GETARG_INT64(0);
uint64 timeout = PG_GETARG_INT64(1);
int sig = SIGTERM;
bool success = CitusSignalBackend(pid, timeout, sig);
PG_RETURN_BOOL(success);
}
/*
* CitusSignalBackend gets a global pid and and ends the original query with the global pid
* that might have started in another node by connecting to that node and running either
* pg_cancel_backend or pg_terminate_backend based on the withTerminate argument.
*/
static bool
CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig)
{
Assert((sig == SIGINT) || (sig == SIGTERM));
#if PG_VERSION_NUM < PG_VERSION_14
if (timeout != 0)
{
elog(ERROR, "timeout parameter is only supported on Postgres 14 or later");
}
#endif
int nodeId = ExtractNodeIdFromGlobalPID(globalPID);
int processId = ExtractProcessIdFromGlobalPID(globalPID);
WorkerNode *workerNode = FindNodeWithNodeId(nodeId);
StringInfo cancelQuery = makeStringInfo();
if (sig == SIGINT)
{
appendStringInfo(cancelQuery, "SELECT pg_cancel_backend(%d::integer)", processId);
}
else
{
#if PG_VERSION_NUM >= PG_VERSION_14
appendStringInfo(cancelQuery,
"SELECT pg_terminate_backend(%d::integer, %lu::bigint)",
processId, timeout);
#else
appendStringInfo(cancelQuery, "SELECT pg_terminate_backend(%d::integer)",
processId);
#endif
}
StringInfo queryResult = makeStringInfo();
bool reportResultError = true;
bool success = ExecuteRemoteQueryOrCommand(workerNode->workerName,
workerNode->workerPort, cancelQuery->data,
queryResult, reportResultError);
if (success && queryResult && strcmp(queryResult->data, "f") == 0)
{
success = false;
}
return success;
}

View File

@ -31,6 +31,7 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/trigger.h" #include "commands/trigger.h"
#include "distributed/backend_data.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
@ -3666,9 +3667,10 @@ GetLocalNodeId(void)
/* /*
* This is expected if the coordinator is not added to the metadata. * This is expected if the coordinator is not added to the metadata.
* We'll return 0 for this case and for all cases so views can function almost normally * We'll return GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA for this case and
* for all cases so views can function almost normally
*/ */
nodeId = 0; nodeId = GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA;
} }
LocalNodeId = nodeId; LocalNodeId = nodeId;

View File

@ -1539,6 +1539,31 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
} }
/*
* FindNodeWithNodeId searches pg_dist_node and returns the node with the nodeId.
* If the node cannot be found this functions errors.
*/
WorkerNode *
FindNodeWithNodeId(int nodeId)
{
List *workerList = ActiveReadableNodeList();
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
if (workerNode->nodeId == nodeId)
{
return workerNode;
}
}
/* there isn't any node with nodeId in pg_dist_node */
elog(ERROR, "worker node with node id %d could not be found", nodeId);
return NULL;
}
/* /*
* ReadDistNode iterates over pg_dist_node table, converts each row * ReadDistNode iterates over pg_dist_node table, converts each row
* into it's memory representation (i.e., WorkerNode) and adds them into * into it's memory representation (i.e., WorkerNode) and adds them into

View File

@ -14,6 +14,7 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/backend_data.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
@ -50,8 +51,6 @@ static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray,
bool *statusArray, bool *statusArray,
StringInfo *resultStringArray, StringInfo *resultStringArray,
int commandCount); int commandCount);
static bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort,
char *queryString, StringInfo queryResult);
static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor,
StringInfo *nodeNameArray, int *nodePortArray, StringInfo *nodeNameArray, int *nodePortArray,
bool *statusArray, bool *statusArray,
@ -474,9 +473,10 @@ ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray,
int32 nodePort = nodePortArray[commandIndex]; int32 nodePort = nodePortArray[commandIndex];
char *queryString = commandStringArray[commandIndex]->data; char *queryString = commandStringArray[commandIndex]->data;
StringInfo queryResultString = resultStringArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex];
bool reportResultError = false;
bool success = ExecuteRemoteQueryOrCommand(nodeName, nodePort, queryString, bool success = ExecuteRemoteQueryOrCommand(nodeName, nodePort, queryString,
queryResultString); queryResultString, reportResultError);
statusArray[commandIndex] = success; statusArray[commandIndex] = success;
@ -491,9 +491,9 @@ ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray,
* (success/failure), and query result. The query is expected to return a single * (success/failure), and query result. The query is expected to return a single
* target containing zero or one rows. * target containing zero or one rows.
*/ */
static bool bool
ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
StringInfo queryResultString) StringInfo queryResultString, bool reportResultError)
{ {
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *connection = MultiConnection *connection =
@ -517,6 +517,11 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts); PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts);
bool success = EvaluateQueryResult(connection, queryResult, queryResultString); bool success = EvaluateQueryResult(connection, queryResult, queryResultString);
if (!success && reportResultError)
{
ReportResultError(connection, queryResult, ERROR);
}
PQclear(queryResult); PQclear(queryResult);
/* close the connection */ /* close the connection */

View File

@ -35,6 +35,9 @@ GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC;
-- we have to recreate this view because recreated citus_dist_stat_activity that this view depends -- we have to recreate this view because recreated citus_dist_stat_activity that this view depends
#include "udfs/citus_lock_waits/11.0-1.sql" #include "udfs/citus_lock_waits/11.0-1.sql"
#include "udfs/pg_cancel_backend/11.0-1.sql"
#include "udfs/pg_terminate_backend/11.0-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text); DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text);
DROP FUNCTION pg_catalog.master_get_table_metadata(text); DROP FUNCTION pg_catalog.master_get_table_metadata(text);
DROP FUNCTION pg_catalog.master_append_table_to_shard(bigint, text, text, integer); DROP FUNCTION pg_catalog.master_append_table_to_shard(bigint, text, text, integer);

View File

@ -212,3 +212,6 @@ DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]);
#include "../udfs/worker_create_or_replace_object/9.0-1.sql" #include "../udfs/worker_create_or_replace_object/9.0-1.sql"
RESET search_path; RESET search_path;
DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE;
DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE;

View File

@ -0,0 +1,9 @@
DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(global_pid bigint) CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.pg_cancel_backend(global_pid bigint)
RETURNS BOOL
LANGUAGE C
AS 'MODULE_PATHNAME', $$pg_cancel_backend$$;
COMMENT ON FUNCTION pg_catalog.pg_cancel_backend(global_pid bigint)
IS 'cancels a Citus query which might be on any node in the Citus cluster';

View File

@ -0,0 +1,9 @@
DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(global_pid bigint) CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.pg_cancel_backend(global_pid bigint)
RETURNS BOOL
LANGUAGE C
AS 'MODULE_PATHNAME', $$pg_cancel_backend$$;
COMMENT ON FUNCTION pg_catalog.pg_cancel_backend(global_pid bigint)
IS 'cancels a Citus query which might be on any node in the Citus cluster';

View File

@ -0,0 +1,9 @@
DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint) CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint DEFAULT 0)
RETURNS BOOL
LANGUAGE C
AS 'MODULE_PATHNAME', $$pg_terminate_backend$$;
COMMENT ON FUNCTION pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint)
IS 'terminates a Citus query which might be on any node in the Citus cluster';

View File

@ -0,0 +1,9 @@
DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint) CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint DEFAULT 0)
RETURNS BOOL
LANGUAGE C
AS 'MODULE_PATHNAME', $$pg_terminate_backend$$;
COMMENT ON FUNCTION pg_catalog.pg_terminate_backend(global_pid bigint, timeout bigint)
IS 'terminates a Citus query which might be on any node in the Citus cluster';

View File

@ -33,6 +33,7 @@
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/transaction_identifier.h" #include "distributed/transaction_identifier.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "distributed/worker_manager.h"
#include "nodes/execnodes.h" #include "nodes/execnodes.h"
#include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */ #include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */
#include "replication/walsender.h" #include "replication/walsender.h"
@ -47,6 +48,7 @@
#define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();" #define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();"
#define ACTIVE_TRANSACTION_COLUMN_COUNT 7 #define ACTIVE_TRANSACTION_COLUMN_COUNT 7
#define GLOBAL_PID_NODE_ID_MULTIPLIER 10000000000
/* /*
* Each backend's data reside in the shared memory * Each backend's data reside in the shared memory
@ -864,7 +866,7 @@ GenerateGlobalPID(void)
* node ids might cause overflow. But even for the applications that scale around 50 nodes every * node ids might cause overflow. But even for the applications that scale around 50 nodes every
* day it'd take about 100K years. So we are not worried. * day it'd take about 100K years. So we are not worried.
*/ */
return (((uint64) GetLocalNodeId()) * 10000000000) + getpid(); return (((uint64) GetLocalNodeId()) * GLOBAL_PID_NODE_ID_MULTIPLIER) + getpid();
} }
@ -907,6 +909,42 @@ ExtractGlobalPID(char *applicationName)
} }
/*
* ExtractNodeIdFromGlobalPID extracts the node id from the global pid.
* Global pid is constructed by multiplying node id with GLOBAL_PID_NODE_ID_MULTIPLIER
* and adding process id. So integer division of global pid by GLOBAL_PID_NODE_ID_MULTIPLIER
* gives us the node id.
*/
int
ExtractNodeIdFromGlobalPID(uint64 globalPID)
{
int nodeId = (int) (globalPID / GLOBAL_PID_NODE_ID_MULTIPLIER);
if (nodeId == GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA)
{
ereport(ERROR, (errmsg("originator node of the query with the global pid "
"%lu is not in Citus' metadata", globalPID),
errhint("connect to the node directly run pg_cancel_backend(pid) "
"or pg_terminate_backend(pid)")));
}
return nodeId;
}
/*
* ExtractProcessIdFromGlobalPID extracts the process id from the global pid.
* Global pid is constructed by multiplying node id with GLOBAL_PID_NODE_ID_MULTIPLIER
* and adding process id. So global pid mod GLOBAL_PID_NODE_ID_MULTIPLIER gives us the
* process id.
*/
int
ExtractProcessIdFromGlobalPID(uint64 globalPID)
{
return (int) (globalPID % GLOBAL_PID_NODE_ID_MULTIPLIER);
}
/* /*
* CurrentDistributedTransactionNumber returns the transaction number of the * CurrentDistributedTransactionNumber returns the transaction number of the
* current distributed transaction. The caller must make sure a distributed * current distributed transaction. The caller must make sure a distributed

View File

@ -68,6 +68,8 @@ extern void MarkCitusInitiatedCoordinatorBackend(void);
extern void AssignGlobalPID(void); extern void AssignGlobalPID(void);
extern uint64 GetGlobalPID(void); extern uint64 GetGlobalPID(void);
extern uint64 ExtractGlobalPID(char *applicationName); extern uint64 ExtractGlobalPID(char *applicationName);
extern int ExtractNodeIdFromGlobalPID(uint64 globalPID);
extern int ExtractProcessIdFromGlobalPID(uint64 globalPID);
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern void CancelTransactionDueToDeadlock(PGPROC *proc);
extern bool MyBackendGotCancelledDueToDeadlock(bool clearState); extern bool MyBackendGotCancelledDueToDeadlock(bool clearState);
@ -78,6 +80,11 @@ extern int GetAllActiveClientBackendCount(void);
extern void IncrementClientBackendCounter(void); extern void IncrementClientBackendCounter(void);
extern void DecrementClientBackendCounter(void); extern void DecrementClientBackendCounter(void);
extern bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort,
char *queryString, StringInfo queryResultString,
bool reportResultError);
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0 #define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
#endif /* BACKEND_DATA_H */ #endif /* BACKEND_DATA_H */

View File

@ -86,6 +86,7 @@ extern List * ActiveReadableNodeList(void);
extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
extern WorkerNode * FindNodeWithNodeId(int nodeId);
extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern List * ReadDistNode(bool includeNodesFromOtherClusters);
extern void EnsureCoordinator(void); extern void EnsureCoordinator(void);
extern void InsertCoordinatorIfClusterEmpty(void); extern void InsertCoordinatorIfClusterEmpty(void);

View File

@ -256,3 +256,7 @@ s/CREATE TABLESPACE test_tablespace LOCATION.*/CREATE TABLESPACE test_tablespace
s/(.*absolute correlation \()([0,1]\.[0-9]+)(\) of var attribute [0-9]+ is smaller than.*)/\1X\.YZ\3/g s/(.*absolute correlation \()([0,1]\.[0-9]+)(\) of var attribute [0-9]+ is smaller than.*)/\1X\.YZ\3/g
s/NOTICE: issuing WITH placement_data\(shardid, shardstate, shardlength, groupid, placementid\) AS \(VALUES \([0-9]+, [0-9]+, [0-9]+, [0-9]+, [0-9]+\)\)/NOTICE: issuing WITH placement_data\(shardid, shardstate, shardlength, groupid, placementid\) AS \(VALUES \(xxxxxx, xxxxxx, xxxxxx, xxxxxx, xxxxxx\)\)/g s/NOTICE: issuing WITH placement_data\(shardid, shardstate, shardlength, groupid, placementid\) AS \(VALUES \([0-9]+, [0-9]+, [0-9]+, [0-9]+, [0-9]+\)\)/NOTICE: issuing WITH placement_data\(shardid, shardstate, shardlength, groupid, placementid\) AS \(VALUES \(xxxxxx, xxxxxx, xxxxxx, xxxxxx, xxxxxx\)\)/g
# global_pid when pg_cancel_backend is sent to workers
s/pg_cancel_backend\('[0-9]+'::bigint\)/pg_cancel_backend('xxxxx'::bigint)/g
s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_backend(xxxxx::integer)/g

View File

@ -239,7 +239,7 @@ ERROR: node group 0 does not have a secondary node
-- should work this time -- should work this time
\c -reuse-previous=off regression - - :master_port \c -reuse-previous=off regression - - :master_port
SET search_path TO single_node; SET search_path TO single_node;
SELECT 1 FROM master_add_node('localhost', :follower_master_port, groupid => 0, noderole => 'secondary'); SELECT 1 FROM master_add_node('localhost', :follower_master_port, groupid => 0, noderole => 'secondary', nodecluster => 'second-cluster');
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -251,7 +251,7 @@ SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhavesha
1 1
(1 row) (1 row)
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always'" \c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
SET search_path TO single_node; SET search_path TO single_node;
SELECT * FROM test WHERE x = 1; SELECT * FROM test WHERE x = 1;
x | y x | y
@ -387,6 +387,40 @@ SELECT * FROM columnar_test ORDER BY 1,2;
1 | 8 1 | 8
(6 rows) (6 rows)
\c -reuse-previous=off regression - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path TO single_node;
CREATE TABLE dist_table (a INT, b INT);
SELECT create_distributed_table ('dist_table', 'a', shard_count:=4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_table VALUES (1, 1);
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
SET search_path TO single_node;
SELECT * FROM dist_table;
a | b
---------------------------------------------------------------------
1 | 1
(1 row)
SELECT global_pid AS follower_coordinator_gpid FROM get_all_active_transactions() WHERE process_id = pg_backend_pid() \gset
SELECT pg_typeof(:follower_coordinator_gpid);
pg_typeof
---------------------------------------------------------------------
bigint
(1 row)
SELECT pg_cancel_backend(:follower_coordinator_gpid);
ERROR: canceling statement due to user request
SET citus.log_remote_commands TO ON;
SELECT pg_cancel_backend(:follower_coordinator_gpid) FROM dist_table WHERE a = 1;
NOTICE: executing the command locally: SELECT pg_cancel_backend('xxxxx'::bigint) AS pg_cancel_backend FROM single_node.dist_table_102008 dist_table WHERE (a OPERATOR(pg_catalog.=) 1)
NOTICE: issuing SELECT pg_cancel_backend(xxxxx::integer)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
ERROR: canceling statement due to user request
-- Cleanup -- Cleanup
\c -reuse-previous=off regression - - :master_port \c -reuse-previous=off regression - - :master_port
SET search_path TO single_node; SET search_path TO single_node;

View File

@ -0,0 +1,83 @@
CREATE SCHEMA global_cancel;
SET search_path TO global_cancel;
SET citus.next_shard_id TO 56789000;
CREATE TABLE dist_table (a INT, b INT);
SELECT create_distributed_table ('dist_table', 'a', shard_count:=4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_table VALUES (1, 1);
SELECT global_pid AS coordinator_gpid FROM get_all_active_transactions() WHERE process_id = pg_backend_pid() \gset
SELECT pg_typeof(:coordinator_gpid);
pg_typeof
---------------------------------------------------------------------
bigint
(1 row)
SELECT pg_cancel_backend(:coordinator_gpid);
ERROR: canceling statement due to user request
SET citus.log_remote_commands TO ON;
SELECT pg_cancel_backend(:coordinator_gpid) FROM dist_table WHERE a = 1;
NOTICE: issuing SELECT pg_cancel_backend('xxxxx'::bigint) AS pg_cancel_backend FROM global_cancel.dist_table_56789000 dist_table WHERE (a OPERATOR(pg_catalog.=) 1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
ERROR: canceling statement due to user request
BEGIN;
SELECT pg_cancel_backend(:coordinator_gpid) FROM dist_table WHERE a = 1;
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_cancel_backend('xxxxx'::bigint) AS pg_cancel_backend FROM global_cancel.dist_table_56789000 dist_table WHERE (a OPERATOR(pg_catalog.=) 1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
ERROR: canceling statement due to user request
END;
SET citus.log_remote_commands TO OFF;
SELECT global_pid AS maintenance_daemon_gpid
FROM pg_stat_activity psa JOIN get_all_active_transactions() gaat ON psa.pid = gaat.process_id
WHERE application_name = 'Citus Maintenance Daemon' \gset
SET client_min_messages TO ERROR;
CREATE USER global_cancel_user;
SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user');
?column?
---------------------------------------------------------------------
1
1
(2 rows)
RESET client_min_messages;
SET ROLE global_cancel_user;
SELECT pg_typeof(:maintenance_daemon_gpid);
pg_typeof
---------------------------------------------------------------------
bigint
(1 row)
SELECT pg_cancel_backend(:maintenance_daemon_gpid);
ERROR: must be a superuser to cancel superuser query
CONTEXT: while executing command on localhost:xxxxx
SELECT pg_terminate_backend(:maintenance_daemon_gpid);
ERROR: must be a superuser to terminate superuser process
CONTEXT: while executing command on localhost:xxxxx
RESET ROLE;
SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset
SET client_min_messages TO DEBUG;
-- 10000000000 is the node id multiplier for global pid
SELECT pg_cancel_backend(10000000000 * :coordinator_node_id + 0);
DEBUG: PID 0 is not a PostgreSQL server process
DETAIL: from localhost:xxxxx
pg_cancel_backend
---------------------------------------------------------------------
f
(1 row)
SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0);
DEBUG: PID 0 is not a PostgreSQL server process
DETAIL: from localhost:xxxxx
pg_terminate_backend
---------------------------------------------------------------------
f
(1 row)
RESET client_min_messages;
DROP SCHEMA global_cancel CASCADE;
NOTICE: drop cascades to table dist_table

View File

@ -1016,10 +1016,12 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_shard_indexes_on_worker() SETOF record | function citus_shard_indexes_on_worker() SETOF record
| function citus_shards_on_worker() SETOF record | function citus_shards_on_worker() SETOF record
| function create_distributed_function(regprocedure,text,text,boolean) void | function create_distributed_function(regprocedure,text,text,boolean) void
| function pg_cancel_backend(bigint) boolean
| function pg_terminate_backend(bigint,bigint) boolean
| function worker_create_or_replace_object(text[]) boolean | function worker_create_or_replace_object(text[]) boolean
| function worker_drop_sequence_dependency(text) void | function worker_drop_sequence_dependency(text) void
| function worker_drop_shell_table(text) void | function worker_drop_shell_table(text) void
(16 rows) (18 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -172,6 +172,8 @@ ORDER BY 1;
function master_update_shard_statistics(bigint) function master_update_shard_statistics(bigint)
function master_update_table_statistics(regclass) function master_update_table_statistics(regclass)
function notify_constraint_dropped() function notify_constraint_dropped()
function pg_cancel_backend(bigint)
function pg_terminate_backend(bigint, bigint)
function poolinfo_valid(text) function poolinfo_valid(text)
function read_intermediate_result(text,citus_copy_format) function read_intermediate_result(text,citus_copy_format)
function read_intermediate_results(text[],citus_copy_format) function read_intermediate_results(text[],citus_copy_format)
@ -268,5 +270,5 @@ ORDER BY 1;
view citus_worker_stat_activity view citus_worker_stat_activity
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(252 rows) (254 rows)

View File

@ -282,6 +282,7 @@ test: create_citus_local_table_cascade
test: fkeys_between_local_ref test: fkeys_between_local_ref
test: auto_undist_citus_local test: auto_undist_citus_local
test: mx_regular_user test: mx_regular_user
test: global_cancel
test: remove_coordinator test: remove_coordinator
# ---------- # ----------

View File

@ -107,10 +107,10 @@ SELECT * FROM test WHERE x = 1;
\c -reuse-previous=off regression - - :master_port \c -reuse-previous=off regression - - :master_port
SET search_path TO single_node; SET search_path TO single_node;
SELECT 1 FROM master_add_node('localhost', :follower_master_port, groupid => 0, noderole => 'secondary'); SELECT 1 FROM master_add_node('localhost', :follower_master_port, groupid => 0, noderole => 'secondary', nodecluster => 'second-cluster');
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always'" \c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
SET search_path TO single_node; SET search_path TO single_node;
SELECT * FROM test WHERE x = 1; SELECT * FROM test WHERE x = 1;
@ -169,6 +169,29 @@ INSERT INTO columnar_test(a, b) VALUES (1, 8);
\c - - - :follower_master_port \c - - - :follower_master_port
SELECT * FROM columnar_test ORDER BY 1,2; SELECT * FROM columnar_test ORDER BY 1,2;
\c -reuse-previous=off regression - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path TO single_node;
CREATE TABLE dist_table (a INT, b INT);
SELECT create_distributed_table ('dist_table', 'a', shard_count:=4);
INSERT INTO dist_table VALUES (1, 1);
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
SET search_path TO single_node;
SELECT * FROM dist_table;
SELECT global_pid AS follower_coordinator_gpid FROM get_all_active_transactions() WHERE process_id = pg_backend_pid() \gset
SELECT pg_typeof(:follower_coordinator_gpid);
SELECT pg_cancel_backend(:follower_coordinator_gpid);
SET citus.log_remote_commands TO ON;
SELECT pg_cancel_backend(:follower_coordinator_gpid) FROM dist_table WHERE a = 1;
-- Cleanup -- Cleanup
\c -reuse-previous=off regression - - :master_port \c -reuse-previous=off regression - - :master_port
SET search_path TO single_node; SET search_path TO single_node;

View File

@ -0,0 +1,51 @@
CREATE SCHEMA global_cancel;
SET search_path TO global_cancel;
SET citus.next_shard_id TO 56789000;
CREATE TABLE dist_table (a INT, b INT);
SELECT create_distributed_table ('dist_table', 'a', shard_count:=4);
INSERT INTO dist_table VALUES (1, 1);
SELECT global_pid AS coordinator_gpid FROM get_all_active_transactions() WHERE process_id = pg_backend_pid() \gset
SELECT pg_typeof(:coordinator_gpid);
SELECT pg_cancel_backend(:coordinator_gpid);
SET citus.log_remote_commands TO ON;
SELECT pg_cancel_backend(:coordinator_gpid) FROM dist_table WHERE a = 1;
BEGIN;
SELECT pg_cancel_backend(:coordinator_gpid) FROM dist_table WHERE a = 1;
END;
SET citus.log_remote_commands TO OFF;
SELECT global_pid AS maintenance_daemon_gpid
FROM pg_stat_activity psa JOIN get_all_active_transactions() gaat ON psa.pid = gaat.process_id
WHERE application_name = 'Citus Maintenance Daemon' \gset
SET client_min_messages TO ERROR;
CREATE USER global_cancel_user;
SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user');
RESET client_min_messages;
SET ROLE global_cancel_user;
SELECT pg_typeof(:maintenance_daemon_gpid);
SELECT pg_cancel_backend(:maintenance_daemon_gpid);
SELECT pg_terminate_backend(:maintenance_daemon_gpid);
RESET ROLE;
SELECT nodeid AS coordinator_node_id FROM pg_dist_node WHERE nodeport = :master_port \gset
SET client_min_messages TO DEBUG;
-- 10000000000 is the node id multiplier for global pid
SELECT pg_cancel_backend(10000000000 * :coordinator_node_id + 0);
SELECT pg_terminate_backend(10000000000 * :coordinator_node_id + 0);
RESET client_min_messages;
DROP SCHEMA global_cancel CASCADE;