diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 0d2c9e745..e02a900f4 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -39,17 +39,15 @@ /* local function forward declarations */ +static void RepairShardPlacement(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort); static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); -static void EnsureShardCanBeMoved(int64 shardId, char *sourceNodeName, - int32 sourceNodePort); static ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 nodePort, bool missingOk); -static void CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort, - bool doRepair); static List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, int32 sourceNodePort); static List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval); @@ -79,87 +77,13 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) int32 sourceNodePort = PG_GETARG_INT32(2); text *targetNodeNameText = PG_GETARG_TEXT_P(3); int32 targetNodePort = PG_GETARG_INT32(4); - bool doRepair = true; char *sourceNodeName = text_to_cstring(sourceNodeNameText); char *targetNodeName = text_to_cstring(targetNodeNameText); - ShardInterval *shardInterval = LoadShardInterval(shardId); - Oid distributedTableId = shardInterval->relationId; - List *colocatedTableList = ColocatedTableList(distributedTableId); - ListCell *colocatedTableCell = NULL; - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - ListCell *colocatedShardCell = NULL; - foreach(colocatedTableCell, colocatedTableList) - { - Oid colocatedTableId = lfirst_oid(colocatedTableCell); - char relationKind = '\0'; - - /* check that user has owner rights in all co-located tables */ - EnsureTableOwner(colocatedTableId); - - relationKind = get_rel_relkind(colocatedTableId); - if (relationKind == RELKIND_FOREIGN_TABLE) - { - char *relationName = get_rel_name(colocatedTableId); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot repair shard"), - errdetail("Table %s is a foreign table. Repairing " - "shards backed by foreign tables is " - "not supported.", relationName))); - } - } - - /* we sort colocatedShardList so that lock operations will not cause any deadlocks */ - colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); - foreach(colocatedShardCell, colocatedShardList) - { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); - uint64 colocatedShardId = colocatedShard->shardId; - - /* - * We've stopped data modifications of this shard, but we plan to move - * a placement to the healthy state, so we need to grab a shard metadata - * lock (in exclusive mode) as well. - */ - LockShardDistributionMetadata(colocatedShardId, ExclusiveLock); - - /* - * If our aim is repairing, we should be sure that there is an unhealthy - * placement in target node. We use EnsureShardCanBeRepaired function - * to be sure that there is an unhealthy placement in target node. If - * we just want to copy the shard without any repair, it is enough to use - * EnsureShardCanBeCopied which just checks there is a placement in source - * and no placement in target node. - */ - if (doRepair) - { - /* - * After #810 is fixed, we should remove this check and call EnsureShardCanBeRepaired - * for all shard ids - */ - if (colocatedShardId == shardId) - { - EnsureShardCanBeRepaired(colocatedShardId, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); - } - else - { - EnsureShardCanBeMoved(colocatedShardId, sourceNodeName, sourceNodePort); - } - } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("master_copy_shard_placement() without repair option " - "is only supported on Citus Enterprise"))); - } - } - - - /* CopyShardPlacement function copies given shard with its co-located shards */ - CopyShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, doRepair); + /* RepairShardPlacement function repairs only given shard */ + RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, + targetNodePort); PG_RETURN_VOID(); } @@ -178,6 +102,66 @@ master_move_shard_placement(PG_FUNCTION_ARGS) } +/* + * RepairShardPlacement repairs given shard from a source node to target node. + * This function is not co-location aware. It only repairs given shard. + */ +static void +RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid distributedTableId = shardInterval->relationId; + + char relationKind = get_rel_relkind(distributedTableId); + char *tableOwner = TableOwner(shardInterval->relationId); + bool missingOk = false; + + List *ddlCommandList = NIL; + List *foreignConstraintCommandList = NIL; + List *placementList = NIL; + ShardPlacement *placement = NULL; + + EnsureTableOwner(distributedTableId); + + if (relationKind == RELKIND_FOREIGN_TABLE) + { + char *relationName = get_rel_name(distributedTableId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot repair shard"), + errdetail("Table %s is a foreign table. Repairing " + "shards backed by foreign tables is " + "not supported.", relationName))); + } + + /* + * We plan to move the placement to the healthy state, so we need to grab a shard + * metadata lock (in exclusive mode). + */ + LockShardDistributionMetadata(shardId, ExclusiveLock); + + /* + * For shard repair, there should be healthy placement in source node and unhealthy + * placement in the target node. + */ + EnsureShardCanBeRepaired(shardId, sourceNodeName, sourceNodePort, targetNodeName, + targetNodePort); + + /* we generate necessary commands to recreate the shard in target node */ + ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort); + foreignConstraintCommandList = CopyShardForeignConstraintCommandList(shardInterval); + ddlCommandList = list_concat(ddlCommandList, foreignConstraintCommandList); + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, + ddlCommandList); + + /* after successful repair, we update shard state as healthy*/ + placementList = ShardPlacementList(shardId); + placement = SearchShardPlacementInList(placementList, targetNodeName, targetNodePort, + missingOk); + UpdateShardPlacementState(placement->placementId, FILE_FINALIZED); +} + + /* * EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source * node and inactive node on the target node. @@ -210,24 +194,6 @@ EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePo } -/* - * EnsureShardCanBeMoved checks if the given shard has a placement in the source node but - * not on the target node. It is important to note that SearchShardPlacementInList - * function already generates error if given shard does not have a placement in the - * source node. Therefore we do not perform extra check. - */ -static void -EnsureShardCanBeMoved(int64 shardId, char *sourceNodeName, int32 sourceNodePort) -{ - List *shardPlacementList = ShardPlacementList(shardId); - bool missingSourceOk = false; - - /* Actual check is done in SearchShardPlacementInList */ - SearchShardPlacementInList(shardPlacementList, sourceNodeName, sourceNodePort, - missingSourceOk); -} - - /* * SearchShardPlacementInList searches a provided list for a shard placement with the * specified node name and port. If missingOk is set to true, this function returns NULL @@ -269,74 +235,6 @@ SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 node } -/* - * CopyShardPlacement copies a shard along with its co-located shards from a source node - * to target node. CopyShardPlacement does not make any checks about state of the shards. - * It is caller's responsibility to make those checks if they are necessary. - */ -static void -CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort, bool doRepair) -{ - ShardInterval *shardInterval = LoadShardInterval(shardId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - ListCell *colocatedShardCell = NULL; - - foreach(colocatedShardCell, colocatedShardList) - { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); - List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(colocatedShard->relationId); - - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - } - - foreach(colocatedShardCell, colocatedShardList) - { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); - List *foreignConstraintCommandList = CopyShardForeignConstraintCommandList( - colocatedShard); - char *tableOwner = TableOwner(colocatedShard->relationId); - - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, - tableOwner, - foreignConstraintCommandList); - } - - foreach(colocatedShardCell, colocatedShardList) - { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); - uint64 colocatedShardId = colocatedShard->shardId; - - /* - * If we call this function for repair purposes, the caller should have - * removed the old shard placement metadata. - */ - if (doRepair) - { - List *shardPlacementList = ShardPlacementList(colocatedShardId); - bool missingSourceOk = false; - - ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList, - targetNodeName, - targetNodePort, - missingSourceOk); - - UpdateShardPlacementState(placement->placementId, FILE_FINALIZED); - } - else - { - InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, - FILE_FINALIZED, ShardLength(colocatedShardId), - targetNodeName, - targetNodePort); - } - } -} - - /* * CopyShardCommandList generates command list to copy the given shard placement * from the source node to the target node. diff --git a/src/test/regress/expected/multi_colocated_shard_transfer.out b/src/test/regress/expected/multi_colocated_shard_transfer.out index 0cda8e584..a1193b4fd 100644 --- a/src/test/regress/expected/multi_colocated_shard_transfer.out +++ b/src/test/regress/expected/multi_colocated_shard_transfer.out @@ -63,7 +63,7 @@ ORDER BY s.shardid; 1300002 | table1_group1 | 57637 | 1000 | 1 1300003 | table1_group1 | 57637 | 1000 | 1 1300003 | table1_group1 | 57638 | 1000 | 1 - 1300004 | table2_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 3 1300004 | table2_group1 | 57637 | 1000 | 1 1300005 | table2_group1 | 57637 | 1000 | 1 1300005 | table2_group1 | 57638 | 1000 | 1 @@ -187,7 +187,7 @@ ORDER BY s.shardid; 1300002 | table1_group1 | 57637 | 1000 | 1 1300003 | table1_group1 | 57637 | 1000 | 1 1300003 | table1_group1 | 57638 | 1000 | 1 - 1300004 | table2_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 3 1300004 | table2_group1 | 57637 | 1000 | 1 1300005 | table2_group1 | 57637 | 1000 | 1 1300005 | table2_group1 | 57638 | 1000 | 1 @@ -219,7 +219,7 @@ ORDER BY s.shardid; 1300002 | table1_group1 | 57637 | 1000 | 1 1300003 | table1_group1 | 57637 | 1000 | 1 1300003 | table1_group1 | 57638 | 1000 | 1 - 1300004 | table2_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 3 1300004 | table2_group1 | 57637 | 1000 | 1 1300005 | table2_group1 | 57637 | 1000 | 1 1300005 | table2_group1 | 57638 | 1000 | 1