From 331af3dce80ca6aa4e4d395a32ae5ee9b9dbbc78 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 9 Feb 2022 15:48:48 +0100 Subject: [PATCH 1/2] Dumping wait edges becomes optionally scan all backends Before this commit, dumping wait edges can only be used for distributed deadlock detection purposes. With this commit, we open the possibility that we can use it for any backend. --- .../distributed/sql/citus--10.2-4--11.0-1.sql | 3 + .../sql/downgrades/citus--11.0-1--10.2-4.sql | 70 ++++++++++- .../udfs/dump_global_wait_edges/11.0-1.sql | 19 +++ .../udfs/dump_global_wait_edges/latest.sql | 19 +++ .../sql/udfs/dump_local_wait_edges/11.0-1.sql | 19 +++ .../sql/udfs/dump_local_wait_edges/latest.sql | 19 +++ .../test/distributed_deadlock_detection.c | 5 +- .../distributed_deadlock_detection.c | 4 +- .../distributed/transaction/lock_graph.c | 114 +++++++++++------- src/include/distributed/lock_graph.h | 4 +- src/test/regress/expected/multi_extension.out | 10 ++ .../expected/upgrade_list_citus_objects.out | 4 +- 12 files changed, 241 insertions(+), 49 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql create mode 100644 src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index a8cfa24fd..de6748c12 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -17,6 +17,9 @@ #include "udfs/get_all_active_transactions/11.0-1.sql" #include "udfs/get_global_active_transactions/11.0-1.sql" +#include "udfs/dump_local_wait_edges/11.0-1.sql" +#include "udfs/dump_global_wait_edges/11.0-1.sql" + #include "udfs/citus_worker_stat_activity/11.0-1.sql" #include "udfs/worker_create_or_replace_object/11.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 399e42ff0..53d03f986 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -113,6 +113,40 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int RESET search_path; +DROP FUNCTION dump_local_wait_edges CASCADE; +CREATE FUNCTION pg_catalog.dump_local_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() +IS 'returns all local lock wait chains, that start from distributed transactions'; + +DROP FUNCTION dump_global_wait_edges CASCADE; +CREATE FUNCTION pg_catalog.dump_global_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE 'c' STRICT +AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() +IS 'returns a global list of blocked transactions originating from this node'; + DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, @@ -211,7 +245,39 @@ GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]); #include "../udfs/worker_create_or_replace_object/9.0-1.sql" -RESET search_path; - DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE; DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE; + +DROP FUNCTION pg_catalog.dump_global_wait_edges; +CREATE FUNCTION pg_catalog.dump_local_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() +IS 'returns all local lock wait chains, that start from distributed transactions'; + +DROP FUNCTION pg_catalog.dump_global_wait_edges; +CREATE FUNCTION pg_catalog.dump_global_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE 'c' STRICT +AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() +IS 'returns a global list of blocked transactions originating from this node'; diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql new file mode 100644 index 000000000..32e25e217 --- /dev/null +++ b/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql @@ -0,0 +1,19 @@ +DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; +CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( + distributed_tx_only boolean DEFAULT true, + OUT waiting_global_pid int8, + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_global_pid int8, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool) +IS 'returns a global list of blocked backends originating from this node'; diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql new file mode 100644 index 000000000..32e25e217 --- /dev/null +++ b/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql @@ -0,0 +1,19 @@ +DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; +CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( + distributed_tx_only boolean DEFAULT true, + OUT waiting_global_pid int8, + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_global_pid int8, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool) +IS 'returns a global list of blocked backends originating from this node'; diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql new file mode 100644 index 000000000..190e4c0d5 --- /dev/null +++ b/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql @@ -0,0 +1,19 @@ +DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; +CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( + distributed_tx_only boolean DEFAULT true, + OUT waiting_global_pid int8, + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_global_pid int8, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) +IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql new file mode 100644 index 000000000..190e4c0d5 --- /dev/null +++ b/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql @@ -0,0 +1,19 @@ +DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; +CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( + distributed_tx_only boolean DEFAULT true, + OUT waiting_global_pid int8, + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_global_pid int8, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) +IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/test/distributed_deadlock_detection.c b/src/backend/distributed/test/distributed_deadlock_detection.c index 448228158..d3fa34db2 100644 --- a/src/backend/distributed/test/distributed_deadlock_detection.c +++ b/src/backend/distributed/test/distributed_deadlock_detection.c @@ -50,7 +50,10 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) bool isNulls[2]; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); - WaitGraph *waitGraph = BuildGlobalWaitGraph(); + + /* distributed deadlock detection only considers distributed txs */ + bool onlyDistributedTx = true; + WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph); /* iterate on all nodes */ diff --git a/src/backend/distributed/transaction/distributed_deadlock_detection.c b/src/backend/distributed/transaction/distributed_deadlock_detection.c index f9e4adca0..82c274661 100644 --- a/src/backend/distributed/transaction/distributed_deadlock_detection.c +++ b/src/backend/distributed/transaction/distributed_deadlock_detection.c @@ -119,7 +119,9 @@ CheckForDistributedDeadlocks(void) return false; } - WaitGraph *waitGraph = BuildGlobalWaitGraph(); + /* distributed deadlock detection only considers distributed txs */ + bool onlyDistributedTx = true; + WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph); int edgeCount = waitGraph->edgeCount; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index aa37e4371..18f391d74 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -47,7 +47,7 @@ typedef struct PROCStack static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); -static WaitGraph * BuildLocalWaitGraph(void); +static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx); static bool IsProcessWaitingForSafeOperations(PGPROC *proc); static void LockLockData(void); static void UnlockLockData(void); @@ -74,7 +74,9 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges); Datum dump_global_wait_edges(PG_FUNCTION_ARGS) { - WaitGraph *waitGraph = BuildGlobalWaitGraph(); + bool onlyDistributedTx = PG_GETARG_BOOL(0); + + WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); ReturnWaitGraph(waitGraph, fcinfo); @@ -86,16 +88,23 @@ dump_global_wait_edges(PG_FUNCTION_ARGS) * BuildGlobalWaitGraph builds a wait graph for distributed transactions * that originate from this node, including edges from all (other) worker * nodes. + * + * + * If onlyDistributedTx is true, we only return distributed transactions + * (e.g., AssignDistributedTransaction() or assign_distributed_transactions()) + * has been called for the process. Distributed deadlock detection only + * interested in these processes. */ WaitGraph * -BuildGlobalWaitGraph(void) +BuildGlobalWaitGraph(bool onlyDistributedTx) { List *workerNodeList = ActiveReadableNodeList(); char *nodeUser = CitusExtensionOwnerName(); List *connectionList = NIL; int32 localGroupId = GetLocalGroupId(); - WaitGraph *waitGraph = BuildLocalWaitGraph(); + /* deadlock detection is only interested in */ + WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); /* open connections in parallel */ WorkerNode *workerNode = NULL; @@ -124,9 +133,14 @@ BuildGlobalWaitGraph(void) MultiConnection *connection = NULL; foreach_ptr(connection, connectionList) { - const char *command = "SELECT * FROM dump_local_wait_edges()"; + StringInfo queryString = makeStringInfo(); + const char *onlyDistributedTxStr = onlyDistributedTx ? "true" : "false"; - int querySent = SendRemoteCommand(connection, command); + appendStringInfo(queryString, + "SELECT * FROM dump_local_wait_edges(%s)", + onlyDistributedTxStr); + + int querySent = SendRemoteCommand(connection, queryString->data); if (querySent == 0) { ReportConnectionError(connection, WARNING); @@ -148,7 +162,7 @@ BuildGlobalWaitGraph(void) int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); - if (colCount != 9) + if (colCount != 11) { ereport(WARNING, (errmsg("unexpected number of columns from " "dump_local_wait_edges"))); @@ -177,15 +191,17 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) { WaitEdge *waitEdge = AllocWaitEdge(waitGraph); - waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); - waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); - waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); - waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); - waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); - waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); - waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); - waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); - waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); + waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0); + waitEdge->waitingPid = ParseIntField(result, rowIndex, 1); + waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2); + waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 3); + waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 4); + waitEdge->blockingGPid = ParseIntField(result, rowIndex, 5); + waitEdge->blockingPid = ParseIntField(result, rowIndex, 6); + waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 7); + waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 8); + waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 9); + waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 10); } @@ -256,7 +272,9 @@ ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex) Datum dump_local_wait_edges(PG_FUNCTION_ARGS) { - WaitGraph *waitGraph = BuildLocalWaitGraph(); + bool onlyDistributedTx = PG_GETARG_BOOL(0); + + WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); ReturnWaitGraph(waitGraph, fcinfo); return (Datum) 0; @@ -274,51 +292,55 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) /* * Columns: - * 00: waiting_pid - * 01: waiting_node_id - * 02: waiting_transaction_num - * 03: waiting_transaction_stamp - * 04: blocking_pid - * 05: blocking__node_id - * 06: blocking_transaction_num - * 07: blocking_transaction_stamp - * 08: blocking_transaction_waiting + * 00: waiting_global_pid + * 01: waiting_pid + * 02: waiting_node_id + * 03: waiting_transaction_num + * 04: waiting_transaction_stamp + * 05: blocking_global_pid + * 06: blocking_pid + * 07: blocking__node_id + * 08: blocking_transaction_num + * 09: blocking_transaction_stamp + * 10: blocking_transaction_waiting */ for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++) { - Datum values[9]; - bool nulls[9]; + Datum values[11]; + bool nulls[11]; WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); - values[0] = Int32GetDatum(curEdge->waitingPid); - values[1] = Int32GetDatum(curEdge->waitingNodeId); + values[0] = UInt64GetDatum(curEdge->waitingGPid); + values[1] = Int32GetDatum(curEdge->waitingPid); + values[2] = Int32GetDatum(curEdge->waitingNodeId); if (curEdge->waitingTransactionNum != 0) { - values[2] = Int64GetDatum(curEdge->waitingTransactionNum); - values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); + values[3] = Int64GetDatum(curEdge->waitingTransactionNum); + values[4] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); } else { - nulls[2] = true; nulls[3] = true; + nulls[4] = true; } - values[4] = Int32GetDatum(curEdge->blockingPid); - values[5] = Int32GetDatum(curEdge->blockingNodeId); + values[5] = UInt64GetDatum(curEdge->blockingGPid); + values[6] = Int32GetDatum(curEdge->blockingPid); + values[7] = Int32GetDatum(curEdge->blockingNodeId); if (curEdge->blockingTransactionNum != 0) { - values[6] = Int64GetDatum(curEdge->blockingTransactionNum); - values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); + values[8] = Int64GetDatum(curEdge->blockingTransactionNum); + values[9] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); } else { - nulls[6] = true; - nulls[7] = true; + nulls[8] = true; + nulls[9] = true; } - values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); + values[10] = BoolGetDatum(curEdge->isBlockingXactWaiting); tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); } @@ -328,9 +350,14 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) /* * BuildLocalWaitGraph builds a wait graph for distributed transactions * that originate from the local node. + * + * If onlyDistributedTx is true, we only return distributed transactions + * (e.g., AssignDistributedTransaction() or assign_distributed_transactions()) + * has been called for the process. Distributed deadlock detection only + * interested in these processes. */ static WaitGraph * -BuildLocalWaitGraph(void) +BuildLocalWaitGraph(bool onlyDistributedTx) { PROCStack remaining; int totalProcs = TotalProcCount(); @@ -379,7 +406,8 @@ BuildLocalWaitGraph(void) * care about distributed transactions for the purpose of distributed * deadlock detection. */ - if (!IsInDistributedTransaction(¤tBackendData)) + if (onlyDistributedTx && + !IsInDistributedTransaction(¤tBackendData)) { continue; } @@ -627,6 +655,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc, } curEdge->waitingPid = waitingProc->pid; + curEdge->waitingGPid = waitingBackendData.globalPID; if (IsInDistributedTransaction(&waitingBackendData)) { @@ -645,6 +674,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc, } curEdge->blockingPid = blockingProc->pid; + curEdge->blockingGPid = blockingBackendData.globalPID; if (IsInDistributedTransaction(&blockingBackendData)) { diff --git a/src/include/distributed/lock_graph.h b/src/include/distributed/lock_graph.h index 46fec1dee..f204ebb03 100644 --- a/src/include/distributed/lock_graph.h +++ b/src/include/distributed/lock_graph.h @@ -31,11 +31,13 @@ */ typedef struct WaitEdge { + uint64 waitingGPid; int waitingPid; int waitingNodeId; int64 waitingTransactionNum; TimestampTz waitingTransactionStamp; + uint64 blockingGPid; int blockingPid; int blockingNodeId; int64 blockingTransactionNum; @@ -58,7 +60,7 @@ typedef struct WaitGraph } WaitGraph; -extern WaitGraph * BuildGlobalWaitGraph(void); +extern WaitGraph * BuildGlobalWaitGraph(bool onlyDistributedTx); extern bool IsProcessWaitingForLock(PGPROC *proc); extern bool IsInDistributedTransaction(BackendData *backendData); extern TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index dd93bdd44..2746d8ea4 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1005,6 +1005,8 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- function citus_disable_node(text,integer) void | function create_distributed_function(regprocedure,text,text) void | + function dump_global_wait_edges() SETOF record | + function dump_local_wait_edges() SETOF record | function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | @@ -1016,12 +1018,20 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_shard_indexes_on_worker() SETOF record | function citus_shards_on_worker() SETOF record | function create_distributed_function(regprocedure,text,text,boolean) void +<<<<<<< HEAD | function pg_cancel_backend(bigint) boolean | function pg_terminate_backend(bigint,bigint) boolean | function worker_create_or_replace_object(text[]) boolean | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void (18 rows) +======= + | function dump_global_wait_edges(boolean) SETOF record + | function dump_local_wait_edges(boolean) SETOF record + | function worker_drop_sequence_dependency(text) void + | function worker_drop_shell_table(text) void +(19 rows) +>>>>>>> d4b956c7f (Use the optional APIs introduced for dump global/local wait edges) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 13a15c658..2071dca18 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -127,8 +127,8 @@ ORDER BY 1; function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone) function distributed_tables_colocated(regclass,regclass) function drop_old_time_partitions(regclass,timestamp with time zone) - function dump_global_wait_edges() - function dump_local_wait_edges() + function dump_global_wait_edges(boolean) + function dump_local_wait_edges(boolean) function fetch_intermediate_results(text[],text,integer) function fix_all_partition_shard_index_names() function fix_partition_shard_index_names(regclass) From dffcafc0967de66429458d5910bcc61478d40824 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 9 Feb 2022 17:35:42 +0100 Subject: [PATCH 2/2] Use global pids in citus_lock_waits --- .../distributed/sql/citus--10.2-4--11.0-1.sql | 6 +- .../sql/downgrades/citus--11.0-1--10.2-4.sql | 258 ++++++++----- .../sql/udfs/citus_blocking_pids/11.0-1.sql | 34 ++ .../sql/udfs/citus_blocking_pids/latest.sql | 34 ++ .../11.0-1.sql | 12 +- .../latest.sql | 12 +- .../11.0-1.sql | 10 +- .../latest.sql | 10 +- .../11.0-1.sql | 56 +++ .../latest.sql | 12 +- .../sql/udfs/citus_lock_waits/11.0-1.sql | 8 +- .../sql/udfs/citus_lock_waits/latest.sql | 8 +- .../distributed/transaction/lock_graph.c | 188 ++++++++- ...lation_get_distributed_wait_queries_mx.out | 361 ++++++++++++------ .../isolation_rebalancer_deferred_drop.out | 7 +- ...licate_reference_tables_to_coordinator.out | 7 +- src/test/regress/expected/multi_extension.out | 118 +++--- .../expected/upgrade_list_citus_objects.out | 8 +- ...ation_get_distributed_wait_queries_mx.spec | 18 +- ...icate_reference_tables_to_coordinator.spec | 3 +- 20 files changed, 833 insertions(+), 337 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql rename src/backend/distributed/sql/udfs/{dump_global_wait_edges => citus_internal_global_blocked_processes}/11.0-1.sql (62%) rename src/backend/distributed/sql/udfs/{dump_global_wait_edges => citus_internal_global_blocked_processes}/latest.sql (62%) rename src/backend/distributed/sql/udfs/{dump_local_wait_edges => citus_internal_local_blocked_processes}/11.0-1.sql (67%) rename src/backend/distributed/sql/udfs/{dump_local_wait_edges => citus_internal_local_blocked_processes}/latest.sql (67%) create mode 100644 src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index de6748c12..849b28761 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -17,11 +17,13 @@ #include "udfs/get_all_active_transactions/11.0-1.sql" #include "udfs/get_global_active_transactions/11.0-1.sql" -#include "udfs/dump_local_wait_edges/11.0-1.sql" -#include "udfs/dump_global_wait_edges/11.0-1.sql" +#include "udfs/citus_internal_local_blocked_processes/11.0-1.sql" +#include "udfs/citus_internal_global_blocked_processes/11.0-1.sql" #include "udfs/citus_worker_stat_activity/11.0-1.sql" #include "udfs/worker_create_or_replace_object/11.0-1.sql" +#include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql" +#include "udfs/citus_blocking_pids/11.0-1.sql" CREATE VIEW citus.citus_worker_stat_activity AS SELECT * FROM pg_catalog.citus_worker_stat_activity(); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 53d03f986..ba13b134a 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -113,39 +113,8 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int RESET search_path; -DROP FUNCTION dump_local_wait_edges CASCADE; -CREATE FUNCTION pg_catalog.dump_local_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() -IS 'returns all local lock wait chains, that start from distributed transactions'; - -DROP FUNCTION dump_global_wait_edges CASCADE; -CREATE FUNCTION pg_catalog.dump_global_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE 'c' STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() -IS 'returns a global list of blocked transactions originating from this node'; +DROP FUNCTION citus_internal_local_blocked_processes CASCADE; +DROP FUNCTION citus_internal_global_blocked_processes CASCADE; DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; @@ -173,6 +142,162 @@ ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; SET search_path = 'pg_catalog'; +DROP FUNCTION citus_worker_stat_activity CASCADE; + +CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_worker_stat_activity$$; + +COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +IS 'returns distributed transaction activity on shards of distributed tables'; + +DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]); +#include "../udfs/worker_create_or_replace_object/9.0-1.sql" + +DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE; +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE; + +DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; +CREATE FUNCTION pg_catalog.dump_local_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE C STRICT +AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() +IS 'returns all local lock wait chains, that start from distributed transactions'; + +DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; +CREATE FUNCTION pg_catalog.dump_global_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE 'c' STRICT +AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() +IS 'returns a global list of blocked transactions originating from this node'; + +DROP FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]); +CREATE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) +RETURNS boolean AS $$ + DECLARE + mBlockedTransactionNum int8; + workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); + coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); + BEGIN + IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN + RETURN true; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + -- Note that worker process may be blocked or waiting for a lock. So we need to + -- get transaction number for both of them. Following IF provides the transaction + -- number when the worker process waiting for other session. + IF EXISTS (SELECT transaction_number FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN + SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; + ELSE + -- Check whether transactions initiated from the coordinator get locked + SELECT transaction_number INTO mBlockedTransactionNum + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + END IF; + + RETURN EXISTS ( + SELECT 1 FROM dump_global_wait_edges() + WHERE waiting_transaction_num = mBlockedTransactionNum + ) OR EXISTS ( + -- Check on the workers if any logical replication job spawned by the + -- current PID is blocked, by checking it's application name + -- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring + SELECT result FROM run_command_on_workers($two$ + SELECT blocked_activity.application_name AS blocked_application + FROM pg_catalog.pg_locks blocked_locks + JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid + JOIN pg_catalog.pg_locks blocking_locks + ON blocking_locks.locktype = blocked_locks.locktype + AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE + AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation + AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page + AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple + AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid + AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid + AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid + AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid + AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid + AND blocking_locks.pid != blocked_locks.pid + JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid + WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%' + $two$) where result='citus_shard_move_subscription_' || pBlockedPid); + + END; +$$ LANGUAGE plpgsql; + +REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC; + +DROP FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer); +CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) +RETURNS int4[] AS $$ + DECLARE + mLocalBlockingPids int4[]; + mRemoteBlockingPids int4[]; + mLocalTransactionNum int8; + BEGIN + SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids; + + IF (array_length(mLocalBlockingPids, 1) > 0) THEN + RETURN mLocalBlockingPids; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + SELECT transaction_number INTO mLocalTransactionNum + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + + SELECT array_agg(process_id) INTO mRemoteBlockingPids FROM ( + WITH activeTransactions AS ( + SELECT process_id, transaction_number FROM get_all_active_transactions() + ), blockingTransactions AS ( + SELECT blocking_transaction_num AS txn_num FROM dump_global_wait_edges() + WHERE waiting_transaction_num = mLocalTransactionNum + ) + SELECT activeTransactions.process_id FROM activeTransactions, blockingTransactions + WHERE activeTransactions.transaction_number = blockingTransactions.txn_num + ) AS sub; + + RETURN mRemoteBlockingPids; + END; +$$ LANGUAGE plpgsql; +REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC; + +CREATE VIEW citus.citus_worker_stat_activity AS +SELECT * FROM pg_catalog.citus_worker_stat_activity(); +ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; -- we have to recreate this view because we drop citus_dist_stat_activity that this view depends CREATE VIEW citus.citus_lock_waits AS @@ -184,7 +309,7 @@ citus_dist_stat_activity AS ), unique_global_wait_edges AS ( - SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() + SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() ), citus_dist_stat_activity_with_node_id AS ( @@ -217,67 +342,4 @@ JOIN ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; -DROP FUNCTION citus_worker_stat_activity CASCADE; - -CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) -RETURNS SETOF RECORD -LANGUAGE C STRICT AS 'MODULE_PATHNAME', -$$citus_worker_stat_activity$$; - -COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, - OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, - OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, - OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, - OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) -IS 'returns distributed transaction activity on shards of distributed tables'; - -CREATE VIEW citus.citus_worker_stat_activity AS -SELECT * FROM pg_catalog.citus_worker_stat_activity(); -ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; -GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; - -DROP FUNCTION pg_catalog.worker_create_or_replace_object(text[]); -#include "../udfs/worker_create_or_replace_object/9.0-1.sql" - -DROP FUNCTION IF EXISTS pg_catalog.pg_cancel_backend(bigint) CASCADE; -DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_backend(bigint, bigint) CASCADE; - -DROP FUNCTION pg_catalog.dump_global_wait_edges; -CREATE FUNCTION pg_catalog.dump_local_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges() -IS 'returns all local lock wait chains, that start from distributed transactions'; - -DROP FUNCTION pg_catalog.dump_global_wait_edges; -CREATE FUNCTION pg_catalog.dump_global_wait_edges( - OUT waiting_pid int4, - OUT waiting_node_id int4, - OUT waiting_transaction_num int8, - OUT waiting_transaction_stamp timestamptz, - OUT blocking_pid int4, - OUT blocking_node_id int4, - OUT blocking_transaction_num int8, - OUT blocking_transaction_stamp timestamptz, - OUT blocking_transaction_waiting bool) -RETURNS SETOF RECORD -LANGUAGE 'c' STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() -IS 'returns a global list of blocked transactions originating from this node'; +RESET search_path; diff --git a/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql new file mode 100644 index 000000000..c7e607c1c --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_blocking_pids/11.0-1.sql @@ -0,0 +1,34 @@ +DROP FUNCTION pg_catalog.citus_blocking_pids; +CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) +RETURNS int4[] AS $$ + DECLARE + mLocalBlockingPids int4[]; + mRemoteBlockingPids int4[]; + mLocalGlobalPid int8; + BEGIN + SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids; + + IF (array_length(mLocalBlockingPids, 1) > 0) THEN + RETURN mLocalBlockingPids; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + SELECT global_pid INTO mLocalGlobalPid + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + + SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM ( + WITH activeTransactions AS ( + SELECT global_pid FROM get_all_active_transactions() + ), blockingTransactions AS ( + SELECT blocking_global_pid FROM citus_internal_global_blocked_processes() + WHERE waiting_global_pid = mLocalGlobalPid + ) + SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions + WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid + ) AS sub; + + RETURN mRemoteBlockingPids; + END; +$$ LANGUAGE plpgsql; + +REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql b/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql new file mode 100644 index 000000000..c7e607c1c --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_blocking_pids/latest.sql @@ -0,0 +1,34 @@ +DROP FUNCTION pg_catalog.citus_blocking_pids; +CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) +RETURNS int4[] AS $$ + DECLARE + mLocalBlockingPids int4[]; + mRemoteBlockingPids int4[]; + mLocalGlobalPid int8; + BEGIN + SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids; + + IF (array_length(mLocalBlockingPids, 1) > 0) THEN + RETURN mLocalBlockingPids; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + SELECT global_pid INTO mLocalGlobalPid + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + + SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM ( + WITH activeTransactions AS ( + SELECT global_pid FROM get_all_active_transactions() + ), blockingTransactions AS ( + SELECT blocking_global_pid FROM citus_internal_global_blocked_processes() + WHERE waiting_global_pid = mLocalGlobalPid + ) + SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions + WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid + ) AS sub; + + RETURN mRemoteBlockingPids; + END; +$$ LANGUAGE plpgsql; + +REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/11.0-1.sql similarity index 62% rename from src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql rename to src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/11.0-1.sql index 32e25e217..510cdf93d 100644 --- a/src/backend/distributed/sql/udfs/dump_global_wait_edges/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/11.0-1.sql @@ -1,12 +1,10 @@ -DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, - OUT blocking_global_pid int8, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_global_blocked_processes() IS 'returns a global list of blocked backends originating from this node'; diff --git a/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/latest.sql similarity index 62% rename from src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql rename to src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/latest.sql index 32e25e217..510cdf93d 100644 --- a/src/backend/distributed/sql/udfs/dump_global_wait_edges/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_global_blocked_processes/latest.sql @@ -1,12 +1,10 @@ -DROP FUNCTION pg_catalog.dump_global_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, OUT waiting_transaction_stamp timestamptz, - OUT blocking_global_pid int8, + OUT blocking_global_pid int8, OUT blocking_pid int4, OUT blocking_node_id int4, OUT blocking_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_global_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_global_blocked_processes() IS 'returns a global list of blocked backends originating from this node'; diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/11.0-1.sql similarity index 67% rename from src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql rename to src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/11.0-1.sql index 190e4c0d5..3157a9aad 100644 --- a/src/backend/distributed/sql/udfs/dump_local_wait_edges/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/11.0-1.sql @@ -1,7 +1,5 @@ -DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_local_blocked_processes() IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/latest.sql similarity index 67% rename from src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql rename to src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/latest.sql index 190e4c0d5..3157a9aad 100644 --- a/src/backend/distributed/sql/udfs/dump_local_wait_edges/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_local_blocked_processes/latest.sql @@ -1,7 +1,5 @@ -DROP FUNCTION pg_catalog.dump_local_wait_edges CASCADE; -CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( - distributed_tx_only boolean DEFAULT true, - OUT waiting_global_pid int8, +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes( + OUT waiting_global_pid int8, OUT waiting_pid int4, OUT waiting_node_id int4, OUT waiting_transaction_num int8, @@ -14,6 +12,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.dump_local_wait_edges( OUT blocking_transaction_waiting bool) RETURNS SETOF RECORD LANGUAGE C STRICT -AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; -COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(bool) +AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_local_blocked_processes() IS 'returns all local lock wait chains, that start from any citus backend'; diff --git a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql new file mode 100644 index 000000000..64b89ec0e --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.0-1.sql @@ -0,0 +1,56 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) +RETURNS boolean AS $$ + DECLARE + mBlockedGlobalPid int8; + workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); + coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); + BEGIN + IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN + RETURN true; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + -- Note that worker process may be blocked or waiting for a lock. So we need to + -- get transaction number for both of them. Following IF provides the transaction + -- number when the worker process waiting for other session. + IF EXISTS (SELECT 1 FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN + SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; + ELSE + -- Check whether transactions initiated from the coordinator get locked + SELECT global_pid INTO mBlockedGlobalPid + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + END IF; + + RETURN EXISTS ( + SELECT 1 FROM citus_internal_global_blocked_processes() + WHERE waiting_global_pid = mBlockedGlobalPid + ) OR EXISTS ( + -- Check on the workers if any logical replication job spawned by the + -- current PID is blocked, by checking it's application name + -- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring + SELECT result FROM run_command_on_workers($two$ + SELECT blocked_activity.application_name AS blocked_application + FROM pg_catalog.pg_locks blocked_locks + JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid + JOIN pg_catalog.pg_locks blocking_locks + ON blocking_locks.locktype = blocked_locks.locktype + AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE + AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation + AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page + AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple + AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid + AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid + AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid + AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid + AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid + AND blocking_locks.pid != blocked_locks.pid + JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid + WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%' + $two$) where result='citus_shard_move_subscription_' || pBlockedPid); + + END; +$$ LANGUAGE plpgsql; + +REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql index 0b91cc37c..64b89ec0e 100644 --- a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql @@ -1,7 +1,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) RETURNS boolean AS $$ DECLARE - mBlockedTransactionNum int8; + mBlockedGlobalPid int8; workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); BEGIN @@ -13,19 +13,19 @@ RETURNS boolean AS $$ -- Note that worker process may be blocked or waiting for a lock. So we need to -- get transaction number for both of them. Following IF provides the transaction -- number when the worker process waiting for other session. - IF EXISTS (SELECT transaction_number FROM get_global_active_transactions() + IF EXISTS (SELECT 1 FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN - SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions() + SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; ELSE -- Check whether transactions initiated from the coordinator get locked - SELECT transaction_number INTO mBlockedTransactionNum + SELECT global_pid INTO mBlockedGlobalPid FROM get_all_active_transactions() WHERE process_id = pBlockedPid; END IF; RETURN EXISTS ( - SELECT 1 FROM dump_global_wait_edges() - WHERE waiting_transaction_num = mBlockedTransactionNum + SELECT 1 FROM citus_internal_global_blocked_processes() + WHERE waiting_global_pid = mBlockedGlobalPid ) OR EXISTS ( -- Check on the workers if any logical replication job spawned by the -- current PID is blocked, by checking it's application name diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql index 2ae40374a..779341657 100644 --- a/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql @@ -8,7 +8,7 @@ citus_dist_stat_activity AS ), unique_global_wait_edges AS ( - SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() + SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes() ), citus_dist_stat_activity_with_node_id AS ( @@ -21,6 +21,8 @@ citus_dist_stat_activity_with_node_id AS citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport ) SELECT + waiting.global_pid as waiting_gpid, + blocking.global_pid as blocking_gpid, waiting.pid AS waiting_pid, blocking.pid AS blocking_pid, waiting.query AS blocked_statement, @@ -34,9 +36,9 @@ SELECT FROM unique_global_wait_edges JOIN - citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id) + citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid) JOIN - citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id); + citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid); ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql index 2ae40374a..779341657 100644 --- a/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql @@ -8,7 +8,7 @@ citus_dist_stat_activity AS ), unique_global_wait_edges AS ( - SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() + SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes() ), citus_dist_stat_activity_with_node_id AS ( @@ -21,6 +21,8 @@ citus_dist_stat_activity_with_node_id AS citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport ) SELECT + waiting.global_pid as waiting_gpid, + blocking.global_pid as blocking_gpid, waiting.pid AS waiting_pid, blocking.pid AS blocking_pid, waiting.query AS blocked_statement, @@ -34,9 +36,9 @@ SELECT FROM unique_global_wait_edges JOIN - citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id) + citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid) JOIN - citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id); + citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid); ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 18f391d74..62b5e4e04 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -47,6 +47,9 @@ typedef struct PROCStack static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); +static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, + int rowIndex); +static void ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx); static bool IsProcessWaitingForSafeOperations(PGPROC *proc); static void LockLockData(void); @@ -62,10 +65,30 @@ static void AddProcToVisit(PROCStack *remaining, PGPROC *proc); static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc); static bool IsConflictingLockMask(int holdMask, int conflictMask); - +/* + * We almost have 2 sets of identical functions. The first set (e.g., dump_wait_edges) + * functions are intended for distributed deadlock detection purposes. + * + * The second set of functions (e.g., citus_internal_local_blocked_processes) are + * intended for citus_lock_waits view. + * + * The main difference is that the former functions only show processes that are blocked + * inside a distributed transaction (e.g., see AssignDistributedTransactionId()). + * The latter functions return a superset, where any blocked process is returned. + * + * We kept two different set of functions for two purposes. First, the deadlock detection + * is a performance critical code-path happening very frequently and we don't add any + * performance overhead. Secondly, to be able to do rolling upgrades, we cannot change + * the API of dump_global_wait_edges/dump_local_wait_edges such that they take a boolean + * parameter. If we do that, until all nodes are upgraded, the deadlock detection would fail, + * which is not acceptable. + */ PG_FUNCTION_INFO_V1(dump_local_wait_edges); PG_FUNCTION_INFO_V1(dump_global_wait_edges); +PG_FUNCTION_INFO_V1(citus_internal_local_blocked_processes); +PG_FUNCTION_INFO_V1(citus_internal_global_blocked_processes); + /* * dump_global_wait_edges returns global wait edges for distributed transactions @@ -74,7 +97,7 @@ PG_FUNCTION_INFO_V1(dump_global_wait_edges); Datum dump_global_wait_edges(PG_FUNCTION_ARGS) { - bool onlyDistributedTx = PG_GETARG_BOOL(0); + bool onlyDistributedTx = true; WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); @@ -84,6 +107,23 @@ dump_global_wait_edges(PG_FUNCTION_ARGS) } +/* + * citus_internal_global_blocked_processes returns global wait edges + * including all processes running on the cluster. + */ +Datum +citus_internal_global_blocked_processes(PG_FUNCTION_ARGS) +{ + bool onlyDistributedTx = false; + + WaitGraph *waitGraph = BuildGlobalWaitGraph(onlyDistributedTx); + + ReturnBlockedProcessGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + /* * BuildGlobalWaitGraph builds a wait graph for distributed transactions * that originate from this node, including edges from all (other) worker @@ -103,7 +143,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) List *connectionList = NIL; int32 localGroupId = GetLocalGroupId(); - /* deadlock detection is only interested in */ + /* deadlock detection is only interested in distributed transactions */ WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); /* open connections in parallel */ @@ -134,11 +174,25 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) foreach_ptr(connection, connectionList) { StringInfo queryString = makeStringInfo(); - const char *onlyDistributedTxStr = onlyDistributedTx ? "true" : "false"; - appendStringInfo(queryString, - "SELECT * FROM dump_local_wait_edges(%s)", - onlyDistributedTxStr); + if (onlyDistributedTx) + { + appendStringInfo(queryString, + "SELECT waiting_pid, waiting_node_id, " + "waiting_transaction_num, waiting_transaction_stamp, " + "blocking_pid, blocking_node_id, blocking_transaction_num, " + "blocking_transaction_stamp, blocking_transaction_waiting " + "FROM dump_local_wait_edges()"); + } + else + { + appendStringInfo(queryString, + "SELECT waiting_global_pid, waiting_pid, " + "waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, " + "blocking_global_pid,blocking_pid, blocking_node_id, " + "blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting " + "FROM citus_internal_local_blocked_processes()"); + } int querySent = SendRemoteCommand(connection, queryString->data); if (querySent == 0) @@ -162,16 +216,29 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); - if (colCount != 11) + if (onlyDistributedTx && colCount != 9) { ereport(WARNING, (errmsg("unexpected number of columns from " "dump_local_wait_edges"))); continue; } + else if (!onlyDistributedTx && colCount != 11) + { + ereport(WARNING, (errmsg("unexpected number of columns from " + "citus_internal_local_blocked_processes"))); + continue; + } for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { - AddWaitEdgeFromResult(waitGraph, result, rowIndex); + if (onlyDistributedTx) + { + AddWaitEdgeFromResult(waitGraph, result, rowIndex); + } + else + { + AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex); + } } PQclear(result); @@ -191,6 +258,29 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) { WaitEdge *waitEdge = AllocWaitEdge(waitGraph); + waitEdge->waitingGPid = 0; /* not requested for deadlock detection */ + waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); + waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); + waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); + waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); + waitEdge->blockingGPid = 0; /* not requested for deadlock detection */ + waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); + waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); + waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); + waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); + waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); +} + + +/* + * AddWaitEdgeFromBlockedProcessResult adds an edge to the wait graph that + * is read from a PGresult. + */ +static void +AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) +{ + WaitEdge *waitEdge = AllocWaitEdge(waitGraph); + waitEdge->waitingGPid = ParseIntField(result, rowIndex, 0); waitEdge->waitingPid = ParseIntField(result, rowIndex, 1); waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 2); @@ -272,7 +362,7 @@ ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex) Datum dump_local_wait_edges(PG_FUNCTION_ARGS) { - bool onlyDistributedTx = PG_GETARG_BOOL(0); + bool onlyDistributedTx = true; WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); ReturnWaitGraph(waitGraph, fcinfo); @@ -281,6 +371,22 @@ dump_local_wait_edges(PG_FUNCTION_ARGS) } +/* + * citus_internal_local_blocked_processes returns global wait edges + * including all processes running on the node. + */ +Datum +citus_internal_local_blocked_processes(PG_FUNCTION_ARGS) +{ + bool onlyDistributedTx = false; + + WaitGraph *waitGraph = BuildLocalWaitGraph(onlyDistributedTx); + ReturnBlockedProcessGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + /* * ReturnWaitGraph returns a wait graph for a set returning function. */ @@ -290,6 +396,68 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) TupleDesc tupleDesc; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc); + /* + * Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_num + * 03: waiting_transaction_stamp + * 04: blocking_pid + * 05: blocking__node_id + * 06: blocking_transaction_num + * 07: blocking_transaction_stamp + * 08: blocking_transaction_waiting + */ + for (size_t curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++) + { + Datum values[9]; + bool nulls[9]; + WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(curEdge->waitingPid); + values[1] = Int32GetDatum(curEdge->waitingNodeId); + if (curEdge->waitingTransactionNum != 0) + { + values[2] = Int64GetDatum(curEdge->waitingTransactionNum); + values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); + } + else + { + nulls[2] = true; + nulls[3] = true; + } + + values[4] = Int32GetDatum(curEdge->blockingPid); + values[5] = Int32GetDatum(curEdge->blockingNodeId); + if (curEdge->blockingTransactionNum != 0) + { + values[6] = Int64GetDatum(curEdge->blockingTransactionNum); + values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); + } + else + { + nulls[6] = true; + nulls[7] = true; + } + values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); + + tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); + } +} + + +/* + * ReturnBlockedProcessGraph returns a wait graph for a set returning function. + */ +static void +ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) +{ + TupleDesc tupleDesc; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc); + /* * Columns: * 00: waiting_global_pid diff --git a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out index 27e13263d..cae2222ed 100644 --- a/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out +++ b/src/test/regress/expected/isolation_get_distributed_wait_queries_mx.out @@ -2,13 +2,13 @@ Parsed test spec with 4 sessions starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-commit-worker s2-stop-connection step s1-begin: - BEGIN; + BEGIN; step s1-update-ref-table-from-coordinator: - UPDATE ref_table SET value_1 = 15; + UPDATE ref_table SET value_1 = 15; step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -16,7 +16,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -24,20 +24,20 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| - UPDATE ref_table SET value_1 = 15; + UPDATE ref_table SET value_1 = 15; |localhost |coordinator_host | 57638| 57636 (1 row) step s1-commit: - COMMIT; + COMMIT; step s2-update-ref-table: <... completed> run_commands_on_session_level_connection_to_node @@ -54,7 +54,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -69,7 +69,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -77,7 +77,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -85,7 +85,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -93,7 +93,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -101,7 +101,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -109,10 +109,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -142,7 +142,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -150,7 +150,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -165,7 +165,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table s2-start-session-level-connection s2-begin-on-worker s2-update-dist-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -173,7 +173,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -181,7 +181,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-update-dist-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -189,7 +189,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -197,7 +197,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -205,10 +205,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-dist-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -238,7 +238,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -246,7 +246,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -261,7 +261,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-delete-from-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -269,7 +269,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -277,7 +277,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-delete-from-ref-table: - SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -285,7 +285,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -293,7 +293,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -301,10 +301,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -334,7 +334,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -342,7 +342,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -357,7 +357,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -365,7 +365,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -373,7 +373,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -381,7 +381,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -389,7 +389,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -397,10 +397,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -430,7 +430,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -438,7 +438,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -453,7 +453,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -461,7 +461,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -469,7 +469,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -477,7 +477,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -485,7 +485,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -493,7 +493,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -501,7 +501,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -524,7 +524,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -532,7 +532,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -547,7 +547,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -555,7 +555,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -563,7 +563,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-copy-to-ref-table: - SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -571,7 +571,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -579,7 +579,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -587,10 +587,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -620,7 +620,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -628,7 +628,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -643,7 +643,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -651,7 +651,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -659,7 +659,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-copy-to-ref-table: - SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -667,7 +667,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -675,7 +675,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -683,7 +683,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -691,7 +691,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -714,7 +714,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -722,7 +722,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -737,7 +737,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-copy-to-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -745,7 +745,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -753,7 +753,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-copy-to-ref-table: - SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -761,7 +761,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -769,7 +769,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -777,7 +777,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-copy-to-ref-table: - SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -785,7 +785,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement|current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -808,7 +808,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -816,7 +816,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -831,7 +831,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-select-for-update s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -839,7 +839,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -847,7 +847,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-select-for-update: - SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); + SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -855,7 +855,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -863,7 +863,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -871,10 +871,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-update-ref-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -904,7 +904,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -912,7 +912,7 @@ stop_session_level_connection_to_node (1 row) step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -927,7 +927,7 @@ restore_isolation_tester_func starting permutation: s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s1-begin s1-alter-table s3-select-distributed-waiting-queries s2-commit-worker s1-commit s2-stop-connection step s2-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57638); + SELECT start_session_level_connection_to_node('localhost', 57638); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -935,7 +935,7 @@ start_session_level_connection_to_node (1 row) step s2-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -943,7 +943,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s2-insert-into-ref-table: - SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -951,18 +951,18 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-begin: - BEGIN; + BEGIN; step s1-alter-table: - ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); + ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- - ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); + ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); |INSERT INTO ref_table VALUES(8,81),(9,91)|coordinator_host |localhost | 57636| 57638 (1 row) @@ -976,10 +976,10 @@ run_commands_on_session_level_connection_to_node step s1-alter-table: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -994,28 +994,28 @@ restore_isolation_tester_func starting permutation: s1-begin s1-update-on-the-coordinator s2-update-on-the-coordinator s3-select-distributed-waiting-queries s1-commit step s1-begin: - BEGIN; + BEGIN; step s1-update-on-the-coordinator: - UPDATE tt1 SET value_1 = 4; + UPDATE tt1 SET value_1 = 4; step s2-update-on-the-coordinator: - UPDATE tt1 SET value_1 = 4; + UPDATE tt1 SET value_1 = 4; step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- - UPDATE tt1 SET value_1 = 4; + UPDATE tt1 SET value_1 = 4; | - UPDATE tt1 SET value_1 = 4; + UPDATE tt1 SET value_1 = 4; |coordinator_host |coordinator_host | 57636| 57636 (1 row) step s1-commit: - COMMIT; + COMMIT; step s2-update-on-the-coordinator: <... completed> restore_isolation_tester_func @@ -1026,7 +1026,7 @@ restore_isolation_tester_func starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table s4-start-session-level-connection s4-begin-on-worker s4-update-dist-table s3-select-distributed-waiting-queries s1-commit-worker s4-commit-worker s1-stop-connection s4-stop-connection step s1-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -1034,7 +1034,7 @@ start_session_level_connection_to_node (1 row) step s1-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -1042,7 +1042,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-update-dist-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -1050,7 +1050,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s4-start-session-level-connection: - SELECT start_session_level_connection_to_node('localhost', 57637); + SELECT start_session_level_connection_to_node('localhost', 57637); start_session_level_connection_to_node --------------------------------------------------------------------- @@ -1058,7 +1058,7 @@ start_session_level_connection_to_node (1 row) step s4-begin-on-worker: - SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); run_commands_on_session_level_connection_to_node --------------------------------------------------------------------- @@ -1066,10 +1066,10 @@ run_commands_on_session_level_connection_to_node (1 row) step s4-update-dist-table: - SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); step s3-select-distributed-waiting-queries: - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port --------------------------------------------------------------------- @@ -1099,7 +1099,7 @@ run_commands_on_session_level_connection_to_node (1 row) step s1-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -1107,7 +1107,138 @@ stop_session_level_connection_to_node (1 row) step s4-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +restore_isolation_tester_func +--------------------------------------------------------------------- + +(1 row) + + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table-id-1 s2-start-session-level-connection s2-update-dist-table-id-1 s3-select-distributed-waiting-queries s1-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-update-dist-table-id-1: + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-update-dist-table-id-1: + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1'); + +step s3-select-distributed-waiting-queries: + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + +blocked_statement |current_statement_in_blocking_process |waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +--------------------------------------------------------------------- +UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|UPDATE tt1 SET value_1 = 4 WHERE user_id = 1|localhost |localhost | 57638| 57637 +(1 row) + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-update-dist-table-id-1: <... completed> +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +restore_isolation_tester_func +--------------------------------------------------------------------- + +(1 row) + + +starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-stop-connection +step s1-begin: + BEGIN; + +step s1-update-ref-table-from-coordinator: + UPDATE ref_table SET value_1 = 15; + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-update-ref-table: + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + +step s3-select-distributed-waiting-queries: + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; + +blocked_statement |current_statement_in_blocking_process|waiting_node_name|blocking_node_name|waiting_node_port|blocking_node_port +--------------------------------------------------------------------- +UPDATE ref_table SET value_1 = 12 WHERE user_id = 1| + UPDATE ref_table SET value_1 = 15; +|localhost |coordinator_host | 57638| 57636 +(1 row) + +step s1-commit: + COMMIT; + +step s2-update-ref-table: <... completed> +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index d89a71ece..6963e9122 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -96,7 +96,7 @@ step s1-commit: COMMIT; step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- @@ -148,7 +148,8 @@ step s1-move-placement-back: SET client_min_messages to NOTICE; SHOW log_error_verbosity; SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637); - + +step s1-move-placement-back: <... completed> log_error_verbosity --------------------------------------------------------------------- verbose @@ -159,7 +160,7 @@ step s1-commit: COMMIT; step s2-stop-connection: - SELECT stop_session_level_connection_to_node(); + SELECT stop_session_level_connection_to_node(); stop_session_level_connection_to_node --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out index e83e71919..2d49f8586 100644 --- a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out @@ -101,13 +101,14 @@ step s2-view-worker: FROM citus_worker_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND - query NOT ILIKE '%dump_local_wait_edges%' + query NOT ILIKE '%dump_local_%' AND + query NOT ILIKE '%citus_internal_local_blocked_processes%' ORDER BY query, query_hostport DESC; query |query_hostname|query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname --------------------------------------------------------------------- -UPDATE public.ref_table_1500767 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression -UPDATE public.ref_table_1500767 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression +UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression (2 rows) step s2-end: diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 2746d8ea4..e5cbcd994 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -428,20 +428,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE ALTER EXTENSION citus UPDATE TO '9.4-2'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -469,20 +469,20 @@ SELECT * FROM multi_extension.print_extension_changes(); ALTER EXTENSION citus UPDATE TO '9.4-1'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -578,20 +578,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE ALTER EXTENSION citus UPDATE TO '9.5-2'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -619,20 +619,20 @@ SELECT * FROM multi_extension.print_extension_changes(); ALTER EXTENSION citus UPDATE TO '9.5-1'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -1005,8 +1005,6 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- function citus_disable_node(text,integer) void | function create_distributed_function(regprocedure,text,text) void | - function dump_global_wait_edges() SETOF record | - function dump_local_wait_edges() SETOF record | function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | @@ -1014,24 +1012,18 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_check_connection_to_node(text,integer) boolean | function citus_disable_node(text,integer,boolean) void | function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void + | function citus_internal_global_blocked_processes() SETOF record + | function citus_internal_local_blocked_processes() SETOF record | function citus_run_local_command(text) void | function citus_shard_indexes_on_worker() SETOF record | function citus_shards_on_worker() SETOF record | function create_distributed_function(regprocedure,text,text,boolean) void -<<<<<<< HEAD | function pg_cancel_backend(bigint) boolean | function pg_terminate_backend(bigint,bigint) boolean | function worker_create_or_replace_object(text[]) boolean | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void -(18 rows) -======= - | function dump_global_wait_edges(boolean) SETOF record - | function dump_local_wait_edges(boolean) SETOF record - | function worker_drop_sequence_dependency(text) void - | function worker_drop_shell_table(text) void -(19 rows) ->>>>>>> d4b956c7f (Use the optional APIs introduced for dump global/local wait edges) +(20 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 2071dca18..185bf19c5 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -74,6 +74,8 @@ ORDER BY 1; function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal_delete_shard_metadata(bigint) + function citus_internal_global_blocked_processes() + function citus_internal_local_blocked_processes() function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_internal_update_relation_colocation(oid,integer) function citus_isolation_test_session_is_blocked(integer,integer[]) @@ -127,8 +129,8 @@ ORDER BY 1; function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone) function distributed_tables_colocated(regclass,regclass) function drop_old_time_partitions(regclass,timestamp with time zone) - function dump_global_wait_edges(boolean) - function dump_local_wait_edges(boolean) + function dump_global_wait_edges() + function dump_local_wait_edges() function fetch_intermediate_results(text[],text,integer) function fix_all_partition_shard_index_names() function fix_partition_shard_index_names(regclass) @@ -270,5 +272,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(254 rows) +(256 rows) diff --git a/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec b/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec index b43708ab1..84daaf792 100644 --- a/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec +++ b/src/test/regress/spec/isolation_get_distributed_wait_queries_mx.spec @@ -73,6 +73,11 @@ step "s1-select-for-update" SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); } +step "s1-update-dist-table-id-1" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1'); +} + step "s1-commit-worker" { SELECT run_commands_on_session_level_connection_to_node('COMMIT'); @@ -115,6 +120,11 @@ step "s2-update-dist-table" SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5'); } +step "s2-update-dist-table-id-1" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4 WHERE user_id = 1'); +} + step "s2-update-ref-table" { SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); @@ -149,7 +159,7 @@ session "s3" step "s3-select-distributed-waiting-queries" { - SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits; + SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits WHERE blocked_statement NOT ILIKE '%run_commands_on_session_level_connection_to_node%' AND current_statement_in_blocking_process NOT ILIKE '%run_commands_on_session_level_connection_to_node%'; } // session s1 and s4 executes the commands on the same worker node @@ -196,3 +206,9 @@ permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert- // blocked on the same node permutation "s1-begin" "s1-update-on-the-coordinator" "s2-update-on-the-coordinator" "s3-select-distributed-waiting-queries" "s1-commit" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s4-start-session-level-connection" "s4-begin-on-worker" "s4-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s4-commit-worker" "s1-stop-connection" "s4-stop-connection" + + +// show that even if the commands are not in a transaction block +// we can find the blocking relationship +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table-id-1" "s2-start-session-level-connection" "s2-update-dist-table-id-1" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s1-begin" "s1-update-ref-table-from-coordinator" "s2-start-session-level-connection" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit" "s2-stop-connection" diff --git a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec index 0defcf549..fa2079ba5 100644 --- a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec +++ b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec @@ -93,7 +93,8 @@ step "s2-view-worker" FROM citus_worker_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' AND - query NOT ILIKE '%dump_local_wait_edges%' + query NOT ILIKE '%dump_local_%' AND + query NOT ILIKE '%citus_internal_local_blocked_processes%' ORDER BY query, query_hostport DESC; }