Not include to-be-deleted shards while finding shard placements

Ignore orphaned shards in more places

Only use active shard placements in RouterInsertTaskList

Use IncludingOrphanedPlacements in some more places

Fix comment

Add tests

(cherry picked from commit e7ed16c296)

Conflicts:
	src/backend/distributed/planner/multi_router_planner.c

Quite trivial conflict that was easy to resolve
pull/5152/head
Sait Talha Nisanci 2021-05-21 11:21:22 +03:00 committed by Hanefi Onaldi
parent 6f400dab58
commit 6fee3068e3
No known key found for this signature in database
GPG Key ID: F18CDB10BA0DFDC7
20 changed files with 606 additions and 72 deletions

View File

@ -2139,7 +2139,7 @@ ShardIntervalListHasLocalPlacements(List *shardIntervalList)
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
if (FindShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL)
if (ActiveShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL)
{
return true;
}

View File

@ -377,7 +377,7 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
/*
* EnsureConnectionPossibilityForNode reserves a shared connection
* counter per node in the nodeList unless:
* - Reservation is possible/allowed (see IsReservationPossible())
* - Reservation is not possible/allowed (see IsReservationPossible())
* - there is at least one connection to the node so that we are guranteed
* to get a connection
* - An earlier call already reserved a connection (e.g., we allow only a

View File

@ -128,8 +128,8 @@ BuildPlacementAccessList(int32 groupId, List *relationShardList,
RelationShard *relationShard = NULL;
foreach_ptr(relationShard, relationShardList)
{
ShardPlacement *placement = FindShardPlacementOnGroup(groupId,
relationShard->shardId);
ShardPlacement *placement = ActiveShardPlacementOnGroup(groupId,
relationShard->shardId);
if (placement == NULL)
{
continue;

View File

@ -594,12 +594,14 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
/*
* FindShardPlacementOnGroup returns the shard placement for the given shard
* on the given group, or returns NULL if no placement for the shard exists
* on the group.
* ShardPlacementOnGroupIncludingOrphanedPlacements returns the shard placement
* for the given shard on the given group, or returns NULL if no placement for
* the shard exists on the group.
*
* NOTE: This can return inactive or orphaned placements.
*/
ShardPlacement *
FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId)
{
ShardPlacement *placementOnNode = NULL;
@ -614,7 +616,6 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
{
GroupShardPlacement *placement = &placementArray[placementIndex];
if (placement->groupId == groupId)
{
placementOnNode = ResolveGroupShardPlacement(placement, tableEntry,
@ -627,6 +628,28 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
}
/*
* ActiveShardPlacementOnGroup returns the active shard placement for the
* given shard on the given group, or returns NULL if no active placement for
* the shard exists on the group.
*/
ShardPlacement *
ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId)
{
ShardPlacement *placement =
ShardPlacementOnGroupIncludingOrphanedPlacements(groupId, shardId);
if (placement == NULL)
{
return NULL;
}
if (placement->shardState != SHARD_STATE_ACTIVE)
{
return NULL;
}
return placement;
}
/*
* ResolveGroupShardPlacement takes a GroupShardPlacement and adds additional data to it,
* such as the node we should consider it to be on.
@ -791,13 +814,14 @@ LookupNodeForGroup(int32 groupId)
/*
* ShardPlacementList returns the list of placements for the given shard from
* the cache.
* the cache. This list includes placements that are orphaned, because they
* their deletion is postponed to a later point (shardstate = 4).
*
* The returned list is deep copied from the cache and thus can be modified
* and pfree()d freely.
*/
List *
ShardPlacementList(uint64 shardId)
ShardPlacementListIncludingOrphanedPlacements(uint64 shardId)
{
List *placementList = NIL;

View File

@ -1092,7 +1092,7 @@ TableShardReplicationFactor(Oid relationId)
{
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
uint32 shardPlacementCount = list_length(shardPlacementList);
/*
@ -1392,7 +1392,8 @@ List *
ActiveShardPlacementList(uint64 shardId)
{
List *activePlacementList = NIL;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
@ -1407,6 +1408,31 @@ ActiveShardPlacementList(uint64 shardId)
}
/*
* ShardPlacementListWithoutOrphanedPlacements returns shard placements exluding
* the ones that are orphaned, because they are marked to be deleted at a later
* point (shardstate = 4).
*/
List *
ShardPlacementListWithoutOrphanedPlacements(uint64 shardId)
{
List *activePlacementList = NIL;
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
if (shardPlacement->shardState != SHARD_STATE_TO_DELETE)
{
activePlacementList = lappend(activePlacementList, shardPlacement);
}
}
return SortList(activePlacementList, CompareShardPlacementsByWorker);
}
/*
* ActiveShardPlacement finds a shard placement for the given shardId from
* system catalog, chooses a placement that is in active state and returns
@ -1944,7 +1970,8 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s
ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex);
ShardPlacement *partitionPlacement =
ShardPlacementOnGroup(partitionShardId, parentShardPlacement->groupId);
ShardPlacementOnGroupIncludingOrphanedPlacements(
parentShardPlacement->groupId, partitionShardId);
/* the partition should have a placement with the same group */
Assert(partitionPlacement != NULL);
@ -1954,28 +1981,6 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s
}
/*
* ShardPlacementOnGroup gets a shardInterval and a groupId, returns a placement
* of the shard on the given group. If no such placement exists, the function
* return NULL.
*/
ShardPlacement *
ShardPlacementOnGroup(uint64 shardId, int groupId)
{
List *placementList = ShardPlacementList(shardId);
ShardPlacement *placement = NULL;
foreach_ptr(placement, placementList)
{
if (placement->groupId == groupId)
{
return placement;
}
}
return NULL;
}
/*
* MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where
* the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the

View File

@ -287,7 +287,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue);
List *sourceShardPlacementList = ShardPlacementList(sourceShardId);
List *sourceShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
sourceShardId);
InsertShardRow(targetRelationId, newShardId, targetShardStorageType,
shardMinValueText, shardMaxValueText);
@ -295,11 +296,6 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
ShardPlacement *sourcePlacement = NULL;
foreach_ptr(sourcePlacement, sourceShardPlacementList)
{
if (sourcePlacement->shardState == SHARD_STATE_TO_DELETE)
{
continue;
}
int32 groupId = sourcePlacement->groupId;
const ShardState shardState = SHARD_STATE_ACTIVE;
const uint64 shardSize = 0;

View File

@ -450,7 +450,8 @@ DropTaskList(Oid relationId, char *schemaName, char *relationName,
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = ShardPlacementList(shardId);
task->taskPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
taskList = lappend(taskList, task);
}

View File

@ -748,7 +748,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
ddlCommandList);
/* after successful repair, we update shard state as healthy*/
List *placementList = ShardPlacementList(shardId);
List *placementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
ShardPlacement *placement = SearchShardPlacementInListOrError(placementList,
targetNodeName,
targetNodePort);
@ -1029,7 +1029,8 @@ static void
EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
const char *targetNodeName, int32 targetNodePort)
{
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
shardPlacementList,
@ -1061,7 +1062,7 @@ static void
EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
const char *targetNodeName, int32 targetNodePort)
{
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
shardPlacementList,
@ -1085,7 +1086,7 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo
* the shard.
*/
DropOrphanedShardsInSeparateTransaction();
shardPlacementList = ShardPlacementList(shardId);
shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
targetPlacement = SearchShardPlacementInList(shardPlacementList,
targetNodeName,
targetNodePort);
@ -1429,7 +1430,8 @@ DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
char *qualifiedTableName = ConstructQualifiedShardName(colocatedShard);
StringInfo dropQuery = makeStringInfo();
uint64 shardId = colocatedShard->shardId;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *placement =
SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);
@ -1442,9 +1444,9 @@ DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
/*
* MarkForDropColocatedShardPlacement marks the shard placement metadata for the given
* shard placement to be deleted in pg_dist_placement. The function does this for all
* colocated placements.
* MarkForDropColocatedShardPlacement marks the shard placement metadata for
* the given shard placement to be deleted in pg_dist_placement. The function
* does this for all colocated placements.
*/
static void
MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32
@ -1457,7 +1459,8 @@ MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName,
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 shardId = colocatedShard->shardId;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *placement =
SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort);

View File

@ -2767,8 +2767,10 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
static bool
CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval)
{
List *firstShardPlacementList = ShardPlacementList(firstInterval->shardId);
List *secondShardPlacementList = ShardPlacementList(secondInterval->shardId);
List *firstShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
firstInterval->shardId);
List *secondShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
secondInterval->shardId);
ListCell *firstShardPlacementCell = NULL;
ListCell *secondShardPlacementCell = NULL;

View File

@ -830,13 +830,8 @@ IsTableLocallyAccessible(Oid relationId)
ShardInterval *shardInterval = linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
ShardPlacement *localShardPlacement =
ShardPlacementOnGroup(shardId, GetLocalGroupId());
if (localShardPlacement != NULL)
{
/* the table has a placement on this node */
return true;
}
return false;
ActiveShardPlacementOnGroup(GetLocalGroupId(), shardId);
return localShardPlacement != NULL;
}
@ -1667,7 +1662,8 @@ RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
relationShard->relationId = distributedTableId;
modifyTask->relationShardList = list_make1(relationShard);
modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId);
modifyTask->taskPlacementList = ActiveShardPlacementList(
modifyRoute->shardId);
modifyTask->parametersInQueryStringResolved = parametersInQueryResolved;
insertTaskList = lappend(insertTaskList, modifyTask);

View File

@ -132,7 +132,7 @@ load_shard_placement_array(PG_FUNCTION_ARGS)
}
else
{
placementList = ShardPlacementList(shardId);
placementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
}
placementList = SortList(placementList, CompareShardPlacementsByWorker);

View File

@ -350,8 +350,10 @@ ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId)
leftRelationName, rightRelationName)));
}
List *leftPlacementList = ShardPlacementList(leftShardId);
List *rightPlacementList = ShardPlacementList(rightShardId);
List *leftPlacementList = ShardPlacementListWithoutOrphanedPlacements(
leftShardId);
List *rightPlacementList = ShardPlacementListWithoutOrphanedPlacements(
rightShardId);
if (list_length(leftPlacementList) != list_length(rightPlacementList))
{

View File

@ -344,7 +344,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
List *ddlCommandList =
CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort);
char *tableOwner = TableOwner(shardInterval->relationId);

View File

@ -476,7 +476,7 @@ SingleReplicatedTable(Oid relationId)
/* checking only for the first shard id should suffice */
uint64 shardId = *(uint64 *) linitial(shardList);
shardPlacementList = ShardPlacementList(shardId);
shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
if (list_length(shardPlacementList) != 1)
{
return false;
@ -489,7 +489,7 @@ SingleReplicatedTable(Oid relationId)
foreach_ptr(shardIdPointer, shardIntervalList)
{
uint64 shardId = *shardIdPointer;
shardPlacementList = ShardPlacementList(shardId);
shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
if (list_length(shardPlacementList) != 1)
{

View File

@ -120,7 +120,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
{
uint64 shardId = *shardIdPointer;
List *shardPlacementList = ShardPlacementList(shardId);
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *placement = NULL;
foreach_ptr(placement, shardPlacementList)
{

View File

@ -148,7 +148,9 @@ extern List * CitusTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern Oid RelationIdForShard(uint64 shardId);
extern bool ReferenceTableShardId(uint64 shardId);
extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId);
extern ShardPlacement * ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId,
uint64 shardId);
extern ShardPlacement * ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId);
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId);
@ -158,7 +160,7 @@ extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid,
extern int32 GetLocalGroupId(void);
extern void CitusTableCacheFlushInvalidatedEntries(void);
extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);
extern List * ShardPlacementList(uint64 shardId);
extern List * ShardPlacementListIncludingOrphanedPlacements(uint64 shardId);
extern bool ShardExists(int64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);

View File

