avoid rebuilding MetadataCache for each placement insertion (#7163)

pull/7161/head
zhjwpku 2023-09-04 15:57:25 +08:00 committed by GitHub
parent 5034f8eba5
commit 9fd4ef042f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 33 deletions

View File

@ -1262,19 +1262,21 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
CreateTruncateTrigger(relationId);
}
/* create shards for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED)
{
/* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
colocatedTableId,
localTableEmpty);
}
else if (tableType == REFERENCE_TABLE)
{
/* create shards for reference table */
CreateReferenceTableShard(relationId);
}
else if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
/* create the shard of given single-shard distributed table */
CreateSingleShardTableShard(relationId, colocatedTableId,
colocationId);
}
@ -1900,7 +1902,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
/*
* CreateHashDistributedTableShards creates the shard of given single-shard
* CreateSingleShardTableShard creates the shard of given single-shard
* distributed table.
*/
static void

View File

@ -83,6 +83,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;
/* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId);
@ -174,7 +175,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId();
uint64 *shardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*shardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, shardIdPtr);
/* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1))
@ -186,17 +189,27 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken);
InsertShardRow(distributedTableId, shardId, shardStorageType,
InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType,
minHashTokenText, maxHashTokenText);
List *currentInsertedShardPlacements = InsertShardPlacementRows(
distributedTableId,
shardId,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
*shardIdPtr,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
}
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
foreach_ptr(shardIdPtr, insertedShardIds)
{
List *placementsForShard = ShardPlacementList(*shardIdPtr);
insertedShardPlacements = list_concat(insertedShardPlacements,
currentInsertedShardPlacements);
placementsForShard);
}
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
@ -292,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
/*
* load shard placements for the shard at once after all placement insertions
* finished. That prevents MetadataCache from rebuilding unnecessarily after
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
@ -360,9 +373,18 @@ CreateReferenceTableShard(Oid distributedTableId)
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue);
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
nodeList, workerStartIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
shardId,
nodeList,
workerStartIndex,
replicationFactor);
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection);
@ -408,12 +430,18 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
minHashTokenText, maxHashTokenText);
int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows(
relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);
InsertShardPlacementRows(relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);
/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);
/*
* We don't need to force using exclusive connections because we're anyway

View File

@ -383,14 +383,13 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
/*
* InsertShardPlacementRows inserts shard placements to the metadata table on
* the coordinator node. Then, returns the list of added shard placements.
* the coordinator node.
*/
List *
void
InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
int workerStartIndex, int replicationFactor)
{
int workerNodeCount = list_length(workerNodeList);
List *insertedShardPlacements = NIL;
for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++)
{
@ -399,13 +398,11 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
uint32 nodeGroupId = workerNode->groupId;
const uint64 shardSize = 0;
uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID,
shardSize, nodeGroupId);
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, shardPlacementId);
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
InsertShardPlacementRow(shardId,
INVALID_PLACEMENT_ID,
shardSize,
nodeGroupId);
}
return insertedShardPlacements;
}

View File

@ -251,9 +251,9 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId
replicationFactor);
extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection);
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor,