Dumping wait edges becomes optionally scan all backends

Before this commit, dumping wait edges can only be used for
distributed deadlock detection purposes. With this commit,
we open the possibility that we can use it for any backend.
add_worker_query_again
Onder Kalaci 2022-02-09 15:48:48 +01:00
parent bf5aa1e223
commit 32b5cf257c
10 changed files with 136 additions and 10 deletions

View File

@ -17,6 +17,9 @@
#include "udfs/get_all_active_transactions/11.0-1.sql" #include "udfs/get_all_active_transactions/11.0-1.sql"
#include "udfs/get_global_active_transactions/11.0-1.sql" #include "udfs/get_global_active_transactions/11.0-1.sql"
#include "udfs/dump_local_wait_edges/11.0-1.sql"
#include "udfs/dump_global_wait_edges/11.0-1.sql"
#include "udfs/citus_worker_stat_activity/11.0-1.sql" #include "udfs/citus_worker_stat_activity/11.0-1.sql"
CREATE VIEW citus.citus_worker_stat_activity AS CREATE VIEW citus.citus_worker_stat_activity AS

View File

@ -208,4 +208,39 @@ SELECT * FROM pg_catalog.citus_worker_stat_activity();
ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC;
DROP FUNCTION dump_global_wait_edges;
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges()
IS 'returns all local lock wait chains, that start from distributed transactions';
DROP FUNCTION dump_global_wait_edges;
CREATE FUNCTION pg_catalog.dump_global_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE 'c' STRICT
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges()
IS 'returns a global list of blocked transactions originating from this node';
RESET search_path; RESET search_path;

View File

@ -0,0 +1,17 @@
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
distributed_tx_only boolean DEFAULT false,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool)
IS 'returns a global list of blocked backends originating from this node';

View File

@ -0,0 +1,17 @@
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
distributed_tx_only boolean DEFAULT false,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool)
IS 'returns a global list of blocked backends originating from this node';

View File

@ -0,0 +1,17 @@
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
distributed_tx_only boolean DEFAULT false,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool)
IS 'returns all local lock wait chains, that start from any citus backend';

View File

@ -0,0 +1,17 @@
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
distributed_tx_only boolean DEFAULT false,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool)
IS 'returns all local lock wait chains, that start from any citus backend';

View File

@ -50,7 +50,9 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
bool isNulls[2]; bool isNulls[2];
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
WaitGraph *waitGraph = BuildGlobalWaitGraph();
bool onlyDistributedTx = true;
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph); HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph);
/* iterate on all nodes */ /* iterate on all nodes */

View File

@ -119,7 +119,8 @@ CheckForDistributedDeadlocks(void)
return false; return false;
} }
WaitGraph *waitGraph = BuildGlobalWaitGraph(); bool onlyDistributedTx = true;
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph); HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph);
int edgeCount = waitGraph->edgeCount; int edgeCount = waitGraph->edgeCount;

View File

@ -47,7 +47,7 @@ typedef struct PROCStack
static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex);
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
static WaitGraph * BuildLocalWaitGraph(void); static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx);
static bool IsProcessWaitingForSafeOperations(PGPROC *proc); static bool IsProcessWaitingForSafeOperations(PGPROC *proc);
static void LockLockData(void); static void LockLockData(void);
static void UnlockLockData(void); static void UnlockLockData(void);
@ -74,7 +74,9 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges);
Datum Datum
dump_global_wait_edges(PG_FUNCTION_ARGS) dump_global_wait_edges(PG_FUNCTION_ARGS)
{ {
WaitGraph *waitGraph = BuildGlobalWaitGraph(); uint64 onlyDistributedTx = PG_GETARG_BOOL(0);
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
ReturnWaitGraph(waitGraph, fcinfo); ReturnWaitGraph(waitGraph, fcinfo);
@ -86,16 +88,23 @@ dump_global_wait_edges(PG_FUNCTION_ARGS)
* BuildGlobalWaitGraph builds a wait graph for distributed transactions * BuildGlobalWaitGraph builds a wait graph for distributed transactions
* that originate from this node, including edges from all (other) worker * that originate from this node, including edges from all (other) worker
* nodes. * nodes.
*
*
* If onlyDistributedTx is true, we only return distributed transactions
* (e.g., AssignDistributedTransaction() or assign_distributed_transactions())
* has been called for the process. Distributed deadlock detection only
* interested in these processes.
*/ */
WaitGraph * WaitGraph *
BuildGlobalWaitGraph(void) BuildGlobalWaitGraph(bool onlyDistributedTx)
{ {
List *workerNodeList = ActiveReadableNodeList(); List *workerNodeList = ActiveReadableNodeList();
char *nodeUser = CitusExtensionOwnerName(); char *nodeUser = CitusExtensionOwnerName();
List *connectionList = NIL; List *connectionList = NIL;
int32 localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
WaitGraph *waitGraph = BuildLocalWaitGraph(); /* deadlock detection is only interested in */
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
/* open connections in parallel */ /* open connections in parallel */
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
@ -256,7 +265,9 @@ ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex)
Datum Datum
dump_local_wait_edges(PG_FUNCTION_ARGS) dump_local_wait_edges(PG_FUNCTION_ARGS)
{ {
WaitGraph *waitGraph = BuildLocalWaitGraph(); uint64 onlyDistributedTx = PG_GETARG_BOOL(0);
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
ReturnWaitGraph(waitGraph, fcinfo); ReturnWaitGraph(waitGraph, fcinfo);
return (Datum) 0; return (Datum) 0;
@ -328,9 +339,14 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
/* /*
* BuildLocalWaitGraph builds a wait graph for distributed transactions * BuildLocalWaitGraph builds a wait graph for distributed transactions
* that originate from the local node. * that originate from the local node.
*
* If onlyDistributedTx is true, we only return distributed transactions
* (e.g., AssignDistributedTransaction() or assign_distributed_transactions())
* has been called for the process. Distributed deadlock detection only
* interested in these processes.
*/ */
static WaitGraph * static WaitGraph *
BuildLocalWaitGraph(void) BuildLocalWaitGraph(bool onlyDistributedTx)
{ {
PROCStack remaining; PROCStack remaining;
int totalProcs = TotalProcCount(); int totalProcs = TotalProcCount();
@ -379,7 +395,8 @@ BuildLocalWaitGraph(void)
* care about distributed transactions for the purpose of distributed * care about distributed transactions for the purpose of distributed
* deadlock detection. * deadlock detection.
*/ */
if (!IsInDistributedTransaction(&currentBackendData)) if (onlyDistributedTx &&
!IsInDistributedTransaction(&currentBackendData))
{ {
continue; continue;
} }

View File

@ -58,7 +58,7 @@ typedef struct WaitGraph
} WaitGraph; } WaitGraph;
extern WaitGraph * BuildGlobalWaitGraph(void); extern WaitGraph * BuildGlobalWaitGraph(bool onlyDistributedTx);
extern bool IsProcessWaitingForLock(PGPROC *proc); extern bool IsProcessWaitingForLock(PGPROC *proc);
extern bool IsInDistributedTransaction(BackendData *backendData); extern bool IsInDistributedTransaction(BackendData *backendData);
extern TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex); extern TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex);