diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index de0c20d4e..70dba5e2a 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -53,9 +53,9 @@ static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, char shardReplicationMode); -static void CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, - int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort); +static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort); static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval, const char *sourceNodeName, int32 sourceNodePort); @@ -67,6 +67,8 @@ static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 targetNodePort); static List * RecreateTableDDLCommandList(Oid relationId); static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId); +static void EnsureTableListOwner(List *tableIdList); +static void EnsureTableListSuitableForReplication(List *tableIdList); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_copy_shard_placement); @@ -383,57 +385,21 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, List *colocatedTableList = ColocatedTableList(distributedTableId); List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - Oid colocatedTableId = InvalidOid; - ListCell *colocatedShardCell = NULL; - foreach_oid(colocatedTableId, colocatedTableList) - { - char relationKind = '\0'; - - /* check that user has owner rights in all co-located tables */ - EnsureTableOwner(colocatedTableId); - - relationKind = get_rel_relkind(colocatedTableId); - if (relationKind == RELKIND_FOREIGN_TABLE) - { - char *relationName = get_rel_name(colocatedTableId); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot replicate shard"), - errdetail("Table %s is a foreign table. Replicating " - "shards backed by foreign tables is " - "not supported.", relationName))); - } - - List *foreignConstraintCommandList = GetTableForeignConstraintCommands( - colocatedTableId); - - if (foreignConstraintCommandList != NIL && - PartitionMethod(colocatedTableId) != DISTRIBUTE_BY_NONE) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot create foreign key constraint"), - errdetail("This shard has foreign constraints on it. " - "Citus currently supports " - "foreign key constraints only for " - "\"citus.shard_replication_factor = 1\"."), - errhint("Please change \"citus.shard_replication_factor to " - "1\". To learn more about using foreign keys with " - "other replication factors, please contact us at " - "https://citusdata.com/about/contact_us."))); - } - } + EnsureTableListOwner(colocatedTableList); + EnsureTableListSuitableForReplication(colocatedTableList); /* - * We sort colocatedShardList so that lock operations will not cause any + * We sort shardIntervalList so that lock operations will not cause any * deadlocks. */ colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); BlockWritesToShardList(colocatedShardList); - foreach(colocatedShardCell, colocatedShardList) + ShardInterval *colocatedShard = NULL; + foreach_ptr(colocatedShard, colocatedShardList) { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); uint64 colocatedShardId = colocatedShard->shardId; /* @@ -457,12 +423,76 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, EnsureReferenceTablesExistOnAllNodes(); } - /* - * CopyColocatedShardPlacement function copies given shard with its co-located - * shards. - */ - CopyColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); + + /* finally insert the placements to pg_dist_placement */ + foreach_ptr(colocatedShard, colocatedShardList) + { + uint64 colocatedShardId = colocatedShard->shardId; + uint32 groupId = GroupForNode(targetNodeName, targetNodePort); + + InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, + SHARD_STATE_ACTIVE, ShardLength(colocatedShardId), + groupId); + } +} + + +/* + * EnsureTableListOwner ensures current user owns given tables. Superusers + * are regarded as owners. + */ +static void +EnsureTableListOwner(List *tableIdList) +{ + Oid tableId = InvalidOid; + foreach_oid(tableId, tableIdList) + { + EnsureTableOwner(tableId); + } +} + + +/* + * EnsureTableListSuitableForReplication errors out if given tables are not + * suitable for replication. + */ +static void +EnsureTableListSuitableForReplication(List *tableIdList) +{ + Oid tableId = InvalidOid; + foreach_oid(tableId, tableIdList) + { + char relationKind = get_rel_relkind(tableId); + if (relationKind == RELKIND_FOREIGN_TABLE) + { + char *relationName = get_rel_name(tableId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate shard"), + errdetail("Table %s is a foreign table. Replicating " + "shards backed by foreign tables is " + "not supported.", relationName))); + } + + List *foreignConstraintCommandList = + GetTableForeignConstraintCommands(tableId); + + if (foreignConstraintCommandList != NIL && + PartitionMethod(tableId) != DISTRIBUTE_BY_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create foreign key constraint"), + errdetail("This shard has foreign constraints on it. " + "Citus currently supports " + "foreign key constraints only for " + "\"citus.shard_replication_factor = 1\"."), + errhint("Please change \"citus.shard_replication_factor to " + "1\". To learn more about using foreign keys with " + "other replication factors, please contact us at " + "https://citusdata.com/about/contact_us."))); + } + } } @@ -473,28 +503,25 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, * necessary. */ static void -CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort) +CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort) { - ShardInterval *shardInterval = LoadShardInterval(shardId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - ListCell *colocatedShardCell = NULL; + ShardInterval *shardInterval = NULL; /* iterate through the colocated shards and copy each */ - foreach(colocatedShardCell, colocatedShardList) + foreach_ptr(shardInterval, shardIntervalList) { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); bool includeDataCopy = true; - if (PartitionedTable(colocatedShard->relationId)) + if (PartitionedTable(shardInterval->relationId)) { /* partitioned tables contain no data */ includeDataCopy = false; } - List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName, + List *ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort, includeDataCopy); - char *tableOwner = TableOwner(colocatedShard->relationId); + char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); @@ -517,25 +544,23 @@ CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNod * the copied shard would get updated twice because of a cascading DML coming * from both of the placements. */ - foreach(colocatedShardCell, colocatedShardList) + foreach_ptr(shardInterval, shardIntervalList) { - List *colocatedShardForeignConstraintCommandList = NIL; + List *shardForeignConstraintCommandList = NIL; List *referenceTableForeignConstraintList = NIL; - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); - char *tableOwner = TableOwner(colocatedShard->relationId); + char *tableOwner = TableOwner(shardInterval->relationId); - CopyShardForeignConstraintCommandListGrouped(colocatedShard, - & - colocatedShardForeignConstraintCommandList, + CopyShardForeignConstraintCommandListGrouped(shardInterval, + &shardForeignConstraintCommandList, &referenceTableForeignConstraintList); - List *commandList = list_concat(colocatedShardForeignConstraintCommandList, + List *commandList = list_concat(shardForeignConstraintCommandList, referenceTableForeignConstraintList); - if (PartitionTable(colocatedShard->relationId)) + if (PartitionTable(shardInterval->relationId)) { char *attachPartitionCommand = - GenerateAttachShardPartitionCommand(colocatedShard); + GenerateAttachShardPartitionCommand(shardInterval); commandList = lappend(commandList, attachPartitionCommand); } @@ -543,18 +568,6 @@ CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNod SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, commandList); } - - /* finally insert the placements to pg_dist_placement */ - foreach(colocatedShardCell, colocatedShardList) - { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); - uint64 colocatedShardId = colocatedShard->shardId; - uint32 groupId = GroupForNode(targetNodeName, targetNodePort); - - InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, - SHARD_STATE_ACTIVE, ShardLength(colocatedShardId), - groupId); - } }