follow feedback

add_worker_query_again
Onder Kalaci 2022-02-14 17:59:46 +01:00
parent c8cb25aab3
commit 6179e6a5ac
15 changed files with 26 additions and 26 deletions

View File

@ -17,8 +17,8 @@
#include "udfs/get_all_active_transactions/11.0-1.sql" #include "udfs/get_all_active_transactions/11.0-1.sql"
#include "udfs/get_global_active_transactions/11.0-1.sql" #include "udfs/get_global_active_transactions/11.0-1.sql"
#include "udfs/dump_local_blocked_processes/11.0-1.sql" #include "udfs/citus_internal_local_blocked_processes/11.0-1.sql"
#include "udfs/dump_global_blocked_processes/11.0-1.sql" #include "udfs/citus_internal_global_blocked_processes/11.0-1.sql"
#include "udfs/citus_worker_stat_activity/11.0-1.sql" #include "udfs/citus_worker_stat_activity/11.0-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.0-1.sql"

View File

@ -113,8 +113,8 @@ CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int
RESET search_path; RESET search_path;
DROP FUNCTION dump_local_blocked_processes CASCADE; DROP FUNCTION citus_internal_local_blocked_processes CASCADE;
DROP FUNCTION dump_global_blocked_processes CASCADE; DROP FUNCTION citus_internal_global_blocked_processes CASCADE;
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;

View File

@ -20,7 +20,7 @@ RETURNS int4[] AS $$
WITH activeTransactions AS ( WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions() SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS ( ), blockingTransactions AS (
SELECT blocking_global_pid FROM dump_global_blocked_processes() SELECT blocking_global_pid FROM citus_internal_global_blocked_processes()
WHERE waiting_global_pid = mLocalGlobalPid WHERE waiting_global_pid = mLocalGlobalPid
) )
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions

View File

@ -20,7 +20,7 @@ RETURNS int4[] AS $$
WITH activeTransactions AS ( WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions() SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS ( ), blockingTransactions AS (
SELECT blocking_global_pid FROM dump_global_blocked_processes() SELECT blocking_global_pid FROM citus_internal_global_blocked_processes()
WHERE waiting_global_pid = mLocalGlobalPid WHERE waiting_global_pid = mLocalGlobalPid
) )
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions

View File

