Add function for dumping global wait edges

pull/1495/head
Marco Slot 2017-07-17 17:26:01 +02:00
parent 81198a1d02
commit 80ea233ec1
6 changed files with 416 additions and 17 deletions

View File

@ -16,3 +16,19 @@ LANGUAGE 'c' STRICT
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(int)
IS 'returns a local list of blocked transactions originating from source_node_id';
CREATE FUNCTION pg_catalog.dump_global_wait_edges(
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE 'c' STRICT
AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges()
IS 'returns a global list of blocked transactions originating from this node';

View File

@ -19,9 +19,11 @@
#include "access/hash.h"
#include "distributed/backend_data.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/hsearch.h"
@ -39,6 +41,10 @@ typedef struct PROCStack
} PROCStack;
static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex);
static int64 ParseIntField(PGresult *result, int rowIndex, int colIndex);
static bool ParseBoolField(PGresult *result, int rowIndex, int colIndex);
static TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex);
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
static WaitGraph * BuildWaitGraphForSourceNode(int sourceNodeId);
static void LockLockData(void);
@ -57,6 +63,210 @@ static bool IsInDistributedTransaction(BackendData *backendData);
PG_FUNCTION_INFO_V1(dump_local_wait_edges);
PG_FUNCTION_INFO_V1(dump_global_wait_edges);
/*
* dump_global_wait_edges returns global wait edges for distributed transactions
* originating from the node on which it is started.
*/
Datum
dump_global_wait_edges(PG_FUNCTION_ARGS)
{
WaitGraph *waitGraph = NULL;
waitGraph = BuildGlobalWaitGraph();
ReturnWaitGraph(waitGraph, fcinfo);
return (Datum) 0;
}
/*
* BuildGlobalWaitGraph builds a wait graph for distributed transactions
* that originate from this node, including edges from all (other) worker
* nodes.
*/
WaitGraph *
BuildGlobalWaitGraph(void)
{
List *workerNodeList = ActivePrimaryNodeList();
ListCell *workerNodeCell = NULL;
char *nodeUser = CitusExtensionOwnerName();
List *connectionList = NIL;
ListCell *connectionCell = NULL;
int localNodeId = GetLocalGroupId();
WaitGraph *waitGraph = BuildWaitGraphForSourceNode(localNodeId);
/* open connections in parallel */
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
MultiConnection *connection = NULL;
int connectionFlags = 0;
if (workerNode->groupId == localNodeId)
{
/* we already have local wait edges */
continue;
}
connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
nodeUser, NULL);
connectionList = lappend(connectionList, connection);
}
FinishConnectionListEstablishment(connectionList);
/* send commands in parallel */
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
int querySent = false;
char *command = NULL;
const char *params[1];
params[0] = psprintf("%d", GetLocalGroupId());
command = "SELECT * FROM dump_local_wait_edges($1)";
querySent = SendRemoteCommandParams(connection, command, 1,
NULL, params);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
/* receive dump_local_wait_edges results */
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
PGresult *result = NULL;
bool raiseInterrupts = true;
int64 rowIndex = 0;
int64 rowCount = 0;
int64 colCount = 0;
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
rowCount = PQntuples(result);
colCount = PQnfields(result);
if (colCount != 9)
{
ereport(ERROR, (errmsg("unexpected number of columns from "
"dump_local_wait_edges")));
}
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
AddWaitEdgeFromResult(waitGraph, result, rowIndex);
}
PQclear(result);
ForgetResults(connection);
}
return waitGraph;
}
/*
* AddWaitEdgeFromResult adds an edge to the wait graph that is read from
* a PGresult.
*/
static void
AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
{
WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
waitEdge->waitingPid = ParseIntField(result, rowIndex, 0);
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1);
waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2);
waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3);
waitEdge->blockingPid = ParseIntField(result, rowIndex, 4);
waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5);
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6);
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7);
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8);
}
/*
* ParseIntField parses a int64 from a remote result or returns 0 if the
* result is NULL.
*/
static int64
ParseIntField(PGresult *result, int rowIndex, int colIndex)
{
char *resultString = NULL;
if (PQgetisnull(result, rowIndex, colIndex))
{
return 0;
}
resultString = PQgetvalue(result, rowIndex, colIndex);
return pg_strtouint64(resultString, NULL, 10);
}
/*
* ParseBoolField parses a bool from a remote result or returns false if the
* result is NULL.
*/
static bool
ParseBoolField(PGresult *result, int rowIndex, int colIndex)
{
char *resultString = NULL;
if (PQgetisnull(result, rowIndex, colIndex))
{
return false;
}
resultString = PQgetvalue(result, rowIndex, colIndex);
if (strlen(resultString) != 1)
{
return false;
}
return resultString[0] == 't';
}
/*
* ParseTimestampTzField parses a timestamptz from a remote result or returns
* 0 if the result is NULL.
*/
static TimestampTz
ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex)
{
char *resultString = NULL;
Datum resultStringDatum = 0;
Datum timestampDatum = 0;
if (PQgetisnull(result, rowIndex, colIndex))
{
return 0;
}
resultString = PQgetvalue(result, rowIndex, colIndex);
resultStringDatum = CStringGetDatum(resultString);
timestampDatum = DirectFunctionCall3(timestamptz_in, resultStringDatum, 0, -1);
return DatumGetTimestampTz(timestampDatum);
}
/*
@ -81,22 +291,22 @@ dump_local_wait_edges(PG_FUNCTION_ARGS)
static void
ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc = NULL;
Tuplestorestate *tupstore = NULL;
ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupleDesc = NULL;
Tuplestorestate *tupleStore = NULL;
MemoryContext per_query_ctx = NULL;
MemoryContext oldcontext = NULL;
MemoryContext oldContext = NULL;
size_t curEdgeNum = 0;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
if (resultInfo == NULL || !IsA(resultInfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"set-valued function called in context that cannot accept a set")));
}
if (!(rsinfo->allowedModes & SFRM_Materialize))
if (!(resultInfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -105,19 +315,19 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
}
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
per_query_ctx = resultInfo->econtext->ecxt_per_query_memory;
oldContext = MemoryContextSwitchTo(per_query_ctx);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
tupleStore = tuplestore_begin_heap(true, false, work_mem);
resultInfo->returnMode = SFRM_Materialize;
resultInfo->setResult = tupleStore;
resultInfo->setDesc = tupleDesc;
MemoryContextSwitchTo(oldContext);
/*
* Columns:
@ -167,11 +377,11 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
}
values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
}
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
tuplestore_donestoring(tupleStore);
}

View File

@ -54,4 +54,7 @@ typedef struct WaitGraph
} WaitGraph;
extern WaitGraph * BuildGlobalWaitGraph(void);
#endif /* LOCK_GRAPH_H */

