mirror of https://github.com/citusdata/citus.git
Give isolation tester ability to see locks on workers
parent
db5206846e
commit
32e16ffe02
|
@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
|||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
|
||||
6.2-1 6.2-2 6.2-3 6.2-4 \
|
||||
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6
|
||||
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7
|
||||
|
||||
# All citus--*.sql files in the source directory
|
||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||
|
@ -151,6 +151,8 @@ $(EXTENSION)--7.0-5.sql: $(EXTENSION)--7.0-4.sql $(EXTENSION)--7.0-4--7.0-5.sql
|
|||
cat $^ > $@
|
||||
$(EXTENSION)--7.0-6.sql: $(EXTENSION)--7.0-5.sql $(EXTENSION)--7.0-5--7.0-6.sql
|
||||
cat $^ > $@
|
||||
$(EXTENSION)--7.0-7.sql: $(EXTENSION)--7.0-6.sql $(EXTENSION)--7.0-6--7.0-7.sql
|
||||
cat $^ > $@
|
||||
|
||||
NO_PGXS = 1
|
||||
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
/* citus--7.0-6--7.0-7 */
|
||||
|
||||
CREATE FUNCTION citus.replace_isolation_tester_func()
|
||||
RETURNS void AS $$
|
||||
DECLARE
|
||||
version integer := current_setting('server_version_num');
|
||||
BEGIN
|
||||
IF version >= 100000 THEN
|
||||
ALTER FUNCTION pg_catalog.pg_isolation_test_session_is_blocked(integer, integer[])
|
||||
RENAME TO old_pg_isolation_test_session_is_blocked;
|
||||
ALTER FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(integer, integer[])
|
||||
RENAME TO pg_isolation_test_session_is_blocked;
|
||||
ELSE
|
||||
ALTER FUNCTION pg_catalog.pg_blocking_pids(integer)
|
||||
RENAME TO old_pg_blocking_pids;
|
||||
ALTER FUNCTION pg_catalog.citus_blocking_pids(integer)
|
||||
RENAME TO pg_blocking_pids;
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE FUNCTION citus.restore_isolation_tester_func()
|
||||
RETURNS void AS $$
|
||||
DECLARE
|
||||
version integer := current_setting('server_version_num');
|
||||
BEGIN
|
||||
IF version >= 100000 THEN
|
||||
ALTER FUNCTION pg_catalog.pg_isolation_test_session_is_blocked(integer, integer[])
|
||||
RENAME TO citus_isolation_test_session_is_blocked;
|
||||
ALTER FUNCTION pg_catalog.old_pg_isolation_test_session_is_blocked(integer, integer[])
|
||||
RENAME TO pg_isolation_test_session_is_blocked;
|
||||
ELSE
|
||||
ALTER FUNCTION pg_catalog.pg_blocking_pids(integer)
|
||||
RENAME TO citus_blocking_pids;
|
||||
ALTER FUNCTION pg_catalog.old_pg_blocking_pids(integer)
|
||||
RENAME TO pg_blocking_pids;
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE FUNCTION citus.refresh_isolation_tester_prepared_statement()
|
||||
RETURNS void AS $$
|
||||
BEGIN
|
||||
-- isolation creates a prepared statement using the old function before tests have a
|
||||
-- chance to call replace_isolation_tester_func. By calling that prepared statement
|
||||
-- with a different search_path we force a re-parse which picks up the new function
|
||||
SET search_path TO 'citus';
|
||||
EXECUTE 'EXECUTE isolationtester_waiting (0)';
|
||||
RESET search_path;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
|
||||
RETURNS int4[] AS $$
|
||||
DECLARE
|
||||
mLocalBlockingPids int4[];
|
||||
mRemoteBlockingPids int4[];
|
||||
mLocalTransactionNum int8;
|
||||
BEGIN
|
||||
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
|
||||
|
||||
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
|
||||
RETURN mLocalBlockingPids;
|
||||
END IF;
|
||||
|
||||
-- pg says we're not blocked locally; check whether we're blocked globally.
|
||||
SELECT transaction_number INTO mLocalTransactionNum
|
||||
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
|
||||
|
||||
SELECT array_agg(process_id) INTO mRemoteBlockingPids FROM (
|
||||
WITH activeTransactions AS (
|
||||
SELECT process_id, transaction_number FROM get_all_active_transactions()
|
||||
), blockingTransactions AS (
|
||||
SELECT blocking_transaction_num AS txn_num FROM dump_global_wait_edges()
|
||||
WHERE waiting_transaction_num = mLocalTransactionNum
|
||||
)
|
||||
SELECT activeTransactions.process_id FROM activeTransactions, blockingTransactions
|
||||
WHERE activeTransactions.transaction_number = blockingTransactions.txn_num
|
||||
) AS sub;
|
||||
|
||||
RETURN mRemoteBlockingPids;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
|
||||
RETURNS boolean AS $$
|
||||
DECLARE
|
||||
mBlockedTransactionNum int8;
|
||||
BEGIN
|
||||
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
|
||||
RETURN true;
|
||||
END IF;
|
||||
|
||||
-- pg says we're not blocked locally; check whether we're blocked globally.
|
||||
SELECT transaction_number INTO mBlockedTransactionNum
|
||||
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
|
||||
|
||||
RETURN EXISTS (
|
||||
SELECT 1 FROM dump_global_wait_edges()
|
||||
WHERE waiting_transaction_num = mBlockedTransactionNum
|
||||
);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '7.0-6'
|
||||
default_version = '7.0-7'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-insert-1 s2-insert s1-finish s2-finish
|
||||
create_distributed_table
|
||||
|
||||
|
||||
step s1-insert-1:
|
||||
BEGIN;
|
||||
INSERT INTO test_locking (a) VALUES (1);
|
||||
|
||||
step s2-insert:
|
||||
BEGIN;
|
||||
INSERT INTO test_locking (a) VALUES (1);
|
||||
<waiting ...>
|
||||
step s1-finish:
|
||||
COMMIT;
|
||||
|
||||
step s2-insert: <... completed>
|
||||
error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102751"
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
|
@ -116,6 +116,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-3';
|
|||
ALTER EXTENSION citus UPDATE TO '7.0-4';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-5';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-6';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-7';
|
||||
-- show running version
|
||||
SHOW citus.version;
|
||||
citus.version
|
||||
|
|
|
@ -11,3 +11,5 @@ test: isolation_drop_shards isolation_copy_placement_vs_modification
|
|||
test: isolation_insert_vs_vacuum isolation_transaction_recovery
|
||||
test: isolation_distributed_transaction_id isolation_progress_monitoring
|
||||
test: isolation_dump_local_wait_edges isolation_dump_global_wait_edges
|
||||
|
||||
test: isolation_replace_wait_function
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
# check that replace_isolation_tester_func correctly replaces the functions isolation
|
||||
# tester uses while searching for locks. If those functions aren't correctly replaced
|
||||
# this test will timeout, since isolation tester will never notice that s2 is blocked
|
||||
# by s1 on a lock it's taken out on one of the workers
|
||||
|
||||
setup
|
||||
{
|
||||
SELECT citus.replace_isolation_tester_func();
|
||||
SELECT citus.refresh_isolation_tester_prepared_statement();
|
||||
|
||||
CREATE TABLE test_locking (a int unique);
|
||||
SELECT create_distributed_table('test_locking', 'a');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE test_locking;
|
||||
SELECT citus.restore_isolation_tester_func();
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-insert-1"
|
||||
{
|
||||
BEGIN;
|
||||
INSERT INTO test_locking (a) VALUES (1);
|
||||
}
|
||||
|
||||
step "s1-finish"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-insert"
|
||||
{
|
||||
BEGIN;
|
||||
INSERT INTO test_locking (a) VALUES (1);
|
||||
}
|
||||
|
||||
step "s2-finish"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
permutation "s1-insert-1" "s2-insert" "s1-finish" "s2-finish"
|
|
@ -116,6 +116,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-3';
|
|||
ALTER EXTENSION citus UPDATE TO '7.0-4';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-5';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-6';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-7';
|
||||
|
||||
-- show running version
|
||||
SHOW citus.version;
|
||||
|
|
Loading…
Reference in New Issue