From 48a9d68d75af5d0bae5902ded58c16e19e957687 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Fri, 27 Jan 2023 12:03:18 +0300 Subject: [PATCH] Remove dependency on placement caches from DROP --- .../distributed/metadata/metadata_cache.c | 57 ++++++++++++++++++- .../distributed/metadata/metadata_utility.c | 16 +++--- .../distributed/operations/delete_protocol.c | 14 ++++- src/include/distributed/metadata_cache.h | 4 ++ src/include/distributed/metadata_utility.h | 2 +- 5 files changed, 82 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 8fd4c5de6..a0ac0d341 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -1054,6 +1054,61 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, } +/* + * ResolveGroupShardPlacementViaCatalog takes a GroupShardPlacement and adds additional + * data to it, such as the node we should consider it to be on. + * + * Resides here because it shares code with ResolveGroupShardPlacement in this file. + * Unlike ResolveGroupShardPlacement, this method does not access cache entries for Citus + * tables. + */ +ShardPlacement * +ResolveGroupShardPlacementViaCatalog(GroupShardPlacement *groupShardPlacement, + Oid relationId, + ShardInterval *shardInterval) +{ + ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); + int32 groupId = groupShardPlacement->groupId; + WorkerNode *workerNode = LookupNodeForGroup(groupId); + + /* copy everything into shardPlacement but preserve the header */ + CitusNode header = shardPlacement->type; + GroupShardPlacement *shardPlacementAsGroupPlacement = + (GroupShardPlacement *) shardPlacement; + *shardPlacementAsGroupPlacement = *groupShardPlacement; + shardPlacement->type = header; + + SetPlacementNodeMetadata(shardPlacement, workerNode); + + /* fill in remaining fields */ + char partitionMethod = PartitionMethodViaCatalog(relationId); + Assert(partitionMethod != 0); + shardPlacement->partitionMethod = partitionMethod; + + uint32 colocationId = ColocationIdViaCatalog(relationId); + shardPlacement->colocationGroupId = colocationId; + + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + Assert(shardInterval->minValueExists); + Assert(shardInterval->valueTypeId == INT4OID); + + /* + * Use the lower boundary of the interval's range to identify + * it for colocation purposes. That remains meaningful even if + * a concurrent session splits a shard. + */ + shardPlacement->representativeValue = DatumGetInt32(shardInterval->minValue); + } + else + { + shardPlacement->representativeValue = 0; + } + + return shardPlacement; +} + + /* * HasAnyNodes returns whether there are any nodes in pg_dist_node. */ @@ -1892,7 +1947,7 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry) cacheEntry->shardIntervalArrayLength++; /* build list of shard placements */ - List *placementList = BuildShardPlacementList(shardId); + List *placementList = BuildGroupShardPlacementList(shardId); int numberOfPlacements = list_length(placementList); /* and copy that list into the cache entry */ diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index dba509681..9c423c30e 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1466,7 +1466,7 @@ ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId) /* * ActiveShardPlacementList finds shard placements for the given shardId from - * system catalogs, chooses placements that are in active state, and returns + * metadata cache, chooses placements that are in active state, and returns * these shard placements in a new list. */ List * @@ -1545,7 +1545,7 @@ ActiveShardPlacementWorkerNode(uint64 shardId) /* - * BuildShardPlacementList finds shard placements for the given shardId from + * BuildGroupShardPlacementList finds shard placements for the given shardId from * system catalogs, converts these placements to their in-memory * representation, and returns the converted shard placements in a new list. * @@ -1553,9 +1553,9 @@ ActiveShardPlacementWorkerNode(uint64 shardId) * because it shares code with other routines in this file. */ List * -BuildShardPlacementList(int64 shardId) +BuildGroupShardPlacementList(int64 shardId) { - List *shardPlacementList = NIL; + List *groupShardPlacementList = NIL; ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; @@ -1575,10 +1575,10 @@ BuildShardPlacementList(int64 shardId) { TupleDesc tupleDescriptor = RelationGetDescr(pgPlacement); - GroupShardPlacement *placement = + GroupShardPlacement *groupShardPlacement = TupleToGroupShardPlacement(tupleDescriptor, heapTuple); - shardPlacementList = lappend(shardPlacementList, placement); + groupShardPlacementList = lappend(groupShardPlacementList, groupShardPlacement); heapTuple = systable_getnext(scanDescriptor); } @@ -1586,12 +1586,12 @@ BuildShardPlacementList(int64 shardId) systable_endscan(scanDescriptor); table_close(pgPlacement, NoLock); - return shardPlacementList; + return groupShardPlacementList; } /* - * BuildShardPlacementListForGroup finds shard placements for the given groupId + * AllShardPlacementsOnNodeGroup finds shard placements for the given groupId * from system catalogs, converts these placements to their in-memory * representation, and returns the converted shard placements in a new list. */ diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index abed39272..a9785be8c 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -365,6 +365,18 @@ DropTaskList(Oid relationId, char *schemaName, char *relationName, CreateDropShardPlacementCommand(schemaName, shardRelationName, storageType); + /* build shard placement list using catalog only */ + List *groupShardPlacementList = BuildGroupShardPlacementList(shardId); + List *shardPlacementList = NIL; + GroupShardPlacement *groupShardPlacement = NULL; + foreach_ptr(groupShardPlacement, groupShardPlacementList) + { + ShardPlacement *placement = + ResolveGroupShardPlacementViaCatalog(groupShardPlacement, relationId, + shardInterval); + shardPlacementList = lappend(shardPlacementList, placement); + } + Task *task = CitusMakeNode(Task); task->jobId = INVALID_JOB_ID; task->taskId = taskId++; @@ -373,7 +385,7 @@ DropTaskList(Oid relationId, char *schemaName, char *relationName, task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; - task->taskPlacementList = ShardPlacementList(shardId); + task->taskPlacementList = shardPlacementList; taskList = lappend(taskList, task); } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 07fa50e64..5f409b369 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -170,6 +170,10 @@ extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId) extern CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId); extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid); +extern ShardPlacement * ResolveGroupShardPlacementViaCatalog( + GroupShardPlacement *groupShardPlacement, + Oid relationId, + ShardInterval *shardInterval); extern int32 GetLocalGroupId(void); extern int32 GetLocalNodeId(void); extern void CitusTableCacheFlushInvalidatedEntries(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 82576d681..97472d037 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -296,7 +296,7 @@ extern List * ActiveShardPlacementList(uint64 shardId); extern List * ShardPlacementListSortedByWorker(uint64 shardId); extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk); extern WorkerNode * ActiveShardPlacementWorkerNode(uint64 shardId); -extern List * BuildShardPlacementList(int64 shardId); +extern List * BuildGroupShardPlacementList(int64 shardId); extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,