View File

@ -0,0 +1,85 @@
Parsed test spec with 4 sessions
starting permutation: s1-begin s2-begin s1-update s2-update detector-dump-wait-edges s1-abort s2-abort
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-update:
UPDATE distributed_table SET y = 1 WHERE x = 1;
step s2-update:
UPDATE distributed_table SET y = 2 WHERE x = 1;
<waiting ...>
step detector-dump-wait-edges:
SELECT
waiting_transaction_num,
blocking_transaction_num,
blocking_transaction_waiting
FROM
dump_global_wait_edges()
ORDER BY
waiting_transaction_num,
blocking_transaction_num,
blocking_transaction_waiting;
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
192 191 f
step s1-abort:
ABORT;
step s2-update: <... completed>
step s2-abort:
ABORT;
starting permutation: s1-begin s2-begin s3-begin s1-update s2-update s3-update detector-dump-wait-edges s1-abort s2-abort s3-abort
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s1-update:
UPDATE distributed_table SET y = 1 WHERE x = 1;
step s2-update:
UPDATE distributed_table SET y = 2 WHERE x = 1;
<waiting ...>
step s3-update:
UPDATE distributed_table SET y = 3 WHERE x = 1;
<waiting ...>
step detector-dump-wait-edges:
SELECT
waiting_transaction_num,
blocking_transaction_num,
blocking_transaction_waiting
FROM
dump_global_wait_edges()
ORDER BY
waiting_transaction_num,
blocking_transaction_num,
blocking_transaction_waiting;
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
196 195 f
197 195 f
197 196 t
step s1-abort:
ABORT;
step s2-update: <... completed>
step s2-abort:
ABORT;
step s3-update: <... completed>
step s3-abort:
ABORT;

View File

@ -10,4 +10,4 @@ test: isolation_concurrent_dml isolation_data_migration
test: isolation_drop_shards isolation_copy_placement_vs_modification
test: isolation_insert_vs_vacuum isolation_transaction_recovery
test: isolation_distributed_transaction_id
test: isolation_dump_local_wait_edges
test: isolation_dump_local_wait_edges isolation_dump_global_wait_edges

View File

@ -0,0 +1,85 @@
setup
{
CREATE TABLE distributed_table (x int primary key, y int);
SELECT create_distributed_table('distributed_table', 'x');
INSERT INTO distributed_table VALUES (1,0);
}
teardown
{
DROP TABLE distributed_table;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-update"
{
UPDATE distributed_table SET y = 1 WHERE x = 1;
}
step "s1-abort"
{
ABORT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-update"
{
UPDATE distributed_table SET y = 2 WHERE x = 1;
}
step "s2-abort"
{
ABORT;
}
session "s3"
step "s3-begin"
{
BEGIN;
}
step "s3-update"
{
UPDATE distributed_table SET y = 3 WHERE x = 1;
}
step "s3-abort"
{
ABORT;
}
session "detector"
step "detector-dump-wait-edges"
{
SELECT
waiting_transaction_num,
blocking_transaction_num,
blocking_transaction_waiting
FROM
dump_global_wait_edges()
ORDER BY
waiting_transaction_num,
blocking_transaction_num,
blocking_transaction_waiting;
}
# Distributed transaction blocked by another distributed transaction
permutation "s1-begin" "s2-begin" "s1-update" "s2-update" "detector-dump-wait-edges" "s1-abort" "s2-abort"
# Distributed transaction blocked by another distributed transaction blocked by another distributed transaction
permutation "s1-begin" "s2-begin" "s3-begin" "s1-update" "s2-update" "s3-update" "detector-dump-wait-edges" "s1-abort" "s2-abort" "s3-abort"