@ -24,7 +24,7 @@ RETURNS boolean AS $$
END IF; END IF;
RETURN EXISTS ( RETURN EXISTS (
SELECT 1 FROM dump_global_blocked_processes() SELECT 1 FROM citus_internal_global_blocked_processes()
WHERE waiting_global_pid = mBlockedGlobalPid WHERE waiting_global_pid = mBlockedGlobalPid
) OR EXISTS ( ) OR EXISTS (
-- Check on the workers if any logical replication job spawned by the -- Check on the workers if any logical replication job spawned by the

View File

@ -24,7 +24,7 @@ RETURNS boolean AS $$
END IF; END IF;
RETURN EXISTS ( RETURN EXISTS (
SELECT 1 FROM dump_global_blocked_processes() SELECT 1 FROM citus_internal_global_blocked_processes()
WHERE waiting_global_pid = mBlockedGlobalPid WHERE waiting_global_pid = mBlockedGlobalPid
) OR EXISTS ( ) OR EXISTS (
-- Check on the workers if any logical replication job spawned by the -- Check on the workers if any logical replication job spawned by the

View File

@ -8,7 +8,7 @@ citus_dist_stat_activity AS
), ),
unique_global_wait_edges AS unique_global_wait_edges AS
( (
SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_blocked_processes() SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes()
), ),
citus_dist_stat_activity_with_node_id AS citus_dist_stat_activity_with_node_id AS
( (

View File

@ -8,7 +8,7 @@ citus_dist_stat_activity AS
), ),
unique_global_wait_edges AS unique_global_wait_edges AS
( (
SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM dump_global_blocked_processes() SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM citus_internal_global_blocked_processes()
), ),
citus_dist_stat_activity_with_node_id AS citus_dist_stat_activity_with_node_id AS
( (

View File

@ -69,8 +69,8 @@ static bool IsConflictingLockMask(int holdMask, int conflictMask);
* We almost have 2 sets of identical functions. The first set (e.g., dump_wait_edges) * We almost have 2 sets of identical functions. The first set (e.g., dump_wait_edges)
* functions are intended for distributed deadlock detection purposes. * functions are intended for distributed deadlock detection purposes.
* *
* The second set of functions (e.g., dump_blocked_processes) are intended for * The second set of functions (e.g., citus_internal_local_blocked_processes) are
* citus_lock_waits view. * intended for citus_lock_waits view.
* *
* The main difference is that the former functions only show processes that are blocked * The main difference is that the former functions only show processes that are blocked
* inside a distributed transaction (e.g., see AssignDistributedTransactionId()). * inside a distributed transaction (e.g., see AssignDistributedTransactionId()).
@ -86,8 +86,8 @@ static bool IsConflictingLockMask(int holdMask, int conflictMask);
PG_FUNCTION_INFO_V1(dump_local_wait_edges); PG_FUNCTION_INFO_V1(dump_local_wait_edges);
PG_FUNCTION_INFO_V1(dump_global_wait_edges); PG_FUNCTION_INFO_V1(dump_global_wait_edges);
PG_FUNCTION_INFO_V1(dump_local_blocked_processes); PG_FUNCTION_INFO_V1(citus_internal_local_blocked_processes);
PG_FUNCTION_INFO_V1(dump_global_blocked_processes); PG_FUNCTION_INFO_V1(citus_internal_global_blocked_processes);
/* /*
@ -108,11 +108,11 @@ dump_global_wait_edges(PG_FUNCTION_ARGS)
/* /*
* dump_global_blocked_processes returns global wait edges including all processes * citus_internal_global_blocked_processes returns global wait edges
* running on the cluster. * including all processes running on the cluster.
*/ */
Datum Datum
dump_global_blocked_processes(PG_FUNCTION_ARGS) citus_internal_global_blocked_processes(PG_FUNCTION_ARGS)
{ {
bool onlyDistributedTx = false; bool onlyDistributedTx = false;
@ -183,7 +183,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
else else
{ {
appendStringInfo(queryString, appendStringInfo(queryString,
"SELECT * FROM dump_local_blocked_processes()"); "SELECT * FROM citus_internal_local_blocked_processes()");
} }
int querySent = SendRemoteCommand(connection, queryString->data); int querySent = SendRemoteCommand(connection, queryString->data);
@ -217,7 +217,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
else if (!onlyDistributedTx && colCount != 11) else if (!onlyDistributedTx && colCount != 11)
{ {
ereport(WARNING, (errmsg("unexpected number of columns from " ereport(WARNING, (errmsg("unexpected number of columns from "
"dump_local_blocked_processes"))); "citus_internal_local_blocked_processes")));
continue; continue;
} }
@ -364,11 +364,11 @@ dump_local_wait_edges(PG_FUNCTION_ARGS)
/* /*
* dump_local_blocked_processes returns global wait edges including * citus_internal_local_blocked_processes returns global wait edges
* all processes running on the node. * including all processes running on the node.
*/ */
Datum Datum
dump_local_blocked_processes(PG_FUNCTION_ARGS) citus_internal_local_blocked_processes(PG_FUNCTION_ARGS)
{ {
bool onlyDistributedTx = false; bool onlyDistributedTx = false;

View File

@ -1007,12 +1007,12 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_check_connection_to_node(text,integer) boolean | function citus_check_connection_to_node(text,integer) boolean
| function citus_disable_node(text,integer,boolean) void | function citus_disable_node(text,integer,boolean) void
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void | function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void
| function citus_internal_local_blocked_processes() SETOF record
| function citus_internal_global_blocked_processes() SETOF record
| function citus_run_local_command(text) void | function citus_run_local_command(text) void
| function citus_shard_indexes_on_worker() SETOF record | function citus_shard_indexes_on_worker() SETOF record
| function citus_shards_on_worker() SETOF record | function citus_shards_on_worker() SETOF record
| function create_distributed_function(regprocedure,text,text,boolean) void | function create_distributed_function(regprocedure,text,text,boolean) void
| function dump_global_blocked_processes() SETOF record
| function dump_local_blocked_processes() SETOF record
| function worker_drop_sequence_dependency(text) void | function worker_drop_sequence_dependency(text) void
| function worker_drop_shell_table(text) void | function worker_drop_shell_table(text) void
(17 rows) (17 rows)

View File

@ -74,6 +74,8 @@ ORDER BY 1;
function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint)
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
function citus_internal_delete_shard_metadata(bigint) function citus_internal_delete_shard_metadata(bigint)
function citus_internal_global_blocked_processes()
function citus_internal_local_blocked_processes()
function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_internal_update_placement_metadata(bigint,integer,integer)
function citus_internal_update_relation_colocation(oid,integer) function citus_internal_update_relation_colocation(oid,integer)
function citus_isolation_test_session_is_blocked(integer,integer[]) function citus_isolation_test_session_is_blocked(integer,integer[])
@ -127,9 +129,7 @@ ORDER BY 1;
function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone) function create_time_partitions(regclass,interval,timestamp with time zone,timestamp with time zone)
function distributed_tables_colocated(regclass,regclass) function distributed_tables_colocated(regclass,regclass)
function drop_old_time_partitions(regclass,timestamp with time zone) function drop_old_time_partitions(regclass,timestamp with time zone)
function dump_global_blocked_processes()
function dump_global_wait_edges() function dump_global_wait_edges()
function dump_local_blocked_processes()
function dump_local_wait_edges() function dump_local_wait_edges()
function fetch_intermediate_results(text[],text,integer) function fetch_intermediate_results(text[],text,integer)
function fix_all_partition_shard_index_names() function fix_all_partition_shard_index_names()