diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 47e9a99df..671346f36 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -327,9 +327,12 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) /* * ReplicateShardToNode function replicates given shard to the given worker node - * in a separate transaction. While replicating, it only replicates the shard to the - * workers which does not have a healthy replica of the shard. This function also modifies - * metadata by inserting/updating related rows in pg_dist_placement. + * in a separate transaction. If the worker already has + * a replica of the shard this is a no-op. This function also modifies metadata + * by inserting/updating related rows in pg_dist_placement. + * + * IMPORTANT: This should only be used to replicate shards of a reference + * table. */ static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) @@ -349,54 +352,39 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) nodeName, nodePort); char *tableOwner = TableOwner(shardInterval->relationId); - /* - * Although this function is used for reference tables, and reference table shard - * placements always have shardState = SHARD_STATE_ACTIVE, in case of an upgrade - * of a non-reference table to reference table, unhealty placements may exist. - * In this case, repair the shard placement and update its state in pg_dist_placement. - */ - if (targetPlacement == NULL || targetPlacement->shardState != SHARD_STATE_ACTIVE) + if (targetPlacement != NULL) { - uint64 placementId = 0; - int32 groupId = 0; - - ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d", - get_rel_name(shardInterval->relationId), nodeName, - nodePort))); - - EnsureNoModificationsHaveBeenDone(); - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, - ddlCommandList); - if (targetPlacement == NULL) + if (targetPlacement->shardState == SHARD_STATE_ACTIVE) { - groupId = GroupForNode(nodeName, nodePort); - - placementId = GetNextPlacementId(); - InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0, - groupId); - } - else - { - groupId = targetPlacement->groupId; - placementId = targetPlacement->placementId; - UpdateShardPlacementState(placementId, SHARD_STATE_ACTIVE); + /* We already have the shard, nothing to do */ + return; } + ereport(ERROR, (errmsg( + "Placement for reference table \"%s\" on node %s:%d is not active. This should not be possible, please report this as a bug", + get_rel_name(shardInterval->relationId), nodeName, + nodePort))); + } - /* - * Although ReplicateShardToAllNodes is used only for reference tables, - * during the upgrade phase, the placements are created before the table is - * marked as a reference table. All metadata (including the placement - * metadata) will be copied to workers after all reference table changed - * are finished. - */ - if (ShouldSyncTableMetadata(shardInterval->relationId)) - { - char *placementCommand = PlacementUpsertCommand(shardId, placementId, - SHARD_STATE_ACTIVE, 0, - groupId); + ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d", + get_rel_name(shardInterval->relationId), nodeName, + nodePort))); - SendCommandToWorkersWithMetadata(placementCommand); - } + EnsureNoModificationsHaveBeenDone(); + SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, + ddlCommandList); + int32 groupId = GroupForNode(nodeName, nodePort); + + uint64 placementId = GetNextPlacementId(); + InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0, + groupId); + + if (ShouldSyncTableMetadata(shardInterval->relationId)) + { + char *placementCommand = PlacementUpsertCommand(shardId, placementId, + SHARD_STATE_ACTIVE, 0, + groupId); + + SendCommandToWorkersWithMetadata(placementCommand); } }