Accept invalidation messages before accessing the metadata cache (#1406)

* Accept invalidation messages before accessing the metadata cache

This commit is crucial to prevent stale metadata reads from the
cache. Without this commit, some of the operations may use stale
metadata which could end up with various bugs such as crashes,
inconsistent/lost data etc.

As an example, consider that a COPY operation is blocked on shard
metadata lock. Another concurrent session updates the metadata and
invalidates the cache. However, since Citus doesn't accept invalidations,
COPY continues with the stale metadata once it acquires the lock.

With this commit, we make sure that invalidation messages are accepted
just before accessing the metadata cache and preventing any operation to
use stale metadata.

* Add isolation tests for placement changes and conccurrent operations

   - add node with reference table vs COPY/insert/update/DDL
   - repair shard vs COPY/insert/update/DDL
   - repair shard vs repair shard
pull/1410/head
Önder Kalacı 2017-05-12 12:32:35 +03:00 committed by GitHub
parent 94151c9aef
commit e0257aecd9
10 changed files with 1361 additions and 15 deletions

View File

@ -385,7 +385,15 @@ LookupShardCacheEntry(int64 shardId)
recheck = true; recheck = true;
} }
else if (!shardEntry->tableEntry->isValid) else
{
/*
* We might have some concurrent metadata changes. In order to get the changes,
* we first need to accept the cache invalidation messages.
*/
AcceptInvalidationMessages();
if (!shardEntry->tableEntry->isValid)
{ {
/* /*
* The cache entry might not be valid right now. Reload cache entry * The cache entry might not be valid right now. Reload cache entry
@ -394,6 +402,7 @@ LookupShardCacheEntry(int64 shardId)
LookupDistTableCacheEntry(shardEntry->tableEntry->relationId); LookupDistTableCacheEntry(shardEntry->tableEntry->relationId);
recheck = true; recheck = true;
} }
}
/* /*
* If we (re-)loaded the table cache, re-search the shard cache - the * If we (re-)loaded the table cache, re-search the shard cache - the
@ -472,6 +481,12 @@ LookupDistTableCacheEntry(Oid relationId)
/* return valid matches */ /* return valid matches */
if (foundInCache) if (foundInCache)
{ {
/*
* We might have some concurrent metadata changes. In order to get the changes,
* we first need to accept the cache invalidation messages.
*/
AcceptInvalidationMessages();
if (cacheEntry->isValid) if (cacheEntry->isValid)
{ {
return cacheEntry; return cacheEntry;
@ -1927,6 +1942,12 @@ InitializeDistTableCache(void)
HTAB * HTAB *
GetWorkerNodeHash(void) GetWorkerNodeHash(void)
{ {
/*
* We might have some concurrent metadata changes. In order to get the changes,
* we first need to accept the cache invalidation messages.
*/
AcceptInvalidationMessages();
if (!workerNodeHashValid) if (!workerNodeHashValid)
{ {
InitializeWorkerNodeCache(); InitializeWorkerNodeCache();

View File

@ -0,0 +1,460 @@
Parsed test spec with 2 sessions
starting permutation: s2-load-metadata-cache s1-begin s1-add-second-worker s2-copy-to-reference-table s1-commit s2-print-content
create_reference_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-copy-to-reference-table:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
<waiting ...>
step s1-commit:
COMMIT;
step s2-copy-to-reference-table: <... completed>
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 10
57638 t 10
master_remove_node
starting permutation: s2-load-metadata-cache s2-begin s2-copy-to-reference-table s1-add-second-worker s2-commit s2-print-content
create_reference_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
step s2-begin:
BEGIN;
step s2-copy-to-reference-table:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-add-second-worker: <... completed>
nodename nodeport isactive
localhost 57638 t
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 10
57638 t 10
master_remove_node
starting permutation: s2-load-metadata-cache s1-begin s1-add-second-worker s2-insert-to-reference-table s1-commit s2-print-content
create_reference_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-insert-to-reference-table:
INSERT INTO test_reference_table VALUES (6);
<waiting ...>
step s1-commit:
COMMIT;
step s2-insert-to-reference-table: <... completed>
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 6
57638 t 6
master_remove_node
starting permutation: s2-load-metadata-cache s2-begin s2-insert-to-reference-table s1-add-second-worker s2-commit s2-print-content
create_reference_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
step s2-begin:
BEGIN;
step s2-insert-to-reference-table:
INSERT INTO test_reference_table VALUES (6);
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-add-second-worker: <... completed>
nodename nodeport isactive
localhost 57638 t
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 6
57638 t 6
master_remove_node
starting permutation: s2-load-metadata-cache s1-begin s1-add-second-worker s2-ddl-on-reference-table s1-commit s2-print-index-count
create_reference_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-ddl-on-reference-table:
CREATE INDEX reference_index ON test_reference_table(test_id);
<waiting ...>
step s1-commit:
COMMIT;
step s2-ddl-on-reference-table: <... completed>
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
starting permutation: s2-load-metadata-cache s2-begin s2-ddl-on-reference-table s1-add-second-worker s2-commit s2-print-index-count
create_reference_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
step s2-begin:
BEGIN;
step s2-ddl-on-reference-table:
CREATE INDEX reference_index ON test_reference_table(test_id);
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-add-second-worker: <... completed>
nodename nodeport isactive
localhost 57638 t
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
starting permutation: s1-begin s1-add-second-worker s2-copy-to-reference-table s1-commit s2-print-content
create_reference_table
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-copy-to-reference-table:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
<waiting ...>
step s1-commit:
COMMIT;
step s2-copy-to-reference-table: <... completed>
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 5
57638 t 5
master_remove_node
starting permutation: s2-begin s2-copy-to-reference-table s1-add-second-worker s2-commit s2-print-content
create_reference_table
step s2-begin:
BEGIN;
step s2-copy-to-reference-table:
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
<waiting ...>
step s2-commit: <... completed>
step s1-add-second-worker: <... completed>
error in steps s2-commit s1-add-second-worker: ERROR: deadlock detected
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 5
master_remove_node
starting permutation: s1-begin s1-add-second-worker s2-insert-to-reference-table s1-commit s2-print-content
create_reference_table
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-insert-to-reference-table:
INSERT INTO test_reference_table VALUES (6);
<waiting ...>
step s1-commit:
COMMIT;
step s2-insert-to-reference-table: <... completed>
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
starting permutation: s2-begin s2-insert-to-reference-table s1-add-second-worker s2-commit s2-print-content
create_reference_table
step s2-begin:
BEGIN;
step s2-insert-to-reference-table:
INSERT INTO test_reference_table VALUES (6);
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-add-second-worker: <... completed>
nodename nodeport isactive
localhost 57638 t
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
starting permutation: s1-begin s1-add-second-worker s2-ddl-on-reference-table s1-commit s2-print-index-count
create_reference_table
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-ddl-on-reference-table:
CREATE INDEX reference_index ON test_reference_table(test_id);
<waiting ...>
step s1-commit:
COMMIT;
step s2-ddl-on-reference-table: <... completed>
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
starting permutation: s2-begin s2-ddl-on-reference-table s1-add-second-worker s2-commit s2-print-index-count
create_reference_table
step s2-begin:
BEGIN;
step s2-ddl-on-reference-table:
CREATE INDEX reference_index ON test_reference_table(test_id);
step s1-add-second-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-add-second-worker: <... completed>
nodename nodeport isactive
localhost 57638 t
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node

View File

@ -2,12 +2,12 @@ Parsed test spec with 1 sessions
starting permutation: s1a starting permutation: s1a
step s1a: step s1a:
SELECT master_add_node('localhost', 57637); SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57637);
SELECT master_add_node('localhost', 57638); SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
master_add_node nodename nodeport isactive
(1,1,localhost,57637,default,f,t) localhost 57637 t
master_add_node nodename nodeport isactive
(2,2,localhost,57638,default,f,t) localhost 57638 t

View File

@ -0,0 +1,51 @@
Parsed test spec with 2 sessions
starting permutation: s1-load-cache s2-load-cache s2-set-placement-inactive s2-begin s2-repair-placement s1-repair-placement s2-commit
step s1-load-cache:
COPY test_hash_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
step s2-load-cache:
COPY test_hash_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard_for_test_table) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-repair-placement: <... completed>
error in steps s2-commit s1-repair-placement: ERROR: target placement must be in inactive state
starting permutation: s2-set-placement-inactive s2-begin s2-repair-placement s1-repair-placement s2-commit
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard_for_test_table) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-repair-placement: <... completed>
error in steps s2-commit s1-repair-placement: ERROR: target placement must be in inactive state

View File

@ -0,0 +1,498 @@
Parsed test spec with 2 sessions
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content
step s1-load-cache:
TRUNCATE test_table;
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
1
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-update:
UPDATE test_table SET y = 5 WHERE x = 5;
<waiting ...>
step s2-commit:
COMMIT;
step s1-update: <... completed>
step s1-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
nodeport;
nodeport success result
57637 t 5
57638 t 5
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content
step s1-load-cache:
TRUNCATE test_table;
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
1
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-delete:
UPDATE test_table SET y = 5 WHERE x = 5;
<waiting ...>
step s2-commit:
COMMIT;
step s1-delete: <... completed>
step s1-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
nodeport;
nodeport success result
57637 t 5
57638 t 5
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content
step s1-load-cache:
TRUNCATE test_table;
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
0
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
<waiting ...>
step s2-commit:
COMMIT;
step s1-insert: <... completed>
step s1-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
nodeport;
nodeport success result
57637 t 10
57638 t 10
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content
step s1-load-cache:
TRUNCATE test_table;
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
0
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-copy:
COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
<waiting ...>
step s2-commit:
COMMIT;
step s1-copy: <... completed>
step s1-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
nodeport;
nodeport success result
57637 t 5
57638 t 5
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count
step s1-load-cache:
TRUNCATE test_table;
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
0
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-ddl:
CREATE INDEX test_table_index ON test_table(x);
<waiting ...>
step s2-commit:
COMMIT;
step s1-ddl: <... completed>
step s1-commit:
COMMIT;
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57637 t 1
57638 t 1
57638 t 1
starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
1
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-update:
UPDATE test_table SET y = 5 WHERE x = 5;
<waiting ...>
step s2-commit:
COMMIT;
step s1-update: <... completed>
step s1-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
nodeport;
nodeport success result
57637 t 5
57638 t 5
starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
1
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-delete:
UPDATE test_table SET y = 5 WHERE x = 5;
<waiting ...>
step s2-commit:
COMMIT;
step s1-delete: <... completed>
step s1-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
nodeport;
nodeport success result
57637 t 5
57638 t 5
starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
0
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
<waiting ...>
step s2-commit:
COMMIT;
step s1-insert: <... completed>
step s1-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
nodeport;
nodeport success result
57637 t 10
57638 t 10
starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
0
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-copy:
COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
<waiting ...>
step s2-commit:
COMMIT;
step s1-copy: <... completed>
step s1-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
nodeport;
nodeport success result
57637 t 5
57638 t 5
starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
count
0
step s2-set-placement-inactive:
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
step s2-begin:
BEGIN;
step s2-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_copy_shard_placement
step s1-ddl:
CREATE INDEX test_table_index ON test_table(x);
<waiting ...>
step s2-commit:
COMMIT;
step s1-ddl: <... completed>
step s1-commit:
COMMIT;
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57637 t 1
57638 t 1
57638 t 1

View File

@ -1,4 +1,10 @@
test: isolation_add_node_vs_reference_table_operations
# tests that change node metadata should precede
# isolation_cluster_management such that tests
# that come later can be parallelized
test: isolation_cluster_management test: isolation_cluster_management
test: isolation_dml_vs_repair
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement
test: isolation_concurrent_dml isolation_data_migration test: isolation_concurrent_dml isolation_data_migration
test: isolation_drop_shards test: isolation_drop_shards isolation_copy_placement_vs_modification

View File

@ -0,0 +1,112 @@
# the test expects to have zero nodes in pg_dist_node at the beginning
# add single one of the nodes for the purpose of the test
setup
{
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57637);
CREATE TABLE test_reference_table (test_id integer);
SELECT create_reference_table('test_reference_table');
}
# ensure that both nodes exists for the remaining of the isolation tests
teardown
{
DROP TABLE test_reference_table;
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-add-second-worker"
{
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
}
step "s1-remove-second-worker"
{
SELECT master_remove_node('localhost', 57638);
}
step "s1-commit"
{
COMMIT;
}
session "s2"
# COPY accesses all shard/placement metadata, so should be enough for
# loading the cache
step "s2-load-metadata-cache"
{
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
}
step "s2-copy-to-reference-table"
{
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
}
step "s2-insert-to-reference-table"
{
INSERT INTO test_reference_table VALUES (6);
}
step "s2-ddl-on-reference-table"
{
CREATE INDEX reference_index ON test_reference_table(test_id);
}
step "s2-begin"
{
BEGIN;
}
step "s2-commit"
{
COMMIT;
}
step "s2-print-content"
{
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
}
step "s2-print-index-count"
{
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
}
# verify that copy/insert gets the invalidation and re-builts its metadata cache
# note that we need to run "s1-load-metadata-cache" and "s2-load-metadata-cache"
# to ensure that metadata is cached otherwise the test would be useless since
# the cache would be empty and the metadata data is gathered from the tables directly
permutation "s2-load-metadata-cache" "s1-begin" "s1-add-second-worker" "s2-copy-to-reference-table" "s1-commit" "s2-print-content"
permutation "s2-load-metadata-cache" "s2-begin" "s2-copy-to-reference-table" "s1-add-second-worker" "s2-commit" "s2-print-content"
permutation "s2-load-metadata-cache" "s1-begin" "s1-add-second-worker" "s2-insert-to-reference-table" "s1-commit" "s2-print-content"
permutation "s2-load-metadata-cache" "s2-begin" "s2-insert-to-reference-table" "s1-add-second-worker" "s2-commit" "s2-print-content"
permutation "s2-load-metadata-cache" "s1-begin" "s1-add-second-worker" "s2-ddl-on-reference-table" "s1-commit" "s2-print-index-count"
permutation "s2-load-metadata-cache" "s2-begin" "s2-ddl-on-reference-table" "s1-add-second-worker" "s2-commit" "s2-print-index-count"
# same tests without loading the cache
permutation "s1-begin" "s1-add-second-worker" "s2-copy-to-reference-table" "s1-commit" "s2-print-content"
permutation "s2-begin" "s2-copy-to-reference-table" "s1-add-second-worker" "s2-commit" "s2-print-content"
permutation "s1-begin" "s1-add-second-worker" "s2-insert-to-reference-table" "s1-commit" "s2-print-content"
permutation "s2-begin" "s2-insert-to-reference-table" "s1-add-second-worker" "s2-commit" "s2-print-content"
permutation "s1-begin" "s1-add-second-worker" "s2-ddl-on-reference-table" "s1-commit" "s2-print-index-count"
permutation "s2-begin" "s2-ddl-on-reference-table" "s1-add-second-worker" "s2-commit" "s2-print-index-count"

View File

@ -1,8 +1,8 @@
session "s1" session "s1"
step "s1a" step "s1a"
{ {
SELECT master_add_node('localhost', 57637); SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57637);
SELECT master_add_node('localhost', 57638); SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
} }
permutation "s1a" permutation "s1a"

View File

@ -0,0 +1,71 @@
# we use 5 as the partition key value through out the test
# so setting the corresponding shard here is useful
setup
{
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 2;
CREATE TABLE test_hash_table (x int, y int);
SELECT create_distributed_table('test_hash_table', 'x');
SELECT get_shard_id_for_distribution_column('test_hash_table', 5) INTO selected_shard_for_test_table;
}
teardown
{
DROP TABLE test_hash_table;
DROP TABLE selected_shard_for_test_table;
}
session "s1"
# since test_hash_table has rep > 1 simple select query doesn't hit all placements
# hence not all placements are cached
# but with copy all placements are cached
step "s1-load-cache"
{
COPY test_hash_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
}
step "s1-repair-placement"
{
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-set-placement-inactive"
{
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard_for_test_table) AND nodeport = 57638;
}
step "s2-repair-placement"
{
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
}
# since test_hash_table has rep > 1 simple select query doesn't hit all placements
# hence not all placements are cached
# but with copy all placements are cached
step "s2-load-cache"
{
COPY test_hash_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
}
step "s2-commit"
{
COMMIT;
}
# two concurrent shard repairs on the same shard
# note that "s1-repair-placement" errors out but that is expected
# given that "s2-repair-placement" succeeds and the placement is
# already repaired
permutation "s1-load-cache" "s2-load-cache" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-repair-placement" "s2-commit"
# the same test without the load caches
permutation "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-repair-placement" "s2-commit"

View File

@ -0,0 +1,127 @@
# we use 5 as the partition key value through out the test
# so setting the corresponding shard here is useful
setup
{
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 2;
CREATE TABLE test_table (x int, y int);
SELECT create_distributed_table('test_table', 'x');
SELECT get_shard_id_for_distribution_column('test_table', 5) INTO selected_shard;
}
teardown
{
DROP TABLE test_table;
DROP TABLE selected_shard;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
# since test_table has rep > 1 simple select query doesn't hit all placements
# hence not all placements are cached
step "s1-load-cache"
{
TRUNCATE test_table;
}
step "s1-insert"
{
INSERT INTO test_table VALUES (5, 10);
}
step "s1-update"
{
UPDATE test_table SET y = 5 WHERE x = 5;
}
step "s1-delete"
{
UPDATE test_table SET y = 5 WHERE x = 5;
}
step "s1-select"
{
SELECT count(*) FROM test_table WHERE x = 5;
}
step "s1-ddl"
{
CREATE INDEX test_table_index ON test_table(x);
}
step "s1-copy"
{
COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-set-placement-inactive"
{
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638;
}
step "s2-repair-placement"
{
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
}
step "s2-commit"
{
COMMIT;
}
step "s2-print-content"
{
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
nodeport;
}
step "s2-print-index-count"
{
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
}
# repair a placement while concurrently performing an update/delete/insert/copy
# note that at some points we use "s1-select" just after "s1-begin" given that BEGIN
# may invalidate cache at certain cases
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-update" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-delete" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-insert" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-copy" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-ddl" "s2-commit" "s1-commit" "s2-print-index-count"
# the same tests without loading the cache at first
permutation "s1-insert" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-update" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-insert" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-delete" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-insert" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-copy" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-ddl" "s2-commit" "s1-commit" "s2-print-index-count"