From b46b9a68aed71ec0e30332ef695502628e7f2d00 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 16 Mar 2020 16:24:44 -0700 Subject: [PATCH] Tests for master_copy_shard_placement --- .../distributed/master/master_repair_shards.c | 84 +- ...olation_copy_placement_vs_modification.out | 724 ++++++++++++++---- .../expected/master_copy_shard_placement.out | 63 +- ...lation_copy_placement_vs_modification.spec | 82 +- .../sql/master_copy_shard_placement.sql | 44 +- 5 files changed, 771 insertions(+), 226 deletions(-) diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index f9db5e872..a9275924e 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -117,18 +117,17 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) ShardInterval *shardInterval = LoadShardInterval(shardId); ErrorIfTableCannotBeReplicated(shardInterval->relationId); - if (!doRepair) + if (doRepair) + { + RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, + targetNodePort); + } + else { ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, shardReplicationMode); } - else - { - /* RepairShardPlacement function repairs only given shard */ - RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); - } PG_RETURN_VOID(); } @@ -190,41 +189,38 @@ BlockWritesToShardList(List *shardList) /* * ErrorIfTableCannotBeReplicated function errors out if the given table is not suitable - * for its shard being replicated. There are 2 cases in which shard replication is not - * allowed: - * - * 1) MX tables, since RF=1 is a must MX tables - * 2) Reference tables, since the shard should already exist in all workers + * for its shard being replicated. Shard replications is not allowed only for MX tables, + * since RF=1 is a must MX tables. */ static void ErrorIfTableCannotBeReplicated(Oid relationId) { + /* + * Note that ShouldSyncTableMetadata() returns true for both MX tables + * and reference tables. + */ bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId); - - if (shouldSyncMetadata) + if (!shouldSyncMetadata) { - CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); - char *relationName = get_rel_name(relationId); - StringInfo errorDetailString = makeStringInfo(); + return; + } - if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING) - { - appendStringInfo(errorDetailString, - "Table %s is streaming replicated. Shards " - "of streaming replicated tables cannot " - "be copied", relationName); - } - else if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE) - { - appendStringInfo(errorDetailString, "Table %s is a reference table. Shards " - "of reference tables cannot be copied", - relationName); - return; - } + CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); + char *relationName = get_rel_name(relationId); + /* + * ShouldSyncTableMetadata() returns true also for reference table, + * we don't want to error in that case since reference tables aren't + * automatically replicated to active nodes with no shards, and + * master_copy_shard_placement() can be used to create placements in + * such nodes. + */ + if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING) + { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot copy shard"), - errdetail("%s", errorDetailString->data))); + (errmsg("Table %s is streaming replicated. Shards " + "of streaming replicated tables cannot " + "be copied", quote_literal_cstr(relationName))))); } } @@ -381,8 +377,8 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode /* - * ReplicateColocatedShardPlacement replicated given shard and its colocated shards - * from a source node to target node. + * ReplicateColocatedShardPlacement replicates the given shard and its + * colocated shards from a source node to target node. */ static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, @@ -394,13 +390,11 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, List *colocatedTableList = ColocatedTableList(distributedTableId); List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - ListCell *colocatedTableCell = NULL; + Oid colocatedTableId = InvalidOid; ListCell *colocatedShardCell = NULL; - - foreach(colocatedTableCell, colocatedTableList) + foreach_oid(colocatedTableId, colocatedTableList) { - Oid colocatedTableId = lfirst_oid(colocatedTableCell); char relationKind = '\0'; /* check that user has owner rights in all co-located tables */ @@ -411,8 +405,8 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, { char *relationName = get_rel_name(colocatedTableId); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot repair shard"), - errdetail("Table %s is a foreign table. Repairing " + errmsg("cannot replicate shard"), + errdetail("Table %s is a foreign table. Replicating " "shards backed by foreign tables is " "not supported.", relationName))); } @@ -441,6 +435,9 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, * deadlocks. */ colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + + BlockWritesToShardList(colocatedShardList); + foreach(colocatedShardCell, colocatedShardList) { ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); @@ -454,8 +451,6 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, targetNodeName, targetNodePort); } - BlockWritesToShardList(colocatedShardList); - /* * CopyColocatedShardPlacement function copies given shard with its co-located * shards. @@ -659,7 +654,8 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo if (targetPlacement != NULL) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("shard %ld already exist in target placement", shardId))); + errmsg("shard " INT64_FORMAT " already exists in the target node", + shardId))); } } diff --git a/src/test/regress/expected/isolation_copy_placement_vs_modification.out b/src/test/regress/expected/isolation_copy_placement_vs_modification.out index 24abf82b1..803311cbc 100644 --- a/src/test/regress/expected/isolation_copy_placement_vs_modification.out +++ b/src/test/regress/expected/isolation_copy_placement_vs_modification.out @@ -1,53 +1,56 @@ Parsed test spec with 2 sessions starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 1 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-update: - UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5; step s2-commit: - COMMIT; + COMMIT; step s1-update: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -55,53 +58,56 @@ nodeport success result 57638 t 5 starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 1 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-delete: - DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + DELETE FROM test_repair_placement_vs_modification WHERE x = 5; step s2-commit: - COMMIT; + COMMIT; step s1-delete: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -109,50 +115,53 @@ nodeport success result 57638 t starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s2-commit: - COMMIT; + COMMIT; step s1-insert: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -160,50 +169,53 @@ nodeport success result 57638 t 10 starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-copy: - COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; + COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; step s2-commit: - COMMIT; + COMMIT; step s1-copy: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -211,48 +223,51 @@ nodeport success result 57638 t 5 starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-ddl: - CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x); step s2-commit: - COMMIT; + COMMIT; step s1-ddl: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-index-count: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; nodeport success result @@ -262,50 +277,53 @@ nodeport success result 57638 t 1 starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 1 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-update: - UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5; step s2-commit: - COMMIT; + COMMIT; step s1-update: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -313,50 +331,53 @@ nodeport success result 57638 t 5 starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 1 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-delete: - DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + DELETE FROM test_repair_placement_vs_modification WHERE x = 5; step s2-commit: - COMMIT; + COMMIT; step s1-delete: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -364,47 +385,50 @@ nodeport success result 57638 t starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s2-commit: - COMMIT; + COMMIT; step s1-insert: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -412,47 +436,50 @@ nodeport success result 57638 t 10 starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-copy: - COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; + COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; step s2-commit: - COMMIT; + COMMIT; step s1-copy: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -460,45 +487,48 @@ nodeport success result 57638 t 5 starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count +create_distributed_table + + step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-ddl: - CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x); step s2-commit: - COMMIT; + COMMIT; step s1-ddl: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-index-count: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; nodeport success result @@ -506,3 +536,367 @@ nodeport success result 57637 t 1 57638 t 1 57638 t 1 + +starting permutation: s1-begin s2-begin s2-copy-placement s1-update-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-update-copy-table: + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + +step s2-commit: + COMMIT; + +step s1-update-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-delete-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-delete-copy-table: + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + +step s2-commit: + COMMIT; + +step s1-delete-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-insert-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-insert-copy-table: + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + +step s2-commit: + COMMIT; + +step s1-insert-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-copy-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-copy-copy-table: + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; + +step s2-commit: + COMMIT; + +step s1-copy-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-ddl-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-ddl-copy-table: + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + +step s2-commit: + COMMIT; + +step s1-ddl-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-select-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-select-copy-table: + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + +count + +0 +step s2-commit: + COMMIT; + +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-update-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-update-copy-table: + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-delete-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-delete-copy-table: + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-insert-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-insert-copy-table: + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-copy-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-copy-copy-table: + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-ddl-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-ddl-copy-table: + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-select-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-select-copy-table: + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + +count + +0 +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-commit: + COMMIT; + +step s2-commit: + COMMIT; + diff --git a/src/test/regress/expected/master_copy_shard_placement.out b/src/test/regress/expected/master_copy_shard_placement.out index ff109dbae..b39be6d82 100644 --- a/src/test/regress/expected/master_copy_shard_placement.out +++ b/src/test/regress/expected/master_copy_shard_placement.out @@ -3,7 +3,14 @@ CREATE SCHEMA mcsp; SET search_path TO mcsp; SET citus.next_shard_id TO 8139000; SET citus.shard_replication_factor TO 1; -SET citus.replicatiOn_model TO 'statement'; +SET citus.replication_model TO 'statement'; +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + CREATE TABLE data ( key text primary key, value text not null, @@ -33,6 +40,28 @@ INSERT INTO data VALUES ('key-1', 'value-1'); INSERT INTO data VALUES ('key-2', 'value-2'); INSERT INTO history VALUES ('key-1', '2020-02-01', 'old'); INSERT INTO history VALUES ('key-1', '2019-10-01', 'older'); +-- verify we error out if no healthy placement exists at source +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); +ERROR: could not find placement matching "localhost:xxxxx" +HINT: Confirm the placement still exists and try again. +-- verify we error out if source and destination are the same +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_2_port, + do_repair := false); +ERROR: shard xxxxx already exists in the target node +-- verify we error out if target already contains a healthy placement +SELECT master_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); +ERROR: shard xxxxx already exists in the target node -- replicate shard that contains key-1 SELECT master_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), @@ -62,7 +91,33 @@ SELECT count(*) FROM history; 2 (1 row) +-- test we can not replicate MX tables +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE mx_table(a int); +SELECT create_distributed_table('mx_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('mx_table', '1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); +ERROR: Table 'mx_table' is streaming replicated. Shards of streaming replicated tables cannot be copied +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO ERROR; DROP SCHEMA mcsp CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to table data -drop cascades to table history diff --git a/src/test/regress/spec/isolation_copy_placement_vs_modification.spec b/src/test/regress/spec/isolation_copy_placement_vs_modification.spec index 7efc53240..fc3f5eb8b 100644 --- a/src/test/regress/spec/isolation_copy_placement_vs_modification.spec +++ b/src/test/regress/spec/isolation_copy_placement_vs_modification.spec @@ -4,16 +4,21 @@ setup { SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 2; + CREATE TABLE test_repair_placement_vs_modification (x int, y int); + SELECT create_distributed_table('test_repair_placement_vs_modification', 'x'); + + SELECT get_shard_id_for_distribution_column('test_repair_placement_vs_modification', 5) INTO selected_shard; + + SET citus.shard_replication_factor TO 1; CREATE TABLE test_copy_placement_vs_modification (x int, y int); SELECT create_distributed_table('test_copy_placement_vs_modification', 'x'); - - SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5) INTO selected_shard; } teardown { - DROP TABLE test_copy_placement_vs_modification; + DROP TABLE test_repair_placement_vs_modification; DROP TABLE selected_shard; + DROP TABLE test_copy_placement_vs_modification; } session "s1" @@ -24,39 +29,69 @@ step "s1-begin" SET LOCAL citus.select_opens_transaction_block TO off; } -// since test_copy_placement_vs_modification has rep > 1 simple select query doesn't hit all placements +// since test_repair_placement_vs_modification has rep > 1 simple select query doesn't hit all placements // hence not all placements are cached step "s1-load-cache" { - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; } step "s1-insert" { - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); } step "s1-update" { - UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5; } step "s1-delete" { - DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + DELETE FROM test_repair_placement_vs_modification WHERE x = 5; } step "s1-select" { - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; } step "s1-ddl" { - CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x); } step "s1-copy" +{ + COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; +} + +step "s1-insert-copy-table" +{ + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); +} + +step "s1-update-copy-table" +{ + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; +} + +step "s1-delete-copy-table" +{ + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; +} + +step "s1-select-copy-table" +{ + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; +} + +step "s1-ddl-copy-table" +{ + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); +} + +step "s1-copy-copy-table" { COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; } @@ -83,6 +118,13 @@ step "s2-repair-placement" SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); } +step "s2-copy-placement" +{ + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); +} + step "s2-commit" { COMMIT; @@ -93,7 +135,7 @@ step "s2-print-content" SELECT nodeport, success, result FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -105,7 +147,7 @@ step "s2-print-index-count" SELECT nodeport, success, result FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') ORDER BY nodeport; } @@ -126,3 +168,19 @@ permutation "s1-insert" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-b permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-insert" "s2-commit" "s1-commit" "s2-print-content" permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-copy" "s2-commit" "s1-commit" "s2-print-content" permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-ddl" "s2-commit" "s1-commit" "s2-print-index-count" + +// verify that copy placement (do_repair := false) blocks other operations, except SELECT +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-update-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-delete-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-insert-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-copy-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-ddl-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-select-copy-table" "s2-commit" "s1-commit" + +// verify that copy placement (do_repair := false) is blocked by other operations, except SELECT +permutation "s1-begin" "s2-begin" "s1-update-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-delete-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-insert-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-copy-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-ddl-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-select-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" diff --git a/src/test/regress/sql/master_copy_shard_placement.sql b/src/test/regress/sql/master_copy_shard_placement.sql index 233642f05..dee374809 100644 --- a/src/test/regress/sql/master_copy_shard_placement.sql +++ b/src/test/regress/sql/master_copy_shard_placement.sql @@ -3,7 +3,10 @@ CREATE SCHEMA mcsp; SET search_path TO mcsp; SET citus.next_shard_id TO 8139000; SET citus.shard_replication_factor TO 1; -SET citus.replicatiOn_model TO 'statement'; +SET citus.replication_model TO 'statement'; + +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); CREATE TABLE data ( key text primary key, @@ -28,6 +31,27 @@ INSERT INTO data VALUES ('key-2', 'value-2'); INSERT INTO history VALUES ('key-1', '2020-02-01', 'old'); INSERT INTO history VALUES ('key-1', '2019-10-01', 'older'); +-- verify we error out if no healthy placement exists at source +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); + +-- verify we error out if source and destination are the same +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_2_port, + do_repair := false); + +-- verify we error out if target already contains a healthy placement +SELECT master_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); + -- replicate shard that contains key-1 SELECT master_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), @@ -46,4 +70,22 @@ WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nod SELECT count(*) FROM data; SELECT count(*) FROM history; +-- test we can not replicate MX tables +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +CREATE TABLE mx_table(a int); +SELECT create_distributed_table('mx_table', 'a'); + +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('mx_table', '1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); + +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + +SET client_min_messages TO ERROR; DROP SCHEMA mcsp CASCADE;