@ -214,6 +214,7 @@ extern bool NodeGroupHasShardPlacements(int32 groupId,
bool onlyConsiderActivePlacements);
extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);
extern List * ActiveShardPlacementList(uint64 shardId);
extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId);
extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk);
extern List * BuildShardPlacementList(ShardInterval *shardInterval);
extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
@ -223,7 +224,6 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
SizeQueryType sizeQueryType,
bool optimizePartitionCalculations);
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
/* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,

View File

@ -0,0 +1,355 @@
CREATE SCHEMA ignoring_orphaned_shards;
SET search_path TO ignoring_orphaned_shards;
-- Use a weird shard count that we don't use in any other tests
SET citus.shard_count TO 13;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 92448000;
CREATE TABLE ref(id int PRIMARY KEY);
SELECT * FROM create_reference_table('ref');
create_reference_table
---------------------------------------------------------------------
(1 row)
SET citus.next_shard_id TO 92448100;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448100;
CREATE TABLE dist1(id int);
SELECT * FROM create_distributed_table('dist1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
dist1
(1 row)
-- Move first shard, so that the first shard now has 2 placements. One that's
-- active and one that's orphaned.
SELECT citus_move_shard_placement(92448100, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448100 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448100 | 4 | 57637
92448100 | 1 | 57638
(2 rows)
-- Add a new table that should get colocated with dist1 automatically, but
-- should not get a shard for the orphaned placement.
SET citus.next_shard_id TO 92448200;
CREATE TABLE dist2(id int);
SELECT * FROM create_distributed_table('dist2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
dist1
dist2
(2 rows)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448200 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448200 | 1 | 57638
(1 row)
-- uncolocate it
SELECT update_distributed_table_colocation('dist2', 'none');
update_distributed_table_colocation
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
dist1
(1 row)
-- Make sure we can add it back to the colocation, even though it has a
-- different number of shard placements for the first shard.
SELECT update_distributed_table_colocation('dist2', 'dist1');
update_distributed_table_colocation
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
dist1
dist2
(2 rows)
-- Make sure that replication count check in FOR UPDATE ignores orphaned
-- shards.
SELECT * FROM dist1 WHERE id = 1 FOR UPDATE;
id
---------------------------------------------------------------------
(0 rows)
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
INSERT INTO dist1 VALUES (1);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.dist1_92448100 (id) VALUES (1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
ROLLBACK;
NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-- Make sure we can create a foreign key on community edition, because
-- replication factor is 1
ALTER TABLE dist1
ADD CONSTRAINT dist1_ref_fk
FOREIGN KEY (id)
REFERENCES ref(id);
SET citus.shard_replication_factor TO 2;
SET citus.next_shard_id TO 92448300;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448300;
CREATE TABLE rep1(id int);
SELECT * FROM create_distributed_table('rep1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Add the coordinator, so we can have a replicated shard
SELECT 1 FROM citus_add_node('localhost', :master_port, 0);
NOTICE: Replicating reference table "ref" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
rep1
(1 row)
SELECT citus_move_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 4 | 57637
92448300 | 1 | 57638
92448300 | 1 | 57636
(3 rows)
-- Add a new table that should get colocated with rep1 automatically, but
-- should not get a shard for the orphaned placement.
SET citus.next_shard_id TO 92448400;
CREATE TABLE rep2(id int);
SELECT * FROM create_distributed_table('rep2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
rep1
rep2
(2 rows)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448400 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448400 | 1 | 57636
92448400 | 1 | 57638
(2 rows)
-- uncolocate it
SELECT update_distributed_table_colocation('rep2', 'none');
update_distributed_table_colocation
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
rep1
(1 row)
-- Make sure we can add it back to the colocation, even though it has a
-- different number of shard placements for the first shard.
SELECT update_distributed_table_colocation('rep2', 'rep1');
update_distributed_table_colocation
---------------------------------------------------------------------
(1 row)
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
logicalrelid
---------------------------------------------------------------------
rep1
rep2
(2 rows)
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = 92448300 AND groupid = 0;
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 4 | 57637
92448300 | 1 | 57638
92448300 | 3 | 57636
(3 rows)
-- cannot copy from an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
ERROR: source placement must be in active state
-- cannot copy to an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
ERROR: target placement must be in inactive state
-- can still copy to an inactive shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :master_port);
citus_copy_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 4 | 57637
92448300 | 1 | 57638
92448300 | 1 | 57636
(3 rows)
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.log_local_commands TO ON;
INSERT INTO rep1 VALUES (1);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: executing the command locally: INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
ROLLBACK;
NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-- Cause the orphaned shard to be local
SELECT 1 FROM citus_drain_node('localhost', :master_port);
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 1 | 57638
92448300 | 4 | 57636
92448300 | 1 | 57637
(3 rows)
-- Make sure we don't send a query to the orphaned shard if it's local
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.log_local_commands TO ON;
INSERT INTO rep1 VALUES (1);
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
ROLLBACK;
NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ROLLBACK
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 92448500;
CREATE TABLE range1(id int);
SELECT create_distributed_table('range1', 'id', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}');
-- Move shard placement and clean it up
SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 3 orphaned shards
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448300 | 1 | 57638
92448300 | 1 | 57637
(2 rows)
SET citus.next_shard_id TO 92448600;
CREATE TABLE range2(id int);
SELECT create_distributed_table('range2', 'id', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
-- Move shard placement and DON'T clean it up, now range1 and range2 are
-- colocated, but only range2 has an orphaned shard.
SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------
92448600 | 4 | 57638
92448600 | 1 | 57637
(2 rows)
-- Make sure that tables are detected as colocated
SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
id | id
---------------------------------------------------------------------
(0 rows)
-- Make sure we can create a foreign key on community edition, because
-- replication factor is 1
ALTER TABLE range1
ADD CONSTRAINT range1_ref_fk
FOREIGN KEY (id)
REFERENCES ref(id);
SET client_min_messages TO WARNING;
DROP SCHEMA ignoring_orphaned_shards CASCADE;

View File

@ -7,3 +7,4 @@ test: foreign_key_to_reference_shard_rebalance
test: multi_move_mx
test: shard_move_deferred_delete
test: multi_colocated_shard_rebalance
test: ignoring_orphaned_shards

View File

@ -0,0 +1,147 @@
CREATE SCHEMA ignoring_orphaned_shards;
SET search_path TO ignoring_orphaned_shards;
-- Use a weird shard count that we don't use in any other tests
SET citus.shard_count TO 13;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 92448000;
CREATE TABLE ref(id int PRIMARY KEY);
SELECT * FROM create_reference_table('ref');
SET citus.next_shard_id TO 92448100;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448100;
CREATE TABLE dist1(id int);
SELECT * FROM create_distributed_table('dist1', 'id');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
-- Move first shard, so that the first shard now has 2 placements. One that's
-- active and one that's orphaned.
SELECT citus_move_shard_placement(92448100, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448100 ORDER BY placementid;
-- Add a new table that should get colocated with dist1 automatically, but
-- should not get a shard for the orphaned placement.
SET citus.next_shard_id TO 92448200;
CREATE TABLE dist2(id int);
SELECT * FROM create_distributed_table('dist2', 'id');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448200 ORDER BY placementid;
-- uncolocate it
SELECT update_distributed_table_colocation('dist2', 'none');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
-- Make sure we can add it back to the colocation, even though it has a
-- different number of shard placements for the first shard.
SELECT update_distributed_table_colocation('dist2', 'dist1');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1;
-- Make sure that replication count check in FOR UPDATE ignores orphaned
-- shards.
SELECT * FROM dist1 WHERE id = 1 FOR UPDATE;
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
INSERT INTO dist1 VALUES (1);
ROLLBACK;
-- Make sure we can create a foreign key on community edition, because
-- replication factor is 1
ALTER TABLE dist1
ADD CONSTRAINT dist1_ref_fk
FOREIGN KEY (id)
REFERENCES ref(id);
SET citus.shard_replication_factor TO 2;
SET citus.next_shard_id TO 92448300;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448300;
CREATE TABLE rep1(id int);
SELECT * FROM create_distributed_table('rep1', 'id');
-- Add the coordinator, so we can have a replicated shard
SELECT 1 FROM citus_add_node('localhost', :master_port, 0);
SELECT 1 FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
SELECT citus_move_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
-- Add a new table that should get colocated with rep1 automatically, but
-- should not get a shard for the orphaned placement.
SET citus.next_shard_id TO 92448400;
CREATE TABLE rep2(id int);
SELECT * FROM create_distributed_table('rep2', 'id');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448400 ORDER BY placementid;
-- uncolocate it
SELECT update_distributed_table_colocation('rep2', 'none');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
-- Make sure we can add it back to the colocation, even though it has a
-- different number of shard placements for the first shard.
SELECT update_distributed_table_colocation('rep2', 'rep1');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = 92448300 AND groupid = 0;
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
-- cannot copy from an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
-- cannot copy to an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- can still copy to an inactive shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :master_port);
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.log_local_commands TO ON;
INSERT INTO rep1 VALUES (1);
ROLLBACK;
-- Cause the orphaned shard to be local
SELECT 1 FROM citus_drain_node('localhost', :master_port);
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
-- Make sure we don't send a query to the orphaned shard if it's local
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.log_local_commands TO ON;
INSERT INTO rep1 VALUES (1);
ROLLBACK;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 92448500;
CREATE TABLE range1(id int);
SELECT create_distributed_table('range1', 'id', 'range');
CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}');
-- Move shard placement and clean it up
SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
CALL citus_cleanup_orphaned_shards();
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
SET citus.next_shard_id TO 92448600;
CREATE TABLE range2(id int);
SELECT create_distributed_table('range2', 'id', 'range');
CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}');
-- Move shard placement and DON'T clean it up, now range1 and range2 are
-- colocated, but only range2 has an orphaned shard.
SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid;
-- Make sure that tables are detected as colocated
SELECT * FROM range1 JOIN range2 ON range1.id = range2.id;
-- Make sure we can create a foreign key on community edition, because
-- replication factor is 1
ALTER TABLE range1
ADD CONSTRAINT range1_ref_fk
FOREIGN KEY (id)
REFERENCES ref(id);
SET client_min_messages TO WARNING;
DROP SCHEMA ignoring_orphaned_shards CASCADE;