avoid rebuilding MetadataCache for each placement insertion (#7163)

pg16_grant_inherit_set
zhjwpku 2023-09-04 15:57:25 +08:00 committed by francisjodi
parent a62e6f615d
commit 6c0c3faebd
4 changed files with 60 additions and 33 deletions

View File

@ -1262,19 +1262,21 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
CreateTruncateTrigger(relationId); CreateTruncateTrigger(relationId);
} }
/* create shards for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED) if (tableType == HASH_DISTRIBUTED)
{ {
/* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
colocatedTableId, colocatedTableId,
localTableEmpty); localTableEmpty);
} }
else if (tableType == REFERENCE_TABLE) else if (tableType == REFERENCE_TABLE)
{ {
/* create shards for reference table */
CreateReferenceTableShard(relationId); CreateReferenceTableShard(relationId);
} }
else if (tableType == SINGLE_SHARD_DISTRIBUTED) else if (tableType == SINGLE_SHARD_DISTRIBUTED)
{ {
/* create the shard of given single-shard distributed table */
CreateSingleShardTableShard(relationId, colocatedTableId, CreateSingleShardTableShard(relationId, colocatedTableId,
colocationId); colocationId);
} }
@ -1900,7 +1902,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
/* /*
* CreateHashDistributedTableShards creates the shard of given single-shard * CreateSingleShardTableShard creates the shard of given single-shard
* distributed table. * distributed table.
*/ */
static void static void

View File

@ -83,6 +83,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL; List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;
/* make sure table is hash partitioned */ /* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId); CheckHashPartitionedTable(distributedTableId);
@ -174,7 +175,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* initialize the hash token space for this shard */ /* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId(); uint64 *shardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*shardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, shardIdPtr);
/* if we are at the last shard, make sure the max token value is INT_MAX */ /* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1)) if (shardIndex == (shardCount - 1))
@ -186,17 +189,27 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken); text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken);
InsertShardRow(distributedTableId, shardId, shardStorageType, InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType,
minHashTokenText, maxHashTokenText); minHashTokenText, maxHashTokenText);
List *currentInsertedShardPlacements = InsertShardPlacementRows( InsertShardPlacementRows(distributedTableId,
distributedTableId, *shardIdPtr,
shardId,
workerNodeList, workerNodeList,
roundRobinNodeIndex, roundRobinNodeIndex,
replicationFactor); replicationFactor);
}
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
foreach_ptr(shardIdPtr, insertedShardIds)
{
List *placementsForShard = ShardPlacementList(*shardIdPtr);
insertedShardPlacements = list_concat(insertedShardPlacements, insertedShardPlacements = list_concat(insertedShardPlacements,
currentInsertedShardPlacements); placementsForShard);
} }
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
@ -292,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
/* /*
* load shard placements for the shard at once after all placement insertions * load shard placements for the shard at once after all placement insertions
* finished. That prevents MetadataCache from rebuilding unnecessarily after * finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion. * each placement insertion.
*/ */
uint64 *shardIdPtr; uint64 *shardIdPtr;
@ -360,10 +373,19 @@ CreateReferenceTableShard(Oid distributedTableId)
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue); shardMaxValue);
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, InsertShardPlacementRows(distributedTableId,
nodeList, workerStartIndex, shardId,
nodeList,
workerStartIndex,
replicationFactor); replicationFactor);
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection); useExclusiveConnection);
} }
@ -408,13 +430,19 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
minHashTokenText, maxHashTokenText); minHashTokenText, maxHashTokenText);
int replicationFactor = 1; int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows( InsertShardPlacementRows(relationId,
relationId,
shardId, shardId,
workerNodeList, workerNodeList,
roundRobinNodeIdx, roundRobinNodeIdx,
replicationFactor); replicationFactor);
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);
/* /*
* We don't need to force using exclusive connections because we're anyway * We don't need to force using exclusive connections because we're anyway
* creating a single shard. * creating a single shard.

View File

@ -383,14 +383,13 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
/* /*
* InsertShardPlacementRows inserts shard placements to the metadata table on * InsertShardPlacementRows inserts shard placements to the metadata table on
* the coordinator node. Then, returns the list of added shard placements. * the coordinator node.
*/ */
List * void
InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
int workerStartIndex, int replicationFactor) int workerStartIndex, int replicationFactor)
{ {
int workerNodeCount = list_length(workerNodeList); int workerNodeCount = list_length(workerNodeList);
List *insertedShardPlacements = NIL;
for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++) for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++)
{ {
@ -399,13 +398,11 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
uint32 nodeGroupId = workerNode->groupId; uint32 nodeGroupId = workerNode->groupId;
const uint64 shardSize = 0; const uint64 shardSize = 0;
uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, InsertShardPlacementRow(shardId,
shardSize, nodeGroupId); INVALID_PLACEMENT_ID,
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, shardPlacementId); shardSize,
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement); nodeGroupId);
} }
return insertedShardPlacements;
} }

View File

@ -251,7 +251,7 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId
replicationFactor); replicationFactor);
extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection); bool useExclusiveConnection);
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId, extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex, List *workerNodeList, int workerStartIndex,
int replicationFactor); int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId); extern uint64 UpdateShardStatistics(int64 shardId);