diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 329a7b51f..f9db5e872 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -50,15 +50,26 @@ /* local function forward declarations */ static char LookupShardTransferMode(Oid shardReplicationModeOid); +static void ErrorIfTableCannotBeReplicated(Oid relationId); static void RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); +static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort, + char shardReplicationMode); +static void CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort); static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval, const char *sourceNodeName, int32 sourceNodePort); static void EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); +static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, + int32 sourceNodePort, const char *targetNodeName, + int32 targetNodePort); static List * RecreateTableDDLCommandList(Oid relationId); static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId); @@ -87,32 +98,38 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) int32 targetNodePort = PG_GETARG_INT32(4); bool doRepair = PG_GETARG_BOOL(5); Oid shardReplicationModeOid = PG_GETARG_OID(6); - char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); char *sourceNodeName = text_to_cstring(sourceNodeNameText); char *targetNodeName = text_to_cstring(targetNodeNameText); + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("using logical replication in " + "master_copy_shard_placement() requires Citus " + "Enterprise"))); + } + + ShardInterval *shardInterval = LoadShardInterval(shardId); + ErrorIfTableCannotBeReplicated(shardInterval->relationId); + if (!doRepair) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("master_copy_shard_placement() " - "with do not repair functionality " - "is only supported on Citus Enterprise"))); + ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + shardReplicationMode); } - else if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL) + else { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("using logical replication with repair functionality " - "is currently not supported"))); + /* RepairShardPlacement function repairs only given shard */ + RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, + targetNodePort); } - EnsureCoordinator(); - CheckCitusVersion(ERROR); - - /* RepairShardPlacement function repairs only given shard */ - RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); - PG_RETURN_VOID(); } @@ -171,6 +188,47 @@ BlockWritesToShardList(List *shardList) } +/* + * ErrorIfTableCannotBeReplicated function errors out if the given table is not suitable + * for its shard being replicated. There are 2 cases in which shard replication is not + * allowed: + * + * 1) MX tables, since RF=1 is a must MX tables + * 2) Reference tables, since the shard should already exist in all workers + */ +static void +ErrorIfTableCannotBeReplicated(Oid relationId) +{ + bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId); + + if (shouldSyncMetadata) + { + CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); + char *relationName = get_rel_name(relationId); + StringInfo errorDetailString = makeStringInfo(); + + if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING) + { + appendStringInfo(errorDetailString, + "Table %s is streaming replicated. Shards " + "of streaming replicated tables cannot " + "be copied", relationName); + } + else if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE) + { + appendStringInfo(errorDetailString, "Table %s is a reference table. Shards " + "of reference tables cannot be copied", + relationName); + return; + } + + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot copy shard"), + errdetail("%s", errorDetailString->data))); + } +} + + /* * LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum * values to a char. @@ -217,6 +275,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode char relationKind = get_rel_relkind(distributedTableId); char *tableOwner = TableOwner(shardInterval->relationId); + bool missingOk = false; /* prevent table from being dropped */ @@ -314,13 +373,190 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode /* after successful repair, we update shard state as healthy*/ List *placementList = ShardPlacementList(shardId); - ShardPlacement *placement = ForceSearchShardPlacementInList(placementList, - targetNodeName, - targetNodePort); + ShardPlacement *placement = SearchShardPlacementInList(placementList, targetNodeName, + targetNodePort, + missingOk); UpdateShardPlacementState(placement->placementId, SHARD_STATE_ACTIVE); } +/* + * ReplicateColocatedShardPlacement replicated given shard and its colocated shards + * from a source node to target node. + */ +static void +ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort, char shardReplicationMode) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid distributedTableId = shardInterval->relationId; + + List *colocatedTableList = ColocatedTableList(distributedTableId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + ListCell *colocatedTableCell = NULL; + ListCell *colocatedShardCell = NULL; + + + foreach(colocatedTableCell, colocatedTableList) + { + Oid colocatedTableId = lfirst_oid(colocatedTableCell); + char relationKind = '\0'; + + /* check that user has owner rights in all co-located tables */ + EnsureTableOwner(colocatedTableId); + + relationKind = get_rel_relkind(colocatedTableId); + if (relationKind == RELKIND_FOREIGN_TABLE) + { + char *relationName = get_rel_name(colocatedTableId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot repair shard"), + errdetail("Table %s is a foreign table. Repairing " + "shards backed by foreign tables is " + "not supported.", relationName))); + } + + List *foreignConstraintCommandList = GetTableForeignConstraintCommands( + colocatedTableId); + + if (foreignConstraintCommandList != NIL && + PartitionMethod(colocatedTableId) != DISTRIBUTE_BY_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create foreign key constraint"), + errdetail("This shard has foreign constraints on it. " + "Citus currently supports " + "foreign key constraints only for " + "\"citus.shard_replication_factor = 1\"."), + errhint("Please change \"citus.shard_replication_factor to " + "1\". To learn more about using foreign keys with " + "other replication factors, please contact us at " + "https://citusdata.com/about/contact_us."))); + } + } + + /* + * We sort colocatedShardList so that lock operations will not cause any + * deadlocks. + */ + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + uint64 colocatedShardId = colocatedShard->shardId; + + /* + * For shard copy, there should be healthy placement in source node and no + * placement in the target node. + */ + EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); + } + + BlockWritesToShardList(colocatedShardList); + + /* + * CopyColocatedShardPlacement function copies given shard with its co-located + * shards. + */ + CopyColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); +} + + +/* + * CopyColocatedShardPlacement copies a shard along with its co-located shards from a + * source node to target node. CopyShardPlacement does not make any checks about state + * of the shards. It is caller's responsibility to make those checks if they are + * necessary. + */ +static void +CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + ListCell *colocatedShardCell = NULL; + + /* iterate through the colocated shards and copy each */ + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + bool includeDataCopy = true; + + if (PartitionedTable(colocatedShard->relationId)) + { + /* partitioned tables contain no data */ + includeDataCopy = false; + } + + List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName, + sourceNodePort, includeDataCopy); + char *tableOwner = TableOwner(colocatedShard->relationId); + + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + } + + + /* + * Once all shards are created, we can recreate relationships between shards. + * + * Iterate through the colocated shards and create the foreign constraints and + * attach child tables to their parents in a partitioning hierarchy. + * + * Note: After implementing foreign constraints from distributed to reference + * tables, we have decided to not create foreign constraints from hash + * distributed to reference tables at this stage for nonblocking rebalancer. + * We just create the co-located ones here. We add the foreign constraints + * from hash distributed to reference tables after being completely done with + * the copy procedure inside LogicallyReplicateShards. The reason is that, + * the reference tables have placements in both source and target workers and + * the copied shard would get updated twice because of a cascading DML coming + * from both of the placements. + */ + foreach(colocatedShardCell, colocatedShardList) + { + List *colocatedShardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + char *tableOwner = TableOwner(colocatedShard->relationId); + + CopyShardForeignConstraintCommandListGrouped(colocatedShard, + & + colocatedShardForeignConstraintCommandList, + &referenceTableForeignConstraintList); + + List *commandList = list_concat(colocatedShardForeignConstraintCommandList, + referenceTableForeignConstraintList); + + if (PartitionTable(colocatedShard->relationId)) + { + char *attachPartitionCommand = + GenerateAttachShardPartitionCommand(colocatedShard); + + commandList = lappend(commandList, attachPartitionCommand); + } + + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, + tableOwner, commandList); + } + + /* finally insert the placements to pg_dist_placement */ + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + uint64 colocatedShardId = colocatedShard->shardId; + uint32 groupId = GroupForNode(targetNodeName, targetNodePort); + + InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, + SHARD_STATE_ACTIVE, ShardLength(colocatedShardId), + groupId); + } +} + + /* * CopyPartitionShardsCommandList gets a shardInterval which is a shard that * belongs to partitioned table (this is asserted). @@ -369,19 +605,23 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source const char *targetNodeName, int32 targetNodePort) { List *shardPlacementList = ShardPlacementList(shardId); + bool missingSourceOk = false; + bool missingTargetOk = false; - ShardPlacement *sourcePlacement = ForceSearchShardPlacementInList(shardPlacementList, - sourceNodeName, - sourceNodePort); + ShardPlacement *sourcePlacement = SearchShardPlacementInList(shardPlacementList, + sourceNodeName, + sourceNodePort, + missingSourceOk); if (sourcePlacement->shardState != SHARD_STATE_ACTIVE) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("source placement must be in active state"))); } - ShardPlacement *targetPlacement = ForceSearchShardPlacementInList(shardPlacementList, - targetNodeName, - targetNodePort); + ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, + targetNodeName, + targetNodePort, + missingTargetOk); if (targetPlacement->shardState != SHARD_STATE_INACTIVE) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -391,46 +631,77 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source /* - * SearchShardPlacementInList searches a provided list for a shard placement with the - * specified node name and port. This function returns NULL if no such - * placement exists in the provided list. + * EnsureShardCanBeCopied checks if the given shard has a healthy placement in the source + * node and no placements in the target node. */ -ShardPlacement * -SearchShardPlacementInList(List *shardPlacementList, const char *nodeName, - uint32 nodePort) +static void +EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, + const char *targetNodeName, int32 targetNodePort) { - ShardPlacement *shardPlacement = NULL; - foreach_ptr(shardPlacement, shardPlacementList) + List *shardPlacementList = ShardPlacementList(shardId); + bool missingSourceOk = false; + bool missingTargetOk = true; + + ShardPlacement *sourcePlacement = SearchShardPlacementInList(shardPlacementList, + sourceNodeName, + sourceNodePort, + missingSourceOk); + if (sourcePlacement->shardState != SHARD_STATE_ACTIVE) { - if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 && - nodePort == shardPlacement->nodePort) - { - return shardPlacement; - } + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("source placement must be in active state"))); + } + + ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, + targetNodeName, + targetNodePort, + missingTargetOk); + if (targetPlacement != NULL) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("shard %ld already exist in target placement", shardId))); } - return NULL; } /* - * ForceSearchShardPlacementInList searches a provided list for a shard - * placement with the specified node name and port. This function throws an - * error if no such placement exists in the provided list. + * SearchShardPlacementInList searches a provided list for a shard placement with the + * specified node name and port. If missingOk is set to true, this function returns NULL + * if no such placement exists in the provided list, otherwise it throws an error. */ ShardPlacement * -ForceSearchShardPlacementInList(List *shardPlacementList, const char *nodeName, - uint32 nodePort) +SearchShardPlacementInList(List *shardPlacementList, const char *nodeName, + uint32 nodePort, bool missingOk) { - ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList, nodeName, - nodePort); - if (placement == NULL) + ListCell *shardPlacementCell = NULL; + ShardPlacement *matchingPlacement = NULL; + + foreach(shardPlacementCell, shardPlacementList) { + ShardPlacement *shardPlacement = lfirst(shardPlacementCell); + + if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 && + nodePort == shardPlacement->nodePort) + { + matchingPlacement = shardPlacement; + break; + } + } + + if (matchingPlacement == NULL) + { + if (missingOk) + { + return NULL; + } + ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("could not find placement matching \"%s:%d\"", nodeName, nodePort), errhint("Confirm the placement still exists and try again."))); } - return placement; + + return matchingPlacement; } @@ -456,10 +727,6 @@ CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName, copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, tableRecreationCommandList); - /* - * The caller doesn't want to include the COPY command, perhaps using - * logical replication to copy the data. - */ if (includeDataCopy) { appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index ebf5350dd..8f43d5d3a 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -270,8 +270,10 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData); List *shardPlacementList = ShardPlacementList(shardId); + bool missingWorkerOk = true; ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, - nodeName, nodePort); + nodeName, nodePort, + missingWorkerOk); char *tableOwner = TableOwner(shardInterval->relationId); /* diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index b9f5b2671..8b9978657 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -165,9 +165,7 @@ extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInt List ** referenceTableForeignConstraintList); extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, - const char *nodeName, uint32 nodePort); -extern ShardPlacement * ForceSearchShardPlacementInList(List *shardPlacementList, - const char *nodeName, - uint32 nodePort); + const char *nodeName, uint32 nodePort, + bool missingOk); #endif /* MASTER_PROTOCOL_H */ diff --git a/src/test/regress/expected/master_copy_shard_placement.out b/src/test/regress/expected/master_copy_shard_placement.out new file mode 100644 index 000000000..ff109dbae --- /dev/null +++ b/src/test/regress/expected/master_copy_shard_placement.out @@ -0,0 +1,68 @@ +-- Tests for master_copy_shard_placement, which can be used for adding replicas in statement-based replication +CREATE SCHEMA mcsp; +SET search_path TO mcsp; +SET citus.next_shard_id TO 8139000; +SET citus.shard_replication_factor TO 1; +SET citus.replicatiOn_model TO 'statement'; +CREATE TABLE data ( + key text primary key, + value text not null, + check (value <> '') +); +CREATE INDEX ON data (value); +SELECT create_distributed_table('data','key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE history ( + key text not null, + t timestamptz not null, + value text not null +) PARTITION BY RANGE (t); +CREATE TABLE history_p1 PARTITION OF history FOR VALUES FROM ('2019-01-01') TO ('2020-01-01'); +CREATE TABLE history_p2 PARTITION OF history FOR VALUES FROM ('2020-01-01') TO ('2021-01-01'); +SELECT create_distributed_table('history','key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO data VALUES ('key-1', 'value-1'); +INSERT INTO data VALUES ('key-2', 'value-2'); +INSERT INTO history VALUES ('key-1', '2020-02-01', 'old'); +INSERT INTO history VALUES ('key-1', '2019-10-01', 'older'); +-- replicate shard that contains key-1 +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_1_port, + do_repair := false); + master_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- forcefully mark the old replica as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = get_shard_id_for_distribution_column('data', 'key-1') AND nodeport = :worker_2_port; +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nodeport = :worker_2_port; +-- should still have all data available thanks to new replica +SELECT count(*) FROM data; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM history; + count +--------------------------------------------------------------------- + 2 +(1 row) + +DROP SCHEMA mcsp CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table data +drop cascades to table history diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 87c519b9d..78098d721 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -200,7 +200,7 @@ test: multi_complex_count_distinct multi_select_distinct test: multi_modifications test: multi_distribution_metadata test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list multi_repair_shards -test: multi_upsert multi_simple_queries multi_data_types +test: multi_upsert multi_simple_queries multi_data_types master_copy_shard_placement # multi_utilities cannot be run in parallel with other tests because it checks # global locks test: multi_utilities diff --git a/src/test/regress/sql/master_copy_shard_placement.sql b/src/test/regress/sql/master_copy_shard_placement.sql new file mode 100644 index 000000000..233642f05 --- /dev/null +++ b/src/test/regress/sql/master_copy_shard_placement.sql @@ -0,0 +1,49 @@ +-- Tests for master_copy_shard_placement, which can be used for adding replicas in statement-based replication +CREATE SCHEMA mcsp; +SET search_path TO mcsp; +SET citus.next_shard_id TO 8139000; +SET citus.shard_replication_factor TO 1; +SET citus.replicatiOn_model TO 'statement'; + +CREATE TABLE data ( + key text primary key, + value text not null, + check (value <> '') +); +CREATE INDEX ON data (value); +SELECT create_distributed_table('data','key'); + +CREATE TABLE history ( + key text not null, + t timestamptz not null, + value text not null +) PARTITION BY RANGE (t); +CREATE TABLE history_p1 PARTITION OF history FOR VALUES FROM ('2019-01-01') TO ('2020-01-01'); +CREATE TABLE history_p2 PARTITION OF history FOR VALUES FROM ('2020-01-01') TO ('2021-01-01'); +SELECT create_distributed_table('history','key'); + +INSERT INTO data VALUES ('key-1', 'value-1'); +INSERT INTO data VALUES ('key-2', 'value-2'); + +INSERT INTO history VALUES ('key-1', '2020-02-01', 'old'); +INSERT INTO history VALUES ('key-1', '2019-10-01', 'older'); + +-- replicate shard that contains key-1 +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_1_port, + do_repair := false); + +-- forcefully mark the old replica as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = get_shard_id_for_distribution_column('data', 'key-1') AND nodeport = :worker_2_port; + +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nodeport = :worker_2_port; + +-- should still have all data available thanks to new replica +SELECT count(*) FROM data; +SELECT count(*) FROM history; + +DROP SCHEMA mcsp CASCADE;