mirror of https://github.com/citusdata/citus.git
Dumping wait edges becomes optionally scan all backends
Before this commit, dumping wait edges can only be used for distributed deadlock detection purposes. With this commit, we open the possibility that we can use it for any backend.pull/5702/head
parent
c866f4b2d6
commit
331af3dce8
|
@ -17,6 +17,9 @@
|
|||
#include "udfs/get_all_active_transactions/11.0-1.sql"
|
||||
#include "udfs/get_global_active_transactions/11.0-1.sql"
|
||||
|
||||
#include "udfs/dump_local_wait_edges/11.0-1.sql"
|
||||
#include "udfs/dump_global_wait_edges/11.0-1.sql"
|
||||
|
||||
#include "udfs/citus_worker_stat_activity/11.0-1.sql"
|
||||
#include "udfs/worker_create_or_replace_object/11.0-1.sql"
|
||||
|
||||
|
|
|
@ -113,6 +113,40 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int
|
|||
|
||||
RESET search_path;
|
||||
|
||||
DROP FUNCTION dump_local_wait_edges CASCADE;
|
||||
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
|
||||
OUT waiting_pid int4,
|
||||
OUT waiting_node_id int4,
|
||||
OUT waiting_transaction_num int8,
|
||||
OUT waiting_transaction_stamp timestamptz,
|
||||
OUT blocking_pid int4,
|
||||
OUT blocking_node_id int4,
|
||||
OUT blocking_transaction_num int8,
|
||||
OUT blocking_transaction_stamp timestamptz,
|
||||
OUT blocking_transaction_waiting bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
|
||||
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges()
|
||||
IS 'returns all local lock wait chains, that start from distributed transactions';
|
||||
|
||||
DROP FUNCTION dump_global_wait_edges CASCADE;
|
||||
CREATE FUNCTION pg_catalog.dump_global_wait_edges(
|
||||
OUT waiting_pid int4,
|
||||
OUT waiting_node_id int4,
|
||||
OUT waiting_transaction_num int8,
|
||||
OUT waiting_transaction_stamp timestamptz,
|
||||
OUT blocking_pid int4,
|
||||
OUT blocking_node_id int4,
|
||||
OUT blocking_transaction_num int8,
|
||||
OUT blocking_transaction_stamp timestamptz,
|
||||
OUT blocking_transaction_waiting bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE 'c' STRICT
|
||||
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
|
||||
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges()
|
||||
IS 'returns a global list of blocked transactions originating from this node';
|
||||
|
||||
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
|
||||
|
@ -211,7 +245,39 @@ GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC;
|
|||
DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]);
|
||||
#include "../udfs/worker_create_or_replace_object/9.0-1.sql"
|
||||
|
||||
RESET search_path;
|
||||
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE;
|
||||
DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE;
|
||||
|
||||
DROP FUNCTION pg_catalog.dump_global_wait_edges;
|
||||
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
|
||||
OUT waiting_pid int4,
|
||||
OUT waiting_node_id int4,
|
||||
OUT waiting_transaction_num int8,
|
||||
OUT waiting_transaction_stamp timestamptz,
|
||||
OUT blocking_pid int4,
|
||||
OUT blocking_node_id int4,
|
||||
OUT blocking_transaction_num int8,
|
||||
OUT blocking_transaction_stamp timestamptz,
|
||||
OUT blocking_transaction_waiting bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
|
||||
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges()
|
||||
IS 'returns all local lock wait chains, that start from distributed transactions';
|
||||
|
||||
DROP FUNCTION pg_catalog.dump_global_wait_edges;
|
||||
CREATE FUNCTION pg_catalog.dump_global_wait_edges(
|
||||
OUT waiting_pid int4,
|
||||
OUT waiting_node_id int4,
|
||||
OUT waiting_transaction_num int8,
|
||||
OUT waiting_transaction_stamp timestamptz,
|
||||
OUT blocking_pid int4,
|
||||
OUT blocking_node_id int4,
|
||||
OUT blocking_transaction_num int8,
|
||||
OUT blocking_transaction_stamp timestamptz,
|
||||
OUT blocking_transaction_waiting bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE 'c' STRICT
|
||||
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
|
||||
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges()
|
||||
IS 'returns a global list of blocked transactions originating from this node';
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
|
||||
distributed_tx_only boolean DEFAULT true,
|
||||
OUT waiting_global_pid int8,
|
||||
OUT waiting_pid int4,
|
||||
OUT waiting_node_id int4,
|
||||
OUT waiting_transaction_num int8,
|
||||
OUT waiting_transaction_stamp timestamptz,
|
||||
OUT blocking_global_pid int8,
|
||||
OUT blocking_pid int4,
|
||||
OUT blocking_node_id int4,
|
||||
OUT blocking_transaction_num int8,
|
||||
OUT blocking_transaction_stamp timestamptz,
|
||||
OUT blocking_transaction_waiting bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
|
||||
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool)
|
||||
IS 'returns a global list of blocked backends originating from this node';
|
|
@ -0,0 +1,19 @@
|
|||
DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE;
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges(
|
||||
distributed_tx_only boolean DEFAULT true,
|
||||
OUT waiting_global_pid int8,
|
||||
OUT waiting_pid int4,
|
||||
OUT waiting_node_id int4,
|
||||
OUT waiting_transaction_num int8,
|
||||
OUT waiting_transaction_stamp timestamptz,
|
||||
OUT blocking_global_pid int8,
|
||||
OUT blocking_pid int4,
|
||||
OUT blocking_node_id int4,
|
||||
OUT blocking_transaction_num int8,
|
||||
OUT blocking_transaction_stamp timestamptz,
|
||||
OUT blocking_transaction_waiting bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
|
||||
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool)
|
||||
IS 'returns a global list of blocked backends originating from this node';
|
|
@ -0,0 +1,19 @@
|
|||
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
|
||||
distributed_tx_only boolean DEFAULT true,
|
||||
OUT waiting_global_pid int8,
|
||||
OUT waiting_pid int4,
|
||||
OUT waiting_node_id int4,
|
||||
OUT waiting_transaction_num int8,
|
||||
OUT waiting_transaction_stamp timestamptz,
|
||||
OUT blocking_global_pid int8,
|
||||
OUT blocking_pid int4,
|
||||
OUT blocking_node_id int4,
|
||||
OUT blocking_transaction_num int8,
|
||||
OUT blocking_transaction_stamp timestamptz,
|
||||
OUT blocking_transaction_waiting bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
|
||||
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool)
|
||||
IS 'returns all local lock wait chains, that start from any citus backend';
|
|
@ -0,0 +1,19 @@
|
|||
DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE;
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges(
|
||||
distributed_tx_only boolean DEFAULT true,
|
||||
OUT waiting_global_pid int8,
|
||||
OUT waiting_pid int4,
|
||||
OUT waiting_node_id int4,
|
||||
OUT waiting_transaction_num int8,
|
||||
OUT waiting_transaction_stamp timestamptz,
|
||||
OUT blocking_global_pid int8,
|
||||
OUT blocking_pid int4,
|
||||
OUT blocking_node_id int4,
|
||||
OUT blocking_transaction_num int8,
|
||||
OUT blocking_transaction_stamp timestamptz,
|
||||
OUT blocking_transaction_waiting bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
|
||||
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool)
|
||||
IS 'returns all local lock wait chains, that start from any citus backend';
|
|
@ -50,7 +50,10 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
|
|||
bool isNulls[2];
|
||||
|
||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||
WaitGraph *waitGraph = BuildGlobalWaitGraph();
|
||||
|
||||
/* distributed deadlock detection only considers distributed txs */
|
||||
bool onlyDistributedTx = true;
|
||||
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
|
||||
HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph);
|
||||
|
||||
/* iterate on all nodes */
|
||||
|
|
|
@ -119,7 +119,9 @@ CheckForDistributedDeadlocks(void)
|
|||
return false;
|
||||
}
|
||||
|
||||
WaitGraph *waitGraph = BuildGlobalWaitGraph();
|
||||
/* distributed deadlock detection only considers distributed txs */
|
||||
bool onlyDistributedTx = true;
|
||||
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
|
||||
HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph);
|
||||
|
||||
int edgeCount = waitGraph->edgeCount;
|
||||
|
|
|
@ -47,7 +47,7 @@ typedef struct PROCStack
|
|||
|
||||
static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex);
|
||||
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
|
||||
static WaitGraph * BuildLocalWaitGraph(void);
|
||||
static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx);
|
||||
static bool IsProcessWaitingForSafeOperations(PGPROC *proc);
|
||||
static void LockLockData(void);
|
||||
static void UnlockLockData(void);
|
||||
|
@ -74,7 +74,9 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges);
|
|||
Datum
|
||||
dump_global_wait_edges(PG_FUNCTION_ARGS)
|
||||
{
|
||||
WaitGraph *waitGraph = BuildGlobalWaitGraph();
|
||||
bool onlyDistributedTx = PG_GETARG_BOOL(0);
|
||||
|
||||
WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx);
|
||||
|
||||
ReturnWaitGraph(waitGraph, fcinfo);
|
||||
|
||||
|
@ -86,16 +88,23 @@ dump_global_wait_edges(PG_FUNCTION_ARGS)
|
|||
* BuildGlobalWaitGraph builds a wait graph for distributed transactions
|
||||
* that originate from this node, including edges from all (other) worker
|
||||
* nodes.
|
||||
*
|
||||
*
|
||||
* If onlyDistributedTx is true, we only return distributed transactions
|
||||
* (e.g., AssignDistributedTransaction() or assign_distributed_transactions())
|
||||
* has been called for the process. Distributed deadlock detection only
|
||||
* interested in these processes.
|
||||
*/
|
||||
WaitGraph *
|
||||
BuildGlobalWaitGraph(void)
|
||||
BuildGlobalWaitGraph(bool onlyDistributedTx)
|
||||
{
|
||||
List *workerNodeList = ActiveReadableNodeList();
|
||||
char *nodeUser = CitusExtensionOwnerName();
|
||||
List *connectionList = NIL;
|
||||
int32 localGroupId = GetLocalGroupId();
|
||||
|
||||
WaitGraph *waitGraph = BuildLocalWaitGraph();
|
||||
/* deadlock detection is only interested in */
|
||||
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
|
||||
|
||||
/* open connections in parallel */
|
||||
WorkerNode *workerNode = NULL;
|
||||
|
@ -124,9 +133,14 @@ BuildGlobalWaitGraph(void)
|
|||
MultiConnection *connection = NULL;
|
||||
foreach_ptr(connection, connectionList)
|
||||
{
|
||||
const char *command = "SELECT * FROM dump_local_wait_edges()";
|
||||
StringInfo queryString = makeStringInfo();
|
||||
const char *onlyDistributedTxStr = onlyDistributedTx ? "true" : "false";
|
||||
|
||||
int querySent = SendRemoteCommand(connection, command);
|
||||
appendStringInfo(queryString,
|
||||
"SELECT * FROM dump_local_wait_edges(%s)",
|
||||
onlyDistributedTxStr);
|
||||
|
||||
int querySent = SendRemoteCommand(connection, queryString->data);
|
||||
if (querySent == 0)
|
||||
{
|
||||
ReportConnectionError(connection, WARNING);
|
||||
|
@ -148,7 +162,7 @@ BuildGlobalWaitGraph(void)
|
|||
int64 rowCount = PQntuples(result);
|
||||
int64 colCount = PQnfields(result);
|
||||
|
||||
if (colCount != 9)
|
||||
if (colCount != 11)
|
||||
{
|
||||
ereport(WARNING, (errmsg("unexpected number of columns from "
|
||||
"dump_local_wait_edges")));
|
||||
|
@ -177,15 +191,17 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
|
|||
{
|
||||
WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
|
||||
|
||||
waitEdge->waitingPid = ParseIntField(result, rowIndex, 0);
|
||||
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1);
|
||||
waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2);
|
||||
waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3);
|
||||
waitEdge->blockingPid = ParseIntField(result, rowIndex, 4);
|
||||
waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5);
|
||||
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6);
|
||||
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7);
|
||||
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8);
|
||||
waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0);
|
||||
waitEdge->waitingPid = ParseIntField(result, rowIndex, 1);
|
||||
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2);
|
||||
waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 3);
|
||||
waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 4);
|
||||
waitEdge->blockingGPid = ParseIntField(result, rowIndex, 5);
|
||||
waitEdge->blockingPid = ParseIntField(result, rowIndex, 6);
|
||||
waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 7);
|
||||
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 8);
|
||||
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 9);
|
||||
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 10);
|
||||
}
|
||||
|
||||
|
||||
|
@ -256,7 +272,9 @@ ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex)
|
|||
Datum
|
||||
dump_local_wait_edges(PG_FUNCTION_ARGS)
|
||||
{
|
||||
WaitGraph *waitGraph = BuildLocalWaitGraph();
|
||||
bool onlyDistributedTx = PG_GETARG_BOOL(0);
|
||||
|
||||
WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx);
|
||||
ReturnWaitGraph(waitGraph, fcinfo);
|
||||
|
||||
return (Datum) 0;
|
||||
|
@ -274,51 +292,55 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
|
|||
|
||||
/*
|
||||
* Columns:
|
||||
* 00: waiting_pid
|
||||
* 01: waiting_node_id
|
||||
* 02: waiting_transaction_num
|
||||
* 03: waiting_transaction_stamp
|
||||
* 04: blocking_pid
|
||||
* 05: blocking__node_id
|
||||
* 06: blocking_transaction_num
|
||||
* 07: blocking_transaction_stamp
|
||||
* 08: blocking_transaction_waiting
|
||||
* 00: waiting_global_pid
|
||||
* 01: waiting_pid
|
||||
* 02: waiting_node_id
|
||||
* 03: waiting_transaction_num
|
||||
* 04: waiting_transaction_stamp
|
||||
* 05: blocking_global_pid
|
||||
* 06: blocking_pid
|
||||
* 07: blocking__node_id
|
||||
* 08: blocking_transaction_num
|
||||
* 09: blocking_transaction_stamp
|
||||
* 10: blocking_transaction_waiting
|
||||
*/
|
||||
for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++)
|
||||
{
|
||||
Datum values[9];
|
||||
bool nulls[9];
|
||||
Datum values[11];
|
||||
bool nulls[11];
|
||||
WaitEdge *curEdge = &waitGraph->edges[curEdgeNum];
|
||||
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(nulls, 0, sizeof(nulls));
|
||||
|
||||
values[0] = Int32GetDatum(curEdge->waitingPid);
|
||||
values[1] = Int32GetDatum(curEdge->waitingNodeId);
|
||||
values[0] = UInt64GetDatum(curEdge->waitingGPid);
|
||||
values[1] = Int32GetDatum(curEdge->waitingPid);
|
||||
values[2] = Int32GetDatum(curEdge->waitingNodeId);
|
||||
if (curEdge->waitingTransactionNum != 0)
|
||||
{
|
||||
values[2] = Int64GetDatum(curEdge->waitingTransactionNum);
|
||||
values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp);
|
||||
values[3] = Int64GetDatum(curEdge->waitingTransactionNum);
|
||||
values[4] = TimestampTzGetDatum(curEdge->waitingTransactionStamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
nulls[2] = true;
|
||||
nulls[3] = true;
|
||||
nulls[4] = true;
|
||||
}
|
||||
|
||||
values[4] = Int32GetDatum(curEdge->blockingPid);
|
||||
values[5] = Int32GetDatum(curEdge->blockingNodeId);
|
||||
values[5] = UInt64GetDatum(curEdge->blockingGPid);
|
||||
values[6] = Int32GetDatum(curEdge->blockingPid);
|
||||
values[7] = Int32GetDatum(curEdge->blockingNodeId);
|
||||
if (curEdge->blockingTransactionNum != 0)
|
||||
{
|
||||
values[6] = Int64GetDatum(curEdge->blockingTransactionNum);
|
||||
values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp);
|
||||
values[8] = Int64GetDatum(curEdge->blockingTransactionNum);
|
||||
values[9] = TimestampTzGetDatum(curEdge->blockingTransactionStamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
nulls[6] = true;
|
||||
nulls[7] = true;
|
||||
nulls[8] = true;
|
||||
nulls[9] = true;
|
||||
}
|
||||
values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting);
|
||||
values[10] = BoolGetDatum(curEdge->isBlockingXactWaiting);
|
||||
|
||||
tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
|
||||
}
|
||||
|
@ -328,9 +350,14 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
|
|||
/*
|
||||
* BuildLocalWaitGraph builds a wait graph for distributed transactions
|
||||
* that originate from the local node.
|
||||
*
|
||||
* If onlyDistributedTx is true, we only return distributed transactions
|
||||
* (e.g., AssignDistributedTransaction() or assign_distributed_transactions())
|
||||
* has been called for the process. Distributed deadlock detection only
|
||||
* interested in these processes.
|
||||
*/
|
||||
static WaitGraph *
|
||||
BuildLocalWaitGraph(void)
|
||||
BuildLocalWaitGraph(bool onlyDistributedTx)
|
||||
{
|
||||
PROCStack remaining;
|
||||
int totalProcs = TotalProcCount();
|
||||
|
@ -379,7 +406,8 @@ BuildLocalWaitGraph(void)
|
|||
* care about distributed transactions for the purpose of distributed
|
||||
* deadlock detection.
|
||||
*/
|
||||
if (!IsInDistributedTransaction(¤tBackendData))
|
||||
if (onlyDistributedTx &&
|
||||
!IsInDistributedTransaction(¤tBackendData))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
@ -627,6 +655,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
|
|||
}
|
||||
|
||||
curEdge->waitingPid = waitingProc->pid;
|
||||
curEdge->waitingGPid = waitingBackendData.globalPID;
|
||||
|
||||
if (IsInDistributedTransaction(&waitingBackendData))
|
||||
{
|
||||
|
@ -645,6 +674,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
|
|||
}
|
||||
|
||||
curEdge->blockingPid = blockingProc->pid;
|
||||
curEdge->blockingGPid = blockingBackendData.globalPID;
|
||||
|
||||
if (IsInDistributedTransaction(&blockingBackendData))
|
||||
{
|
||||
|
|
|
@ -31,11 +31,13 @@
|
|||
*/
|
||||
typedef struct WaitEdge
|
||||
{
|
||||
uint64 waitingGPid;
|
||||
int waitingPid;
|
||||
int waitingNodeId;
|
||||
int64 waitingTransactionNum;
|
||||
TimestampTz waitingTransactionStamp;
|
||||
|
||||
uint64 blockingGPid;
|
||||
int blockingPid;
|
||||
int blockingNodeId;
|
||||
int64 blockingTransactionNum;
|
||||
|
@ -58,7 +60,7 @@ typedef struct WaitGraph
|
|||
} WaitGraph;
|
||||
|
||||
|
||||
extern WaitGraph * BuildGlobalWaitGraph(void);
|
||||
extern WaitGraph * BuildGlobalWaitGraph(bool onlyDistributedTx);
|
||||
extern bool IsProcessWaitingForLock(PGPROC *proc);
|
||||
extern bool IsInDistributedTransaction(BackendData *backendData);
|
||||
extern TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex);
|
||||
|
|
|
@ -1005,6 +1005,8 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
---------------------------------------------------------------------
|
||||
function citus_disable_node(text,integer) void |
|
||||
function create_distributed_function(regprocedure,text,text) void |
|
||||
function dump_global_wait_edges() SETOF record |
|
||||
function dump_local_wait_edges() SETOF record |
|
||||
function master_append_table_to_shard(bigint,text,text,integer) real |
|
||||
function master_apply_delete_command(text) integer |
|
||||
function master_get_table_metadata(text) record |
|
||||
|
@ -1016,12 +1018,20 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function citus_shard_indexes_on_worker() SETOF record
|
||||
| function citus_shards_on_worker() SETOF record
|
||||
| function create_distributed_function(regprocedure,text,text,boolean) void
|
||||
<<<<<<< HEAD
|
||||
| function pg_cancel_backend(bigint) boolean
|
||||
| function pg_terminate_backend(bigint,bigint) boolean
|
||||
| function worker_create_or_replace_object(text[]) boolean
|
||||
| function worker_drop_sequence_dependency(text) void
|
||||
| function worker_drop_shell_table(text) void
|
||||
(18 rows)
|
||||
=======
|
||||
| function dump_global_wait_edges(boolean) SETOF record
|
||||
| function dump_local_wait_edges(boolean) SETOF record
|
||||
| function worker_drop_sequence_dependency(text) void
|
||||
| function worker_drop_shell_table(text) void
|
||||
(19 rows)
|
||||
>>>>>>> d4b956c7f (Use the optional APIs introduced for dump global/local wait edges)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -127,8 +127,8 @@ ORDER BY 1;
|
|||
function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone)
|
||||
function distributed_tables_colocated(regclass,regclass)
|
||||
function drop_old_time_partitions(regclass,timestamp with time zone)
|
||||
function dump_global_wait_edges()
|
||||
function dump_local_wait_edges()
|
||||
function dump_global_wait_edges(boolean)
|
||||
function dump_local_wait_edges(boolean)
|
||||
function fetch_intermediate_results(text[],text,integer)
|
||||
function fix_all_partition_shard_index_names()
|
||||
function fix_partition_shard_index_names(regclass)
|
||||
|
|
Loading…
Reference in New Issue