Use the optional APIs introduced for dump global/local wait edges

Do not change any behavior, simply use them. In the follow-up commits,
we'll use the API with different parameters.
add_worker_query_again
Onder Kalaci 2022-02-09 16:24:53 +01:00
parent 32b5cf257c
commit d4b956c7f2
11 changed files with 109 additions and 81 deletions

View File

@ -113,6 +113,40 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int
RESET search_path; RESET search_path;
DROP FUNCTION dump_local_wait_edges CASCADE;
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges()
IS 'returns all local lock wait chains, that start from distributed transactions';
DROP FUNCTION dump_global_wait_edges CASCADE;
CREATE FUNCTION pg_catalog.dump_global_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE 'c' STRICT
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges()
IS 'returns a global list of blocked transactions originating from this node';
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
@ -208,39 +242,4 @@ SELECT * FROM pg_catalog.citus_worker_stat_activity();
ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC;
DROP FUNCTION dump_global_wait_edges;
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges()
IS 'returns all local lock wait chains, that start from distributed transactions';
DROP FUNCTION dump_global_wait_edges;
CREATE FUNCTION pg_catalog.dump_global_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE 'c' STRICT
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges()
IS 'returns a global list of blocked transactions originating from this node';
RESET search_path; RESET search_path;

View File

@ -1,10 +1,12 @@
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
distributed_tx_only boolean DEFAULT false, distributed_tx_only boolean DEFAULT true,
OUT waiting_global_pid int8,
OUT waiting_pid int4, OUT waiting_pid int4,
OUT waiting_node_id int4, OUT waiting_node_id int4,
OUT waiting_transaction_num int8, OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz, OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4, OUT blocking_pid int4,
OUT blocking_node_id int4, OUT blocking_node_id int4,
OUT blocking_transaction_num int8, OUT blocking_transaction_num int8,

View File

@ -1,10 +1,12 @@
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
distributed_tx_only boolean DEFAULT false, distributed_tx_only boolean DEFAULT true,
OUT waiting_global_pid int8,
OUT waiting_pid int4, OUT waiting_pid int4,
OUT waiting_node_id int4, OUT waiting_node_id int4,
OUT waiting_transaction_num int8, OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz, OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4, OUT blocking_pid int4,
OUT blocking_node_id int4, OUT blocking_node_id int4,
OUT blocking_transaction_num int8, OUT blocking_transaction_num int8,

View File

@ -1,10 +1,12 @@
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
distributed_tx_only boolean DEFAULT false, distributed_tx_only boolean DEFAULT true,
OUT waiting_global_pid int8,
OUT waiting_pid int4, OUT waiting_pid int4,
OUT waiting_node_id int4, OUT waiting_node_id int4,
OUT waiting_transaction_num int8, OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz, OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4, OUT blocking_pid int4,
OUT blocking_node_id int4, OUT blocking_node_id int4,
OUT blocking_transaction_num int8, OUT blocking_transaction_num int8,

View File

@ -1,10 +1,12 @@
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
distributed_tx_only boolean DEFAULT false, distributed_tx_only boolean DEFAULT true,
OUT waiting_global_pid int8,
OUT waiting_pid int4, OUT waiting_pid int4,
OUT waiting_node_id int4, OUT waiting_node_id int4,
OUT waiting_transaction_num int8, OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz, OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4, OUT blocking_pid int4,
OUT blocking_node_id int4, OUT blocking_node_id int4,
OUT blocking_transaction_num int8, OUT blocking_transaction_num int8,

View File

@ -51,6 +51,7 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
/* distributed deadlock detection only considers distributed txs */
bool onlyDistributedTx = true; bool onlyDistributedTx = true;
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph); HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph);

View File

@ -119,6 +119,7 @@ CheckForDistributedDeadlocks(void)
return false; return false;
} }
/* distributed deadlock detection only considers distributed txs */
bool onlyDistributedTx = true; bool onlyDistributedTx = true;
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph); HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph);

View File

@ -74,7 +74,7 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges);
Datum Datum
dump_global_wait_edges(PG_FUNCTION_ARGS) dump_global_wait_edges(PG_FUNCTION_ARGS)
{ {
uint64 onlyDistributedTx = PG_GETARG_BOOL(0); bool onlyDistributedTx = PG_GETARG_BOOL(0);
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
@ -133,9 +133,14 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList) foreach_ptr(connection, connectionList)
{ {
const char *command = "SELECT * FROM dump_local_wait_edges()"; StringInfo queryString = makeStringInfo();
const char *onlyDistributedTxStr = onlyDistributedTx ? "true" : "false";
int querySent = SendRemoteCommand(connection, command); appendStringInfo(queryString,
"SELECT * FROM dump_local_wait_edges(%s)",
onlyDistributedTxStr);
int querySent = SendRemoteCommand(connection, queryString->data);
if (querySent == 0) if (querySent == 0)
{ {
ReportConnectionError(connection, WARNING); ReportConnectionError(connection, WARNING);
@ -157,7 +162,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
int64 rowCount = PQntuples(result); int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result); int64 colCount = PQnfields(result);
if (colCount != 9) if (colCount != 11)
{ {
ereport(WARNING, (errmsg("unexpected number of columns from " ereport(WARNING, (errmsg("unexpected number of columns from "
"dump_local_wait_edges"))); "dump_local_wait_edges")));
@ -186,15 +191,17 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
{ {
WaitEdge *waitEdge = AllocWaitEdge(waitGraph); WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0);
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); waitEdge->waitingPid = ParseIntField(result, rowIndex, 1);
waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2);
waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 3);
waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 4);
waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); waitEdge->blockingGPid = ParseIntField(result, rowIndex, 5);
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); waitEdge->blockingPid = ParseIntField(result, rowIndex, 6);
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 7);
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 8);
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 9);
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 10);
} }
@ -265,7 +272,7 @@ ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex)
Datum Datum
dump_local_wait_edges(PG_FUNCTION_ARGS) dump_local_wait_edges(PG_FUNCTION_ARGS)
{ {
uint64 onlyDistributedTx = PG_GETARG_BOOL(0); bool onlyDistributedTx = PG_GETARG_BOOL(0);
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
ReturnWaitGraph(waitGraph, fcinfo); ReturnWaitGraph(waitGraph, fcinfo);
@ -285,51 +292,55 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
/* /*
* Columns: * Columns:
* 00: waiting_pid * 00: waiting_global_pid
* 01: waiting_node_id * 01: waiting_pid
* 02: waiting_transaction_num * 02: waiting_node_id
* 03: waiting_transaction_stamp * 03: waiting_transaction_num
* 04: blocking_pid * 04: waiting_transaction_stamp
* 05: blocking__node_id * 05: blocking_global_pid
* 06: blocking_transaction_num * 06: blocking_pid
* 07: blocking_transaction_stamp * 07: blocking__node_id
* 08: blocking_transaction_waiting * 08: blocking_transaction_num
* 09: blocking_transaction_stamp
* 10: blocking_transaction_waiting
*/ */
for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++) for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++)
{ {
Datum values[9]; Datum values[11];
bool nulls[9]; bool nulls[11];
WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; WaitEdge *curEdge = &waitGraph->edges[curEdgeNum];
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls)); memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(curEdge->waitingPid); values[0] = UInt64GetDatum(curEdge->waitingGPid);
values[1] = Int32GetDatum(curEdge->waitingNodeId); values[1] = Int32GetDatum(curEdge->waitingPid);
values[2] = Int32GetDatum(curEdge->waitingNodeId);
if (curEdge->waitingTransactionNum != 0) if (curEdge->waitingTransactionNum != 0)
{ {
values[2] = Int64GetDatum(curEdge->waitingTransactionNum); values[3] = Int64GetDatum(curEdge->waitingTransactionNum);
values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); values[4] = TimestampTzGetDatum(curEdge->waitingTransactionStamp);
} }
else else
{ {
nulls[2] = true;
nulls[3] = true; nulls[3] = true;
nulls[4] = true;
} }
values[4] = Int32GetDatum(curEdge->blockingPid); values[5] = UInt64GetDatum(curEdge->blockingGPid);
values[5] = Int32GetDatum(curEdge->blockingNodeId); values[6] = Int32GetDatum(curEdge->blockingPid);
values[7] = Int32GetDatum(curEdge->blockingNodeId);
if (curEdge->blockingTransactionNum != 0) if (curEdge->blockingTransactionNum != 0)
{ {
values[6] = Int64GetDatum(curEdge->blockingTransactionNum); values[8] = Int64GetDatum(curEdge->blockingTransactionNum);
values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); values[9] = TimestampTzGetDatum(curEdge->blockingTransactionStamp);
} }
else else
{ {
nulls[6] = true; nulls[8] = true;
nulls[7] = true; nulls[9] = true;
} }
values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); values[10] = BoolGetDatum(curEdge->isBlockingXactWaiting);
tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
} }
@ -644,6 +655,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
} }
curEdge->waitingPid = waitingProc->pid; curEdge->waitingPid = waitingProc->pid;
curEdge->waitingGPid = waitingBackendData.globalPID;
if (IsInDistributedTransaction(&waitingBackendData)) if (IsInDistributedTransaction(&waitingBackendData))
{ {
@ -662,6 +674,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
} }
curEdge->blockingPid = blockingProc->pid; curEdge->blockingPid = blockingProc->pid;
curEdge->blockingGPid = blockingBackendData.globalPID;
if (IsInDistributedTransaction(&blockingBackendData)) if (IsInDistributedTransaction(&blockingBackendData))
{ {

View File

@ -31,11 +31,13 @@
*/ */
typedef struct WaitEdge typedef struct WaitEdge
{ {
uint64 waitingGPid;
int waitingPid; int waitingPid;
int waitingNodeId; int waitingNodeId;
int64 waitingTransactionNum; int64 waitingTransactionNum;
TimestampTz waitingTransactionStamp; TimestampTz waitingTransactionStamp;
uint64 blockingGPid;
int blockingPid; int blockingPid;
int blockingNodeId; int blockingNodeId;
int64 blockingTransactionNum; int64 blockingTransactionNum;

View File

@ -1000,6 +1000,8 @@ SELECT * FROM multi_extension.print_extension_changes();
--------------------------------------------------------------------- ---------------------------------------------------------------------
function citus_disable_node(text,integer) void | function citus_disable_node(text,integer) void |
function create_distributed_function(regprocedure,text,text) void | function create_distributed_function(regprocedure,text,text) void |
function dump_global_wait_edges() SETOF record |
function dump_local_wait_edges() SETOF record |
function master_append_table_to_shard(bigint,text,text,integer) real | function master_append_table_to_shard(bigint,text,text,integer) real |
function master_apply_delete_command(text) integer | function master_apply_delete_command(text) integer |
function master_get_table_metadata(text) record | function master_get_table_metadata(text) record |
@ -1011,9 +1013,11 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_shard_indexes_on_worker() SETOF record | function citus_shard_indexes_on_worker() SETOF record
| function citus_shards_on_worker() SETOF record | function citus_shards_on_worker() SETOF record
| function create_distributed_function(regprocedure,text,text,boolean) void | function create_distributed_function(regprocedure,text,text,boolean) void
| function dump_global_wait_edges(boolean) SETOF record
| function dump_local_wait_edges(boolean) SETOF record
| function worker_drop_sequence_dependency(text) void | function worker_drop_sequence_dependency(text) void
| function worker_drop_shell_table(text) void | function worker_drop_shell_table(text) void
(15 rows) (19 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -127,8 +127,8 @@ ORDER BY 1;
function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone) function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone)
function distributed_tables_colocated(regclass,regclass) function distributed_tables_colocated(regclass,regclass)
function drop_old_time_partitions(regclass,timestamp with time zone) function drop_old_time_partitions(regclass,timestamp with time zone)
function dump_global_wait_edges() function dump_global_wait_edges(boolean)
function dump_local_wait_edges() function dump_local_wait_edges(boolean)
function fetch_intermediate_results(text[],text,integer) function fetch_intermediate_results(text[],text,integer)
function fix_all_partition_shard_index_names() function fix_all_partition_shard_index_names()
function fix_partition_shard_index_names(regclass) function fix_partition_shard_index_names(regclass)