diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index e65016b8c..feea26e7c 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -17,8 +17,8 @@ #include "udfs/get_all_active_transactions/11.0-1.sql" #include "udfs/get_global_active_transactions/11.0-1.sql" -#include "udfs/dump_local_wait_edges/11.0-1.sql" -#include "udfs/dump_global_wait_edges/11.0-1.sql" +#include "udfs/dump_local_blocked_processes/11.0-1.sql" +#include "udfs/dump_global_blocked_processes/11.0-1.sql" #include "udfs/citus_worker_stat_activity/11.0-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index db56725e1..49967ddf5 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -113,39 +113,8 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int RESET search_path; -DROP FUNCTION dump_local_wait_edges CASCADE; -CREATE FUNCTION pg_catalog.dump_local_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() -IS 'returns all local lock wait chains, that start from distributed transactions'; - -DROP FUNCTION dump_global_wait_edges CASCADE; -CREATE FUNCTION pg_catalog.dump_global_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE 'c' STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() -IS 'returns a global list of blocked transactions originating from this node'; +DROP FUNCTION dump_local_blocked_processes CASCADE; +DROP FUNCTION dump_global_blocked_processes CASCADE; DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; diff --git a/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql index 3f5c32099..c63d98ec9 100644 --- a/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql @@ -20,7 +20,7 @@ RETURNS int4[] AS $$ WITH activeTransactions AS ( SELECT global_pid FROM get_all_active_transactions() ), blockingTransactions AS ( - SELECT blocking_global_pid FROM dump_global_wait_edges(distributed_tx_only:=false) + SELECT blocking_global_pid FROM dump_global_blocked_processes() WHERE waiting_global_pid = mLocalGlobalPid ) SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions diff --git a/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql b/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql index 3f5c32099..c63d98ec9 100644 --- a/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql @@ -20,7 +20,7 @@ RETURNS int4[] AS $$ WITH activeTransactions AS ( SELECT global_pid FROM get_all_active_transactions() ), blockingTransactions AS ( - SELECT blocking_global_pid FROM dump_global_wait_edges(distributed_tx_only:=false) + SELECT blocking_global_pid FROM dump_global_blocked_processes() WHERE waiting_global_pid = mLocalGlobalPid ) SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions diff --git a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql index c97ac5ee5..94071a86f 100644 --- a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql @@ -24,7 +24,7 @@ RETURNS boolean AS $$ END IF; RETURN EXISTS ( - SELECT 1 FROM dump_global_wait_edges(distributed_tx_only:=false) + SELECT 1 FROM dump_global_blocked_processes() WHERE waiting_global_pid = mBlockedGlobalPid ) OR EXISTS ( -- Check on the workers if any logical replication job spawned by the diff --git a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql index c97ac5ee5..94071a86f 100644 --- a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql @@ -24,7 +24,7 @@ RETURNS boolean AS $$ END IF; RETURN EXISTS ( - SELECT 1 FROM dump_global_wait_edges(distributed_tx_only:=false) + SELECT 1 FROM dump_global_blocked_processes() WHERE waiting_global_pid = mBlockedGlobalPid ) OR EXISTS ( -- Check on the workers if any logical replication job spawned by the diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql index e10a47731..8b19fad18 100644 --- a/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql @@ -8,7 +8,7 @@ citus_dist_stat_activity AS ), unique_global_wait_edges AS ( - SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_wait_edges(distributed_tx_only:=false) + SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_blocked_processes() ), citus_dist_stat_activity_with_node_id AS ( diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql index e10a47731..8b19fad18 100644 --- a/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql @@ -8,7 +8,7 @@ citus_dist_stat_activity AS ), unique_global_wait_edges AS ( - SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_wait_edges(distributed_tx_only:=false) + SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_blocked_processes() ), citus_dist_stat_activity_with_node_id AS ( diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/dump_global_blocked_processes/11.0-1.sql similarity index 62% rename from src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql rename to src/backend/distributed/sql/udfs/dump_global_blocked_processes/11.0-1.sql index 32e25e217..753dfa606 100644 --- a/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/dump_global_blocked_processes/11.0-1.sql @@ -1,12 +1,10 @@ -DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.dump_global_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, - OUT blocking_global_pid int8, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$dump_global_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.dump_global_blocked_processes() IS 'returns a global list of blocked backends originating from this node'; diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/dump_global_blocked_processes/latest.sql similarity index 62% rename from src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql rename to src/backend/distributed/sql/udfs/dump_global_blocked_processes/latest.sql index 32e25e217..753dfa606 100644 --- a/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql +++ b/src/backend/distributed/sql/udfs/dump_global_blocked_processes/latest.sql @@ -1,12 +1,10 @@ -DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.dump_global_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, - OUT blocking_global_pid int8, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$dump_global_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.dump_global_blocked_processes() IS 'returns a global list of blocked backends originating from this node'; diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/dump_local_blocked_processes/11.0-1.sql similarity index 67% rename from src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql rename to src/backend/distributed/sql/udfs/dump_local_blocked_processes/11.0-1.sql index 190e4c0d5..4f800c726 100644 --- a/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/dump_local_blocked_processes/11.0-1.sql @@ -1,7 +1,5 @@ -DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.dump_local_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$dump_local_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.dump_local_blocked_processes() IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/dump_local_blocked_processes/latest.sql similarity index 67% rename from src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql rename to src/backend/distributed/sql/udfs/dump_local_blocked_processes/latest.sql index 190e4c0d5..4f800c726 100644 --- a/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql +++ b/src/backend/distributed/sql/udfs/dump_local_blocked_processes/latest.sql @@ -1,7 +1,5 @@ -DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.dump_local_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$dump_local_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.dump_local_blocked_processes() IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 18f391d74..d1ff733da 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -47,6 +47,9 @@ typedef struct PROCStack static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); +static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, + int rowIndex); +static void ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx); static bool IsProcessWaitingForSafeOperations(PGPROC *proc); static void LockLockData(void); @@ -62,10 +65,30 @@ static void AddProcToVisit(PROCStack *remaining, PGPROC *proc); static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc); static bool IsConflictingLockMask(int holdMask, int conflictMask); - +/* + * We almost have 2 sets of identical functions. The first set (e.g., dump_wait_edges) + * functions are intended for distributed deadlock detection purposes. + * + * The second set of functions (e.g., dump_blocked_processes) are intended for + * citus_lock_waits view. + * + * The main difference is that the former functions only show processes that are blocked + * inside a distributed transaction (e.g., see AssignDistributedTransactionId()). + * The latter functions return a superset, where any blocked process is returned. + * + * We kept two different set of functions for two purposes. First, the deadlock detection + * is a performance critical code-path happening very frequently and we don't add any + * performance overhead. Secondly, to be able to do rolling upgrades, we cannot change + * the API of dump_global_wait_edges/dump_local_wait_edges such that they take a boolean + * parameter. If we do that, until all nodes are upgraded, the deadlock detection would fail, + * which is not acceptable. + */ PG_FUNCTION_INFO_V1(dump_local_wait_edges); PG_FUNCTION_INFO_V1(dump_global_wait_edges); +PG_FUNCTION_INFO_V1(dump_local_blocked_processes); +PG_FUNCTION_INFO_V1(dump_global_blocked_processes); + /* * dump_global_wait_edges returns global wait edges for distributed transactions @@ -74,7 +97,7 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges); Datum dump_global_wait_edges(PG_FUNCTION_ARGS) { - bool onlyDistributedTx = PG_GETARG_BOOL(0); + bool onlyDistributedTx = true; WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); @@ -84,6 +107,23 @@ dump_global_wait_edges(PG_FUNCTION_ARGS) } +/* + * dump_global_blocked_processes returns global wait edges including all processes + * running on the cluster. + */ +Datum +dump_global_blocked_processes(PG_FUNCTION_ARGS) +{ + bool onlyDistributedTx = false; + + WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); + + ReturnBlockedProcessGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + /* * BuildGlobalWaitGraph builds a wait graph for distributed transactions * that originate from this node, including edges from all (other) worker @@ -103,7 +143,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) List *connectionList = NIL; int32 localGroupId = GetLocalGroupId(); - /* deadlock detection is only interested in */ + /* deadlock detection is only interested in distributed transactions */ WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); /* open connections in parallel */ @@ -134,11 +174,17 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) foreach_ptr(connection, connectionList) { StringInfo queryString = makeStringInfo(); - const char *onlyDistributedTxStr = onlyDistributedTx ? "true" : "false"; - appendStringInfo(queryString, - "SELECT * FROM dump_local_wait_edges(%s)", - onlyDistributedTxStr); + if (onlyDistributedTx) + { + appendStringInfo(queryString, + "SELECT * FROM dump_local_wait_edges()"); + } + else + { + appendStringInfo(queryString, + "SELECT * FROM dump_local_blocked_processes()"); + } int querySent = SendRemoteCommand(connection, queryString->data); if (querySent == 0) @@ -162,16 +208,29 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); - if (colCount != 11) + if (onlyDistributedTx && colCount != 9) { ereport(WARNING, (errmsg("unexpected number of columns from " "dump_local_wait_edges"))); continue; } + else if (!onlyDistributedTx && colCount != 11) + { + ereport(WARNING, (errmsg("unexpected number of columns from " + "dump_local_blocked_processes"))); + continue; + } for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { - AddWaitEdgeFromResult(waitGraph, result, rowIndex); + if (onlyDistributedTx) + { + AddWaitEdgeFromResult(waitGraph, result, rowIndex); + } + else + { + AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex); + } } PQclear(result); @@ -191,6 +250,29 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) { WaitEdge *waitEdge = AllocWaitEdge(waitGraph); + waitEdge->waitingGPid = 0; /* not requested for deadlock detection */ + waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); + waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); + waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); + waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); + waitEdge->blockingGPid = 0; /* not requested for deadlock detection */ + waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); + waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); + waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); + waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); + waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); +} + + +/* + * AddWaitEdgeFromBlockedProcessResult adds an edge to the wait graph that + * is read from a PGresult. + */ +static void +AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) +{ + WaitEdge *waitEdge = AllocWaitEdge(waitGraph); + waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0); waitEdge->waitingPid = ParseIntField(result, rowIndex, 1); waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2); @@ -281,6 +363,22 @@ dump_local_wait_edges(PG_FUNCTION_ARGS) } +/* + * dump_local_blocked_processes returns global wait edges including + * all processes running on the node. + */ +Datum +dump_local_blocked_processes(PG_FUNCTION_ARGS) +{ + bool onlyDistributedTx = false; + + WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); + ReturnBlockedProcessGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + /* * ReturnWaitGraph returns a wait graph for a set returning function. */ @@ -290,6 +388,68 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) TupleDesc tupleDesc; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc); + /* + * Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_num + * 03: waiting_transaction_stamp + * 04: blocking_pid + * 05: blocking__node_id + * 06: blocking_transaction_num + * 07: blocking_transaction_stamp + * 08: blocking_transaction_waiting + */ + for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++) + { + Datum values[9]; + bool nulls[9]; + WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(curEdge->waitingPid); + values[1] = Int32GetDatum(curEdge->waitingNodeId); + if (curEdge->waitingTransactionNum != 0) + { + values[2] = Int64GetDatum(curEdge->waitingTransactionNum); + values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); + } + else + { + nulls[2] = true; + nulls[3] = true; + } + + values[4] = Int32GetDatum(curEdge->blockingPid); + values[5] = Int32GetDatum(curEdge->blockingNodeId); + if (curEdge->blockingTransactionNum != 0) + { + values[6] = Int64GetDatum(curEdge->blockingTransactionNum); + values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); + } + else + { + nulls[6] = true; + nulls[7] = true; + } + values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); + + tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); + } +} + + +/* + * ReturnBlockedProcessGraph returns a wait graph for a set returning function. + */ +static void +ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) +{ + TupleDesc tupleDesc; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc); + /* * Columns: * 00: waiting_global_pid diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 8aad5297f..ee064dd90 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1000,8 +1000,6 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- function citus_disable_node(text,integer) void | function create_distributed_function(regprocedure,text,text) void | - function dump_global_wait_edges() SETOF record | - function dump_local_wait_edges() SETOF record | function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | @@ -1013,11 +1011,11 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_shard_indexes_on_worker() SETOF record | function citus_shards_on_worker() SETOF record | function create_distributed_function(regprocedure,text,text,boolean) void - | function dump_global_wait_edges(boolean) SETOF record - | function dump_local_wait_edges(boolean) SETOF record + | function dump_global_blocked_processes() SETOF record + | function dump_local_blocked_processes() SETOF record | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void -(19 rows) +(17 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version