mirror of https://github.com/citusdata/citus.git
Address feedback
parent
d5bf975f31
commit
d77bf1e1ae
|
@ -17,8 +17,8 @@
|
||||||
#include "udfs/get_all_active_transactions/11.0-1.sql"
|
#include "udfs/get_all_active_transactions/11.0-1.sql"
|
||||||
#include "udfs/get_global_active_transactions/11.0-1.sql"
|
#include "udfs/get_global_active_transactions/11.0-1.sql"
|
||||||
|
|
||||||
#include "udfs/dump_local_wait_edges/11.0-1.sql"
|
#include "udfs/dump_local_blocked_processes/11.0-1.sql"
|
||||||
#include "udfs/dump_global_wait_edges/11.0-1.sql"
|
#include "udfs/dump_global_blocked_processes/11.0-1.sql"
|
||||||
|
|
||||||
#include "udfs/citus_worker_stat_activity/11.0-1.sql"
|
#include "udfs/citus_worker_stat_activity/11.0-1.sql"
|
||||||
#include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql"
|
#include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql"
|
||||||
|
|
|
@ -113,39 +113,8 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int
|
||||||
|
|
||||||
RESET search_path;
|
RESET search_path;
|
||||||
|
|
||||||
DROP FUNCTION dump_local_wait_edges CASCADE;
|
DROP FUNCTION dump_local_blocked_processes CASCADE;
|
||||||
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
|
DROP FUNCTION dump_global_blocked_processes CASCADE;
|
||||||
OUT waiting_pid int4,
|
|
||||||
OUT waiting_node_id int4,
|
|
||||||
OUT waiting_transaction_num int8,
|
|
||||||
OUT waiting_transaction_stamp timestamptz,
|
|
||||||
OUT blocking_pid int4,
|
|
||||||
OUT blocking_node_id int4,
|
|
||||||
OUT blocking_transaction_num int8,
|
|
||||||
OUT blocking_transaction_stamp timestamptz,
|
|
||||||
OUT blocking_transaction_waiting bool)
|
|
||||||
RETURNS SETOF RECORD
|
|
||||||
LANGUAGE C STRICT
|
|
||||||
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
|
|
||||||
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges()
|
|
||||||
IS 'returns all local lock wait chains, that start from distributed transactions';
|
|
||||||
|
|
||||||
DROP FUNCTION dump_global_wait_edges CASCADE;
|
|
||||||
CREATE FUNCTION pg_catalog.dump_global_wait_edges(
|
|
||||||
OUT waiting_pid int4,
|
|
||||||
OUT waiting_node_id int4,
|
|
||||||
OUT waiting_transaction_num int8,
|
|
||||||
OUT waiting_transaction_stamp timestamptz,
|
|
||||||
OUT blocking_pid int4,
|
|
||||||
OUT blocking_node_id int4,
|
|
||||||
OUT blocking_transaction_num int8,
|
|
||||||
OUT blocking_transaction_stamp timestamptz,
|
|
||||||
OUT blocking_transaction_waiting bool)
|
|
||||||
RETURNS SETOF RECORD
|
|
||||||
LANGUAGE 'c' STRICT
|
|
||||||
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
|
|
||||||
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges()
|
|
||||||
IS 'returns a global list of blocked transactions originating from this node';
|
|
||||||
|
|
||||||
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;
|
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ RETURNS int4[] AS $$
|
||||||
WITH activeTransactions AS (
|
WITH activeTransactions AS (
|
||||||
SELECT global_pid FROM get_all_active_transactions()
|
SELECT global_pid FROM get_all_active_transactions()
|
||||||
), blockingTransactions AS (
|
), blockingTransactions AS (
|
||||||
SELECT blocking_global_pid FROM dump_global_wait_edges(distributed_tx_only:=false)
|
SELECT blocking_global_pid FROM dump_global_blocked_processes()
|
||||||
WHERE waiting_global_pid = mLocalGlobalPid
|
WHERE waiting_global_pid = mLocalGlobalPid
|
||||||
)
|
)
|
||||||
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
|
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
|
||||||
|
|
|
@ -20,7 +20,7 @@ RETURNS int4[] AS $$
|
||||||
WITH activeTransactions AS (
|
WITH activeTransactions AS (
|
||||||
SELECT global_pid FROM get_all_active_transactions()
|
SELECT global_pid FROM get_all_active_transactions()
|
||||||
), blockingTransactions AS (
|
), blockingTransactions AS (
|
||||||
SELECT blocking_global_pid FROM dump_global_wait_edges(distributed_tx_only:=false)
|
SELECT blocking_global_pid FROM dump_global_blocked_processes()
|
||||||
WHERE waiting_global_pid = mLocalGlobalPid
|
WHERE waiting_global_pid = mLocalGlobalPid
|
||||||
)
|
)
|
||||||
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
|
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
|
||||||
|
|
|
@ -24,7 +24,7 @@ RETURNS boolean AS $$
|
||||||
END IF;
|
END IF;
|
||||||
|
|
||||||
RETURN EXISTS (
|
RETURN EXISTS (
|
||||||
SELECT 1 FROM dump_global_wait_edges(distributed_tx_only:=false)
|
SELECT 1 FROM dump_global_blocked_processes()
|
||||||
WHERE waiting_global_pid = mBlockedGlobalPid
|
WHERE waiting_global_pid = mBlockedGlobalPid
|
||||||
) OR EXISTS (
|
) OR EXISTS (
|
||||||
-- Check on the workers if any logical replication job spawned by the
|
-- Check on the workers if any logical replication job spawned by the
|
||||||
|
|
|
@ -24,7 +24,7 @@ RETURNS boolean AS $$
|
||||||
END IF;
|
END IF;
|
||||||
|
|
||||||
RETURN EXISTS (
|
RETURN EXISTS (
|
||||||
SELECT 1 FROM dump_global_wait_edges(distributed_tx_only:=false)
|
SELECT 1 FROM dump_global_blocked_processes()
|
||||||
WHERE waiting_global_pid = mBlockedGlobalPid
|
WHERE waiting_global_pid = mBlockedGlobalPid
|
||||||
) OR EXISTS (
|
) OR EXISTS (
|
||||||
-- Check on the workers if any logical replication job spawned by the
|
-- Check on the workers if any logical replication job spawned by the
|
||||||
|
|
|
@ -8,7 +8,7 @@ citus_dist_stat_activity AS
|
||||||
),
|
),
|
||||||
unique_global_wait_edges AS
|
unique_global_wait_edges AS
|
||||||
(
|
(
|
||||||
SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_wait_edges(distributed_tx_only:=false)
|
SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_blocked_processes()
|
||||||
),
|
),
|
||||||
citus_dist_stat_activity_with_node_id AS
|
citus_dist_stat_activity_with_node_id AS
|
||||||
(
|
(
|
||||||
|
|
|
@ -8,7 +8,7 @@ citus_dist_stat_activity AS
|
||||||
),
|
),
|
||||||
unique_global_wait_edges AS
|
unique_global_wait_edges AS
|
||||||
(
|
(
|
||||||
SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_wait_edges(distributed_tx_only:=false)
|
SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_blocked_processes()
|
||||||
),
|
),
|
||||||
citus_dist_stat_activity_with_node_id AS
|
citus_dist_stat_activity_with_node_id AS
|
||||||
(
|
(
|
||||||
|
|
|
@ -1,6 +1,4 @@
|
||||||
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
|
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_blocked_processes(
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
|
|
||||||
distributed_tx_only boolean DEFAULT true,
|
|
||||||
OUT waiting_global_pid int8,
|
OUT waiting_global_pid int8,
|
||||||
OUT waiting_pid int4,
|
OUT waiting_pid int4,
|
||||||
OUT waiting_node_id int4,
|
OUT waiting_node_id int4,
|
||||||
|
@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
|
||||||
OUT blocking_transaction_waiting bool)
|
OUT blocking_transaction_waiting bool)
|
||||||
RETURNS SETOF RECORD
|
RETURNS SETOF RECORD
|
||||||
LANGUAGE C STRICT
|
LANGUAGE C STRICT
|
||||||
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
|
AS $$MODULE_PATHNAME$$, $$dump_global_blocked_processes$$;
|
||||||
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool)
|
COMMENT ON FUNCTION pg_catalog.dump_global_blocked_processes()
|
||||||
IS 'returns a global list of blocked backends originating from this node';
|
IS 'returns a global list of blocked backends originating from this node';
|
|
@ -1,6 +1,4 @@
|
||||||
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
|
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_blocked_processes(
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
|
|
||||||
distributed_tx_only boolean DEFAULT true,
|
|
||||||
OUT waiting_global_pid int8,
|
OUT waiting_global_pid int8,
|
||||||
OUT waiting_pid int4,
|
OUT waiting_pid int4,
|
||||||
OUT waiting_node_id int4,
|
OUT waiting_node_id int4,
|
||||||
|
@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
|
||||||
OUT blocking_transaction_waiting bool)
|
OUT blocking_transaction_waiting bool)
|
||||||
RETURNS SETOF RECORD
|
RETURNS SETOF RECORD
|
||||||
LANGUAGE C STRICT
|
LANGUAGE C STRICT
|
||||||
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
|
AS $$MODULE_PATHNAME$$, $$dump_global_blocked_processes$$;
|
||||||
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool)
|
COMMENT ON FUNCTION pg_catalog.dump_global_blocked_processes()
|
||||||
IS 'returns a global list of blocked backends originating from this node';
|
IS 'returns a global list of blocked backends originating from this node';
|
|
@ -1,6 +1,4 @@
|
||||||
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
|
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_blocked_processes(
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
|
|
||||||
distributed_tx_only boolean DEFAULT true,
|
|
||||||
OUT waiting_global_pid int8,
|
OUT waiting_global_pid int8,
|
||||||
OUT waiting_pid int4,
|
OUT waiting_pid int4,
|
||||||
OUT waiting_node_id int4,
|
OUT waiting_node_id int4,
|
||||||
|
@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
|
||||||
OUT blocking_transaction_waiting bool)
|
OUT blocking_transaction_waiting bool)
|
||||||
RETURNS SETOF RECORD
|
RETURNS SETOF RECORD
|
||||||
LANGUAGE C STRICT
|
LANGUAGE C STRICT
|
||||||
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
|
AS $$MODULE_PATHNAME$$, $$dump_local_blocked_processes$$;
|
||||||
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool)
|
COMMENT ON FUNCTION pg_catalog.dump_local_blocked_processes()
|
||||||
IS 'returns all local lock wait chains, that start from any citus backend';
|
IS 'returns all local lock wait chains, that start from any citus backend';
|
|
@ -1,6 +1,4 @@
|
||||||
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
|
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_blocked_processes(
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
|
|
||||||
distributed_tx_only boolean DEFAULT true,
|
|
||||||
OUT waiting_global_pid int8,
|
OUT waiting_global_pid int8,
|
||||||
OUT waiting_pid int4,
|
OUT waiting_pid int4,
|
||||||
OUT waiting_node_id int4,
|
OUT waiting_node_id int4,
|
||||||
|
@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
|
||||||
OUT blocking_transaction_waiting bool)
|
OUT blocking_transaction_waiting bool)
|
||||||
RETURNS SETOF RECORD
|
RETURNS SETOF RECORD
|
||||||
LANGUAGE C STRICT
|
LANGUAGE C STRICT
|
||||||
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
|
AS $$MODULE_PATHNAME$$, $$dump_local_blocked_processes$$;
|
||||||
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool)
|
COMMENT ON FUNCTION pg_catalog.dump_local_blocked_processes()
|
||||||
IS 'returns all local lock wait chains, that start from any citus backend';
|
IS 'returns all local lock wait chains, that start from any citus backend';
|
|
@ -47,6 +47,9 @@ typedef struct PROCStack
|
||||||
|
|
||||||
static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex);
|
static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex);
|
||||||
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
|
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
|
||||||
|
static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result,
|
||||||
|
int rowIndex);
|
||||||
|
static void ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
|
||||||
static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx);
|
static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx);
|
||||||
static bool IsProcessWaitingForSafeOperations(PGPROC *proc);
|
static bool IsProcessWaitingForSafeOperations(PGPROC *proc);
|
||||||
static void LockLockData(void);
|
static void LockLockData(void);
|
||||||
|
@ -62,10 +65,30 @@ static void AddProcToVisit(PROCStack *remaining, PGPROC *proc);
|
||||||
static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc);
|
static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc);
|
||||||
static bool IsConflictingLockMask(int holdMask, int conflictMask);
|
static bool IsConflictingLockMask(int holdMask, int conflictMask);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We almost have 2 sets of identical functions. The first set (e.g., dump_wait_edges)
|
||||||
|
* functions are intended for distributed deadlock detection purposes.
|
||||||
|
*
|
||||||
|
* The second set of functions (e.g., dump_blocked_processes) are intended for
|
||||||
|
* citus_lock_waits view.
|
||||||
|
*
|
||||||
|
* The main difference is that the former functions only show processes that are blocked
|
||||||
|
* inside a distributed transaction (e.g., see AssignDistributedTransactionId()).
|
||||||
|
* The latter functions return a superset, where any blocked process is returned.
|
||||||
|
*
|
||||||
|
* We kept two different set of functions for two purposes. First, the deadlock detection
|
||||||
|
* is a performance critical code-path happening very frequently and we don't add any
|
||||||
|
* performance overhead. Secondly, to be able to do rolling upgrades, we cannot change
|
||||||
|
* the API of dump_global_wait_edges/dump_local_wait_edges such that they take a boolean
|
||||||
|
* parameter. If we do that, until all nodes are upgraded, the deadlock detection would fail,
|
||||||
|
* which is not acceptable.
|
||||||
|
*/
|
||||||
PG_FUNCTION_INFO_V1(dump_local_wait_edges);
|
PG_FUNCTION_INFO_V1(dump_local_wait_edges);
|
||||||
PG_FUNCTION_INFO_V1(dump_global_wait_edges);
|
PG_FUNCTION_INFO_V1(dump_global_wait_edges);
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(dump_local_blocked_processes);
|
||||||
|
PG_FUNCTION_INFO_V1(dump_global_blocked_processes);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* dump_global_wait_edges returns global wait edges for distributed transactions
|
* dump_global_wait_edges returns global wait edges for distributed transactions
|
||||||
|
@ -74,7 +97,7 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges);
|
||||||
Datum
|
Datum
|
||||||
dump_global_wait_edges(PG_FUNCTION_ARGS)
|
dump_global_wait_edges(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
bool onlyDistributedTx = PG_GETARG_BOOL(0);
|
bool onlyDistributedTx = true;
|
||||||
|
|
||||||
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
|
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
|
||||||
|
|
||||||
|
@ -84,6 +107,23 @@ dump_global_wait_edges(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* dump_global_blocked_processes returns global wait edges including all processes
|
||||||
|
* running on the cluster.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
dump_global_blocked_processes(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
bool onlyDistributedTx = false;
|
||||||
|
|
||||||
|
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
|
||||||
|
|
||||||
|
ReturnBlockedProcessGraph(waitGraph, fcinfo);
|
||||||
|
|
||||||
|
return (Datum) 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* BuildGlobalWaitGraph builds a wait graph for distributed transactions
|
* BuildGlobalWaitGraph builds a wait graph for distributed transactions
|
||||||
* that originate from this node, including edges from all (other) worker
|
* that originate from this node, including edges from all (other) worker
|
||||||
|
@ -103,7 +143,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
|
||||||
List *connectionList = NIL;
|
List *connectionList = NIL;
|
||||||
int32 localGroupId = GetLocalGroupId();
|
int32 localGroupId = GetLocalGroupId();
|
||||||
|
|
||||||
/* deadlock detection is only interested in */
|
/* deadlock detection is only interested in distributed transactions */
|
||||||
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
|
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
|
||||||
|
|
||||||
/* open connections in parallel */
|
/* open connections in parallel */
|
||||||
|
@ -134,11 +174,17 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
|
||||||
foreach_ptr(connection, connectionList)
|
foreach_ptr(connection, connectionList)
|
||||||
{
|
{
|
||||||
StringInfo queryString = makeStringInfo();
|
StringInfo queryString = makeStringInfo();
|
||||||
const char *onlyDistributedTxStr = onlyDistributedTx ? "true" : "false";
|
|
||||||
|
|
||||||
|
if (onlyDistributedTx)
|
||||||
|
{
|
||||||
appendStringInfo(queryString,
|
appendStringInfo(queryString,
|
||||||
"SELECT * FROM dump_local_wait_edges(%s)",
|
"SELECT * FROM dump_local_wait_edges()");
|
||||||
onlyDistributedTxStr);
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(queryString,
|
||||||
|
"SELECT * FROM dump_local_blocked_processes()");
|
||||||
|
}
|
||||||
|
|
||||||
int querySent = SendRemoteCommand(connection, queryString->data);
|
int querySent = SendRemoteCommand(connection, queryString->data);
|
||||||
if (querySent == 0)
|
if (querySent == 0)
|
||||||
|
@ -162,17 +208,30 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
|
||||||
int64 rowCount = PQntuples(result);
|
int64 rowCount = PQntuples(result);
|
||||||
int64 colCount = PQnfields(result);
|
int64 colCount = PQnfields(result);
|
||||||
|
|
||||||
if (colCount != 11)
|
if (onlyDistributedTx && colCount != 9)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("unexpected number of columns from "
|
ereport(WARNING, (errmsg("unexpected number of columns from "
|
||||||
"dump_local_wait_edges")));
|
"dump_local_wait_edges")));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
else if (!onlyDistributedTx && colCount != 11)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("unexpected number of columns from "
|
||||||
|
"dump_local_blocked_processes")));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||||
|
{
|
||||||
|
if (onlyDistributedTx)
|
||||||
{
|
{
|
||||||
AddWaitEdgeFromResult(waitGraph, result, rowIndex);
|
AddWaitEdgeFromResult(waitGraph, result, rowIndex);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
|
@ -191,6 +250,29 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
|
||||||
{
|
{
|
||||||
WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
|
WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
|
||||||
|
|
||||||
|
waitEdge->waitingGPid = 0; /* not requested for deadlock detection */
|
||||||
|
waitEdge->waitingPid = ParseIntField(result, rowIndex, 0);
|
||||||
|
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1);
|
||||||
|
waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2);
|
||||||
|
waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3);
|
||||||
|
waitEdge->blockingGPid = 0; /* not requested for deadlock detection */
|
||||||
|
waitEdge->blockingPid = ParseIntField(result, rowIndex, 4);
|
||||||
|
waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5);
|
||||||
|
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6);
|
||||||
|
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7);
|
||||||
|
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AddWaitEdgeFromBlockedProcessResult adds an edge to the wait graph that
|
||||||
|
* is read from a PGresult.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
|
||||||
|
{
|
||||||
|
WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
|
||||||
|
|
||||||
waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0);
|
waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0);
|
||||||
waitEdge->waitingPid = ParseIntField(result, rowIndex, 1);
|
waitEdge->waitingPid = ParseIntField(result, rowIndex, 1);
|
||||||
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2);
|
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2);
|
||||||
|
@ -281,6 +363,22 @@ dump_local_wait_edges(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* dump_local_blocked_processes returns global wait edges including
|
||||||
|
* all processes running on the node.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
dump_local_blocked_processes(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
bool onlyDistributedTx = false;
|
||||||
|
|
||||||
|
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
|
||||||
|
ReturnBlockedProcessGraph(waitGraph, fcinfo);
|
||||||
|
|
||||||
|
return (Datum) 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReturnWaitGraph returns a wait graph for a set returning function.
|
* ReturnWaitGraph returns a wait graph for a set returning function.
|
||||||
*/
|
*/
|
||||||
|
@ -290,6 +388,68 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
|
||||||
TupleDesc tupleDesc;
|
TupleDesc tupleDesc;
|
||||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc);
|
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Columns:
|
||||||
|
* 00: waiting_pid
|
||||||
|
* 01: waiting_node_id
|
||||||
|
* 02: waiting_transaction_num
|
||||||
|
* 03: waiting_transaction_stamp
|
||||||
|
* 04: blocking_pid
|
||||||
|
* 05: blocking__node_id
|
||||||
|
* 06: blocking_transaction_num
|
||||||
|
* 07: blocking_transaction_stamp
|
||||||
|
* 08: blocking_transaction_waiting
|
||||||
|
*/
|
||||||
|
for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++)
|
||||||
|
{
|
||||||
|
Datum values[9];
|
||||||
|
bool nulls[9];
|
||||||
|
WaitEdge *curEdge = &waitGraph->edges[curEdgeNum];
|
||||||
|
|
||||||
|
memset(values, 0, sizeof(values));
|
||||||
|
memset(nulls, 0, sizeof(nulls));
|
||||||
|
|
||||||
|
values[0] = Int32GetDatum(curEdge->waitingPid);
|
||||||
|
values[1] = Int32GetDatum(curEdge->waitingNodeId);
|
||||||
|
if (curEdge->waitingTransactionNum != 0)
|
||||||
|
{
|
||||||
|
values[2] = Int64GetDatum(curEdge->waitingTransactionNum);
|
||||||
|
values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
nulls[2] = true;
|
||||||
|
nulls[3] = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
values[4] = Int32GetDatum(curEdge->blockingPid);
|
||||||
|
values[5] = Int32GetDatum(curEdge->blockingNodeId);
|
||||||
|
if (curEdge->blockingTransactionNum != 0)
|
||||||
|
{
|
||||||
|
values[6] = Int64GetDatum(curEdge->blockingTransactionNum);
|
||||||
|
values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
nulls[6] = true;
|
||||||
|
nulls[7] = true;
|
||||||
|
}
|
||||||
|
values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting);
|
||||||
|
|
||||||
|
tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReturnBlockedProcessGraph returns a wait graph for a set returning function.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
|
||||||
|
{
|
||||||
|
TupleDesc tupleDesc;
|
||||||
|
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Columns:
|
* Columns:
|
||||||
* 00: waiting_global_pid
|
* 00: waiting_global_pid
|
||||||
|
|
|
@ -1000,8 +1000,6 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
function citus_disable_node(text,integer) void |
|
function citus_disable_node(text,integer) void |
|
||||||
function create_distributed_function(regprocedure,text,text) void |
|
function create_distributed_function(regprocedure,text,text) void |
|
||||||
function dump_global_wait_edges() SETOF record |
|
|
||||||
function dump_local_wait_edges() SETOF record |
|
|
||||||
function master_append_table_to_shard(bigint,text,text,integer) real |
|
function master_append_table_to_shard(bigint,text,text,integer) real |
|
||||||
function master_apply_delete_command(text) integer |
|
function master_apply_delete_command(text) integer |
|
||||||
function master_get_table_metadata(text) record |
|
function master_get_table_metadata(text) record |
|
||||||
|
@ -1013,11 +1011,11 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
| function citus_shard_indexes_on_worker() SETOF record
|
| function citus_shard_indexes_on_worker() SETOF record
|
||||||
| function citus_shards_on_worker() SETOF record
|
| function citus_shards_on_worker() SETOF record
|
||||||
| function create_distributed_function(regprocedure,text,text,boolean) void
|
| function create_distributed_function(regprocedure,text,text,boolean) void
|
||||||
| function dump_global_wait_edges(boolean) SETOF record
|
| function dump_global_blocked_processes() SETOF record
|
||||||
| function dump_local_wait_edges(boolean) SETOF record
|
| function dump_local_blocked_processes() SETOF record
|
||||||
| function worker_drop_sequence_dependency(text) void
|
| function worker_drop_sequence_dependency(text) void
|
||||||
| function worker_drop_shell_table(text) void
|
| function worker_drop_shell_table(text) void
|
||||||
(19 rows)
|
(17 rows)
|
||||||
|
|
||||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||||
-- show running version
|
-- show running version
|
||||||
|
|
Loading…
Reference in New Issue