diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 329a7b51f..a9275924e 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -50,15 +50,26 @@ /* local function forward declarations */ static char LookupShardTransferMode(Oid shardReplicationModeOid); +static void ErrorIfTableCannotBeReplicated(Oid relationId); static void RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); +static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort, + char shardReplicationMode); +static void CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort); static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval, const char *sourceNodeName, int32 sourceNodePort); static void EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); +static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, + int32 sourceNodePort, const char *targetNodeName, + int32 targetNodePort); static List * RecreateTableDDLCommandList(Oid relationId); static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId); @@ -87,31 +98,36 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) int32 targetNodePort = PG_GETARG_INT32(4); bool doRepair = PG_GETARG_BOOL(5); Oid shardReplicationModeOid = PG_GETARG_OID(6); - char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); char *sourceNodeName = text_to_cstring(sourceNodeNameText); char *targetNodeName = text_to_cstring(targetNodeNameText); - if (!doRepair) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("master_copy_shard_placement() " - "with do not repair functionality " - "is only supported on Citus Enterprise"))); - } - else if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("using logical replication with repair functionality " - "is currently not supported"))); - } - - EnsureCoordinator(); CheckCitusVersion(ERROR); + EnsureCoordinator(); - /* RepairShardPlacement function repairs only given shard */ - RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("using logical replication in " + "master_copy_shard_placement() requires Citus " + "Enterprise"))); + } + + ShardInterval *shardInterval = LoadShardInterval(shardId); + ErrorIfTableCannotBeReplicated(shardInterval->relationId); + + if (doRepair) + { + RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, + targetNodePort); + } + else + { + ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + shardReplicationMode); + } PG_RETURN_VOID(); } @@ -171,6 +187,44 @@ BlockWritesToShardList(List *shardList) } +/* + * ErrorIfTableCannotBeReplicated function errors out if the given table is not suitable + * for its shard being replicated. Shard replications is not allowed only for MX tables, + * since RF=1 is a must MX tables. + */ +static void +ErrorIfTableCannotBeReplicated(Oid relationId) +{ + /* + * Note that ShouldSyncTableMetadata() returns true for both MX tables + * and reference tables. + */ + bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId); + if (!shouldSyncMetadata) + { + return; + } + + CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); + char *relationName = get_rel_name(relationId); + + /* + * ShouldSyncTableMetadata() returns true also for reference table, + * we don't want to error in that case since reference tables aren't + * automatically replicated to active nodes with no shards, and + * master_copy_shard_placement() can be used to create placements in + * such nodes. + */ + if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + (errmsg("Table %s is streaming replicated. Shards " + "of streaming replicated tables cannot " + "be copied", quote_literal_cstr(relationName))))); + } +} + + /* * LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum * values to a char. @@ -217,6 +271,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode char relationKind = get_rel_relkind(distributedTableId); char *tableOwner = TableOwner(shardInterval->relationId); + bool missingOk = false; /* prevent table from being dropped */ @@ -314,13 +369,189 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode /* after successful repair, we update shard state as healthy*/ List *placementList = ShardPlacementList(shardId); - ShardPlacement *placement = ForceSearchShardPlacementInList(placementList, - targetNodeName, - targetNodePort); + ShardPlacement *placement = SearchShardPlacementInList(placementList, targetNodeName, + targetNodePort, + missingOk); UpdateShardPlacementState(placement->placementId, SHARD_STATE_ACTIVE); } +/* + * ReplicateColocatedShardPlacement replicates the given shard and its + * colocated shards from a source node to target node. + */ +static void +ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort, char shardReplicationMode) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid distributedTableId = shardInterval->relationId; + + List *colocatedTableList = ColocatedTableList(distributedTableId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + Oid colocatedTableId = InvalidOid; + ListCell *colocatedShardCell = NULL; + + foreach_oid(colocatedTableId, colocatedTableList) + { + char relationKind = '\0'; + + /* check that user has owner rights in all co-located tables */ + EnsureTableOwner(colocatedTableId); + + relationKind = get_rel_relkind(colocatedTableId); + if (relationKind == RELKIND_FOREIGN_TABLE) + { + char *relationName = get_rel_name(colocatedTableId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate shard"), + errdetail("Table %s is a foreign table. Replicating " + "shards backed by foreign tables is " + "not supported.", relationName))); + } + + List *foreignConstraintCommandList = GetTableForeignConstraintCommands( + colocatedTableId); + + if (foreignConstraintCommandList != NIL && + PartitionMethod(colocatedTableId) != DISTRIBUTE_BY_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create foreign key constraint"), + errdetail("This shard has foreign constraints on it. " + "Citus currently supports " + "foreign key constraints only for " + "\"citus.shard_replication_factor = 1\"."), + errhint("Please change \"citus.shard_replication_factor to " + "1\". To learn more about using foreign keys with " + "other replication factors, please contact us at " + "https://citusdata.com/about/contact_us."))); + } + } + + /* + * We sort colocatedShardList so that lock operations will not cause any + * deadlocks. + */ + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + + BlockWritesToShardList(colocatedShardList); + + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + uint64 colocatedShardId = colocatedShard->shardId; + + /* + * For shard copy, there should be healthy placement in source node and no + * placement in the target node. + */ + EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); + } + + /* + * CopyColocatedShardPlacement function copies given shard with its co-located + * shards. + */ + CopyColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); +} + + +/* + * CopyColocatedShardPlacement copies a shard along with its co-located shards from a + * source node to target node. CopyShardPlacement does not make any checks about state + * of the shards. It is caller's responsibility to make those checks if they are + * necessary. + */ +static void +CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + ListCell *colocatedShardCell = NULL; + + /* iterate through the colocated shards and copy each */ + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + bool includeDataCopy = true; + + if (PartitionedTable(colocatedShard->relationId)) + { + /* partitioned tables contain no data */ + includeDataCopy = false; + } + + List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName, + sourceNodePort, includeDataCopy); + char *tableOwner = TableOwner(colocatedShard->relationId); + + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + } + + + /* + * Once all shards are created, we can recreate relationships between shards. + * + * Iterate through the colocated shards and create the foreign constraints and + * attach child tables to their parents in a partitioning hierarchy. + * + * Note: After implementing foreign constraints from distributed to reference + * tables, we have decided to not create foreign constraints from hash + * distributed to reference tables at this stage for nonblocking rebalancer. + * We just create the co-located ones here. We add the foreign constraints + * from hash distributed to reference tables after being completely done with + * the copy procedure inside LogicallyReplicateShards. The reason is that, + * the reference tables have placements in both source and target workers and + * the copied shard would get updated twice because of a cascading DML coming + * from both of the placements. + */ + foreach(colocatedShardCell, colocatedShardList) + { + List *colocatedShardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + char *tableOwner = TableOwner(colocatedShard->relationId); + + CopyShardForeignConstraintCommandListGrouped(colocatedShard, + & + colocatedShardForeignConstraintCommandList, + &referenceTableForeignConstraintList); + + List *commandList = list_concat(colocatedShardForeignConstraintCommandList, + referenceTableForeignConstraintList); + + if (PartitionTable(colocatedShard->relationId)) + { + char *attachPartitionCommand = + GenerateAttachShardPartitionCommand(colocatedShard); + + commandList = lappend(commandList, attachPartitionCommand); + } + + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, + tableOwner, commandList); + } + + /* finally insert the placements to pg_dist_placement */ + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + uint64 colocatedShardId = colocatedShard->shardId; + uint32 groupId = GroupForNode(targetNodeName, targetNodePort); + + InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, + SHARD_STATE_ACTIVE, ShardLength(colocatedShardId), + groupId); + } +} + + /* * CopyPartitionShardsCommandList gets a shardInterval which is a shard that * belongs to partitioned table (this is asserted). @@ -369,19 +600,23 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source const char *targetNodeName, int32 targetNodePort) { List *shardPlacementList = ShardPlacementList(shardId); + bool missingSourceOk = false; + bool missingTargetOk = false; - ShardPlacement *sourcePlacement = ForceSearchShardPlacementInList(shardPlacementList, - sourceNodeName, - sourceNodePort); + ShardPlacement *sourcePlacement = SearchShardPlacementInList(shardPlacementList, + sourceNodeName, + sourceNodePort, + missingSourceOk); if (sourcePlacement->shardState != SHARD_STATE_ACTIVE) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("source placement must be in active state"))); } - ShardPlacement *targetPlacement = ForceSearchShardPlacementInList(shardPlacementList, - targetNodeName, - targetNodePort); + ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, + targetNodeName, + targetNodePort, + missingTargetOk); if (targetPlacement->shardState != SHARD_STATE_INACTIVE) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -391,46 +626,78 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source /* - * SearchShardPlacementInList searches a provided list for a shard placement with the - * specified node name and port. This function returns NULL if no such - * placement exists in the provided list. + * EnsureShardCanBeCopied checks if the given shard has a healthy placement in the source + * node and no placements in the target node. */ -ShardPlacement * -SearchShardPlacementInList(List *shardPlacementList, const char *nodeName, - uint32 nodePort) +static void +EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, + const char *targetNodeName, int32 targetNodePort) { - ShardPlacement *shardPlacement = NULL; - foreach_ptr(shardPlacement, shardPlacementList) + List *shardPlacementList = ShardPlacementList(shardId); + bool missingSourceOk = false; + bool missingTargetOk = true; + + ShardPlacement *sourcePlacement = SearchShardPlacementInList(shardPlacementList, + sourceNodeName, + sourceNodePort, + missingSourceOk); + if (sourcePlacement->shardState != SHARD_STATE_ACTIVE) { - if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 && - nodePort == shardPlacement->nodePort) - { - return shardPlacement; - } + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("source placement must be in active state"))); + } + + ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, + targetNodeName, + targetNodePort, + missingTargetOk); + if (targetPlacement != NULL) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("shard " INT64_FORMAT " already exists in the target node", + shardId))); } - return NULL; } /* - * ForceSearchShardPlacementInList searches a provided list for a shard - * placement with the specified node name and port. This function throws an - * error if no such placement exists in the provided list. + * SearchShardPlacementInList searches a provided list for a shard placement with the + * specified node name and port. If missingOk is set to true, this function returns NULL + * if no such placement exists in the provided list, otherwise it throws an error. */ ShardPlacement * -ForceSearchShardPlacementInList(List *shardPlacementList, const char *nodeName, - uint32 nodePort) +SearchShardPlacementInList(List *shardPlacementList, const char *nodeName, + uint32 nodePort, bool missingOk) { - ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList, nodeName, - nodePort); - if (placement == NULL) + ListCell *shardPlacementCell = NULL; + ShardPlacement *matchingPlacement = NULL; + + foreach(shardPlacementCell, shardPlacementList) { + ShardPlacement *shardPlacement = lfirst(shardPlacementCell); + + if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 && + nodePort == shardPlacement->nodePort) + { + matchingPlacement = shardPlacement; + break; + } + } + + if (matchingPlacement == NULL) + { + if (missingOk) + { + return NULL; + } + ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("could not find placement matching \"%s:%d\"", nodeName, nodePort), errhint("Confirm the placement still exists and try again."))); } - return placement; + + return matchingPlacement; } @@ -456,10 +723,6 @@ CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName, copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, tableRecreationCommandList); - /* - * The caller doesn't want to include the COPY command, perhaps using - * logical replication to copy the data. - */ if (includeDataCopy) { appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index ebf5350dd..8f43d5d3a 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -270,8 +270,10 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData); List *shardPlacementList = ShardPlacementList(shardId); + bool missingWorkerOk = true; ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, - nodeName, nodePort); + nodeName, nodePort, + missingWorkerOk); char *tableOwner = TableOwner(shardInterval->relationId); /* diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index b9f5b2671..8b9978657 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -165,9 +165,7 @@ extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInt List ** referenceTableForeignConstraintList); extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, - const char *nodeName, uint32 nodePort); -extern ShardPlacement * ForceSearchShardPlacementInList(List *shardPlacementList, - const char *nodeName, - uint32 nodePort); + const char *nodeName, uint32 nodePort, + bool missingOk); #endif /* MASTER_PROTOCOL_H */ diff --git a/src/test/regress/expected/isolation_copy_placement_vs_modification.out b/src/test/regress/expected/isolation_copy_placement_vs_modification.out index 24abf82b1..803311cbc 100644 --- a/src/test/regress/expected/isolation_copy_placement_vs_modification.out +++ b/src/test/regress/expected/isolation_copy_placement_vs_modification.out @@ -1,53 +1,56 @@ Parsed test spec with 2 sessions starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 1 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-update: - UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5; step s2-commit: - COMMIT; + COMMIT; step s1-update: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -55,53 +58,56 @@ nodeport success result 57638 t 5 starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 1 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-delete: - DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + DELETE FROM test_repair_placement_vs_modification WHERE x = 5; step s2-commit: - COMMIT; + COMMIT; step s1-delete: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -109,50 +115,53 @@ nodeport success result 57638 t starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s2-commit: - COMMIT; + COMMIT; step s1-insert: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -160,50 +169,53 @@ nodeport success result 57638 t 10 starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-copy: - COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; + COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; step s2-commit: - COMMIT; + COMMIT; step s1-copy: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -211,48 +223,51 @@ nodeport success result 57638 t 5 starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count +create_distributed_table + + step s1-load-cache: - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-ddl: - CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x); step s2-commit: - COMMIT; + COMMIT; step s1-ddl: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-index-count: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; nodeport success result @@ -262,50 +277,53 @@ nodeport success result 57638 t 1 starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 1 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-update: - UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5; step s2-commit: - COMMIT; + COMMIT; step s1-update: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -313,50 +331,53 @@ nodeport success result 57638 t 5 starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 1 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-delete: - DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + DELETE FROM test_repair_placement_vs_modification WHERE x = 5; step s2-commit: - COMMIT; + COMMIT; step s1-delete: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -364,47 +385,50 @@ nodeport success result 57638 t starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-insert: - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); step s2-commit: - COMMIT; + COMMIT; step s1-insert: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -412,47 +436,50 @@ nodeport success result 57638 t 10 starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content +create_distributed_table + + step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-copy: - COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; + COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; step s2-commit: - COMMIT; + COMMIT; step s1-copy: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-content: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') - WHERE - shardid IN (SELECT * FROM selected_shard) - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') + WHERE + shardid IN (SELECT * FROM selected_shard) + ORDER BY + nodeport; nodeport success result @@ -460,45 +487,48 @@ nodeport success result 57638 t 5 starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count +create_distributed_table + + step s1-begin: BEGIN; - SET LOCAL citus.select_opens_transaction_block TO off; + SET LOCAL citus.select_opens_transaction_block TO off; step s1-select: - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; count 0 step s2-set-placement-inactive: - UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard) AND nodeport = 57638; step s2-begin: - BEGIN; + BEGIN; step s2-repair-placement: - SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); master_copy_shard_placement step s1-ddl: - CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x); step s2-commit: - COMMIT; + COMMIT; step s1-ddl: <... completed> step s1-commit: - COMMIT; + COMMIT; step s2-print-index-count: - SELECT - nodeport, success, result - FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') - ORDER BY - nodeport; + SELECT + nodeport, success, result + FROM + run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + ORDER BY + nodeport; nodeport success result @@ -506,3 +536,367 @@ nodeport success result 57637 t 1 57638 t 1 57638 t 1 + +starting permutation: s1-begin s2-begin s2-copy-placement s1-update-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-update-copy-table: + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + +step s2-commit: + COMMIT; + +step s1-update-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-delete-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-delete-copy-table: + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + +step s2-commit: + COMMIT; + +step s1-delete-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-insert-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-insert-copy-table: + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + +step s2-commit: + COMMIT; + +step s1-insert-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-copy-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-copy-copy-table: + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; + +step s2-commit: + COMMIT; + +step s1-copy-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-ddl-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-ddl-copy-table: + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + +step s2-commit: + COMMIT; + +step s1-ddl-copy-table: <... completed> +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s2-copy-placement s1-select-copy-table s2-commit s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-select-copy-table: + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + +count + +0 +step s2-commit: + COMMIT; + +step s1-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-update-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-update-copy-table: + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-delete-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-delete-copy-table: + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-insert-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-insert-copy-table: + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-copy-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-copy-copy-table: + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-ddl-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-ddl-copy-table: + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +step s1-commit: + COMMIT; + +step s2-copy-placement: <... completed> +master_copy_shard_placement + + +step s2-commit: + COMMIT; + + +starting permutation: s1-begin s2-begin s1-select-copy-table s2-copy-placement s1-commit s2-commit +create_distributed_table + + +step s1-begin: + BEGIN; + SET LOCAL citus.select_opens_transaction_block TO off; + +step s2-begin: + BEGIN; + +step s1-select-copy-table: + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + +count + +0 +step s2-copy-placement: + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); + +master_copy_shard_placement + + +step s1-commit: + COMMIT; + +step s2-commit: + COMMIT; + diff --git a/src/test/regress/expected/master_copy_shard_placement.out b/src/test/regress/expected/master_copy_shard_placement.out new file mode 100644 index 000000000..b39be6d82 --- /dev/null +++ b/src/test/regress/expected/master_copy_shard_placement.out @@ -0,0 +1,123 @@ +-- Tests for master_copy_shard_placement, which can be used for adding replicas in statement-based replication +CREATE SCHEMA mcsp; +SET search_path TO mcsp; +SET citus.next_shard_id TO 8139000; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'statement'; +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE data ( + key text primary key, + value text not null, + check (value <> '') +); +CREATE INDEX ON data (value); +SELECT create_distributed_table('data','key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE history ( + key text not null, + t timestamptz not null, + value text not null +) PARTITION BY RANGE (t); +CREATE TABLE history_p1 PARTITION OF history FOR VALUES FROM ('2019-01-01') TO ('2020-01-01'); +CREATE TABLE history_p2 PARTITION OF history FOR VALUES FROM ('2020-01-01') TO ('2021-01-01'); +SELECT create_distributed_table('history','key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO data VALUES ('key-1', 'value-1'); +INSERT INTO data VALUES ('key-2', 'value-2'); +INSERT INTO history VALUES ('key-1', '2020-02-01', 'old'); +INSERT INTO history VALUES ('key-1', '2019-10-01', 'older'); +-- verify we error out if no healthy placement exists at source +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); +ERROR: could not find placement matching "localhost:xxxxx" +HINT: Confirm the placement still exists and try again. +-- verify we error out if source and destination are the same +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_2_port, + do_repair := false); +ERROR: shard xxxxx already exists in the target node +-- verify we error out if target already contains a healthy placement +SELECT master_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); +ERROR: shard xxxxx already exists in the target node +-- replicate shard that contains key-1 +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_1_port, + do_repair := false); + master_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- forcefully mark the old replica as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = get_shard_id_for_distribution_column('data', 'key-1') AND nodeport = :worker_2_port; +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nodeport = :worker_2_port; +-- should still have all data available thanks to new replica +SELECT count(*) FROM data; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM history; + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- test we can not replicate MX tables +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE mx_table(a int); +SELECT create_distributed_table('mx_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('mx_table', '1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); +ERROR: Table 'mx_table' is streaming replicated. Shards of streaming replicated tables cannot be copied +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO ERROR; +DROP SCHEMA mcsp CASCADE; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 87c519b9d..78098d721 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -200,7 +200,7 @@ test: multi_complex_count_distinct multi_select_distinct test: multi_modifications test: multi_distribution_metadata test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list multi_repair_shards -test: multi_upsert multi_simple_queries multi_data_types +test: multi_upsert multi_simple_queries multi_data_types master_copy_shard_placement # multi_utilities cannot be run in parallel with other tests because it checks # global locks test: multi_utilities diff --git a/src/test/regress/spec/isolation_copy_placement_vs_modification.spec b/src/test/regress/spec/isolation_copy_placement_vs_modification.spec index 7efc53240..fc3f5eb8b 100644 --- a/src/test/regress/spec/isolation_copy_placement_vs_modification.spec +++ b/src/test/regress/spec/isolation_copy_placement_vs_modification.spec @@ -4,16 +4,21 @@ setup { SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 2; + CREATE TABLE test_repair_placement_vs_modification (x int, y int); + SELECT create_distributed_table('test_repair_placement_vs_modification', 'x'); + + SELECT get_shard_id_for_distribution_column('test_repair_placement_vs_modification', 5) INTO selected_shard; + + SET citus.shard_replication_factor TO 1; CREATE TABLE test_copy_placement_vs_modification (x int, y int); SELECT create_distributed_table('test_copy_placement_vs_modification', 'x'); - - SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5) INTO selected_shard; } teardown { - DROP TABLE test_copy_placement_vs_modification; + DROP TABLE test_repair_placement_vs_modification; DROP TABLE selected_shard; + DROP TABLE test_copy_placement_vs_modification; } session "s1" @@ -24,39 +29,69 @@ step "s1-begin" SET LOCAL citus.select_opens_transaction_block TO off; } -// since test_copy_placement_vs_modification has rep > 1 simple select query doesn't hit all placements +// since test_repair_placement_vs_modification has rep > 1 simple select query doesn't hit all placements // hence not all placements are cached step "s1-load-cache" { - TRUNCATE test_copy_placement_vs_modification; + TRUNCATE test_repair_placement_vs_modification; } step "s1-insert" { - INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); + INSERT INTO test_repair_placement_vs_modification VALUES (5, 10); } step "s1-update" { - UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; + UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5; } step "s1-delete" { - DELETE FROM test_copy_placement_vs_modification WHERE x = 5; + DELETE FROM test_repair_placement_vs_modification WHERE x = 5; } step "s1-select" { - SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; + SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5; } step "s1-ddl" { - CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); + CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x); } step "s1-copy" +{ + COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; +} + +step "s1-insert-copy-table" +{ + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); +} + +step "s1-update-copy-table" +{ + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; +} + +step "s1-delete-copy-table" +{ + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; +} + +step "s1-select-copy-table" +{ + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; +} + +step "s1-ddl-copy-table" +{ + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); +} + +step "s1-copy-copy-table" { COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV; } @@ -83,6 +118,13 @@ step "s2-repair-placement" SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); } +step "s2-copy-placement" +{ + SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)), + 'localhost', 57637, 'localhost', 57638, + do_repair := false, transfer_mode := 'block_writes'); +} + step "s2-commit" { COMMIT; @@ -93,7 +135,7 @@ step "s2-print-content" SELECT nodeport, success, result FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') + run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -105,7 +147,7 @@ step "s2-print-index-count" SELECT nodeport, success, result FROM - run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') + run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''') ORDER BY nodeport; } @@ -126,3 +168,19 @@ permutation "s1-insert" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-b permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-insert" "s2-commit" "s1-commit" "s2-print-content" permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-copy" "s2-commit" "s1-commit" "s2-print-content" permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-ddl" "s2-commit" "s1-commit" "s2-print-index-count" + +// verify that copy placement (do_repair := false) blocks other operations, except SELECT +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-update-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-delete-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-insert-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-copy-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-ddl-copy-table" "s2-commit" "s1-commit" +permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-select-copy-table" "s2-commit" "s1-commit" + +// verify that copy placement (do_repair := false) is blocked by other operations, except SELECT +permutation "s1-begin" "s2-begin" "s1-update-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-delete-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-insert-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-copy-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-ddl-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s1-select-copy-table" "s2-copy-placement" "s1-commit" "s2-commit" diff --git a/src/test/regress/sql/master_copy_shard_placement.sql b/src/test/regress/sql/master_copy_shard_placement.sql new file mode 100644 index 000000000..dee374809 --- /dev/null +++ b/src/test/regress/sql/master_copy_shard_placement.sql @@ -0,0 +1,91 @@ +-- Tests for master_copy_shard_placement, which can be used for adding replicas in statement-based replication +CREATE SCHEMA mcsp; +SET search_path TO mcsp; +SET citus.next_shard_id TO 8139000; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'statement'; + +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); + +CREATE TABLE data ( + key text primary key, + value text not null, + check (value <> '') +); +CREATE INDEX ON data (value); +SELECT create_distributed_table('data','key'); + +CREATE TABLE history ( + key text not null, + t timestamptz not null, + value text not null +) PARTITION BY RANGE (t); +CREATE TABLE history_p1 PARTITION OF history FOR VALUES FROM ('2019-01-01') TO ('2020-01-01'); +CREATE TABLE history_p2 PARTITION OF history FOR VALUES FROM ('2020-01-01') TO ('2021-01-01'); +SELECT create_distributed_table('history','key'); + +INSERT INTO data VALUES ('key-1', 'value-1'); +INSERT INTO data VALUES ('key-2', 'value-2'); + +INSERT INTO history VALUES ('key-1', '2020-02-01', 'old'); +INSERT INTO history VALUES ('key-1', '2019-10-01', 'older'); + +-- verify we error out if no healthy placement exists at source +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); + +-- verify we error out if source and destination are the same +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_2_port, + do_repair := false); + +-- verify we error out if target already contains a healthy placement +SELECT master_copy_shard_placement( + (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); + +-- replicate shard that contains key-1 +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_1_port, + do_repair := false); + +-- forcefully mark the old replica as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = get_shard_id_for_distribution_column('data', 'key-1') AND nodeport = :worker_2_port; + +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nodeport = :worker_2_port; + +-- should still have all data available thanks to new replica +SELECT count(*) FROM data; +SELECT count(*) FROM history; + +-- test we can not replicate MX tables +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +CREATE TABLE mx_table(a int); +SELECT create_distributed_table('mx_table', 'a'); + +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('mx_table', '1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); + +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + +SET client_min_messages TO ERROR; +DROP SCHEMA mcsp CASCADE;