diff --git a/src/backend/distributed/citus--7.0-5--7.0-6.sql b/src/backend/distributed/citus--7.0-5--7.0-6.sql index e30273161..e09520be4 100644 --- a/src/backend/distributed/citus--7.0-5--7.0-6.sql +++ b/src/backend/distributed/citus--7.0-5--7.0-6.sql @@ -16,3 +16,19 @@ LANGUAGE 'c' STRICT AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(int) IS 'returns a local list of blocked transactions originating from source_node_id'; + +CREATE FUNCTION pg_catalog.dump_global_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE 'c' STRICT +AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() +IS 'returns a global list of blocked transactions originating from this node'; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 5416bba6c..9a790c0d4 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -19,9 +19,11 @@ #include "access/hash.h" #include "distributed/backend_data.h" +#include "distributed/connection_management.h" #include "distributed/hash_helpers.h" #include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" #include "storage/proc.h" #include "utils/builtins.h" #include "utils/hsearch.h" @@ -39,6 +41,10 @@ typedef struct PROCStack } PROCStack; +static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); +static int64 ParseIntField(PGresult *result, int rowIndex, int colIndex); +static bool ParseBoolField(PGresult *result, int rowIndex, int colIndex); +static TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static WaitGraph * BuildWaitGraphForSourceNode(int sourceNodeId); static void LockLockData(void); @@ -57,6 +63,210 @@ static bool IsInDistributedTransaction(BackendData *backendData); PG_FUNCTION_INFO_V1(dump_local_wait_edges); +PG_FUNCTION_INFO_V1(dump_global_wait_edges); + + +/* + * dump_global_wait_edges returns global wait edges for distributed transactions + * originating from the node on which it is started. + */ +Datum +dump_global_wait_edges(PG_FUNCTION_ARGS) +{ + WaitGraph *waitGraph = NULL; + + waitGraph = BuildGlobalWaitGraph(); + + ReturnWaitGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + +/* + * BuildGlobalWaitGraph builds a wait graph for distributed transactions + * that originate from this node, including edges from all (other) worker + * nodes. + */ +WaitGraph * +BuildGlobalWaitGraph(void) +{ + List *workerNodeList = ActivePrimaryNodeList(); + ListCell *workerNodeCell = NULL; + char *nodeUser = CitusExtensionOwnerName(); + List *connectionList = NIL; + ListCell *connectionCell = NULL; + int localNodeId = GetLocalGroupId(); + + WaitGraph *waitGraph = BuildWaitGraphForSourceNode(localNodeId); + + /* open connections in parallel */ + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + MultiConnection *connection = NULL; + int connectionFlags = 0; + + if (workerNode->groupId == localNodeId) + { + /* we already have local wait edges */ + continue; + } + + connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + /* send commands in parallel */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + int querySent = false; + char *command = NULL; + const char *params[1]; + + params[0] = psprintf("%d", GetLocalGroupId()); + command = "SELECT * FROM dump_local_wait_edges($1)"; + + querySent = SendRemoteCommandParams(connection, command, 1, + NULL, params); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + /* receive dump_local_wait_edges results */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + PGresult *result = NULL; + bool raiseInterrupts = true; + int64 rowIndex = 0; + int64 rowCount = 0; + int64 colCount = 0; + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + rowCount = PQntuples(result); + colCount = PQnfields(result); + + if (colCount != 9) + { + ereport(ERROR, (errmsg("unexpected number of columns from " + "dump_local_wait_edges"))); + } + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + AddWaitEdgeFromResult(waitGraph, result, rowIndex); + } + + PQclear(result); + ForgetResults(connection); + } + + return waitGraph; +} + + +/* + * AddWaitEdgeFromResult adds an edge to the wait graph that is read from + * a PGresult. + */ +static void +AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) +{ + WaitEdge *waitEdge = AllocWaitEdge(waitGraph); + + waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); + waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); + waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); + waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); + waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); + waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); + waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); + waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); + waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); +} + + +/* + * ParseIntField parses a int64 from a remote result or returns 0 if the + * result is NULL. + */ +static int64 +ParseIntField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + return 0; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + + return pg_strtouint64(resultString, NULL, 10); +} + + +/* + * ParseBoolField parses a bool from a remote result or returns false if the + * result is NULL. + */ +static bool +ParseBoolField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + return false; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + if (strlen(resultString) != 1) + { + return false; + } + + return resultString[0] == 't'; +} + + +/* + * ParseTimestampTzField parses a timestamptz from a remote result or returns + * 0 if the result is NULL. + */ +static TimestampTz +ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + Datum resultStringDatum = 0; + Datum timestampDatum = 0; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + return 0; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + resultStringDatum = CStringGetDatum(resultString); + timestampDatum = DirectFunctionCall3(timestamptz_in, resultStringDatum, 0, -1); + + return DatumGetTimestampTz(timestampDatum); +} /* @@ -81,22 +291,22 @@ dump_local_wait_edges(PG_FUNCTION_ARGS) static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) { - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupdesc = NULL; - Tuplestorestate *tupstore = NULL; + ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupleDesc = NULL; + Tuplestorestate *tupleStore = NULL; MemoryContext per_query_ctx = NULL; - MemoryContext oldcontext = NULL; + MemoryContext oldContext = NULL; size_t curEdgeNum = 0; /* check to see if caller supports us returning a tuplestore */ - if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + if (resultInfo == NULL || !IsA(resultInfo, ReturnSetInfo)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg( "set-valued function called in context that cannot accept a set"))); } - if (!(rsinfo->allowedModes & SFRM_Materialize)) + if (!(resultInfo->allowedModes & SFRM_Materialize)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -105,19 +315,19 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) } /* Build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE) { elog(ERROR, "return type must be a row type"); } - per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; - oldcontext = MemoryContextSwitchTo(per_query_ctx); + per_query_ctx = resultInfo->econtext->ecxt_per_query_memory; + oldContext = MemoryContextSwitchTo(per_query_ctx); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->returnMode = SFRM_Materialize; - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + tupleStore = tuplestore_begin_heap(true, false, work_mem); + resultInfo->returnMode = SFRM_Materialize; + resultInfo->setResult = tupleStore; + resultInfo->setDesc = tupleDesc; + MemoryContextSwitchTo(oldContext); /* * Columns: @@ -167,11 +377,11 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) } values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); - tuplestore_putvalues(tupstore, tupdesc, values, nulls); + tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); } /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); + tuplestore_donestoring(tupleStore); } diff --git a/src/include/distributed/lock_graph.h b/src/include/distributed/lock_graph.h index 548b12ecb..5084e38bc 100644 --- a/src/include/distributed/lock_graph.h +++ b/src/include/distributed/lock_graph.h @@ -54,4 +54,7 @@ typedef struct WaitGraph } WaitGraph; +extern WaitGraph * BuildGlobalWaitGraph(void); + + #endif /* LOCK_GRAPH_H */ diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out new file mode 100644 index 000000000..585a9fd9e --- /dev/null +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -0,0 +1,85 @@ +Parsed test spec with 4 sessions + +starting permutation: s1-begin s2-begin s1-update s2-update detector-dump-wait-edges s1-abort s2-abort +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-update: + UPDATE distributed_table SET y = 1 WHERE x = 1; + +step s2-update: + UPDATE distributed_table SET y = 2 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_global_wait_edges() + ORDER BY + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting; + +waiting_transaction_numblocking_transaction_numblocking_transaction_waiting + +192 191 f +step s1-abort: + ABORT; + +step s2-update: <... completed> +step s2-abort: + ABORT; + + +starting permutation: s1-begin s2-begin s3-begin s1-update s2-update s3-update detector-dump-wait-edges s1-abort s2-abort s3-abort +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-update: + UPDATE distributed_table SET y = 1 WHERE x = 1; + +step s2-update: + UPDATE distributed_table SET y = 2 WHERE x = 1; + +step s3-update: + UPDATE distributed_table SET y = 3 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_global_wait_edges() + ORDER BY + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting; + +waiting_transaction_numblocking_transaction_numblocking_transaction_waiting + +196 195 f +197 195 f +197 196 t +step s1-abort: + ABORT; + +step s2-update: <... completed> +step s2-abort: + ABORT; + +step s3-update: <... completed> +step s3-abort: + ABORT; + diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 84e796045..32aef1141 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -10,4 +10,4 @@ test: isolation_concurrent_dml isolation_data_migration test: isolation_drop_shards isolation_copy_placement_vs_modification test: isolation_insert_vs_vacuum isolation_transaction_recovery test: isolation_distributed_transaction_id -test: isolation_dump_local_wait_edges +test: isolation_dump_local_wait_edges isolation_dump_global_wait_edges diff --git a/src/test/regress/specs/isolation_dump_global_wait_edges.spec b/src/test/regress/specs/isolation_dump_global_wait_edges.spec new file mode 100644 index 000000000..869ccf569 --- /dev/null +++ b/src/test/regress/specs/isolation_dump_global_wait_edges.spec @@ -0,0 +1,85 @@ +setup +{ + CREATE TABLE distributed_table (x int primary key, y int); + SELECT create_distributed_table('distributed_table', 'x'); + INSERT INTO distributed_table VALUES (1,0); +} + +teardown +{ + DROP TABLE distributed_table; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-update" +{ + UPDATE distributed_table SET y = 1 WHERE x = 1; +} + +step "s1-abort" +{ + ABORT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-update" +{ + UPDATE distributed_table SET y = 2 WHERE x = 1; +} + +step "s2-abort" +{ + ABORT; +} + +session "s3" + +step "s3-begin" +{ + BEGIN; +} + +step "s3-update" +{ + UPDATE distributed_table SET y = 3 WHERE x = 1; +} + +step "s3-abort" +{ + ABORT; +} + + +session "detector" + +step "detector-dump-wait-edges" +{ + SELECT + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_global_wait_edges() + ORDER BY + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting; +} + +# Distributed transaction blocked by another distributed transaction +permutation "s1-begin" "s2-begin" "s1-update" "s2-update" "detector-dump-wait-edges" "s1-abort" "s2-abort" + +# Distributed transaction blocked by another distributed transaction blocked by another distributed transaction +permutation "s1-begin" "s2-begin" "s3-begin" "s1-update" "s2-update" "s3-update" "detector-dump-wait-edges" "s1-abort" "s2-abort" "s3-abort"