From e7ed16c2964c3720bc6d7c69fbc8d04cc6ea79b1 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Fri, 21 May 2021 11:21:22 +0300 Subject: [PATCH] Not include to-be-deleted shards while finding shard placements Ignore orphaned shards in more places Only use active shard placements in RouterInsertTaskList Use IncludingOrphanedPlacements in some more places Fix comment Add tests --- src/backend/distributed/commands/multi_copy.c | 2 +- .../locally_reserved_shared_connections.c | 2 +- .../distributed/executor/placement_access.c | 4 +- .../distributed/metadata/metadata_cache.c | 38 +- .../distributed/metadata/metadata_utility.c | 55 +-- .../distributed/operations/create_shards.c | 8 +- .../distributed/operations/delete_protocol.c | 3 +- .../distributed/operations/repair_shards.c | 21 +- .../planner/multi_physical_planner.c | 6 +- .../planner/multi_router_planner.c | 5 +- .../distributed/test/distribution_metadata.c | 2 +- .../distributed/utils/colocation_utils.c | 6 +- .../distributed/utils/reference_table_utils.c | 2 +- .../distributed/utils/shardinterval_utils.c | 4 +- .../distributed/worker/worker_drop_protocol.c | 2 +- src/include/distributed/metadata_cache.h | 6 +- src/include/distributed/metadata_utility.h | 2 +- .../expected/ignoring_orphaned_shards.out | 355 ++++++++++++++++++ src/test/regress/operations_schedule | 1 + .../regress/sql/ignoring_orphaned_shards.sql | 147 ++++++++ 20 files changed, 605 insertions(+), 66 deletions(-) create mode 100644 src/test/regress/expected/ignoring_orphaned_shards.out create mode 100644 src/test/regress/sql/ignoring_orphaned_shards.sql diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 7d5370389..98449d0d3 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2139,7 +2139,7 @@ ShardIntervalListHasLocalPlacements(List *shardIntervalList) ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) { - if (FindShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL) + if (ActiveShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL) { return true; } diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index efe14b2ad..f9c04be47 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -377,7 +377,7 @@ EnsureConnectionPossibilityForNodeList(List *nodeList) /* * EnsureConnectionPossibilityForNode reserves a shared connection * counter per node in the nodeList unless: - * - Reservation is possible/allowed (see IsReservationPossible()) + * - Reservation is not possible/allowed (see IsReservationPossible()) * - there is at least one connection to the node so that we are guranteed * to get a connection * - An earlier call already reserved a connection (e.g., we allow only a diff --git a/src/backend/distributed/executor/placement_access.c b/src/backend/distributed/executor/placement_access.c index 557dbcad6..df5143a54 100644 --- a/src/backend/distributed/executor/placement_access.c +++ b/src/backend/distributed/executor/placement_access.c @@ -128,8 +128,8 @@ BuildPlacementAccessList(int32 groupId, List *relationShardList, RelationShard *relationShard = NULL; foreach_ptr(relationShard, relationShardList) { - ShardPlacement *placement = FindShardPlacementOnGroup(groupId, - relationShard->shardId); + ShardPlacement *placement = ActiveShardPlacementOnGroup(groupId, + relationShard->shardId); if (placement == NULL) { continue; diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 2d7dab121..2ca01f449 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -594,12 +594,14 @@ LoadShardPlacement(uint64 shardId, uint64 placementId) /* - * FindShardPlacementOnGroup returns the shard placement for the given shard - * on the given group, or returns NULL if no placement for the shard exists - * on the group. + * ShardPlacementOnGroupIncludingOrphanedPlacements returns the shard placement + * for the given shard on the given group, or returns NULL if no placement for + * the shard exists on the group. + * + * NOTE: This can return inactive or orphaned placements. */ ShardPlacement * -FindShardPlacementOnGroup(int32 groupId, uint64 shardId) +ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, uint64 shardId) { ShardPlacement *placementOnNode = NULL; @@ -614,7 +616,6 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId) for (int placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) { GroupShardPlacement *placement = &placementArray[placementIndex]; - if (placement->groupId == groupId) { placementOnNode = ResolveGroupShardPlacement(placement, tableEntry, @@ -627,6 +628,28 @@ FindShardPlacementOnGroup(int32 groupId, uint64 shardId) } +/* + * ActiveShardPlacementOnGroup returns the active shard placement for the + * given shard on the given group, or returns NULL if no active placement for + * the shard exists on the group. + */ +ShardPlacement * +ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId) +{ + ShardPlacement *placement = + ShardPlacementOnGroupIncludingOrphanedPlacements(groupId, shardId); + if (placement == NULL) + { + return NULL; + } + if (placement->shardState != SHARD_STATE_ACTIVE) + { + return NULL; + } + return placement; +} + + /* * ResolveGroupShardPlacement takes a GroupShardPlacement and adds additional data to it, * such as the node we should consider it to be on. @@ -791,13 +814,14 @@ LookupNodeForGroup(int32 groupId) /* * ShardPlacementList returns the list of placements for the given shard from - * the cache. + * the cache. This list includes placements that are orphaned, because they + * their deletion is postponed to a later point (shardstate = 4). * * The returned list is deep copied from the cache and thus can be modified * and pfree()d freely. */ List * -ShardPlacementList(uint64 shardId) +ShardPlacementListIncludingOrphanedPlacements(uint64 shardId) { List *placementList = NIL; diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 9b4a7025a..552ca1833 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1092,7 +1092,7 @@ TableShardReplicationFactor(Oid relationId) { uint64 shardId = shardInterval->shardId; - List *shardPlacementList = ShardPlacementList(shardId); + List *shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId); uint32 shardPlacementCount = list_length(shardPlacementList); /* @@ -1392,7 +1392,8 @@ List * ActiveShardPlacementList(uint64 shardId) { List *activePlacementList = NIL; - List *shardPlacementList = ShardPlacementList(shardId); + List *shardPlacementList = + ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *shardPlacement = NULL; foreach_ptr(shardPlacement, shardPlacementList) @@ -1407,6 +1408,31 @@ ActiveShardPlacementList(uint64 shardId) } +/* + * ShardPlacementListWithoutOrphanedPlacements returns shard placements exluding + * the ones that are orphaned, because they are marked to be deleted at a later + * point (shardstate = 4). + */ +List * +ShardPlacementListWithoutOrphanedPlacements(uint64 shardId) +{ + List *activePlacementList = NIL; + List *shardPlacementList = + ShardPlacementListIncludingOrphanedPlacements(shardId); + + ShardPlacement *shardPlacement = NULL; + foreach_ptr(shardPlacement, shardPlacementList) + { + if (shardPlacement->shardState != SHARD_STATE_TO_DELETE) + { + activePlacementList = lappend(activePlacementList, shardPlacement); + } + } + + return SortList(activePlacementList, CompareShardPlacementsByWorker); +} + + /* * ActiveShardPlacement finds a shard placement for the given shardId from * system catalog, chooses a placement that is in active state and returns @@ -1944,7 +1970,8 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s ColocatedShardIdInRelation(partitionOid, parentShardInterval->shardIndex); ShardPlacement *partitionPlacement = - ShardPlacementOnGroup(partitionShardId, parentShardPlacement->groupId); + ShardPlacementOnGroupIncludingOrphanedPlacements( + parentShardPlacement->groupId, partitionShardId); /* the partition should have a placement with the same group */ Assert(partitionPlacement != NULL); @@ -1954,28 +1981,6 @@ UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlacement, char s } -/* - * ShardPlacementOnGroup gets a shardInterval and a groupId, returns a placement - * of the shard on the given group. If no such placement exists, the function - * return NULL. - */ -ShardPlacement * -ShardPlacementOnGroup(uint64 shardId, int groupId) -{ - List *placementList = ShardPlacementList(shardId); - ShardPlacement *placement = NULL; - foreach_ptr(placement, placementList) - { - if (placement->groupId == groupId) - { - return placement; - } - } - - return NULL; -} - - /* * MarkShardPlacementInactive is a wrapper around UpdateShardPlacementState where * the state is set to SHARD_STATE_INACTIVE. It also marks partitions of the diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 65cba434e..c43da76aa 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -287,7 +287,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); text *shardMinValueText = IntegerToText(shardMinValue); text *shardMaxValueText = IntegerToText(shardMaxValue); - List *sourceShardPlacementList = ShardPlacementList(sourceShardId); + List *sourceShardPlacementList = ShardPlacementListWithoutOrphanedPlacements( + sourceShardId); InsertShardRow(targetRelationId, newShardId, targetShardStorageType, shardMinValueText, shardMaxValueText); @@ -295,11 +296,6 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool ShardPlacement *sourcePlacement = NULL; foreach_ptr(sourcePlacement, sourceShardPlacementList) { - if (sourcePlacement->shardState == SHARD_STATE_TO_DELETE) - { - continue; - } - int32 groupId = sourcePlacement->groupId; const ShardState shardState = SHARD_STATE_ACTIVE; const uint64 shardSize = 0; diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index 7543beaa6..ab5ff7be6 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -450,7 +450,8 @@ DropTaskList(Oid relationId, char *schemaName, char *relationName, task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; - task->taskPlacementList = ShardPlacementList(shardId); + task->taskPlacementList = + ShardPlacementListIncludingOrphanedPlacements(shardId); taskList = lappend(taskList, task); } diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 548218cfd..6eea51964 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -748,7 +748,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode ddlCommandList); /* after successful repair, we update shard state as healthy*/ - List *placementList = ShardPlacementList(shardId); + List *placementList = ShardPlacementListWithoutOrphanedPlacements(shardId); ShardPlacement *placement = SearchShardPlacementInListOrError(placementList, targetNodeName, targetNodePort); @@ -1029,7 +1029,8 @@ static void EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort) { - List *shardPlacementList = ShardPlacementList(shardId); + List *shardPlacementList = + ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError( shardPlacementList, @@ -1061,7 +1062,7 @@ static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort) { - List *shardPlacementList = ShardPlacementList(shardId); + List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError( shardPlacementList, @@ -1085,7 +1086,7 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo * the shard. */ DropOrphanedShardsInSeparateTransaction(); - shardPlacementList = ShardPlacementList(shardId); + shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); targetPlacement = SearchShardPlacementInList(shardPlacementList, targetNodeName, targetNodePort); @@ -1429,7 +1430,8 @@ DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32 char *qualifiedTableName = ConstructQualifiedShardName(colocatedShard); StringInfo dropQuery = makeStringInfo(); uint64 shardId = colocatedShard->shardId; - List *shardPlacementList = ShardPlacementList(shardId); + List *shardPlacementList = + ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *placement = SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort); @@ -1442,9 +1444,9 @@ DropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32 /* - * MarkForDropColocatedShardPlacement marks the shard placement metadata for the given - * shard placement to be deleted in pg_dist_placement. The function does this for all - * colocated placements. + * MarkForDropColocatedShardPlacement marks the shard placement metadata for + * the given shard placement to be deleted in pg_dist_placement. The function + * does this for all colocated placements. */ static void MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, int32 @@ -1457,7 +1459,8 @@ MarkForDropColocatedShardPlacement(ShardInterval *shardInterval, char *nodeName, { ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); uint64 shardId = colocatedShard->shardId; - List *shardPlacementList = ShardPlacementList(shardId); + List *shardPlacementList = + ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *placement = SearchShardPlacementInListOrError(shardPlacementList, nodeName, nodePort); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 07180aa58..73ec0c3b7 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2767,8 +2767,10 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) static bool CoPlacedShardIntervals(ShardInterval *firstInterval, ShardInterval *secondInterval) { - List *firstShardPlacementList = ShardPlacementList(firstInterval->shardId); - List *secondShardPlacementList = ShardPlacementList(secondInterval->shardId); + List *firstShardPlacementList = ShardPlacementListWithoutOrphanedPlacements( + firstInterval->shardId); + List *secondShardPlacementList = ShardPlacementListWithoutOrphanedPlacements( + secondInterval->shardId); ListCell *firstShardPlacementCell = NULL; ListCell *secondShardPlacementCell = NULL; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index ee7268214..ba1003c03 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -834,7 +834,7 @@ IsLocallyAccessibleCitusLocalTable(Oid relationId) ShardInterval *shardInterval = linitial(shardIntervalList); uint64 shardId = shardInterval->shardId; ShardPlacement *localShardPlacement = - ShardPlacementOnGroup(shardId, GetLocalGroupId()); + ActiveShardPlacementOnGroup(GetLocalGroupId(), shardId); return localShardPlacement != NULL; } @@ -1666,7 +1666,8 @@ RouterInsertTaskList(Query *query, bool parametersInQueryResolved, relationShard->relationId = distributedTableId; modifyTask->relationShardList = list_make1(relationShard); - modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId); + modifyTask->taskPlacementList = ActiveShardPlacementList( + modifyRoute->shardId); modifyTask->parametersInQueryStringResolved = parametersInQueryResolved; insertTaskList = lappend(insertTaskList, modifyTask); diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index e99bd405e..97809c02f 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -132,7 +132,7 @@ load_shard_placement_array(PG_FUNCTION_ARGS) } else { - placementList = ShardPlacementList(shardId); + placementList = ShardPlacementListIncludingOrphanedPlacements(shardId); } placementList = SortList(placementList, CompareShardPlacementsByWorker); diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 91c9c2882..979f24b8f 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -350,8 +350,10 @@ ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId) leftRelationName, rightRelationName))); } - List *leftPlacementList = ShardPlacementList(leftShardId); - List *rightPlacementList = ShardPlacementList(rightShardId); + List *leftPlacementList = ShardPlacementListWithoutOrphanedPlacements( + leftShardId); + List *rightPlacementList = ShardPlacementListWithoutOrphanedPlacements( + rightShardId); if (list_length(leftPlacementList) != list_length(rightPlacementList)) { diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 671346f36..cafa2b328 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -347,7 +347,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData); - List *shardPlacementList = ShardPlacementList(shardId); + List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, nodeName, nodePort); char *tableOwner = TableOwner(shardInterval->relationId); diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index 6b429e366..87391e9b1 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -476,7 +476,7 @@ SingleReplicatedTable(Oid relationId) /* checking only for the first shard id should suffice */ uint64 shardId = *(uint64 *) linitial(shardList); - shardPlacementList = ShardPlacementList(shardId); + shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId); if (list_length(shardPlacementList) != 1) { return false; @@ -489,7 +489,7 @@ SingleReplicatedTable(Oid relationId) foreach_ptr(shardIdPointer, shardIntervalList) { uint64 shardId = *shardIdPointer; - shardPlacementList = ShardPlacementList(shardId); + shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId); if (list_length(shardPlacementList) != 1) { diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 165eb13d1..2a1d431c0 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -120,7 +120,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS) { uint64 shardId = *shardIdPointer; - List *shardPlacementList = ShardPlacementList(shardId); + List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *placement = NULL; foreach_ptr(placement, shardPlacementList) { diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index c90af0554..43af99a27 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -148,7 +148,9 @@ extern List * CitusTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); extern Oid RelationIdForShard(uint64 shardId); extern bool ReferenceTableShardId(uint64 shardId); -extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId); +extern ShardPlacement * ShardPlacementOnGroupIncludingOrphanedPlacements(int32 groupId, + uint64 shardId); +extern ShardPlacement * ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId); extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId); @@ -158,7 +160,7 @@ extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, extern int32 GetLocalGroupId(void); extern void CitusTableCacheFlushInvalidatedEntries(void); extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok); -extern List * ShardPlacementList(uint64 shardId); +extern List * ShardPlacementListIncludingOrphanedPlacements(uint64 shardId); extern bool ShardExists(int64 shardId); extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 306d414f9..76f3dd65e 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -214,6 +214,7 @@ extern bool NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements); extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId); extern List * ActiveShardPlacementList(uint64 shardId); +extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId); extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk); extern List * BuildShardPlacementList(ShardInterval *shardInterval); extern List * AllShardPlacementsOnNodeGroup(int32 groupId); @@ -223,7 +224,6 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, SizeQueryType sizeQueryType, bool optimizePartitionCalculations); extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); -extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, diff --git a/src/test/regress/expected/ignoring_orphaned_shards.out b/src/test/regress/expected/ignoring_orphaned_shards.out new file mode 100644 index 000000000..a73dc8906 --- /dev/null +++ b/src/test/regress/expected/ignoring_orphaned_shards.out @@ -0,0 +1,355 @@ +CREATE SCHEMA ignoring_orphaned_shards; +SET search_path TO ignoring_orphaned_shards; +-- Use a weird shard count that we don't use in any other tests +SET citus.shard_count TO 13; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 92448000; +CREATE TABLE ref(id int PRIMARY KEY); +SELECT * FROM create_reference_table('ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SET citus.next_shard_id TO 92448100; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448100; +CREATE TABLE dist1(id int); +SELECT * FROM create_distributed_table('dist1', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1; + logicalrelid +--------------------------------------------------------------------- + dist1 +(1 row) + +-- Move first shard, so that the first shard now has 2 placements. One that's +-- active and one that's orphaned. +SELECT citus_move_shard_placement(92448100, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448100 ORDER BY placementid; + shardid | shardstate | nodeport +--------------------------------------------------------------------- + 92448100 | 4 | 57637 + 92448100 | 1 | 57638 +(2 rows) + +-- Add a new table that should get colocated with dist1 automatically, but +-- should not get a shard for the orphaned placement. +SET citus.next_shard_id TO 92448200; +CREATE TABLE dist2(id int); +SELECT * FROM create_distributed_table('dist2', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1; + logicalrelid +--------------------------------------------------------------------- + dist1 + dist2 +(2 rows) + +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448200 ORDER BY placementid; + shardid | shardstate | nodeport +--------------------------------------------------------------------- + 92448200 | 1 | 57638 +(1 row) + +-- uncolocate it +SELECT update_distributed_table_colocation('dist2', 'none'); + update_distributed_table_colocation +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1; + logicalrelid +--------------------------------------------------------------------- + dist1 +(1 row) + +-- Make sure we can add it back to the colocation, even though it has a +-- different number of shard placements for the first shard. +SELECT update_distributed_table_colocation('dist2', 'dist1'); + update_distributed_table_colocation +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1; + logicalrelid +--------------------------------------------------------------------- + dist1 + dist2 +(2 rows) + +-- Make sure that replication count check in FOR UPDATE ignores orphaned +-- shards. +SELECT * FROM dist1 WHERE id = 1 FOR UPDATE; + id +--------------------------------------------------------------------- +(0 rows) + +-- Make sure we don't send a query to the orphaned shard +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +INSERT INTO dist1 VALUES (1); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing INSERT INTO ignoring_orphaned_shards.dist1_92448100 (id) VALUES (1) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +ROLLBACK; +NOTICE: issuing ROLLBACK +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +-- Make sure we can create a foreign key on community edition, because +-- replication factor is 1 +ALTER TABLE dist1 +ADD CONSTRAINT dist1_ref_fk +FOREIGN KEY (id) +REFERENCES ref(id); +SET citus.shard_replication_factor TO 2; +SET citus.next_shard_id TO 92448300; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448300; +CREATE TABLE rep1(id int); +SELECT * FROM create_distributed_table('rep1', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Add the coordinator, so we can have a replicated shard +SELECT 1 FROM citus_add_node('localhost', :master_port, 0); +NOTICE: Replicating reference table "ref" to the node localhost:xxxxx + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1; + logicalrelid +--------------------------------------------------------------------- + rep1 +(1 row) + +SELECT citus_move_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + shardid | shardstate | nodeport +--------------------------------------------------------------------- + 92448300 | 4 | 57637 + 92448300 | 1 | 57638 + 92448300 | 1 | 57636 +(3 rows) + +-- Add a new table that should get colocated with rep1 automatically, but +-- should not get a shard for the orphaned placement. +SET citus.next_shard_id TO 92448400; +CREATE TABLE rep2(id int); +SELECT * FROM create_distributed_table('rep2', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1; + logicalrelid +--------------------------------------------------------------------- + rep1 + rep2 +(2 rows) + +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448400 ORDER BY placementid; + shardid | shardstate | nodeport +--------------------------------------------------------------------- + 92448400 | 1 | 57636 + 92448400 | 1 | 57638 +(2 rows) + +-- uncolocate it +SELECT update_distributed_table_colocation('rep2', 'none'); + update_distributed_table_colocation +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1; + logicalrelid +--------------------------------------------------------------------- + rep1 +(1 row) + +-- Make sure we can add it back to the colocation, even though it has a +-- different number of shard placements for the first shard. +SELECT update_distributed_table_colocation('rep2', 'rep1'); + update_distributed_table_colocation +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1; + logicalrelid +--------------------------------------------------------------------- + rep1 + rep2 +(2 rows) + +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = 92448300 AND groupid = 0; +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + shardid | shardstate | nodeport +--------------------------------------------------------------------- + 92448300 | 4 | 57637 + 92448300 | 1 | 57638 + 92448300 | 3 | 57636 +(3 rows) + +-- cannot copy from an orphaned shard +SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port); +ERROR: source placement must be in active state +-- cannot copy to an orphaned shard +SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :worker_1_port); +ERROR: target placement must be in inactive state +-- can still copy to an inactive shard +SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :master_port); + citus_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + shardid | shardstate | nodeport +--------------------------------------------------------------------- + 92448300 | 4 | 57637 + 92448300 | 1 | 57638 + 92448300 | 1 | 57636 +(3 rows) + +-- Make sure we don't send a query to the orphaned shard +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +SET LOCAL citus.log_local_commands TO ON; +INSERT INTO rep1 VALUES (1); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: executing the command locally: INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1) +ROLLBACK; +NOTICE: issuing ROLLBACK +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +-- Cause the orphaned shard to be local +SELECT 1 FROM citus_drain_node('localhost', :master_port); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + shardid | shardstate | nodeport +--------------------------------------------------------------------- + 92448300 | 1 | 57638 + 92448300 | 4 | 57636 + 92448300 | 1 | 57637 +(3 rows) + +-- Make sure we don't send a query to the orphaned shard if it's local +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +SET LOCAL citus.log_local_commands TO ON; +INSERT INTO rep1 VALUES (1); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing INSERT INTO ignoring_orphaned_shards.rep1_92448300 (id) VALUES (1) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +ROLLBACK; +NOTICE: issuing ROLLBACK +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ROLLBACK +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 92448500; +CREATE TABLE range1(id int); +SELECT create_distributed_table('range1', 'id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}'); +-- Move shard placement and clean it up +SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 3 orphaned shards +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + shardid | shardstate | nodeport +--------------------------------------------------------------------- + 92448300 | 1 | 57638 + 92448300 | 1 | 57637 +(2 rows) + +SET citus.next_shard_id TO 92448600; +CREATE TABLE range2(id int); +SELECT create_distributed_table('range2', 'id', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); +-- Move shard placement and DON'T clean it up, now range1 and range2 are +-- colocated, but only range2 has an orphaned shard. +SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid; + shardid | shardstate | nodeport +--------------------------------------------------------------------- + 92448600 | 4 | 57638 + 92448600 | 1 | 57637 +(2 rows) + +-- Make sure that tables are detected as colocated +SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; + id | id +--------------------------------------------------------------------- +(0 rows) + +-- Make sure we can create a foreign key on community edition, because +-- replication factor is 1 +ALTER TABLE range1 +ADD CONSTRAINT range1_ref_fk +FOREIGN KEY (id) +REFERENCES ref(id); +SET client_min_messages TO WARNING; +DROP SCHEMA ignoring_orphaned_shards CASCADE; diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 4e526e19d..8b9032eeb 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -7,3 +7,4 @@ test: foreign_key_to_reference_shard_rebalance test: multi_move_mx test: shard_move_deferred_delete test: multi_colocated_shard_rebalance +test: ignoring_orphaned_shards diff --git a/src/test/regress/sql/ignoring_orphaned_shards.sql b/src/test/regress/sql/ignoring_orphaned_shards.sql new file mode 100644 index 000000000..a3aa74db9 --- /dev/null +++ b/src/test/regress/sql/ignoring_orphaned_shards.sql @@ -0,0 +1,147 @@ +CREATE SCHEMA ignoring_orphaned_shards; +SET search_path TO ignoring_orphaned_shards; +-- Use a weird shard count that we don't use in any other tests +SET citus.shard_count TO 13; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 92448000; + +CREATE TABLE ref(id int PRIMARY KEY); +SELECT * FROM create_reference_table('ref'); + +SET citus.next_shard_id TO 92448100; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448100; + +CREATE TABLE dist1(id int); +SELECT * FROM create_distributed_table('dist1', 'id'); +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1; + +-- Move first shard, so that the first shard now has 2 placements. One that's +-- active and one that's orphaned. +SELECT citus_move_shard_placement(92448100, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes'); +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448100 ORDER BY placementid; + +-- Add a new table that should get colocated with dist1 automatically, but +-- should not get a shard for the orphaned placement. +SET citus.next_shard_id TO 92448200; +CREATE TABLE dist2(id int); +SELECT * FROM create_distributed_table('dist2', 'id'); +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1; +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448200 ORDER BY placementid; + +-- uncolocate it +SELECT update_distributed_table_colocation('dist2', 'none'); +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1; +-- Make sure we can add it back to the colocation, even though it has a +-- different number of shard placements for the first shard. +SELECT update_distributed_table_colocation('dist2', 'dist1'); +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448100 ORDER BY 1; + +-- Make sure that replication count check in FOR UPDATE ignores orphaned +-- shards. +SELECT * FROM dist1 WHERE id = 1 FOR UPDATE; +-- Make sure we don't send a query to the orphaned shard +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +INSERT INTO dist1 VALUES (1); +ROLLBACK; + +-- Make sure we can create a foreign key on community edition, because +-- replication factor is 1 +ALTER TABLE dist1 +ADD CONSTRAINT dist1_ref_fk +FOREIGN KEY (id) +REFERENCES ref(id); + +SET citus.shard_replication_factor TO 2; +SET citus.next_shard_id TO 92448300; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 92448300; +CREATE TABLE rep1(id int); +SELECT * FROM create_distributed_table('rep1', 'id'); + +-- Add the coordinator, so we can have a replicated shard +SELECT 1 FROM citus_add_node('localhost', :master_port, 0); +SELECT 1 FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true); +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1; + +SELECT citus_move_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port); +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + +-- Add a new table that should get colocated with rep1 automatically, but +-- should not get a shard for the orphaned placement. +SET citus.next_shard_id TO 92448400; +CREATE TABLE rep2(id int); +SELECT * FROM create_distributed_table('rep2', 'id'); +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1; +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448400 ORDER BY placementid; + +-- uncolocate it +SELECT update_distributed_table_colocation('rep2', 'none'); +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1; +-- Make sure we can add it back to the colocation, even though it has a +-- different number of shard placements for the first shard. +SELECT update_distributed_table_colocation('rep2', 'rep1'); +SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1; + +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = 92448300 AND groupid = 0; +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + +-- cannot copy from an orphaned shard +SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port); +-- cannot copy to an orphaned shard +SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :worker_1_port); +-- can still copy to an inactive shard +SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_2_port, 'localhost', :master_port); +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + +-- Make sure we don't send a query to the orphaned shard +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +SET LOCAL citus.log_local_commands TO ON; +INSERT INTO rep1 VALUES (1); +ROLLBACK; + +-- Cause the orphaned shard to be local +SELECT 1 FROM citus_drain_node('localhost', :master_port); +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + +-- Make sure we don't send a query to the orphaned shard if it's local +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +SET LOCAL citus.log_local_commands TO ON; +INSERT INTO rep1 VALUES (1); +ROLLBACK; + + +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 92448500; +CREATE TABLE range1(id int); +SELECT create_distributed_table('range1', 'id', 'range'); +CALL public.create_range_partitioned_shards('range1', '{0,3}','{2,5}'); + +-- Move shard placement and clean it up +SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); +CALL citus_cleanup_orphaned_shards(); +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; + +SET citus.next_shard_id TO 92448600; +CREATE TABLE range2(id int); +SELECT create_distributed_table('range2', 'id', 'range'); +CALL public.create_range_partitioned_shards('range2', '{0,3}','{2,5}'); + +-- Move shard placement and DON'T clean it up, now range1 and range2 are +-- colocated, but only range2 has an orphaned shard. +SELECT citus_move_shard_placement(92448600, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); +SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448600 ORDER BY placementid; + +-- Make sure that tables are detected as colocated +SELECT * FROM range1 JOIN range2 ON range1.id = range2.id; + +-- Make sure we can create a foreign key on community edition, because +-- replication factor is 1 +ALTER TABLE range1 +ADD CONSTRAINT range1_ref_fk +FOREIGN KEY (id) +REFERENCES ref(id); + +SET client_min_messages TO WARNING; +DROP SCHEMA ignoring_orphaned_shards CASCADE;