Merge pull request #4994 from citusdata/fix/shardPlacementList

Exclude orphaned shards while finding shard placements
metadata_sync_views
SaitTalhaNisanci 2021-06-28 18:02:30 +03:00 committed by GitHub
commit c932642e3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 605 additions and 66 deletions

View File

@ -2139,7 +2139,7 @@ ShardIntervalListHasLocalPlacements(List *shardIntervalList)
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
if (FindShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL)
if (ActiveShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL)
{
return true;
}

View File

@ -377,7 +377,7 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
/*
* EnsureConnectionPossibilityForNode reserves a shared connection
* counter per node in the nodeList unless:
* - Reservation is possible/allowed (see IsReservationPossible())
* - Reservation is not possible/allowed (see IsReservationPossible())
* - there is at least one connection to the node so that we are guranteed
* to get a connection
* - An earlier call already reserved a connection (e.g., we allow only a

View File

@ -128,8 +128,8 @@ BuildPlacementAccessList(int32 groupId, List *relationShardList,
RelationShard *relationShard = NULL;
foreach_ptr(relationShard, relationShardList)
{
ShardPlacement *placement = FindShardPlacementOnGroup(groupId,
relationShard->shardId);
ShardPlacement *placement = ActiveShardPlacementOnGroup(groupId,
relationShard->shardId);
if (placement == NULL)
{
continue;

View File

@ -594,12 +594,14 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
/*
* FindShardPlacementOnGroup returns the shard placement for the given shard
* on the given group, or returns NULL if no placement for the shard exists
* on the group.
* ShardPlacementOnGroupIncludingOrphanedPlacements returns the shard placement
* for the given shard on the given group, or returns NULL if no placement for
* the shard exists on the group.
*
* NOTE: This can return inactive or orphaned placements.
*/
ShardPlacement *
FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId)
{
ShardPlacement *placementOnNode = NULL;
@ -614,7 +616,6 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
{
GroupShardPlacement *placement = &placementArray[placementIndex];
if (placement->groupId == groupId)
{
placementOnNode = ResolveGroupShardPlacement(placement, tableEntry,
@ -627,6 +628,28 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
}
/*
* ActiveShardPlacementOnGroup returns the active shard placement for the
* given shard on the given group, or returns NULL if no active placement for
* the shard exists on the group.
*/
ShardPlacement *
ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId)
{
ShardPlacement *placement =
ShardPlacementOnGroupIncludingOrphanedPlacements(groupId, shardId);
if (placement == NULL)
{
return NULL;
}
if (placement->shardState != SHARD_STATE_ACTIVE)
{
return NULL;
}
return placement;
}
/*
* ResolveGroupShardPlacement takes a GroupShardPlacement and adds additional data to it,
* such as the node we should consider it to be on.
@ -791,13 +814,14 @@ LookupNodeForGroup(int32 groupId)
/*
* ShardPlacementList returns the list of placements for the given shard from
* the cache.
* the cache. This list includes placements that are orphaned, because they
* their deletion is postponed to a later point (shardstate = 4).
*
* The returned list is deep copied from the cache and thus can be modified
* and pfree()d freely.
*/
List *
ShardPlacementList(uint64 shardId)
ShardPlacementListIncludingOrphanedPlacements(uint64 shardId)
{
List *placementList = NIL;

View File

@ -1092,7 +1092,7 @@ TableShardReplicationFactor(Oid relationId)
{
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
uint32 shardPlacementCount = list_length(shardPlacementList);
/*
@ -1392,7 +1392,8 @@ List *
ActiveShardPlacementList(uint64 shardId)
{
List *activePlacementList = NIL;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
@ -1407,6 +1408,31 @@ ActiveShardPlacementList(uint64 shardId)
}
/*
* ShardPlacementListWithoutOrphanedPlacements returns shard placements exluding
* the ones that are orphaned, because they are marked to be deleted at a later
* point (shardstate = 4).
*/
List *
ShardPlacementListWithoutOrphanedPlacements(uint64 shardId)
{
List *activePlacementList = NIL;
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
if (shardPlacement->shardState != SHARD_STATE_TO_DELETE)
{
activePlacementList = lappend(activePlacementList, shardPlacement);
}
}
return SortList(activePlacementList, CompareShardPlacementsByWorker);
}
/*
* ActiveShardPlacement finds a shard placement for the given shardId from
* system catalog, chooses a placement that is in active state and returns
@ -1944,7 +1970,8 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s
ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex);
ShardPlacement *partitionPlacement =
ShardPlacementOnGroup(partitionShardId, parentShardPlacement->groupId);
ShardPlacementOnGroupIncludingOrphanedPlacements(
parentShardPlacement->groupId, partitionShardId);
/* the partition should have a placement with the same group */
Assert(partitionPlacement != NULL);
@ -1954,28 +1981,6 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s
}
/*
* ShardPlacementOnGroup gets a shardInterval and a groupId, returns a placement
* of the shard on the given group. If no such placement exists, the function
* return NULL.
*/
ShardPlacement *
ShardPlacementOnGroup(uint64 shardId, int groupId)
{
List *placementList = ShardPlacementList(shardId);
ShardPlacement *placement = NULL;
foreach_ptr(placement, placementList)
{
if (placement->groupId == groupId)
{
return placement;
}
}
return NULL;
}
/*
* MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where
* the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the

View File

@ -287,7 +287,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue);
List *sourceShardPlacementList = ShardPlacementList(sourceShardId);
List *sourceShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
sourceShardId);
InsertShardRow(targetRelationId, newShardId, targetShardStorageType,
shardMinValueText, shardMaxValueText);
@ -295,11 +296,6 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
ShardPlacement *sourcePlacement = NULL;
foreach_ptr(sourcePlacement, sourceShardPlacementList)
{
if (sourcePlacement->shardState == SHARD_STATE_TO_DELETE)
{
continue;
}
int32 groupId = sourcePlacement->groupId;
const ShardState shardState = SHARD_STATE_ACTIVE;
const uint64 shardSize = 0;

View File

@ -450,7 +450,8 @@ DropTaskList(Oid relationId, char *schemaName, char *relationName,
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = ShardPlacementList(shardId);
task->taskPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
taskList = lappend(taskList, task);
}

View File

@ -748,7 +748,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
ddlCommandList);
/* after successful repair, we update shard state as healthy*/
List *placementList = ShardPlacementList(shardId);
List *placementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
ShardPlacement *placement = SearchShardPlacementInListOrError(placementList,
targetNodeName,
targetNodePort);
@ -1029,7 +1029,8 @@ static void
EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
const char *targetNodeName, int32 targetNodePort)
{
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
shardPlacementList,
@ -1061,7 +1062,7 @@ static void
EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
const char *targetNodeName, int32 targetNodePort)
{
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
shardPlacementList,
@ -1085,7 +1086,7 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo
* the shard.
*/
DropOrphanedShardsInSeparateTransaction();
shardPlacementList = ShardPlacementList(shardId);
shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
targetPlacement = SearchShardPlacementInList(shardPlacementList,
targetNodeName,
targetNodePort);
@ -1429,7 +1430,8 @@ DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
char *qualifiedTableName = ConstructQualifiedShardName(colocatedShard);
StringInfo dropQuery = makeStringInfo();
uint64 shardId = colocatedShard->shardId;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *placement =
SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
@ -1442,9 +1444,9 @@ DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
/*
* MarkForDropColocatedShardPlacement marks the shard placement metadata for the given
* shard placement to be deleted in pg_dist_placement. The function does this for all
* colocated placements.
* MarkForDropColocatedShardPlacement marks the shard placement metadata for
* the given shard placement to be deleted in pg_dist_placement. The function
* does this for all colocated placements.
*/
static void
MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
@ -1457,7 +1459,8 @@ MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName,
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 shardId = colocatedShard->shardId;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *placement =
SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);

View File

@ -2767,8 +2767,10 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
static bool
CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval)
{
List *firstShardPlacementList = ShardPlacementList(firstInterval->shardId);
List *secondShardPlacementList = ShardPlacementList(secondInterval->shardId);
List *firstShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
firstInterval->shardId);
List *secondShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
secondInterval->shardId);
ListCell *firstShardPlacementCell = NULL;
ListCell *secondShardPlacementCell = NULL;

View File

@ -834,7 +834,7 @@ IsLocallyAccessibleCitusLocalTable(Oid relationId)
ShardInterval *shardInterval = linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
ShardPlacement *localShardPlacement =
ShardPlacementOnGroup(shardId, GetLocalGroupId());
ActiveShardPlacementOnGroup(GetLocalGroupId(), shardId);
return localShardPlacement != NULL;
}
@ -1666,7 +1666,8 @@ RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
relationShard->relationId = distributedTableId;
modifyTask->relationShardList = list_make1(relationShard);
modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId);
modifyTask->taskPlacementList = ActiveShardPlacementList(
modifyRoute->shardId);
modifyTask->parametersInQueryStringResolved = parametersInQueryResolved;
insertTaskList = lappend(insertTaskList, modifyTask);

View File

@ -132,7 +132,7 @@ load_shard_placement_array(PG_FUNCTION_ARGS)
}
else
{
placementList = ShardPlacementList(shardId);
placementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
}
placementList = SortList(placementList, CompareShardPlacementsByWorker);

View File

@ -350,8 +350,10 @@ ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId)
leftRelationName, rightRelationName)));
}
List *leftPlacementList = ShardPlacementList(leftShardId);
List *rightPlacementList = ShardPlacementList(rightShardId);
List *leftPlacementList = ShardPlacementListWithoutOrphanedPlacements(
leftShardId);
List *rightPlacementList = ShardPlacementListWithoutOrphanedPlacements(
rightShardId);
if (list_length(leftPlacementList) != list_length(rightPlacementList))
{

View File

@ -347,7 +347,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
List *ddlCommandList =
CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort);
char *tableOwner = TableOwner(shardInterval->relationId);

View File

@ -476,7 +476,7 @@ SingleReplicatedTable(Oid relationId)
/* checking only for the first shard id should suffice */
uint64 shardId = *(uint64 *) linitial(shardList);
shardPlacementList = ShardPlacementList(shardId);
shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
if (list_length(shardPlacementList) != 1)
{
return false;
@ -489,7 +489,7 @@ SingleReplicatedTable(Oid relationId)
foreach_ptr(shardIdPointer, shardIntervalList)
{
uint64 shardId = *shardIdPointer;
shardPlacementList = ShardPlacementList(shardId);
shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
if (list_length(shardPlacementList) != 1)
{

View File

@ -120,7 +120,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
{
uint64 shardId = *shardIdPointer;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *placement = NULL;
foreach_ptr(placement, shardPlacementList)
{

View File

@ -148,7 +148,9 @@ extern List * CitusTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern Oid RelationIdForShard(uint64 shardId);
extern bool ReferenceTableShardId(uint64 shardId);
extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId);
extern ShardPlacement * ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId,
uint64 shardId);
extern ShardPlacement * ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId);
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId);
@ -158,7 +160,7 @@ extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid,
extern int32 GetLocalGroupId(void);
extern void CitusTableCacheFlushInvalidatedEntries(void);
extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);
extern List * ShardPlacementList(uint64 shardId);
extern List * ShardPlacementListIncludingOrphanedPlacements(uint64 shardId);
extern bool ShardExists(int64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);

View File

@ -214,6 +214,7 @@ extern bool NodeGroupHasShardPlacements(int32 groupId,
bool onlyConsiderActivePlacements);
extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);
extern List * ActiveShardPlacementList(uint64 shardId);
extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId);
extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk);
extern List * BuildShardPlacementList(ShardInterval *shardInterval);
extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
@ -223,7 +224,6 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
SizeQueryType sizeQueryType,
bool optimizePartitionCalculations);
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
/* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,

View File

@ -0,0 +1,355 @@
CREATE SCHEMA ignoring_orphaned_shards;
SET search_path TO ignoring_orphaned_shards;
-- Use a weird shard count that we don't use in any other tests
SET citus.shard_count TO 13;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 92448000;
CREATE TABLE ref(id int PRIMARY KEY);
SELECT * FROM create_reference_table('ref');
create_reference_table
---------------------------------------------------------------------
(1 row)
SET citus.next_shard_id TO 92448100;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448100;
CREATE TABLE dist1(id int);
SELECT * FROM create_distributed_table('dist1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
dist1
(1 row)
-- Move first shard, so that the first shard now has 2 placements. One that's
-- active and one that's orphaned.
SELECT citus_move_shard_placement(92448100, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448100 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448100 | 4 | 57637
92448100 | 1 | 57638
(2 rows)
-- Add a new table that should get colocated with dist1 automatically, but
-- should not get a shard for the orphaned placement.
SET citus.next_shard_id TO 92448200;
CREATE TABLE dist2(id int);
SELECT * FROM create_distributed_table('dist2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
dist1
dist2
(2 rows)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448200 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448200 | 1 | 57638
(1 row)
-- uncolocate it
SELECT update_distributed_table_colocation('dist2', 'none');
update_distributed_table_colocation
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
dist1
(1 row)
-- Make sure we can add it back to the colocation, even though it has a
-- different number of shard placements for the first shard.
SELECT update_distributed_table_colocation('dist2', 'dist1');
update_distributed_table_colocation
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
dist1
dist2
(2 rows)
-- Make sure that replication count check in FOR UPDATE ignores orphaned
-- shards.
SELECT * FROM dist1 WHERE id = 1 FOR UPDATE;
id
---------------------------------------------------------------------
(0 rows)
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
INSERT INTO dist1 VALUES (1);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.dist1_92448100 (id) VALUES (1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
ROLLBACK;
NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-- Make sure we can create a foreign key on community edition, because
-- replication factor is 1
ALTER TABLE dist1
ADD CONSTRAINT dist1_ref_fk
FOREIGN KEY (id)
REFERENCES ref(id);
SET citus.shard_replication_factor TO 2;
SET citus.next_shard_id TO 92448300;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448300;
CREATE TABLE rep1(id int);
SELECT * FROM create_distributed_table('rep1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Add the coordinator, so we can have a replicated shard
SELECT 1 FROM citus_add_node('localhost', :master_port, 0);
NOTICE: Replicating reference table "ref" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
rep1
(1 row)
SELECT citus_move_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 4 | 57637
92448300 | 1 | 57638
92448300 | 1 | 57636
(3 rows)
-- Add a new table that should get colocated with rep1 automatically, but
-- should not get a shard for the orphaned placement.
SET citus.next_shard_id TO 92448400;
CREATE TABLE rep2(id int);
SELECT * FROM create_distributed_table('rep2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
rep1
rep2
(2 rows)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448400 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448400 | 1 | 57636
92448400 | 1 | 57638
(2 rows)
-- uncolocate it
SELECT update_distributed_table_colocation('rep2', 'none');
update_distributed_table_colocation
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
rep1
(1 row)
-- Make sure we can add it back to the colocation, even though it has a
-- different number of shard placements for the first shard.
SELECT update_distributed_table_colocation('rep2', 'rep1');
update_distributed_table_colocation
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
rep1
rep2
(2 rows)
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = 92448300 AND groupid = 0;
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 4 | 57637
92448300 | 1 | 57638
92448300 | 3 | 57636
(3 rows)
-- cannot copy from an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
ERROR: source placement must be in active state
-- cannot copy to an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
ERROR: target placement must be in inactive state
-- can still copy to an inactive shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :master_port);
citus_copy_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 4 | 57637
92448300 | 1 | 57638
92448300 | 1 | 57636
(3 rows)
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.log_local_commands TO ON;
INSERT INTO rep1 VALUES (1);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: executing the command locally: INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
ROLLBACK;
NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-- Cause the orphaned shard to be local
SELECT 1 FROM citus_drain_node('localhost', :master_port);
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 1 | 57638
92448300 | 4 | 57636
92448300 | 1 | 57637
(3 rows)
-- Make sure we don't send a query to the orphaned shard if it's local
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.log_local_commands TO ON;
INSERT INTO rep1 VALUES (1);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
ROLLBACK;
NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 92448500;
CREATE TABLE range1(id int);
SELECT create_distributed_table('range1', 'id', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}');
-- Move shard placement and clean it up
SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 3 orphaned shards
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 1 | 57638
92448300 | 1 | 57637
(2 rows)
SET citus.next_shard_id TO 92448600;
CREATE TABLE range2(id int);
SELECT create_distributed_table('range2', 'id', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
-- Move shard placement and DON'T clean it up, now range1 and range2 are
-- colocated, but only range2 has an orphaned shard.
SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448600 | 4 | 57638
92448600 | 1 | 57637
(2 rows)
-- Make sure that tables are detected as colocated
SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
id | id
---------------------------------------------------------------------
(0 rows)
-- Make sure we can create a foreign key on community edition, because
-- replication factor is 1
ALTER TABLE range1
ADD CONSTRAINT range1_ref_fk
FOREIGN KEY (id)
REFERENCES ref(id);
SET client_min_messages TO WARNING;
DROP SCHEMA ignoring_orphaned_shards CASCADE;

View File

@ -7,3 +7,4 @@ test: foreign_key_to_reference_shard_rebalance
test: multi_move_mx
test: shard_move_deferred_delete
test: multi_colocated_shard_rebalance
test: ignoring_orphaned_shards

View File

@ -0,0 +1,147 @@
CREATE SCHEMA ignoring_orphaned_shards;
SET search_path TO ignoring_orphaned_shards;
-- Use a weird shard count that we don't use in any other tests
SET citus.shard_count TO 13;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 92448000;
CREATE TABLE ref(id int PRIMARY KEY);
SELECT * FROM create_reference_table('ref');
SET citus.next_shard_id TO 92448100;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448100;
CREATE TABLE dist1(id int);
SELECT * FROM create_distributed_table('dist1', 'id');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
-- Move first shard, so that the first shard now has 2 placements. One that's
-- active and one that's orphaned.
SELECT citus_move_shard_placement(92448100, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448100 ORDER BY placementid;
-- Add a new table that should get colocated with dist1 automatically, but
-- should not get a shard for the orphaned placement.
SET citus.next_shard_id TO 92448200;
CREATE TABLE dist2(id int);
SELECT * FROM create_distributed_table('dist2', 'id');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448200 ORDER BY placementid;
-- uncolocate it
SELECT update_distributed_table_colocation('dist2', 'none');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
-- Make sure we can add it back to the colocation, even though it has a
-- different number of shard placements for the first shard.
SELECT update_distributed_table_colocation('dist2', 'dist1');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
-- Make sure that replication count check in FOR UPDATE ignores orphaned
-- shards.
SELECT * FROM dist1 WHERE id = 1 FOR UPDATE;
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
INSERT INTO dist1 VALUES (1);
ROLLBACK;
-- Make sure we can create a foreign key on community edition, because
-- replication factor is 1
ALTER TABLE dist1
ADD CONSTRAINT dist1_ref_fk
FOREIGN KEY (id)
REFERENCES ref(id);
SET citus.shard_replication_factor TO 2;
SET citus.next_shard_id TO 92448300;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448300;
CREATE TABLE rep1(id int);
SELECT * FROM create_distributed_table('rep1', 'id');
-- Add the coordinator, so we can have a replicated shard
SELECT 1 FROM citus_add_node('localhost', :master_port, 0);
SELECT 1 FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
SELECT citus_move_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
-- Add a new table that should get colocated with rep1 automatically, but
-- should not get a shard for the orphaned placement.
SET citus.next_shard_id TO 92448400;
CREATE TABLE rep2(id int);
SELECT * FROM create_distributed_table('rep2', 'id');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448400 ORDER BY placementid;
-- uncolocate it
SELECT update_distributed_table_colocation('rep2', 'none');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
-- Make sure we can add it back to the colocation, even though it has a
-- different number of shard placements for the first shard.
SELECT update_distributed_table_colocation('rep2', 'rep1');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = 92448300 AND groupid = 0;
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
-- cannot copy from an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
-- cannot copy to an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- can still copy to an inactive shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :master_port);
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.log_local_commands TO ON;
INSERT INTO rep1 VALUES (1);
ROLLBACK;
-- Cause the orphaned shard to be local
SELECT 1 FROM citus_drain_node('localhost', :master_port);
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
-- Make sure we don't send a query to the orphaned shard if it's local
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.log_local_commands TO ON;
INSERT INTO rep1 VALUES (1);
ROLLBACK;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 92448500;
CREATE TABLE range1(id int);
SELECT create_distributed_table('range1', 'id', 'range');
CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}');
-- Move shard placement and clean it up
SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
CALL citus_cleanup_orphaned_shards();
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
SET citus.next_shard_id TO 92448600;
CREATE TABLE range2(id int);
SELECT create_distributed_table('range2', 'id', 'range');
CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
-- Move shard placement and DON'T clean it up, now range1 and range2 are
-- colocated, but only range2 has an orphaned shard.
SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid;
-- Make sure that tables are detected as colocated
SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
-- Make sure we can create a foreign key on community edition, because
-- replication factor is 1
ALTER TABLE range1
ADD CONSTRAINT range1_ref_fk
FOREIGN KEY (id)
REFERENCES ref(id);
SET client_min_messages TO WARNING;
DROP SCHEMA ignoring_orphaned_shards CASCADE;