diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 690e79f3f..dec092d9c 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -113,6 +113,40 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int RESET search_path; +DROP FUNCTION dump_local_wait_edges CASCADE; +CREATE FUNCTION pg_catalog.dump_local_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() +IS 'returns all local lock wait chains, that start from distributed transactions'; + +DROP FUNCTION dump_global_wait_edges CASCADE; +CREATE FUNCTION pg_catalog.dump_global_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE 'c' STRICT +AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() +IS 'returns a global list of blocked transactions originating from this node'; + DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, @@ -208,39 +242,4 @@ SELECT * FROM pg_catalog.citus_worker_stat_activity(); ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; -DROP FUNCTION dump_global_wait_edges; -CREATE FUNCTION pg_catalog.dump_local_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() -IS 'returns all local lock wait chains, that start from distributed transactions'; - -DROP FUNCTION dump_global_wait_edges; -CREATE FUNCTION pg_catalog.dump_global_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE 'c' STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() -IS 'returns a global list of blocked transactions originating from this node'; - - RESET search_path; diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql index 950d08ccc..32e25e217 100644 --- a/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql @@ -1,10 +1,12 @@ DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( - distributed_tx_only boolean DEFAULT false, + distributed_tx_only boolean DEFAULT true, + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql index 950d08ccc..32e25e217 100644 --- a/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql +++ b/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql @@ -1,10 +1,12 @@ DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( - distributed_tx_only boolean DEFAULT false, + distributed_tx_only boolean DEFAULT true, + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql index 705210f9d..190e4c0d5 100644 --- a/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql @@ -1,10 +1,12 @@ DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( - distributed_tx_only boolean DEFAULT false, + distributed_tx_only boolean DEFAULT true, + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, @@ -14,4 +16,4 @@ RETURNS SETOF RECORD LANGUAGE C STRICT AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) -IS 'returns all local lock wait chains, that start from any citus backend'; \ No newline at end of file +IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql index 705210f9d..190e4c0d5 100644 --- a/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql +++ b/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql @@ -1,10 +1,12 @@ DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( - distributed_tx_only boolean DEFAULT false, + distributed_tx_only boolean DEFAULT true, + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, @@ -14,4 +16,4 @@ RETURNS SETOF RECORD LANGUAGE C STRICT AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) -IS 'returns all local lock wait chains, that start from any citus backend'; \ No newline at end of file +IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/test/distributed_deadlock_detection.c b/src/backend/distributed/test/distributed_deadlock_detection.c index 2a66ad2e0..d3fa34db2 100644 --- a/src/backend/distributed/test/distributed_deadlock_detection.c +++ b/src/backend/distributed/test/distributed_deadlock_detection.c @@ -51,6 +51,7 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + /* distributed deadlock detection only considers distributed txs */ bool onlyDistributedTx = true; WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph); diff --git a/src/backend/distributed/transaction/distributed_deadlock_detection.c b/src/backend/distributed/transaction/distributed_deadlock_detection.c index 60a8519fe..82c274661 100644 --- a/src/backend/distributed/transaction/distributed_deadlock_detection.c +++ b/src/backend/distributed/transaction/distributed_deadlock_detection.c @@ -119,6 +119,7 @@ CheckForDistributedDeadlocks(void) return false; } + /* distributed deadlock detection only considers distributed txs */ bool onlyDistributedTx = true; WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph); diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 33dc7bbcf..18f391d74 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -74,7 +74,7 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges); Datum dump_global_wait_edges(PG_FUNCTION_ARGS) { - uint64 onlyDistributedTx = PG_GETARG_BOOL(0); + bool onlyDistributedTx = PG_GETARG_BOOL(0); WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); @@ -133,9 +133,14 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) MultiConnection *connection = NULL; foreach_ptr(connection, connectionList) { - const char *command = "SELECT * FROM dump_local_wait_edges()"; + StringInfo queryString = makeStringInfo(); + const char *onlyDistributedTxStr = onlyDistributedTx ? "true" : "false"; - int querySent = SendRemoteCommand(connection, command); + appendStringInfo(queryString, + "SELECT * FROM dump_local_wait_edges(%s)", + onlyDistributedTxStr); + + int querySent = SendRemoteCommand(connection, queryString->data); if (querySent == 0) { ReportConnectionError(connection, WARNING); @@ -157,7 +162,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); - if (colCount != 9) + if (colCount != 11) { ereport(WARNING, (errmsg("unexpected number of columns from " "dump_local_wait_edges"))); @@ -186,15 +191,17 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) { WaitEdge *waitEdge = AllocWaitEdge(waitGraph); - waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); - waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); - waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); - waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); - waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); - waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); - waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); - waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); - waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); + waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0); + waitEdge->waitingPid = ParseIntField(result, rowIndex, 1); + waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2); + waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 3); + waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 4); + waitEdge->blockingGPid = ParseIntField(result, rowIndex, 5); + waitEdge->blockingPid = ParseIntField(result, rowIndex, 6); + waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 7); + waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 8); + waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 9); + waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 10); } @@ -265,7 +272,7 @@ ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex) Datum dump_local_wait_edges(PG_FUNCTION_ARGS) { - uint64 onlyDistributedTx = PG_GETARG_BOOL(0); + bool onlyDistributedTx = PG_GETARG_BOOL(0); WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); ReturnWaitGraph(waitGraph, fcinfo); @@ -285,51 +292,55 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) /* * Columns: - * 00: waiting_pid - * 01: waiting_node_id - * 02: waiting_transaction_num - * 03: waiting_transaction_stamp - * 04: blocking_pid - * 05: blocking__node_id - * 06: blocking_transaction_num - * 07: blocking_transaction_stamp - * 08: blocking_transaction_waiting + * 00: waiting_global_pid + * 01: waiting_pid + * 02: waiting_node_id + * 03: waiting_transaction_num + * 04: waiting_transaction_stamp + * 05: blocking_global_pid + * 06: blocking_pid + * 07: blocking__node_id + * 08: blocking_transaction_num + * 09: blocking_transaction_stamp + * 10: blocking_transaction_waiting */ for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++) { - Datum values[9]; - bool nulls[9]; + Datum values[11]; + bool nulls[11]; WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); - values[0] = Int32GetDatum(curEdge->waitingPid); - values[1] = Int32GetDatum(curEdge->waitingNodeId); + values[0] = UInt64GetDatum(curEdge->waitingGPid); + values[1] = Int32GetDatum(curEdge->waitingPid); + values[2] = Int32GetDatum(curEdge->waitingNodeId); if (curEdge->waitingTransactionNum != 0) { - values[2] = Int64GetDatum(curEdge->waitingTransactionNum); - values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); + values[3] = Int64GetDatum(curEdge->waitingTransactionNum); + values[4] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); } else { - nulls[2] = true; nulls[3] = true; + nulls[4] = true; } - values[4] = Int32GetDatum(curEdge->blockingPid); - values[5] = Int32GetDatum(curEdge->blockingNodeId); + values[5] = UInt64GetDatum(curEdge->blockingGPid); + values[6] = Int32GetDatum(curEdge->blockingPid); + values[7] = Int32GetDatum(curEdge->blockingNodeId); if (curEdge->blockingTransactionNum != 0) { - values[6] = Int64GetDatum(curEdge->blockingTransactionNum); - values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); + values[8] = Int64GetDatum(curEdge->blockingTransactionNum); + values[9] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); } else { - nulls[6] = true; - nulls[7] = true; + nulls[8] = true; + nulls[9] = true; } - values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); + values[10] = BoolGetDatum(curEdge->isBlockingXactWaiting); tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); } @@ -644,6 +655,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc, } curEdge->waitingPid = waitingProc->pid; + curEdge->waitingGPid = waitingBackendData.globalPID; if (IsInDistributedTransaction(&waitingBackendData)) { @@ -662,6 +674,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc, } curEdge->blockingPid = blockingProc->pid; + curEdge->blockingGPid = blockingBackendData.globalPID; if (IsInDistributedTransaction(&blockingBackendData)) { diff --git a/src/include/distributed/lock_graph.h b/src/include/distributed/lock_graph.h index 8668fe798..f204ebb03 100644 --- a/src/include/distributed/lock_graph.h +++ b/src/include/distributed/lock_graph.h @@ -31,11 +31,13 @@ */ typedef struct WaitEdge { + uint64 waitingGPid; int waitingPid; int waitingNodeId; int64 waitingTransactionNum; TimestampTz waitingTransactionStamp; + uint64 blockingGPid; int blockingPid; int blockingNodeId; int64 blockingTransactionNum; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 70dc4c2a0..8aad5297f 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1000,6 +1000,8 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- function citus_disable_node(text,integer) void | function create_distributed_function(regprocedure,text,text) void | + function dump_global_wait_edges() SETOF record | + function dump_local_wait_edges() SETOF record | function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | @@ -1011,9 +1013,11 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_shard_indexes_on_worker() SETOF record | function citus_shards_on_worker() SETOF record | function create_distributed_function(regprocedure,text,text,boolean) void + | function dump_global_wait_edges(boolean) SETOF record + | function dump_local_wait_edges(boolean) SETOF record | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void -(15 rows) +(19 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 86c121568..7e8f4635f 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -127,8 +127,8 @@ ORDER BY 1; function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone) function distributed_tables_colocated(regclass,regclass) function drop_old_time_partitions(regclass,timestamp with time zone) - function dump_global_wait_edges() - function dump_local_wait_edges() + function dump_global_wait_edges(boolean) + function dump_local_wait_edges(boolean) function fetch_intermediate_results(text[],text,integer) function fix_all_partition_shard_index_names() function fix_partition_shard_index_names(regclass)