mirror of https://github.com/citusdata/citus.git
Remove dependency on placement caches from DROP
parent
81dcddd1ef
commit
48a9d68d75
|
@ -1054,6 +1054,61 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResolveGroupShardPlacementViaCatalog takes a GroupShardPlacement and adds additional
|
||||
* data to it, such as the node we should consider it to be on.
|
||||
*
|
||||
* Resides here because it shares code with ResolveGroupShardPlacement in this file.
|
||||
* Unlike ResolveGroupShardPlacement, this method does not access cache entries for Citus
|
||||
* tables.
|
||||
*/
|
||||
ShardPlacement *
|
||||
ResolveGroupShardPlacementViaCatalog(GroupShardPlacement *groupShardPlacement,
|
||||
Oid relationId,
|
||||
ShardInterval *shardInterval)
|
||||
{
|
||||
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
|
||||
int32 groupId = groupShardPlacement->groupId;
|
||||
WorkerNode *workerNode = LookupNodeForGroup(groupId);
|
||||
|
||||
/* copy everything into shardPlacement but preserve the header */
|
||||
CitusNode header = shardPlacement->type;
|
||||
GroupShardPlacement *shardPlacementAsGroupPlacement =
|
||||
(GroupShardPlacement *) shardPlacement;
|
||||
*shardPlacementAsGroupPlacement = *groupShardPlacement;
|
||||
shardPlacement->type = header;
|
||||
|
||||
SetPlacementNodeMetadata(shardPlacement, workerNode);
|
||||
|
||||
/* fill in remaining fields */
|
||||
char partitionMethod = PartitionMethodViaCatalog(relationId);
|
||||
Assert(partitionMethod != 0);
|
||||
shardPlacement->partitionMethod = partitionMethod;
|
||||
|
||||
uint32 colocationId = ColocationIdViaCatalog(relationId);
|
||||
shardPlacement->colocationGroupId = colocationId;
|
||||
|
||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
Assert(shardInterval->minValueExists);
|
||||
Assert(shardInterval->valueTypeId == INT4OID);
|
||||
|
||||
/*
|
||||
* Use the lower boundary of the interval's range to identify
|
||||
* it for colocation purposes. That remains meaningful even if
|
||||
* a concurrent session splits a shard.
|
||||
*/
|
||||
shardPlacement->representativeValue = DatumGetInt32(shardInterval->minValue);
|
||||
}
|
||||
else
|
||||
{
|
||||
shardPlacement->representativeValue = 0;
|
||||
}
|
||||
|
||||
return shardPlacement;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* HasAnyNodes returns whether there are any nodes in pg_dist_node.
|
||||
*/
|
||||
|
@ -1892,7 +1947,7 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
|
|||
cacheEntry->shardIntervalArrayLength++;
|
||||
|
||||
/* build list of shard placements */
|
||||
List *placementList = BuildShardPlacementList(shardId);
|
||||
List *placementList = BuildGroupShardPlacementList(shardId);
|
||||
int numberOfPlacements = list_length(placementList);
|
||||
|
||||
/* and copy that list into the cache entry */
|
||||
|
|
|
@ -1466,7 +1466,7 @@ ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId)
|
|||
|
||||
/*
|
||||
* ActiveShardPlacementList finds shard placements for the given shardId from
|
||||
* system catalogs, chooses placements that are in active state, and returns
|
||||
* metadata cache, chooses placements that are in active state, and returns
|
||||
* these shard placements in a new list.
|
||||
*/
|
||||
List *
|
||||
|
@ -1545,7 +1545,7 @@ ActiveShardPlacementWorkerNode(uint64 shardId)
|
|||
|
||||
|
||||
/*
|
||||
* BuildShardPlacementList finds shard placements for the given shardId from
|
||||
* BuildGroupShardPlacementList finds shard placements for the given shardId from
|
||||
* system catalogs, converts these placements to their in-memory
|
||||
* representation, and returns the converted shard placements in a new list.
|
||||
*
|
||||
|
@ -1553,9 +1553,9 @@ ActiveShardPlacementWorkerNode(uint64 shardId)
|
|||
* because it shares code with other routines in this file.
|
||||
*/
|
||||
List *
|
||||
BuildShardPlacementList(int64 shardId)
|
||||
BuildGroupShardPlacementList(int64 shardId)
|
||||
{
|
||||
List *shardPlacementList = NIL;
|
||||
List *groupShardPlacementList = NIL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
bool indexOK = true;
|
||||
|
@ -1575,10 +1575,10 @@ BuildShardPlacementList(int64 shardId)
|
|||
{
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgPlacement);
|
||||
|
||||
GroupShardPlacement *placement =
|
||||
GroupShardPlacement *groupShardPlacement =
|
||||
TupleToGroupShardPlacement(tupleDescriptor, heapTuple);
|
||||
|
||||
shardPlacementList = lappend(shardPlacementList, placement);
|
||||
groupShardPlacementList = lappend(groupShardPlacementList, groupShardPlacement);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
@ -1586,12 +1586,12 @@ BuildShardPlacementList(int64 shardId)
|
|||
systable_endscan(scanDescriptor);
|
||||
table_close(pgPlacement, NoLock);
|
||||
|
||||
return shardPlacementList;
|
||||
return groupShardPlacementList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildShardPlacementListForGroup finds shard placements for the given groupId
|
||||
* AllShardPlacementsOnNodeGroup finds shard placements for the given groupId
|
||||
* from system catalogs, converts these placements to their in-memory
|
||||
* representation, and returns the converted shard placements in a new list.
|
||||
*/
|
||||
|
|
|
@ -365,6 +365,18 @@ DropTaskList(Oid relationId, char *schemaName, char *relationName,
|
|||
CreateDropShardPlacementCommand(schemaName, shardRelationName,
|
||||
storageType);
|
||||
|
||||
/* build shard placement list using catalog only */
|
||||
List *groupShardPlacementList = BuildGroupShardPlacementList(shardId);
|
||||
List *shardPlacementList = NIL;
|
||||
GroupShardPlacement *groupShardPlacement = NULL;
|
||||
foreach_ptr(groupShardPlacement, groupShardPlacementList)
|
||||
{
|
||||
ShardPlacement *placement =
|
||||
ResolveGroupShardPlacementViaCatalog(groupShardPlacement, relationId,
|
||||
shardInterval);
|
||||
shardPlacementList = lappend(shardPlacementList, placement);
|
||||
}
|
||||
|
||||
Task *task = CitusMakeNode(Task);
|
||||
task->jobId = INVALID_JOB_ID;
|
||||
task->taskId = taskId++;
|
||||
|
@ -373,7 +385,7 @@ DropTaskList(Oid relationId, char *schemaName, char *relationName,
|
|||
task->dependentTaskList = NULL;
|
||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||
task->anchorShardId = shardId;
|
||||
task->taskPlacementList = ShardPlacementList(shardId);
|
||||
task->taskPlacementList = shardPlacementList;
|
||||
|
||||
taskList = lappend(taskList, task);
|
||||
}
|
||||
|
|
|
@ -170,6 +170,10 @@ extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId)
|
|||
extern CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
|
||||
extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32
|
||||
objsubid);
|
||||
extern ShardPlacement * ResolveGroupShardPlacementViaCatalog(
|
||||
GroupShardPlacement *groupShardPlacement,
|
||||
Oid relationId,
|
||||
ShardInterval *shardInterval);
|
||||
extern int32 GetLocalGroupId(void);
|
||||
extern int32 GetLocalNodeId(void);
|
||||
extern void CitusTableCacheFlushInvalidatedEntries(void);
|
||||
|
|
|
@ -296,7 +296,7 @@ extern List * ActiveShardPlacementList(uint64 shardId);
|
|||
extern List * ShardPlacementListSortedByWorker(uint64 shardId);
|
||||
extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk);
|
||||
extern WorkerNode * ActiveShardPlacementWorkerNode(uint64 shardId);
|
||||
extern List * BuildShardPlacementList(int64 shardId);
|
||||
extern List * BuildGroupShardPlacementList(int64 shardId);
|
||||
extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
|
||||
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
|
||||
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
||||
|
|
Loading…
Reference in New Issue