mirror of https://github.com/citusdata/citus.git
Not include to-be-deleted shards while finding shard placements
Ignore orphaned shards in more places Only use active shard placements in RouterInsertTaskList Use IncludingOrphanedPlacements in some more places Fix comment Add testspull/4994/head
parent
802225940e
commit
e7ed16c296
|
@ -2139,7 +2139,7 @@ ShardIntervalListHasLocalPlacements(List *shardIntervalList)
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
foreach_ptr(shardInterval, shardIntervalList)
|
||||||
{
|
{
|
||||||
if (FindShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL)
|
if (ActiveShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -377,7 +377,7 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
|
||||||
/*
|
/*
|
||||||
* EnsureConnectionPossibilityForNode reserves a shared connection
|
* EnsureConnectionPossibilityForNode reserves a shared connection
|
||||||
* counter per node in the nodeList unless:
|
* counter per node in the nodeList unless:
|
||||||
* - Reservation is possible/allowed (see IsReservationPossible())
|
* - Reservation is not possible/allowed (see IsReservationPossible())
|
||||||
* - there is at least one connection to the node so that we are guranteed
|
* - there is at least one connection to the node so that we are guranteed
|
||||||
* to get a connection
|
* to get a connection
|
||||||
* - An earlier call already reserved a connection (e.g., we allow only a
|
* - An earlier call already reserved a connection (e.g., we allow only a
|
||||||
|
|
|
@ -128,8 +128,8 @@ BuildPlacementAccessList(int32 groupId, List *relationShardList,
|
||||||
RelationShard *relationShard = NULL;
|
RelationShard *relationShard = NULL;
|
||||||
foreach_ptr(relationShard, relationShardList)
|
foreach_ptr(relationShard, relationShardList)
|
||||||
{
|
{
|
||||||
ShardPlacement *placement = FindShardPlacementOnGroup(groupId,
|
ShardPlacement *placement = ActiveShardPlacementOnGroup(groupId,
|
||||||
relationShard->shardId);
|
relationShard->shardId);
|
||||||
if (placement == NULL)
|
if (placement == NULL)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -594,12 +594,14 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindShardPlacementOnGroup returns the shard placement for the given shard
|
* ShardPlacementOnGroupIncludingOrphanedPlacements returns the shard placement
|
||||||
* on the given group, or returns NULL if no placement for the shard exists
|
* for the given shard on the given group, or returns NULL if no placement for
|
||||||
* on the group.
|
* the shard exists on the group.
|
||||||
|
*
|
||||||
|
* NOTE: This can return inactive or orphaned placements.
|
||||||
*/
|
*/
|
||||||
ShardPlacement *
|
ShardPlacement *
|
||||||
FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
|
ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId)
|
||||||
{
|
{
|
||||||
ShardPlacement *placementOnNode = NULL;
|
ShardPlacement *placementOnNode = NULL;
|
||||||
|
|
||||||
|
@ -614,7 +616,6 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
|
||||||
for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
|
for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
|
||||||
{
|
{
|
||||||
GroupShardPlacement *placement = &placementArray[placementIndex];
|
GroupShardPlacement *placement = &placementArray[placementIndex];
|
||||||
|
|
||||||
if (placement->groupId == groupId)
|
if (placement->groupId == groupId)
|
||||||
{
|
{
|
||||||
placementOnNode = ResolveGroupShardPlacement(placement, tableEntry,
|
placementOnNode = ResolveGroupShardPlacement(placement, tableEntry,
|
||||||
|
@ -627,6 +628,28 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ActiveShardPlacementOnGroup returns the active shard placement for the
|
||||||
|
* given shard on the given group, or returns NULL if no active placement for
|
||||||
|
* the shard exists on the group.
|
||||||
|
*/
|
||||||
|
ShardPlacement *
|
||||||
|
ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId)
|
||||||
|
{
|
||||||
|
ShardPlacement *placement =
|
||||||
|
ShardPlacementOnGroupIncludingOrphanedPlacements(groupId, shardId);
|
||||||
|
if (placement == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (placement->shardState != SHARD_STATE_ACTIVE)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return placement;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ResolveGroupShardPlacement takes a GroupShardPlacement and adds additional data to it,
|
* ResolveGroupShardPlacement takes a GroupShardPlacement and adds additional data to it,
|
||||||
* such as the node we should consider it to be on.
|
* such as the node we should consider it to be on.
|
||||||
|
@ -791,13 +814,14 @@ LookupNodeForGroup(int32 groupId)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ShardPlacementList returns the list of placements for the given shard from
|
* ShardPlacementList returns the list of placements for the given shard from
|
||||||
* the cache.
|
* the cache. This list includes placements that are orphaned, because they
|
||||||
|
* their deletion is postponed to a later point (shardstate = 4).
|
||||||
*
|
*
|
||||||
* The returned list is deep copied from the cache and thus can be modified
|
* The returned list is deep copied from the cache and thus can be modified
|
||||||
* and pfree()d freely.
|
* and pfree()d freely.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
ShardPlacementList(uint64 shardId)
|
ShardPlacementListIncludingOrphanedPlacements(uint64 shardId)
|
||||||
{
|
{
|
||||||
List *placementList = NIL;
|
List *placementList = NIL;
|
||||||
|
|
||||||
|
|
|
@ -1092,7 +1092,7 @@ TableShardReplicationFactor(Oid relationId)
|
||||||
{
|
{
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
|
||||||
uint32 shardPlacementCount = list_length(shardPlacementList);
|
uint32 shardPlacementCount = list_length(shardPlacementList);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1392,7 +1392,8 @@ List *
|
||||||
ActiveShardPlacementList(uint64 shardId)
|
ActiveShardPlacementList(uint64 shardId)
|
||||||
{
|
{
|
||||||
List *activePlacementList = NIL;
|
List *activePlacementList = NIL;
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList =
|
||||||
|
ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
|
|
||||||
ShardPlacement *shardPlacement = NULL;
|
ShardPlacement *shardPlacement = NULL;
|
||||||
foreach_ptr(shardPlacement, shardPlacementList)
|
foreach_ptr(shardPlacement, shardPlacementList)
|
||||||
|
@ -1407,6 +1408,31 @@ ActiveShardPlacementList(uint64 shardId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShardPlacementListWithoutOrphanedPlacements returns shard placements exluding
|
||||||
|
* the ones that are orphaned, because they are marked to be deleted at a later
|
||||||
|
* point (shardstate = 4).
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
ShardPlacementListWithoutOrphanedPlacements(uint64 shardId)
|
||||||
|
{
|
||||||
|
List *activePlacementList = NIL;
|
||||||
|
List *shardPlacementList =
|
||||||
|
ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
|
|
||||||
|
ShardPlacement *shardPlacement = NULL;
|
||||||
|
foreach_ptr(shardPlacement, shardPlacementList)
|
||||||
|
{
|
||||||
|
if (shardPlacement->shardState != SHARD_STATE_TO_DELETE)
|
||||||
|
{
|
||||||
|
activePlacementList = lappend(activePlacementList, shardPlacement);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return SortList(activePlacementList, CompareShardPlacementsByWorker);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ActiveShardPlacement finds a shard placement for the given shardId from
|
* ActiveShardPlacement finds a shard placement for the given shardId from
|
||||||
* system catalog, chooses a placement that is in active state and returns
|
* system catalog, chooses a placement that is in active state and returns
|
||||||
|
@ -1944,7 +1970,8 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s
|
||||||
ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex);
|
ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex);
|
||||||
|
|
||||||
ShardPlacement *partitionPlacement =
|
ShardPlacement *partitionPlacement =
|
||||||
ShardPlacementOnGroup(partitionShardId, parentShardPlacement->groupId);
|
ShardPlacementOnGroupIncludingOrphanedPlacements(
|
||||||
|
parentShardPlacement->groupId, partitionShardId);
|
||||||
|
|
||||||
/* the partition should have a placement with the same group */
|
/* the partition should have a placement with the same group */
|
||||||
Assert(partitionPlacement != NULL);
|
Assert(partitionPlacement != NULL);
|
||||||
|
@ -1954,28 +1981,6 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ShardPlacementOnGroup gets a shardInterval and a groupId, returns a placement
|
|
||||||
* of the shard on the given group. If no such placement exists, the function
|
|
||||||
* return NULL.
|
|
||||||
*/
|
|
||||||
ShardPlacement *
|
|
||||||
ShardPlacementOnGroup(uint64 shardId, int groupId)
|
|
||||||
{
|
|
||||||
List *placementList = ShardPlacementList(shardId);
|
|
||||||
ShardPlacement *placement = NULL;
|
|
||||||
foreach_ptr(placement, placementList)
|
|
||||||
{
|
|
||||||
if (placement->groupId == groupId)
|
|
||||||
{
|
|
||||||
return placement;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where
|
* MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where
|
||||||
* the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the
|
* the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the
|
||||||
|
|
|
@ -287,7 +287,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
|
||||||
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
|
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
|
||||||
text *shardMinValueText = IntegerToText(shardMinValue);
|
text *shardMinValueText = IntegerToText(shardMinValue);
|
||||||
text *shardMaxValueText = IntegerToText(shardMaxValue);
|
text *shardMaxValueText = IntegerToText(shardMaxValue);
|
||||||
List *sourceShardPlacementList = ShardPlacementList(sourceShardId);
|
List *sourceShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
||||||
|
sourceShardId);
|
||||||
|
|
||||||
InsertShardRow(targetRelationId, newShardId, targetShardStorageType,
|
InsertShardRow(targetRelationId, newShardId, targetShardStorageType,
|
||||||
shardMinValueText, shardMaxValueText);
|
shardMinValueText, shardMaxValueText);
|
||||||
|
@ -295,11 +296,6 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
|
||||||
ShardPlacement *sourcePlacement = NULL;
|
ShardPlacement *sourcePlacement = NULL;
|
||||||
foreach_ptr(sourcePlacement, sourceShardPlacementList)
|
foreach_ptr(sourcePlacement, sourceShardPlacementList)
|
||||||
{
|
{
|
||||||
if (sourcePlacement->shardState == SHARD_STATE_TO_DELETE)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32 groupId = sourcePlacement->groupId;
|
int32 groupId = sourcePlacement->groupId;
|
||||||
const ShardState shardState = SHARD_STATE_ACTIVE;
|
const ShardState shardState = SHARD_STATE_ACTIVE;
|
||||||
const uint64 shardSize = 0;
|
const uint64 shardSize = 0;
|
||||||
|
|
|
@ -450,7 +450,8 @@ DropTaskList(Oid relationId, char *schemaName, char *relationName,
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = ShardPlacementList(shardId);
|
task->taskPlacementList =
|
||||||
|
ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
taskList = lappend(taskList, task);
|
||||||
}
|
}
|
||||||
|
|
|
@ -748,7 +748,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
|
||||||
ddlCommandList);
|
ddlCommandList);
|
||||||
|
|
||||||
/* after successful repair, we update shard state as healthy*/
|
/* after successful repair, we update shard state as healthy*/
|
||||||
List *placementList = ShardPlacementList(shardId);
|
List *placementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
|
||||||
ShardPlacement *placement = SearchShardPlacementInListOrError(placementList,
|
ShardPlacement *placement = SearchShardPlacementInListOrError(placementList,
|
||||||
targetNodeName,
|
targetNodeName,
|
||||||
targetNodePort);
|
targetNodePort);
|
||||||
|
@ -1029,7 +1029,8 @@ static void
|
||||||
EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
|
EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
|
||||||
const char *targetNodeName, int32 targetNodePort)
|
const char *targetNodeName, int32 targetNodePort)
|
||||||
{
|
{
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList =
|
||||||
|
ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
|
|
||||||
ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
|
ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
|
||||||
shardPlacementList,
|
shardPlacementList,
|
||||||
|
@ -1061,7 +1062,7 @@ static void
|
||||||
EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
|
EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
|
||||||
const char *targetNodeName, int32 targetNodePort)
|
const char *targetNodeName, int32 targetNodePort)
|
||||||
{
|
{
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
|
|
||||||
ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
|
ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
|
||||||
shardPlacementList,
|
shardPlacementList,
|
||||||
|
@ -1085,7 +1086,7 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo
|
||||||
* the shard.
|
* the shard.
|
||||||
*/
|
*/
|
||||||
DropOrphanedShardsInSeparateTransaction();
|
DropOrphanedShardsInSeparateTransaction();
|
||||||
shardPlacementList = ShardPlacementList(shardId);
|
shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
||||||
targetNodeName,
|
targetNodeName,
|
||||||
targetNodePort);
|
targetNodePort);
|
||||||
|
@ -1429,7 +1430,8 @@ DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
|
||||||
char *qualifiedTableName = ConstructQualifiedShardName(colocatedShard);
|
char *qualifiedTableName = ConstructQualifiedShardName(colocatedShard);
|
||||||
StringInfo dropQuery = makeStringInfo();
|
StringInfo dropQuery = makeStringInfo();
|
||||||
uint64 shardId = colocatedShard->shardId;
|
uint64 shardId = colocatedShard->shardId;
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList =
|
||||||
|
ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
ShardPlacement *placement =
|
ShardPlacement *placement =
|
||||||
SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
|
SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
|
||||||
|
|
||||||
|
@ -1442,9 +1444,9 @@ DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MarkForDropColocatedShardPlacement marks the shard placement metadata for the given
|
* MarkForDropColocatedShardPlacement marks the shard placement metadata for
|
||||||
* shard placement to be deleted in pg_dist_placement. The function does this for all
|
* the given shard placement to be deleted in pg_dist_placement. The function
|
||||||
* colocated placements.
|
* does this for all colocated placements.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
|
MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
|
||||||
|
@ -1457,7 +1459,8 @@ MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName,
|
||||||
{
|
{
|
||||||
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||||
uint64 shardId = colocatedShard->shardId;
|
uint64 shardId = colocatedShard->shardId;
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList =
|
||||||
|
ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
ShardPlacement *placement =
|
ShardPlacement *placement =
|
||||||
SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
|
SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
|
||||||
|
|
||||||
|
|
|
@ -2767,8 +2767,10 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
||||||
static bool
|
static bool
|
||||||
CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval)
|
CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval)
|
||||||
{
|
{
|
||||||
List *firstShardPlacementList = ShardPlacementList(firstInterval->shardId);
|
List *firstShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
||||||
List *secondShardPlacementList = ShardPlacementList(secondInterval->shardId);
|
firstInterval->shardId);
|
||||||
|
List *secondShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
||||||
|
secondInterval->shardId);
|
||||||
ListCell *firstShardPlacementCell = NULL;
|
ListCell *firstShardPlacementCell = NULL;
|
||||||
ListCell *secondShardPlacementCell = NULL;
|
ListCell *secondShardPlacementCell = NULL;
|
||||||
|
|
||||||
|
|
|
@ -834,7 +834,7 @@ IsLocallyAccessibleCitusLocalTable(Oid relationId)
|
||||||
ShardInterval *shardInterval = linitial(shardIntervalList);
|
ShardInterval *shardInterval = linitial(shardIntervalList);
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
ShardPlacement *localShardPlacement =
|
ShardPlacement *localShardPlacement =
|
||||||
ShardPlacementOnGroup(shardId, GetLocalGroupId());
|
ActiveShardPlacementOnGroup(GetLocalGroupId(), shardId);
|
||||||
return localShardPlacement != NULL;
|
return localShardPlacement != NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1666,7 +1666,8 @@ RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
|
||||||
relationShard->relationId = distributedTableId;
|
relationShard->relationId = distributedTableId;
|
||||||
|
|
||||||
modifyTask->relationShardList = list_make1(relationShard);
|
modifyTask->relationShardList = list_make1(relationShard);
|
||||||
modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId);
|
modifyTask->taskPlacementList = ActiveShardPlacementList(
|
||||||
|
modifyRoute->shardId);
|
||||||
modifyTask->parametersInQueryStringResolved = parametersInQueryResolved;
|
modifyTask->parametersInQueryStringResolved = parametersInQueryResolved;
|
||||||
|
|
||||||
insertTaskList = lappend(insertTaskList, modifyTask);
|
insertTaskList = lappend(insertTaskList, modifyTask);
|
||||||
|
|
|
@ -132,7 +132,7 @@ load_shard_placement_array(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
placementList = ShardPlacementList(shardId);
|
placementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
placementList = SortList(placementList, CompareShardPlacementsByWorker);
|
placementList = SortList(placementList, CompareShardPlacementsByWorker);
|
||||||
|
|
|
@ -350,8 +350,10 @@ ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId)
|
||||||
leftRelationName, rightRelationName)));
|
leftRelationName, rightRelationName)));
|
||||||
}
|
}
|
||||||
|
|
||||||
List *leftPlacementList = ShardPlacementList(leftShardId);
|
List *leftPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
||||||
List *rightPlacementList = ShardPlacementList(rightShardId);
|
leftShardId);
|
||||||
|
List *rightPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
||||||
|
rightShardId);
|
||||||
|
|
||||||
if (list_length(leftPlacementList) != list_length(rightPlacementList))
|
if (list_length(leftPlacementList) != list_length(rightPlacementList))
|
||||||
{
|
{
|
||||||
|
|
|
@ -347,7 +347,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
|
||||||
List *ddlCommandList =
|
List *ddlCommandList =
|
||||||
CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
|
CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
|
||||||
|
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
||||||
nodeName, nodePort);
|
nodeName, nodePort);
|
||||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||||
|
|
|
@ -476,7 +476,7 @@ SingleReplicatedTable(Oid relationId)
|
||||||
/* checking only for the first shard id should suffice */
|
/* checking only for the first shard id should suffice */
|
||||||
uint64 shardId = *(uint64 *) linitial(shardList);
|
uint64 shardId = *(uint64 *) linitial(shardList);
|
||||||
|
|
||||||
shardPlacementList = ShardPlacementList(shardId);
|
shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
|
||||||
if (list_length(shardPlacementList) != 1)
|
if (list_length(shardPlacementList) != 1)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
@ -489,7 +489,7 @@ SingleReplicatedTable(Oid relationId)
|
||||||
foreach_ptr(shardIdPointer, shardIntervalList)
|
foreach_ptr(shardIdPointer, shardIntervalList)
|
||||||
{
|
{
|
||||||
uint64 shardId = *shardIdPointer;
|
uint64 shardId = *shardIdPointer;
|
||||||
shardPlacementList = ShardPlacementList(shardId);
|
shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
|
||||||
|
|
||||||
if (list_length(shardPlacementList) != 1)
|
if (list_length(shardPlacementList) != 1)
|
||||||
{
|
{
|
||||||
|
|
|
@ -120,7 +120,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
uint64 shardId = *shardIdPointer;
|
uint64 shardId = *shardIdPointer;
|
||||||
|
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
ShardPlacement *placement = NULL;
|
ShardPlacement *placement = NULL;
|
||||||
foreach_ptr(placement, shardPlacementList)
|
foreach_ptr(placement, shardPlacementList)
|
||||||
{
|
{
|
||||||
|
|
|
@ -148,7 +148,9 @@ extern List * CitusTableList(void);
|
||||||
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
||||||
extern Oid RelationIdForShard(uint64 shardId);
|
extern Oid RelationIdForShard(uint64 shardId);
|
||||||
extern bool ReferenceTableShardId(uint64 shardId);
|
extern bool ReferenceTableShardId(uint64 shardId);
|
||||||
extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId);
|
extern ShardPlacement * ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId,
|
||||||
|
uint64 shardId);
|
||||||
|
extern ShardPlacement * ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId);
|
||||||
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
|
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
|
||||||
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
|
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
|
||||||
extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId);
|
extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId);
|
||||||
|
@ -158,7 +160,7 @@ extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid,
|
||||||
extern int32 GetLocalGroupId(void);
|
extern int32 GetLocalGroupId(void);
|
||||||
extern void CitusTableCacheFlushInvalidatedEntries(void);
|
extern void CitusTableCacheFlushInvalidatedEntries(void);
|
||||||
extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);
|
extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);
|
||||||
extern List * ShardPlacementList(uint64 shardId);
|
extern List * ShardPlacementListIncludingOrphanedPlacements(uint64 shardId);
|
||||||
extern bool ShardExists(int64 shardId);
|
extern bool ShardExists(int64 shardId);
|
||||||
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
|
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
|
||||||
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
|
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
|
||||||
|
|
|
@ -214,6 +214,7 @@ extern bool NodeGroupHasShardPlacements(int32 groupId,
|
||||||
bool onlyConsiderActivePlacements);
|
bool onlyConsiderActivePlacements);
|
||||||
extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);
|
extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);
|
||||||
extern List * ActiveShardPlacementList(uint64 shardId);
|
extern List * ActiveShardPlacementList(uint64 shardId);
|
||||||
|
extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId);
|
||||||
extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk);
|
extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk);
|
||||||
extern List * BuildShardPlacementList(ShardInterval *shardInterval);
|
extern List * BuildShardPlacementList(ShardInterval *shardInterval);
|
||||||
extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
|
extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
|
||||||
|
@ -223,7 +224,6 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
||||||
SizeQueryType sizeQueryType,
|
SizeQueryType sizeQueryType,
|
||||||
bool optimizePartitionCalculations);
|
bool optimizePartitionCalculations);
|
||||||
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
||||||
extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
|
|
||||||
|
|
||||||
/* Function declarations to modify shard and shard placement data */
|
/* Function declarations to modify shard and shard placement data */
|
||||||
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||||
|
|
|
@ -0,0 +1,355 @@
|
||||||
|
CREATE SCHEMA ignoring_orphaned_shards;
|
||||||
|
SET search_path TO ignoring_orphaned_shards;
|
||||||
|
-- Use a weird shard count that we don't use in any other tests
|
||||||
|
SET citus.shard_count TO 13;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.next_shard_id TO 92448000;
|
||||||
|
CREATE TABLE ref(id int PRIMARY KEY);
|
||||||
|
SELECT * FROM create_reference_table('ref');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 92448100;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448100;
|
||||||
|
CREATE TABLE dist1(id int);
|
||||||
|
SELECT * FROM create_distributed_table('dist1', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
|
||||||
|
logicalrelid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
dist1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Move first shard, so that the first shard now has 2 placements. One that's
|
||||||
|
-- active and one that's orphaned.
|
||||||
|
SELECT citus_move_shard_placement(92448100, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448100 ORDER BY placementid;
|
||||||
|
shardid | shardstate | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
92448100 | 4 | 57637
|
||||||
|
92448100 | 1 | 57638
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Add a new table that should get colocated with dist1 automatically, but
|
||||||
|
-- should not get a shard for the orphaned placement.
|
||||||
|
SET citus.next_shard_id TO 92448200;
|
||||||
|
CREATE TABLE dist2(id int);
|
||||||
|
SELECT * FROM create_distributed_table('dist2', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
|
||||||
|
logicalrelid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
dist1
|
||||||
|
dist2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448200 ORDER BY placementid;
|
||||||
|
shardid | shardstate | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
92448200 | 1 | 57638
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- uncolocate it
|
||||||
|
SELECT update_distributed_table_colocation('dist2', 'none');
|
||||||
|
update_distributed_table_colocation
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
|
||||||
|
logicalrelid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
dist1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Make sure we can add it back to the colocation, even though it has a
|
||||||
|
-- different number of shard placements for the first shard.
|
||||||
|
SELECT update_distributed_table_colocation('dist2', 'dist1');
|
||||||
|
update_distributed_table_colocation
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
|
||||||
|
logicalrelid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
dist1
|
||||||
|
dist2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Make sure that replication count check in FOR UPDATE ignores orphaned
|
||||||
|
-- shards.
|
||||||
|
SELECT * FROM dist1 WHERE id = 1 FOR UPDATE;
|
||||||
|
id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- Make sure we don't send a query to the orphaned shard
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
INSERT INTO dist1 VALUES (1);
|
||||||
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.dist1_92448100 (id) VALUES (1)
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
ROLLBACK;
|
||||||
|
NOTICE: issuing ROLLBACK
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
-- Make sure we can create a foreign key on community edition, because
|
||||||
|
-- replication factor is 1
|
||||||
|
ALTER TABLE dist1
|
||||||
|
ADD CONSTRAINT dist1_ref_fk
|
||||||
|
FOREIGN KEY (id)
|
||||||
|
REFERENCES ref(id);
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
SET citus.next_shard_id TO 92448300;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448300;
|
||||||
|
CREATE TABLE rep1(id int);
|
||||||
|
SELECT * FROM create_distributed_table('rep1', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Add the coordinator, so we can have a replicated shard
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :master_port, 0);
|
||||||
|
NOTICE: Replicating reference table "ref" to the node localhost:xxxxx
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT 1 FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
|
||||||
|
logicalrelid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
rep1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_move_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
shardid | shardstate | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
92448300 | 4 | 57637
|
||||||
|
92448300 | 1 | 57638
|
||||||
|
92448300 | 1 | 57636
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- Add a new table that should get colocated with rep1 automatically, but
|
||||||
|
-- should not get a shard for the orphaned placement.
|
||||||
|
SET citus.next_shard_id TO 92448400;
|
||||||
|
CREATE TABLE rep2(id int);
|
||||||
|
SELECT * FROM create_distributed_table('rep2', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
|
||||||
|
logicalrelid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
rep1
|
||||||
|
rep2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448400 ORDER BY placementid;
|
||||||
|
shardid | shardstate | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
92448400 | 1 | 57636
|
||||||
|
92448400 | 1 | 57638
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- uncolocate it
|
||||||
|
SELECT update_distributed_table_colocation('rep2', 'none');
|
||||||
|
update_distributed_table_colocation
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
|
||||||
|
logicalrelid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
rep1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Make sure we can add it back to the colocation, even though it has a
|
||||||
|
-- different number of shard placements for the first shard.
|
||||||
|
SELECT update_distributed_table_colocation('rep2', 'rep1');
|
||||||
|
update_distributed_table_colocation
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
|
||||||
|
logicalrelid
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
rep1
|
||||||
|
rep2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = 92448300 AND groupid = 0;
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
shardid | shardstate | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
92448300 | 4 | 57637
|
||||||
|
92448300 | 1 | 57638
|
||||||
|
92448300 | 3 | 57636
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- cannot copy from an orphaned shard
|
||||||
|
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
|
||||||
|
ERROR: source placement must be in active state
|
||||||
|
-- cannot copy to an orphaned shard
|
||||||
|
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
|
||||||
|
ERROR: target placement must be in inactive state
|
||||||
|
-- can still copy to an inactive shard
|
||||||
|
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :master_port);
|
||||||
|
citus_copy_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
shardid | shardstate | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
92448300 | 4 | 57637
|
||||||
|
92448300 | 1 | 57638
|
||||||
|
92448300 | 1 | 57636
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- Make sure we don't send a query to the orphaned shard
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.log_local_commands TO ON;
|
||||||
|
INSERT INTO rep1 VALUES (1);
|
||||||
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: executing the command locally: INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
|
||||||
|
ROLLBACK;
|
||||||
|
NOTICE: issuing ROLLBACK
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
-- Cause the orphaned shard to be local
|
||||||
|
SELECT 1 FROM citus_drain_node('localhost', :master_port);
|
||||||
|
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
shardid | shardstate | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
92448300 | 1 | 57638
|
||||||
|
92448300 | 4 | 57636
|
||||||
|
92448300 | 1 | 57637
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- Make sure we don't send a query to the orphaned shard if it's local
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.log_local_commands TO ON;
|
||||||
|
INSERT INTO rep1 VALUES (1);
|
||||||
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
ROLLBACK;
|
||||||
|
NOTICE: issuing ROLLBACK
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing ROLLBACK
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.next_shard_id TO 92448500;
|
||||||
|
CREATE TABLE range1(id int);
|
||||||
|
SELECT create_distributed_table('range1', 'id', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}');
|
||||||
|
-- Move shard placement and clean it up
|
||||||
|
SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL citus_cleanup_orphaned_shards();
|
||||||
|
NOTICE: cleaned up 3 orphaned shards
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
shardid | shardstate | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
92448300 | 1 | 57638
|
||||||
|
92448300 | 1 | 57637
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 92448600;
|
||||||
|
CREATE TABLE range2(id int);
|
||||||
|
SELECT create_distributed_table('range2', 'id', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
|
||||||
|
-- Move shard placement and DON'T clean it up, now range1 and range2 are
|
||||||
|
-- colocated, but only range2 has an orphaned shard.
|
||||||
|
SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid;
|
||||||
|
shardid | shardstate | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
92448600 | 4 | 57638
|
||||||
|
92448600 | 1 | 57637
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Make sure that tables are detected as colocated
|
||||||
|
SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
|
||||||
|
id | id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- Make sure we can create a foreign key on community edition, because
|
||||||
|
-- replication factor is 1
|
||||||
|
ALTER TABLE range1
|
||||||
|
ADD CONSTRAINT range1_ref_fk
|
||||||
|
FOREIGN KEY (id)
|
||||||
|
REFERENCES ref(id);
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA ignoring_orphaned_shards CASCADE;
|
|
@ -7,3 +7,4 @@ test: foreign_key_to_reference_shard_rebalance
|
||||||
test: multi_move_mx
|
test: multi_move_mx
|
||||||
test: shard_move_deferred_delete
|
test: shard_move_deferred_delete
|
||||||
test: multi_colocated_shard_rebalance
|
test: multi_colocated_shard_rebalance
|
||||||
|
test: ignoring_orphaned_shards
|
||||||
|
|
|
@ -0,0 +1,147 @@
|
||||||
|
CREATE SCHEMA ignoring_orphaned_shards;
|
||||||
|
SET search_path TO ignoring_orphaned_shards;
|
||||||
|
-- Use a weird shard count that we don't use in any other tests
|
||||||
|
SET citus.shard_count TO 13;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.next_shard_id TO 92448000;
|
||||||
|
|
||||||
|
CREATE TABLE ref(id int PRIMARY KEY);
|
||||||
|
SELECT * FROM create_reference_table('ref');
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 92448100;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448100;
|
||||||
|
|
||||||
|
CREATE TABLE dist1(id int);
|
||||||
|
SELECT * FROM create_distributed_table('dist1', 'id');
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
|
||||||
|
|
||||||
|
-- Move first shard, so that the first shard now has 2 placements. One that's
|
||||||
|
-- active and one that's orphaned.
|
||||||
|
SELECT citus_move_shard_placement(92448100, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448100 ORDER BY placementid;
|
||||||
|
|
||||||
|
-- Add a new table that should get colocated with dist1 automatically, but
|
||||||
|
-- should not get a shard for the orphaned placement.
|
||||||
|
SET citus.next_shard_id TO 92448200;
|
||||||
|
CREATE TABLE dist2(id int);
|
||||||
|
SELECT * FROM create_distributed_table('dist2', 'id');
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448200 ORDER BY placementid;
|
||||||
|
|
||||||
|
-- uncolocate it
|
||||||
|
SELECT update_distributed_table_colocation('dist2', 'none');
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
|
||||||
|
-- Make sure we can add it back to the colocation, even though it has a
|
||||||
|
-- different number of shard placements for the first shard.
|
||||||
|
SELECT update_distributed_table_colocation('dist2', 'dist1');
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
|
||||||
|
|
||||||
|
-- Make sure that replication count check in FOR UPDATE ignores orphaned
|
||||||
|
-- shards.
|
||||||
|
SELECT * FROM dist1 WHERE id = 1 FOR UPDATE;
|
||||||
|
-- Make sure we don't send a query to the orphaned shard
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
INSERT INTO dist1 VALUES (1);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Make sure we can create a foreign key on community edition, because
|
||||||
|
-- replication factor is 1
|
||||||
|
ALTER TABLE dist1
|
||||||
|
ADD CONSTRAINT dist1_ref_fk
|
||||||
|
FOREIGN KEY (id)
|
||||||
|
REFERENCES ref(id);
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
SET citus.next_shard_id TO 92448300;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448300;
|
||||||
|
CREATE TABLE rep1(id int);
|
||||||
|
SELECT * FROM create_distributed_table('rep1', 'id');
|
||||||
|
|
||||||
|
-- Add the coordinator, so we can have a replicated shard
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :master_port, 0);
|
||||||
|
SELECT 1 FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
|
||||||
|
|
||||||
|
SELECT citus_move_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
|
||||||
|
-- Add a new table that should get colocated with rep1 automatically, but
|
||||||
|
-- should not get a shard for the orphaned placement.
|
||||||
|
SET citus.next_shard_id TO 92448400;
|
||||||
|
CREATE TABLE rep2(id int);
|
||||||
|
SELECT * FROM create_distributed_table('rep2', 'id');
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448400 ORDER BY placementid;
|
||||||
|
|
||||||
|
-- uncolocate it
|
||||||
|
SELECT update_distributed_table_colocation('rep2', 'none');
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
|
||||||
|
-- Make sure we can add it back to the colocation, even though it has a
|
||||||
|
-- different number of shard placements for the first shard.
|
||||||
|
SELECT update_distributed_table_colocation('rep2', 'rep1');
|
||||||
|
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
|
||||||
|
|
||||||
|
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = 92448300 AND groupid = 0;
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
|
||||||
|
-- cannot copy from an orphaned shard
|
||||||
|
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
|
||||||
|
-- cannot copy to an orphaned shard
|
||||||
|
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
|
||||||
|
-- can still copy to an inactive shard
|
||||||
|
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :master_port);
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
|
||||||
|
-- Make sure we don't send a query to the orphaned shard
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.log_local_commands TO ON;
|
||||||
|
INSERT INTO rep1 VALUES (1);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Cause the orphaned shard to be local
|
||||||
|
SELECT 1 FROM citus_drain_node('localhost', :master_port);
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
|
||||||
|
-- Make sure we don't send a query to the orphaned shard if it's local
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.log_local_commands TO ON;
|
||||||
|
INSERT INTO rep1 VALUES (1);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.next_shard_id TO 92448500;
|
||||||
|
CREATE TABLE range1(id int);
|
||||||
|
SELECT create_distributed_table('range1', 'id', 'range');
|
||||||
|
CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}');
|
||||||
|
|
||||||
|
-- Move shard placement and clean it up
|
||||||
|
SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||||
|
CALL citus_cleanup_orphaned_shards();
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 92448600;
|
||||||
|
CREATE TABLE range2(id int);
|
||||||
|
SELECT create_distributed_table('range2', 'id', 'range');
|
||||||
|
CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
|
||||||
|
|
||||||
|
-- Move shard placement and DON'T clean it up, now range1 and range2 are
|
||||||
|
-- colocated, but only range2 has an orphaned shard.
|
||||||
|
SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||||
|
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid;
|
||||||
|
|
||||||
|
-- Make sure that tables are detected as colocated
|
||||||
|
SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
|
||||||
|
|
||||||
|
-- Make sure we can create a foreign key on community edition, because
|
||||||
|
-- replication factor is 1
|
||||||
|
ALTER TABLE range1
|
||||||
|
ADD CONSTRAINT range1_ref_fk
|
||||||
|
FOREIGN KEY (id)
|
||||||
|
REFERENCES ref(id);
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA ignoring_orphaned_shards CASCADE;
|
Loading…
Reference in New Issue