Add citus_lock_waits to show locked distributed queries

pull/2381/head
velioglu 2018-09-13 11:00:27 +03:00
parent 14d514d1df
commit d7f75e5b48
10 changed files with 994 additions and 5 deletions

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '8.0-6'
default_version = '8.0-7'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
7.3-1 7.3-2 7.3-3 \
7.4-1 7.4-2 7.4-3 \
7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \
8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6
8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 8.0-7
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -227,6 +227,8 @@ $(EXTENSION)--8.0-5.sql: $(EXTENSION)--8.0-4.sql $(EXTENSION)--8.0-4--8.0-5.sql
cat $^ > $@
$(EXTENSION)--8.0-6.sql: $(EXTENSION)--8.0-5.sql $(EXTENSION)--8.0-5--8.0-6.sql
cat $^ > $@
$(EXTENSION)--8.0-7.sql: $(EXTENSION)--8.0-6.sql $(EXTENSION)--8.0-6--8.0-7.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,46 @@
/* citus--8.0-6--8.0-7 */
SET search_path = 'pg_catalog';
CREATE VIEW citus.citus_lock_waits AS
WITH
citus_dist_stat_activity AS
(
SELECT * FROM citus_dist_stat_activity
),
unique_global_wait_edges AS
(
SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges()
),
citus_dist_stat_activity_with_node_id AS
(
SELECT
citus_dist_stat_activity.*, (CASE citus_dist_stat_activity.master_query_host_name WHEN 'coordinator_host' THEN 0 ELSE pg_dist_node.nodeid END) as initiator_node_id
FROM
citus_dist_stat_activity LEFT JOIN pg_dist_node
ON
citus_dist_stat_activity.master_query_host_name = pg_dist_node.nodename AND
citus_dist_stat_activity.master_query_host_port = pg_dist_node.nodeport
)
SELECT
waiting.pid AS waiting_pid,
blocking.pid AS blocking_pid,
waiting.query AS blocked_statement,
blocking.query AS current_statement_in_blocking_process,
waiting.initiator_node_id AS waiting_node_id,
blocking.initiator_node_id AS blocking_node_id,
waiting.master_query_host_name AS waiting_node_name,
blocking.master_query_host_name AS blocking_node_name,
waiting.master_query_host_port AS waiting_node_port,
blocking.master_query_host_port AS blocking_node_port
FROM
unique_global_wait_edges
JOIN
citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id)
JOIN
citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id);
ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '8.0-6'
default_version = '8.0-7'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -142,12 +142,12 @@ run_commands_on_session_level_connection_to_node(PG_FUNCTION_ARGS)
"start_session_level_connection_to_node must be called first to open a session level connection");
}
ExecuteCriticalRemoteCommand(connection, queryString);
appendStringInfo(processStringInfo, ALTER_CURRENT_PROCESS_ID, MyProcPid);
appendStringInfo(workerProcessStringInfo, ALTER_CURRENT_WORKER_PROCESS_ID,
GetRemoteProcessId(connection));
ExecuteCriticalRemoteCommand(connection, queryString);
/*
* Since we cannot run `ALTER SYSTEM` command within a transaction, we are
* calling it from a self-connected session.

View File

@ -0,0 +1,749 @@
Parsed test spec with 3 sessions
starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-commit-worker s2-stop-connection
step s1-begin:
BEGIN;
step s1-update-ref-table-from-coordinator:
UPDATE ref_table SET value_1 = 15;
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1
UPDATE ref_table SET value_1 = 15;
localhost coordinator_host57638 57636
step s1-commit:
COMMIT;
step s2-update-ref-table: <... completed>
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
run_commands_on_session_level_connection_to_node
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1UPDATE ref_table SET value_1 = 12 WHERE user_id = 1localhost localhost 57638 57637
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table: <... completed>
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table s2-start-session-level-connection s2-begin-on-worker s2-update-dist-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-update-dist-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4');
run_commands_on_session_level_connection_to_node
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-update-dist-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
UPDATE tt1 SET value_1 = 5UPDATE tt1 SET value_1 = 4localhost localhost 57638 57637
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-update-dist-table: <... completed>
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-delete-from-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-delete-from-ref-table:
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1');
run_commands_on_session_level_connection_to_node
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1DELETE FROM ref_table WHERE user_id = 1localhost localhost 57638 57637
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table: <... completed>
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-insert-into-ref-table:
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
run_commands_on_session_level_connection_to_node
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1INSERT INTO ref_table VALUES(8,81),(9,91)localhost localhost 57638 57637
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table: <... completed>
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-insert-into-ref-table:
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
run_commands_on_session_level_connection_to_node
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-insert-into-ref-table:
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
run_commands_on_session_level_connection_to_node
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-copy-to-ref-table:
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
run_commands_on_session_level_connection_to_node
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1COPY ref_table FROM PROGRAM 'echo 10, 101 && echo 11, 111' WITH CSVlocalhost localhost 57638 57637
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table: <... completed>
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-copy-to-ref-table:
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
run_commands_on_session_level_connection_to_node
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-insert-into-ref-table:
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
run_commands_on_session_level_connection_to_node
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-copy-to-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-copy-to-ref-table:
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
run_commands_on_session_level_connection_to_node
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-copy-to-ref-table:
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
run_commands_on_session_level_connection_to_node
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-select-for-update s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-select-for-update:
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE');
run_commands_on_session_level_connection_to_node
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
UPDATE ref_table SET value_1 = 12 WHERE user_id = 1SELECT * FROM ref_table FOR UPDATElocalhost localhost 57638 57637
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-update-ref-table: <... completed>
run_commands_on_session_level_connection_to_node
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s1-begin s1-alter-table s3-select-distributed-waiting-queries s2-commit-worker s1-commit s2-stop-connection
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-insert-into-ref-table:
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
run_commands_on_session_level_connection_to_node
step s1-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
INSERT INTO ref_table VALUES(8,81),(9,91)coordinator_hostlocalhost 57636 57638
step s2-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-alter-table: <... completed>
step s1-commit:
COMMIT;
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func

View File

@ -149,6 +149,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-3';
ALTER EXTENSION citus UPDATE TO '8.0-4';
ALTER EXTENSION citus UPDATE TO '8.0-5';
ALTER EXTENSION citus UPDATE TO '8.0-6';
ALTER EXTENSION citus UPDATE TO '8.0-7';
-- show running version
SHOW citus.version;
citus.version

View File

@ -46,3 +46,4 @@ test: isolation_citus_dist_activity
# MX tests
test: isolation_reference_on_mx
test: isolation_get_distributed_wait_queries

View File

@ -0,0 +1,189 @@
# Create and use UDF to send commands from the same connection. Also make the cluster
# ready for testing MX functionalities.
setup
{
CREATE OR REPLACE FUNCTION start_session_level_connection_to_node(text, integer)
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'citus', $$start_session_level_connection_to_node$$;
CREATE OR REPLACE FUNCTION run_commands_on_session_level_connection_to_node(text)
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'citus', $$run_commands_on_session_level_connection_to_node$$;
CREATE OR REPLACE FUNCTION stop_session_level_connection_to_node()
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'citus', $$stop_session_level_connection_to_node$$;
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT start_metadata_sync_to_node('localhost', 57637);
SELECT start_metadata_sync_to_node('localhost', 57638);
SET citus.shard_replication_factor TO 1;
SET citus.replication_model to streaming;
CREATE TABLE ref_table(user_id int, value_1 int);
SELECT create_reference_table('ref_table');
INSERT INTO ref_table VALUES (1, 11), (2, 21), (3, 31), (4, 41), (5, 51), (6, 61), (7, 71);
CREATE TABLE tt1(user_id int, value_1 int);
SELECT create_distributed_table('tt1', 'user_id');
INSERT INTO tt1 VALUES (1, 11), (2, 21), (3, 31), (4, 41), (5, 51), (6, 61), (7, 71);
}
# Create and use UDF to close the connection opened in the setup step. Also return the cluster
# back to the initial state.
teardown
{
DROP TABLE ref_table;
DROP TABLE tt1;
SELECT citus.restore_isolation_tester_func();
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-update-ref-table-from-coordinator"
{
UPDATE ref_table SET value_1 = 15;
}
# We do not need to begin a transaction on coordinator, since it will be open on workers.
step "s1-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57637);
}
step "s1-begin-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "s1-update-dist-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4');
}
step "s1-update-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
}
step "s1-delete-from-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1');
}
step "s1-insert-into-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
}
step "s1-copy-to-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
}
step "s1-select-for-update"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE');
}
step "s1-commit-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
}
step "s1-alter-table"
{
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
}
step "s1-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57638);
}
step "s2-begin-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "s2-update-dist-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
}
step "s2-update-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
}
step "s2-select-from-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT count(*) FROM ref_table');
}
step "s2-delete-from-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 2');
}
step "s2-insert-into-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
}
step "s2-copy-to-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
}
step "s2-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
step "s2-commit-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
}
session "s3"
step "s3-select-distributed-waiting-queries"
{
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
}
permutation "s1-begin" "s1-update-ref-table-from-coordinator" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit" "s2-commit-worker" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete-from-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-copy-to-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-begin" "s1-alter-table" "s3-select-distributed-waiting-queries" "s2-commit-worker" "s1-commit" "s2-stop-connection"

View File

@ -149,6 +149,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-3';
ALTER EXTENSION citus UPDATE TO '8.0-4';
ALTER EXTENSION citus UPDATE TO '8.0-5';
ALTER EXTENSION citus UPDATE TO '8.0-6';
ALTER EXTENSION citus UPDATE TO '8.0-7';
-- show running version
SHOW citus.version;