From 76a8b3286afaaacc519a72cc2751b4ff8f09bb3b Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Thu, 5 Jan 2017 12:07:12 +0300 Subject: [PATCH 1/2] Modify tests to create clean workspace Since we will now replicate reference tables each time we add node, we need to ensure that test space is clean in terms of reference tables before any add node operation. For this purpose we had to change order of multi_drop_extension test which caused change of some of the colocation ids. --- .../expected/multi_colocation_utils.out | 88 ++++----- .../expected/multi_reference_table.out | 2 +- .../multi_upgrade_reference_table.out | 167 ++++++++++-------- .../input/multi_outer_join_reference.source | 6 + src/test/regress/multi_schedule | 10 +- .../output/multi_outer_join_reference.source | 5 + .../regress/sql/multi_colocation_utils.sql | 4 +- .../regress/sql/multi_reference_table.sql | 2 +- .../multi_unsupported_worker_operations.sql | 1 + .../sql/multi_upgrade_reference_table.sql | 37 ++-- 10 files changed, 180 insertions(+), 142 deletions(-) diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 724f5fa04..048fa084d 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -436,10 +436,10 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1 | 2 | 2 | 23 - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 + 4 | 2 | 2 | 23 + 5 | 2 | 1 | 23 + 6 | 2 | 2 | 25 + 7 | 4 | 2 | 23 (4 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition @@ -447,28 +447,28 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition ORDER BY logicalrelid; logicalrelid | colocationid ---------------+-------------- - table1_groupa | 1 - table2_groupa | 1 - table1_groupb | 2 - table2_groupb | 2 - table1_groupc | 3 - table2_groupc | 3 - table1_groupd | 4 - table2_groupd | 4 - table3_groupd | 4 + table1_groupa | 4 + table2_groupa | 4 + table1_groupb | 5 + table2_groupb | 5 + table1_groupc | 6 + table2_groupc | 6 + table1_groupd | 7 + table2_groupd | 7 + table3_groupd | 7 (9 rows) -- check effects of dropping tables DROP TABLE table1_groupA; -SELECT * FROM pg_dist_colocation WHERE colocationid = 1; +SELECT * FROM pg_dist_colocation WHERE colocationid = 4; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1 | 2 | 2 | 23 + 4 | 2 | 2 | 23 (1 row) -- dropping all tables in a colocation group also deletes the colocation group DROP TABLE table2_groupA; -SELECT * FROM pg_dist_colocation WHERE colocationid = 1; +SELECT * FROM pg_dist_colocation WHERE colocationid = 4; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ (0 rows) @@ -558,11 +558,11 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 - 5 | 2 | 2 | 23 - 9 | 3 | 2 | 23 + 5 | 2 | 1 | 23 + 6 | 2 | 2 | 25 + 7 | 4 | 2 | 23 + 8 | 2 | 2 | 23 + 12 | 3 | 2 | 23 (5 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition @@ -570,23 +570,23 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition ORDER BY colocationid, logicalrelid; logicalrelid | colocationid ----------------------------------+-------------- - table1_groupb | 2 - table2_groupb | 2 - table1_groupc | 3 - table2_groupc | 3 - table1_groupd | 4 - table2_groupd | 4 - table3_groupd | 4 - table1_groupe | 5 - table2_groupe | 5 - table3_groupe | 5 - schema_collocation.table4_groupe | 5 - table4_groupe | 5 - table1_group_none_1 | 6 - table2_group_none_1 | 6 - table1_group_none_2 | 7 - table1_group_none_3 | 8 - table1_group_default | 9 + table1_groupb | 5 + table2_groupb | 5 + table1_groupc | 6 + table2_groupc | 6 + table1_groupd | 7 + table2_groupd | 7 + table3_groupd | 7 + table1_groupe | 8 + table2_groupe | 8 + table3_groupe | 8 + schema_collocation.table4_groupe | 8 + table4_groupe | 8 + table1_group_none_1 | 9 + table2_group_none_1 | 9 + table1_group_none_2 | 10 + table1_group_none_3 | 11 + table1_group_default | 12 (17 rows) -- check failing colocate_with options @@ -651,12 +651,12 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 - 5 | 2 | 2 | 23 - 9 | 3 | 2 | 23 - 10 | 1 | 2 | 0 + 5 | 2 | 1 | 23 + 6 | 2 | 2 | 25 + 7 | 4 | 2 | 23 + 8 | 2 | 2 | 23 + 12 | 3 | 2 | 23 + 13 | 1 | 2 | 0 (6 rows) -- cross check with internal colocation API diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index d8b4a123e..9157c0331 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1574,7 +1574,7 @@ ERROR: single-shard DML commands must not appear in transaction blocks which co ROLLBACK; -- clean up tables DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, - reference_table_test_fourth, reference_table_ddl; + reference_table_test_fourth, reference_table_ddl, reference_table_composite; DROP SCHEMA reference_schema CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table reference_schema.reference_table_test_sixth diff --git a/src/test/regress/expected/multi_upgrade_reference_table.out b/src/test/regress/expected/multi_upgrade_reference_table.out index 3ced545fa..348e9137f 100644 --- a/src/test/regress/expected/multi_upgrade_reference_table.out +++ b/src/test/regress/expected/multi_upgrade_reference_table.out @@ -5,6 +5,7 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1360000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000; -- test with not distributed table CREATE TABLE upgrade_reference_table_local(column1 int); SELECT upgrade_to_reference_table('upgrade_reference_table_local'); @@ -138,15 +139,16 @@ WHERE colocationid IN --------------+------------+-------------------+------------------------ (0 rows) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360009 | 1 | 8192 | localhost | 57637 | 379 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360009 | 1 | 8192 | localhost | 57637 (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_append'); @@ -164,7 +166,7 @@ WHERE logicalrelid = 'upgrade_reference_table_append'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -186,19 +188,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360009 | 1 | 8192 | localhost | 57637 | 379 - 1360009 | 1 | 0 | localhost | 57638 | 380 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360009 | 1 | 8192 | localhost | 57637 + 1360009 | 1 | 0 | localhost | 57638 (2 rows) -- test valid cases, shard exists at one worker @@ -218,7 +221,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 32 | s + h | f | 1360001 | s (1 row) SELECT @@ -240,18 +243,19 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 32 | 1 | 1 | 23 + 1360001 | 1 | 1 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360010 | 1 | 0 | localhost | 57637 | 381 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360010 | 1 | 0 | localhost | 57637 (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker'); @@ -269,7 +273,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -291,19 +295,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360010 | 1 | 0 | localhost | 57637 | 381 - 1360010 | 1 | 0 | localhost | 57638 | 382 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360010 | 1 | 0 | localhost | 57637 + 1360010 | 1 | 0 | localhost | 57638 (2 rows) -- test valid cases, shard exists at both workers but one is unhealthy @@ -325,7 +330,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 34 | c + h | f | 1360003 | c (1 row) SELECT @@ -347,19 +352,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 34 | 1 | 2 | 23 + 1360003 | 1 | 2 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360011 | 1 | 0 | localhost | 57637 | 383 - 1360011 | 1 | 0 | localhost | 57638 | 384 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360011 | 1 | 0 | localhost | 57637 + 1360011 | 1 | 0 | localhost | 57638 (2 rows) SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy'); @@ -377,7 +383,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -399,19 +405,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360011 | 1 | 0 | localhost | 57637 | 383 - 1360011 | 1 | 0 | localhost | 57638 | 384 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360011 | 1 | 0 | localhost | 57637 + 1360011 | 1 | 0 | localhost | 57638 (2 rows) -- test valid cases, shard exists at both workers and both are healthy @@ -431,7 +438,7 @@ WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 35 | c + h | f | 1360004 | c (1 row) SELECT @@ -453,19 +460,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 35 | 1 | 2 | 23 + 1360004 | 1 | 2 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360012 | 1 | 0 | localhost | 57637 | 385 - 1360012 | 1 | 0 | localhost | 57638 | 386 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360012 | 1 | 0 | localhost | 57637 + 1360012 | 1 | 0 | localhost | 57638 (2 rows) SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy'); @@ -483,7 +491,7 @@ WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -505,19 +513,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360012 | 1 | 0 | localhost | 57637 | 385 - 1360012 | 1 | 0 | localhost | 57638 | 386 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360012 | 1 | 0 | localhost | 57637 + 1360012 | 1 | 0 | localhost | 57638 (2 rows) -- test valid cases, do it in transaction and ROLLBACK @@ -538,7 +547,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 32 | s + h | f | 1360001 | s (1 row) SELECT @@ -560,18 +569,19 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 32 | 1 | 1 | 23 + 1360001 | 1 | 1 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360013 | 1 | 0 | localhost | 57637 | 387 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360013 | 1 | 0 | localhost | 57637 (1 row) BEGIN; @@ -591,7 +601,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 32 | s + h | f | 1360001 | s (1 row) SELECT @@ -613,18 +623,19 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 32 | 1 | 1 | 23 + 1360001 | 1 | 1 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360013 | 1 | 0 | localhost | 57637 | 387 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360013 | 1 | 0 | localhost | 57637 (1 row) -- test valid cases, do it in transaction and COMMIT @@ -645,7 +656,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 32 | s + h | f | 1360001 | s (1 row) SELECT @@ -667,18 +678,19 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 32 | 1 | 1 | 23 + 1360001 | 1 | 1 | 23 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360014 | 1 | 0 | localhost | 57637 | 389 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360014 | 1 | 0 | localhost | 57637 (1 row) BEGIN; @@ -698,7 +710,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 33 | t + n | t | 1360002 | t (1 row) SELECT @@ -720,19 +732,20 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 33 | 1 | 2 | 0 + 1360002 | 1 | 2 | 0 (1 row) -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); - shardid | shardstate | shardlength | nodename | nodeport | placementid ----------+------------+-------------+-----------+----------+------------- - 1360014 | 1 | 0 | localhost | 57637 | 389 - 1360014 | 1 | 0 | localhost | 57638 | 390 + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360014 | 1 | 0 | localhost | 57637 + 1360014 | 1 | 0 | localhost | 57638 (2 rows) -- verify that shard is replicated to other worker diff --git a/src/test/regress/input/multi_outer_join_reference.source b/src/test/regress/input/multi_outer_join_reference.source index 40f1cf56d..be121413e 100644 --- a/src/test/regress/input/multi_outer_join_reference.source +++ b/src/test/regress/input/multi_outer_join_reference.source @@ -449,3 +449,9 @@ SELECT FROM multi_outer_join_right_reference FULL JOIN multi_outer_join_third_reference ON (t_custkey = r_custkey); + +-- DROP unused tables to clean up workspace +DROP TABLE multi_outer_join_left_hash; +DROP TABLE multi_outer_join_right_reference; +DROP TABLE multi_outer_join_third_reference; +DROP TABLE multi_outer_join_right_hash; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 3fb257d68..79a6dc0e1 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -157,6 +157,11 @@ test: multi_router_planner # ---------- test: multi_large_shardid +# ---------- +# multi_drop_extension makes sure we can safely drop and recreate the extension +# ---------- +test: multi_drop_extension + # ---------- # multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers # multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers @@ -164,11 +169,6 @@ test: multi_large_shardid test: multi_metadata_sync test: multi_unsupported_worker_operations -# ---------- -# multi_drop_extension makes sure we can safely drop and recreate the extension -# ---------- -test: multi_drop_extension - # ---------- # multi_schema_support makes sure we can work with tables in schemas other than public with no problem # ---------- diff --git a/src/test/regress/output/multi_outer_join_reference.source b/src/test/regress/output/multi_outer_join_reference.source index a508aeb42..502d3cf2e 100644 --- a/src/test/regress/output/multi_outer_join_reference.source +++ b/src/test/regress/output/multi_outer_join_reference.source @@ -834,3 +834,8 @@ FROM 7 | (30 rows) +-- DROP unused tables to clean up workspace +DROP TABLE multi_outer_join_left_hash; +DROP TABLE multi_outer_join_right_reference; +DROP TABLE multi_outer_join_third_reference; +DROP TABLE multi_outer_join_right_hash; diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 336758f56..91867b426 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -219,11 +219,11 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition -- check effects of dropping tables DROP TABLE table1_groupA; -SELECT * FROM pg_dist_colocation WHERE colocationid = 1; +SELECT * FROM pg_dist_colocation WHERE colocationid = 4; -- dropping all tables in a colocation group also deletes the colocation group DROP TABLE table2_groupA; -SELECT * FROM pg_dist_colocation WHERE colocationid = 1; +SELECT * FROM pg_dist_colocation WHERE colocationid = 4; -- create dropped colocation group again SET citus.shard_count = 2; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 32af6ce74..eaea4ab66 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -986,5 +986,5 @@ ROLLBACK; -- clean up tables DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, - reference_table_test_fourth, reference_table_ddl; + reference_table_test_fourth, reference_table_ddl, reference_table_composite; DROP SCHEMA reference_schema CASCADE; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 9b30428d4..1ab19c3e2 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -94,6 +94,7 @@ SELECT master_apply_delete_command('DELETE FROM mx_table'); SELECT count(*) FROM mx_table; -- master_add_node + SELECT master_add_node('localhost', 5432); SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; diff --git a/src/test/regress/sql/multi_upgrade_reference_table.sql b/src/test/regress/sql/multi_upgrade_reference_table.sql index acfec782e..0bec3ade7 100644 --- a/src/test/regress/sql/multi_upgrade_reference_table.sql +++ b/src/test/regress/sql/multi_upgrade_reference_table.sql @@ -6,6 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1360000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000; -- test with not distributed table CREATE TABLE upgrade_reference_table_local(column1 int); @@ -91,7 +92,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -122,7 +124,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -155,7 +158,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -186,7 +190,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -221,7 +226,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -252,7 +258,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -285,7 +292,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -316,7 +324,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -350,7 +359,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -383,7 +393,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -417,7 +428,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid @@ -450,7 +462,8 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); -SELECT * +SELECT + shardid, shardstate, shardlength, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid From 9d756de3ae2fb4dca84d32f6e465d5076d7fd9c9 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Thu, 5 Jan 2017 12:29:32 +0300 Subject: [PATCH 2/2] Replicate reference tables when new node is added With this change, we start to replicate all reference tables to the new node when new node is added to the cluster with master_add_node command. We also update replication factor of reference table's colocation group. --- .../master/master_metadata_utility.c | 58 ++ .../distributed/utils/metadata_cache.c | 16 +- src/backend/distributed/utils/node_metadata.c | 29 +- .../distributed/utils/reference_table_utils.c | 100 ++- .../distributed/master_metadata_utility.h | 2 + src/include/distributed/metadata_cache.h | 2 + .../distributed/reference_table_utils.h | 1 + .../multi_replicate_reference_table.out | 598 ++++++++++++++++++ src/test/regress/multi_schedule | 2 + .../sql/multi_replicate_reference_table.sql | 387 ++++++++++++ 10 files changed, 1187 insertions(+), 8 deletions(-) create mode 100644 src/test/regress/expected/multi_replicate_reference_table.out create mode 100644 src/test/regress/sql/multi_replicate_reference_table.sql diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index f44e03a4d..74c9f39d5 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -31,6 +31,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard_placement.h" @@ -867,6 +868,63 @@ UpdateShardPlacementState(uint64 placementId, char shardState) } +/* + * UpdateColocationGroupReplicationFactor finds colocation group record for given + * colocationId and updates its replication factor to given replicationFactor value. + * Since we do not cache pg_dist_colocation table, we do not need to invalidate the + * cache after updating replication factor. + */ +void +UpdateColocationGroupReplicationFactor(uint32 colocationId, int replicationFactor) +{ + Relation pgDistColocation = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; + + Datum values[Natts_pg_dist_colocation]; + bool isnull[Natts_pg_dist_colocation]; + bool replace[Natts_pg_dist_colocation]; + + /* we first search for colocation group by its colocation id */ + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistColocation); + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(colocationId)); + + scanDescriptor = systable_beginscan(pgDistColocation, + DistColocationColocationidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("could not find valid entry for colocation group " + "%d", colocationId))); + } + + /* after we find colocation group, we update it with new values */ + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_colocation_replicationfactor - 1] = Int32GetDatum( + replicationFactor); + isnull[Anum_pg_dist_colocation_replicationfactor - 1] = false; + replace[Anum_pg_dist_colocation_replicationfactor - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + simple_heap_update(pgDistColocation, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistColocation, heapTuple); + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, NoLock); +} + + /* * Check that the current user has `mode` permissions on relationId, error out * if not. Superusers always have such permissions. diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index bb1bd75f9..55b5329c1 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -58,6 +58,7 @@ static Oid distNodeRelationId = InvalidOid; static Oid distLocalGroupRelationId = InvalidOid; static Oid distColocationRelationId = InvalidOid; static Oid distColocationConfigurationIndexId = InvalidOid; +static Oid distColocationColocationidIndexId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid; @@ -105,7 +106,6 @@ static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); -static List * DistTableOidList(void); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId); static List * LookupDistShardTuples(Oid relationId); @@ -762,6 +762,17 @@ DistColocationConfigurationIndexId(void) } +/* return oid of pg_dist_colocation_pkey index */ +Oid +DistColocationColocationidIndexId(void) +{ + CachedRelationLookup("pg_dist_colocation_pkey", + &distColocationColocationidIndexId); + + return distColocationColocationidIndexId; +} + + /* return oid of pg_dist_partition relation */ Oid DistPartitionRelationId(void) @@ -1565,6 +1576,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distNodeRelationId = InvalidOid; distColocationRelationId = InvalidOid; distColocationConfigurationIndexId = InvalidOid; + distColocationColocationidIndexId = InvalidOid; distPartitionRelationId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid; @@ -1583,7 +1595,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) * DistTableOidList iterates over the pg_dist_partition table and returns * a list that consists of the logicalrelids. */ -static List * +List * DistTableOidList(void) { SysScanDesc scanDescriptor = NULL; diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 01911ed0b..ac8254955 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -30,6 +30,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_node.h" +#include "distributed/reference_table_utils.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" @@ -48,7 +49,7 @@ int GroupSize = 1; /* local function forward declarations */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, - char *nodeRack, bool hasMetadata); + char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists); static Datum GenerateNodeTuple(WorkerNode *workerNode); static int32 GetNextGroupId(void); static uint32 GetMaxGroupId(void); @@ -67,7 +68,8 @@ PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); /* - * master_add_node function adds a new node to the cluster and returns its data. + * master_add_node function adds a new node to the cluster and returns its data. It also + * replicates all reference tables to the new node. */ Datum master_add_node(PG_FUNCTION_ARGS) @@ -78,9 +80,21 @@ master_add_node(PG_FUNCTION_ARGS) int32 groupId = 0; char *nodeRack = WORKER_DEFAULT_RACK; bool hasMetadata = false; + bool nodeAlreadyExists = false; Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata); + hasMetadata, &nodeAlreadyExists); + + /* + * After adding new node, if the node is not already exist, we replicate all existing + * reference tables to the new node. ReplicateAllReferenceTablesToAllNodes replicates + * reference tables to all nodes however, it skips nodes which already has healthy + * placement of particular reference table. + */ + if (!nodeAlreadyExists) + { + ReplicateAllReferenceTablesToAllNodes(); + } PG_RETURN_CSTRING(returnData); } @@ -137,13 +151,14 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) { ListCell *workerNodeCell = NULL; List *workerNodes = ParseWorkerNodeFileAndRename(); + bool nodeAlreadyExists = false; foreach(workerNodeCell, workerNodes) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, - workerNode->workerRack, false); + workerNode->workerRack, false, &nodeAlreadyExists); } PG_RETURN_BOOL(true); @@ -336,7 +351,7 @@ ReadWorkerNodes() */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, - bool hasMetadata) + bool hasMetadata, bool *nodeAlreadyExists) { Relation pgDistNode = NULL; int nextNodeIdInt = 0; @@ -349,6 +364,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, EnsureSchemaNode(); EnsureSuperUser(); + *nodeAlreadyExists = false; + /* acquire a lock so that no one can do this concurrently */ pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); @@ -362,6 +379,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, /* close the heap */ heap_close(pgDistNode, AccessExclusiveLock); + *nodeAlreadyExists = true; + PG_RETURN_DATUM(returnData); } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 5f80a1bfb..905d7b8d2 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -13,9 +13,12 @@ #include "miscadmin.h" #include "access/heapam.h" +#include "access/htup_details.h" +#include "access/genam.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_protocol.h" +#include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" #include "distributed/reference_table_utils.h" @@ -23,14 +26,16 @@ #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/rel.h" /* local function forward declarations */ static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); - +static List * ReferenceTableOidList(void); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -93,6 +98,66 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) } +/* + * ReplicateAllReferenceTablesToAllNodes function finds all reference tables and + * replicates them to all worker nodes. It also modifies pg_dist_colocation table to + * update the replication factor column. This function skips a worker node if that node + * already has healthy placement of a particular reference table to prevent unnecessary + * data transfer. + */ +void +ReplicateAllReferenceTablesToAllNodes() +{ + List *referenceTableList = ReferenceTableOidList(); + ListCell *referenceTableCell = NULL; + + Relation pgDistNode = NULL; + List *workerNodeList = NIL; + int workerCount = 0; + + Oid firstReferenceTableId = InvalidOid; + uint32 referenceTableColocationId = INVALID_COLOCATION_ID; + + /* if there is no reference table, we do not need to do anything */ + if (list_length(referenceTableList) == 0) + { + return; + } + + /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ + pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + workerNodeList = WorkerNodeList(); + workerCount = list_length(workerNodeList); + + foreach(referenceTableCell, referenceTableList) + { + Oid referenceTableId = lfirst_oid(referenceTableCell); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + char *relationName = get_rel_name(referenceTableId); + + LockShardDistributionMetadata(shardId, ExclusiveLock); + + ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to all workers", + relationName))); + + ReplicateShardToAllWorkers(shardInterval); + } + + /* + * After replicating reference tables, we will update replication factor column for + * colocation group of reference tables so that worker count will be equal to + * replication factor again. + */ + firstReferenceTableId = linitial_oid(referenceTableList); + referenceTableColocationId = TableColocationId(firstReferenceTableId); + UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); + + heap_close(pgDistNode, NoLock); +} + + /* * ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to * all worker nodes. It assumes that caller of this function ensures that given broadcast @@ -176,6 +241,7 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval) ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, nodeName, nodePort, missingWorkerOk); + if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) { SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, @@ -250,3 +316,35 @@ CreateReferenceTableColocationId() return colocationId; } + + +/* + * ReferenceTableOidList function scans pg_dist_partition to create a list of all + * reference tables. To create the list, it performs sequential scan. Since it is not + * expected that this function will be called frequently, it is OK not to use index scan. + * If this function becomes performance bottleneck, it is possible to modify this function + * to perform index scan. + */ +static List * +ReferenceTableOidList() +{ + List *distTableOidList = DistTableOidList(); + ListCell *distTableOidCell = NULL; + + List *referenceTableList = NIL; + + foreach(distTableOidCell, distTableOidList) + { + DistTableCacheEntry *cacheEntry = NULL; + Oid relationId = lfirst_oid(distTableOidCell); + + cacheEntry = DistributedTableCacheEntry(relationId); + + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + { + referenceTableList = lappend_oid(referenceTableList, relationId); + } + } + + return referenceTableList; +} diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 4e3298c05..a6e6c9070 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -85,6 +85,8 @@ extern void DeleteShardRow(uint64 shardId); extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort); +extern void UpdateColocationGroupReplicationFactor(uint32 colocationId, + int replicationFactor); extern void CreateTruncateTrigger(Oid relationId); /* Remaining metadata utility functions */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 4b8753694..38fbb9cb1 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -57,6 +57,7 @@ extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); +extern List * DistTableOidList(void); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId); @@ -68,6 +69,7 @@ extern HTAB * GetWorkerNodeHash(void); /* relation oids */ extern Oid DistColocationRelationId(void); extern Oid DistColocationConfigurationIndexId(void); +extern Oid DistColocationColocationidIndexId(void); extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistShardPlacementRelationId(void); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 0ad98bbd9..cde2664ca 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -13,5 +13,6 @@ #define REFERENCE_TABLE_UTILS_H_ extern uint32 CreateReferenceTableColocationId(void); +extern void ReplicateAllReferenceTablesToAllNodes(void); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out new file mode 100644 index 000000000..82ec296ee --- /dev/null +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -0,0 +1,598 @@ +-- +-- MULTI_REPLICATE_REFERENCE_TABLE +-- +-- Tests that check the metadata returned by the master node. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000; +-- remove a node for testing purposes +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- test adding new node with no reference tables +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (4,4,localhost,57638,default,f) +(1 row) + +-- verify node is added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +-- test adding new node with a reference table which does not have any healthy placement +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +CREATE TABLE replicate_reference_table_unhealthy(column1 int); +SELECT create_reference_table('replicate_reference_table_unhealthy'); + create_reference_table +------------------------ + +(1 row) + +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_unhealthy" to all workers +ERROR: could not find any healthy placement for shard 1370000 +-- verify node is not added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +DROP TABLE replicate_reference_table_unhealthy; +-- test replicating a reference table when a new node added +CREATE TABLE replicate_reference_table_valid(column1 int); +SELECT create_reference_table('replicate_reference_table_valid'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 1 | 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers + master_add_node +--------------------------------- + (6,6,localhost,57638,default,f) +(1 row) + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370001 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 2 | 0 +(1 row) + +-- test add same node twice +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370001 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 2 | 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (6,6,localhost,57638,default,f) +(1 row) + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370001 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370001 | 1 | 2 | 0 +(1 row) + +DROP TABLE replicate_reference_table_valid; +-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_rollback(column1 int); +SELECT create_reference_table('replicate_reference_table_rollback'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370002 | 1 | 1 | 0 +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers + master_add_node +--------------------------------- + (7,7,localhost,57638,default,f) +(1 row) + +ROLLBACK; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370002 | 1 | 1 | 0 +(1 row) + +DROP TABLE replicate_reference_table_rollback; +-- test replicating a reference table when a new node added in TRANSACTION + COMMIT +CREATE TABLE replicate_reference_table_commit(column1 int); +SELECT create_reference_table('replicate_reference_table_commit'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370003 | 1 | 1 | 0 +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers + master_add_node +--------------------------------- + (8,8,localhost,57638,default,f) +(1 row) + +COMMIT; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370003 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370003 | 1 | 2 | 0 +(1 row) + +DROP TABLE replicate_reference_table_commit; +-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_reference_one(column1 int); +SELECT create_reference_table('replicate_reference_table_reference_one'); + create_reference_table +------------------------ + +(1 row) + +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicate_reference_table_hash(column1 int); +SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_reference_two(column1 int); +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370004 | 1 | 1 | 0 +(1 row) + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid | partmethod | colocationid | repmodel +-----------------------------------------+------------+--------------+---------- + replicate_reference_table_reference_one | n | 1370004 | t + replicate_reference_table_hash | h | 1370005 | s +(2 rows) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers + master_add_node +--------------------------------- + (9,9,localhost,57638,default,f) +(1 row) + +SELECT upgrade_to_reference_table('replicate_reference_table_hash'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +SELECT create_reference_table('replicate_reference_table_reference_two'); + create_reference_table +------------------------ + +(1 row) + +COMMIT; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370004 | 1 | 0 | localhost | 57638 + 1370005 | 1 | 0 | localhost | 57638 + 1370006 | 1 | 0 | localhost | 57638 +(3 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370004 | 1 | 2 | 0 +(1 row) + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid | partmethod | colocationid | repmodel +-----------------------------------------+------------+--------------+---------- + replicate_reference_table_reference_one | n | 1370004 | t + replicate_reference_table_hash | n | 1370004 | t + replicate_reference_table_reference_two | n | 1370004 | t +(3 rows) + +DROP TABLE replicate_reference_table_reference_one; +DROP TABLE replicate_reference_table_hash; +DROP TABLE replicate_reference_table_reference_two; +-- test inserting a value then adding a new node in a transaction +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE replicate_reference_table_insert(column1 int); +SELECT create_reference_table('replicate_reference_table_insert'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +INSERT INTO replicate_reference_table_insert VALUES(1); +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_insert" to all workers +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +DROP TABLE replicate_reference_table_insert; +-- test COPY then adding a new node in a transaction +CREATE TABLE replicate_reference_table_copy(column1 int); +SELECT create_reference_table('replicate_reference_table_copy'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +COPY replicate_reference_table_copy FROM STDIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_copy" to all workers +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +DROP TABLE replicate_reference_table_copy; +-- test executing DDL command then adding a new node in a transaction +CREATE TABLE replicate_reference_table_ddl(column1 int); +SELECT create_reference_table('replicate_reference_table_ddl'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +ALTER TABLE replicate_reference_table_ddl ADD column2 int; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_ddl" to all workers +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +DROP TABLE replicate_reference_table_ddl; +-- test DROP table after adding new node in a transaction +CREATE TABLE replicate_reference_table_drop(column1 int); +SELECT create_reference_table('replicate_reference_table_drop'); + create_reference_table +------------------------ + +(1 row) + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers + master_add_node +----------------------------------- + (13,13,localhost,57638,default,f) +(1 row) + +DROP TABLE replicate_reference_table_drop; +ERROR: DROP distributed table cannot run inside a transaction block +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +ROLLBACK; +DROP TABLE replicate_reference_table_drop; +-- test adding a node while there is a reference table at another schema +CREATE SCHEMA replicate_reference_table_schema; +CREATE TABLE replicate_reference_table_schema.table1(column1 int); +SELECT create_reference_table('replicate_reference_table_schema.table1'); + create_reference_table +------------------------ + +(1 row) + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370010 | 1 | 1 | 0 +(1 row) + +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "table1" to all workers + master_add_node +----------------------------------- + (14,14,localhost,57638,default,f) +(1 row) + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370011 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370010 | 1 | 2 | 0 +(1 row) + +DROP TABLE replicate_reference_table_schema.table1; +DROP SCHEMA replicate_reference_table_schema CASCADE; +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 79a6dc0e1..2043cdae8 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -208,5 +208,7 @@ test: multi_foreign_key # ---------- # multi_upgrade_reference_table tests for upgrade_reference_table UDF +# multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes # ---------- test: multi_upgrade_reference_table +test: multi_replicate_reference_table diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql new file mode 100644 index 000000000..6f793cac8 --- /dev/null +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -0,0 +1,387 @@ +-- +-- MULTI_REPLICATE_REFERENCE_TABLE +-- +-- Tests that check the metadata returned by the master node. + + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000; + + +-- remove a node for testing purposes +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); + + +-- test adding new node with no reference tables + +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT master_add_node('localhost', :worker_2_port); + +-- verify node is added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + + +-- test adding new node with a reference table which does not have any healthy placement +SELECT master_remove_node('localhost', :worker_2_port); + +-- verify there is no node with nodeport = :worker_2_port before adding the node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +CREATE TABLE replicate_reference_table_unhealthy(column1 int); +SELECT create_reference_table('replicate_reference_table_unhealthy'); +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000; + +SELECT master_add_node('localhost', :worker_2_port); + +-- verify node is not added +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +-- verify nothing is replicated to the new node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +DROP TABLE replicate_reference_table_unhealthy; + + +-- test replicating a reference table when a new node added +CREATE TABLE replicate_reference_table_valid(column1 int); +SELECT create_reference_table('replicate_reference_table_valid'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + +SELECT master_add_node('localhost', :worker_2_port); + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + + +-- test add same node twice + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + +SELECT master_add_node('localhost', :worker_2_port); + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_valid'::regclass); + +DROP TABLE replicate_reference_table_valid; + + +-- test replicating a reference table when a new node added in TRANSACTION + ROLLBACK +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE replicate_reference_table_rollback(column1 int); +SELECT create_reference_table('replicate_reference_table_rollback'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_rollback'::regclass); + +DROP TABLE replicate_reference_table_rollback; + + +-- test replicating a reference table when a new node added in TRANSACTION + COMMIT +CREATE TABLE replicate_reference_table_commit(column1 int); +SELECT create_reference_table('replicate_reference_table_commit'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +COMMIT; + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_commit'::regclass); + +DROP TABLE replicate_reference_table_commit; + + +-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE replicate_reference_table_reference_one(column1 int); +SELECT create_reference_table('replicate_reference_table_reference_one'); + +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE replicate_reference_table_hash(column1 int); +SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); + +CREATE TABLE replicate_reference_table_reference_two(column1 int); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +SELECT upgrade_to_reference_table('replicate_reference_table_hash'); +SELECT create_reference_table('replicate_reference_table_reference_two'); +COMMIT; + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); + +SELECT + logicalrelid, partmethod, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + +DROP TABLE replicate_reference_table_reference_one; +DROP TABLE replicate_reference_table_hash; +DROP TABLE replicate_reference_table_reference_two; + + +-- test inserting a value then adding a new node in a transaction +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE replicate_reference_table_insert(column1 int); +SELECT create_reference_table('replicate_reference_table_insert'); + +BEGIN; +INSERT INTO replicate_reference_table_insert VALUES(1); +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +DROP TABLE replicate_reference_table_insert; + + +-- test COPY then adding a new node in a transaction +CREATE TABLE replicate_reference_table_copy(column1 int); +SELECT create_reference_table('replicate_reference_table_copy'); + +BEGIN; +COPY replicate_reference_table_copy FROM STDIN; +1 +2 +3 +4 +5 +\. +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +DROP TABLE replicate_reference_table_copy; + + +-- test executing DDL command then adding a new node in a transaction +CREATE TABLE replicate_reference_table_ddl(column1 int); +SELECT create_reference_table('replicate_reference_table_ddl'); + +BEGIN; +ALTER TABLE replicate_reference_table_ddl ADD column2 int; +SELECT master_add_node('localhost', :worker_2_port); +ROLLBACK; + +DROP TABLE replicate_reference_table_ddl; + + +-- test DROP table after adding new node in a transaction +CREATE TABLE replicate_reference_table_drop(column1 int); +SELECT create_reference_table('replicate_reference_table_drop'); + +BEGIN; +SELECT master_add_node('localhost', :worker_2_port); +DROP TABLE replicate_reference_table_drop; +ROLLBACK; + +DROP TABLE replicate_reference_table_drop; + +-- test adding a node while there is a reference table at another schema +CREATE SCHEMA replicate_reference_table_schema; +CREATE TABLE replicate_reference_table_schema.table1(column1 int); +SELECT create_reference_table('replicate_reference_table_schema.table1'); + +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + +SELECT master_add_node('localhost', :worker_2_port); + +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_schema.table1'::regclass); + +DROP TABLE replicate_reference_table_schema.table1; +DROP SCHEMA replicate_reference_table_schema CASCADE; + + +-- reload pg_dist_shard_placement table +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement;