mirror of https://github.com/citusdata/citus.git
Harden ReplicateShardToNode to unexpected placements (#5071)
Originally ReplicateShardToNode was meant for `upgrade_to_reference_table`, which required handling of existing inactive placements. These days `upgrade_to_reference_table` is deprecated and cannot be used anymore. Now that we have SHARD_STATE_TO_DELETE too, this left over code seemed error prone. So this removes support for activating inactive reference table placemements, since these should not be possible. If it finds a non active reference table placement anyway it now errors out. This also removes a few outdated comments related to `upgrade_to_refeference_table`.pull/5073/head^2
parent
d1d386a904
commit
f4a2d99ce9
|
@ -327,9 +327,12 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
|
|||
|
||||
/*
|
||||
* ReplicateShardToNode function replicates given shard to the given worker node
|
||||
* in a separate transaction. While replicating, it only replicates the shard to the
|
||||
* workers which does not have a healthy replica of the shard. This function also modifies
|
||||
* metadata by inserting/updating related rows in pg_dist_placement.
|
||||
* in a separate transaction. If the worker already has
|
||||
* a replica of the shard this is a no-op. This function also modifies metadata
|
||||
* by inserting/updating related rows in pg_dist_placement.
|
||||
*
|
||||
* IMPORTANT: This should only be used to replicate shards of a reference
|
||||
* table.
|
||||
*/
|
||||
static void
|
||||
ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
|
||||
|
@ -349,54 +352,39 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
|
|||
nodeName, nodePort);
|
||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||
|
||||
/*
|
||||
* Although this function is used for reference tables, and reference table shard
|
||||
* placements always have shardState = SHARD_STATE_ACTIVE, in case of an upgrade
|
||||
* of a non-reference table to reference table, unhealty placements may exist.
|
||||
* In this case, repair the shard placement and update its state in pg_dist_placement.
|
||||
*/
|
||||
if (targetPlacement == NULL || targetPlacement->shardState != SHARD_STATE_ACTIVE)
|
||||
if (targetPlacement != NULL)
|
||||
{
|
||||
uint64 placementId = 0;
|
||||
int32 groupId = 0;
|
||||
|
||||
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d",
|
||||
get_rel_name(shardInterval->relationId), nodeName,
|
||||
nodePort)));
|
||||
|
||||
EnsureNoModificationsHaveBeenDone();
|
||||
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
|
||||
ddlCommandList);
|
||||
if (targetPlacement == NULL)
|
||||
if (targetPlacement->shardState == SHARD_STATE_ACTIVE)
|
||||
{
|
||||
groupId = GroupForNode(nodeName, nodePort);
|
||||
|
||||
placementId = GetNextPlacementId();
|
||||
InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0,
|
||||
groupId);
|
||||
}
|
||||
else
|
||||
{
|
||||
groupId = targetPlacement->groupId;
|
||||
placementId = targetPlacement->placementId;
|
||||
UpdateShardPlacementState(placementId, SHARD_STATE_ACTIVE);
|
||||
/* We already have the shard, nothing to do */
|
||||
return;
|
||||
}
|
||||
ereport(ERROR, (errmsg(
|
||||
"Placement for reference table \"%s\" on node %s:%d is not active. This should not be possible, please report this as a bug",
|
||||
get_rel_name(shardInterval->relationId), nodeName,
|
||||
nodePort)));
|
||||
}
|
||||
|
||||
/*
|
||||
* Although ReplicateShardToAllNodes is used only for reference tables,
|
||||
* during the upgrade phase, the placements are created before the table is
|
||||
* marked as a reference table. All metadata (including the placement
|
||||
* metadata) will be copied to workers after all reference table changed
|
||||
* are finished.
|
||||
*/
|
||||
if (ShouldSyncTableMetadata(shardInterval->relationId))
|
||||
{
|
||||
char *placementCommand = PlacementUpsertCommand(shardId, placementId,
|
||||
SHARD_STATE_ACTIVE, 0,
|
||||
groupId);
|
||||
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d",
|
||||
get_rel_name(shardInterval->relationId), nodeName,
|
||||
nodePort)));
|
||||
|
||||
SendCommandToWorkersWithMetadata(placementCommand);
|
||||
}
|
||||
EnsureNoModificationsHaveBeenDone();
|
||||
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
|
||||
ddlCommandList);
|
||||
int32 groupId = GroupForNode(nodeName, nodePort);
|
||||
|
||||
uint64 placementId = GetNextPlacementId();
|
||||
InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0,
|
||||
groupId);
|
||||
|
||||
if (ShouldSyncTableMetadata(shardInterval->relationId))
|
||||
{
|
||||
char *placementCommand = PlacementUpsertCommand(shardId, placementId,
|
||||
SHARD_STATE_ACTIVE, 0,
|
||||
groupId);
|
||||
|
||||
SendCommandToWorkersWithMetadata(placementCommand);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue