From 6c0c3faebdc028826fb742b9a093fdbe059bd7da Mon Sep 17 00:00:00 2001 From: zhjwpku Date: Mon, 4 Sep 2023 15:57:25 +0800 Subject: [PATCH] avoid rebuilding MetadataCache for each placement insertion (#7163) --- .../commands/create_distributed_table.c | 6 +- .../distributed/operations/create_shards.c | 66 +++++++++++++------ .../distributed/operations/stage_protocol.c | 15 ++--- .../distributed/coordinator_protocol.h | 6 +- 4 files changed, 60 insertions(+), 33 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 086d9360e..3b993250f 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1262,19 +1262,21 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, CreateTruncateTrigger(relationId); } - /* create shards for hash distributed and reference tables */ if (tableType == HASH_DISTRIBUTED) { + /* create shards for hash distributed table */ CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, colocatedTableId, localTableEmpty); } else if (tableType == REFERENCE_TABLE) { + /* create shards for reference table */ CreateReferenceTableShard(relationId); } else if (tableType == SINGLE_SHARD_DISTRIBUTED) { + /* create the shard of given single-shard distributed table */ CreateSingleShardTableShard(relationId, colocatedTableId, colocationId); } @@ -1900,7 +1902,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, /* - * CreateHashDistributedTableShards creates the shard of given single-shard + * CreateSingleShardTableShard creates the shard of given single-shard * distributed table. */ static void diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 26946515b..d0fcc9612 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -83,6 +83,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); List *insertedShardPlacements = NIL; + List *insertedShardIds = NIL; /* make sure table is hash partitioned */ CheckHashPartitionedTable(distributedTableId); @@ -174,7 +175,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* initialize the hash token space for this shard */ int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); - uint64 shardId = GetNextShardId(); + uint64 *shardIdPtr = (uint64 *) palloc0(sizeof(uint64)); + *shardIdPtr = GetNextShardId(); + insertedShardIds = lappend(insertedShardIds, shardIdPtr); /* if we are at the last shard, make sure the max token value is INT_MAX */ if (shardIndex == (shardCount - 1)) @@ -186,17 +189,27 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *minHashTokenText = IntegerToText(shardMinHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken); - InsertShardRow(distributedTableId, shardId, shardStorageType, + InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType, minHashTokenText, maxHashTokenText); - List *currentInsertedShardPlacements = InsertShardPlacementRows( - distributedTableId, - shardId, - workerNodeList, - roundRobinNodeIndex, - replicationFactor); + InsertShardPlacementRows(distributedTableId, + *shardIdPtr, + workerNodeList, + roundRobinNodeIndex, + replicationFactor); + } + + /* + * load shard placements for the shard at once after all placement insertions + * finished. This prevents MetadataCache from rebuilding unnecessarily after + * each placement insertion. + */ + uint64 *shardIdPtr; + foreach_ptr(shardIdPtr, insertedShardIds) + { + List *placementsForShard = ShardPlacementList(*shardIdPtr); insertedShardPlacements = list_concat(insertedShardPlacements, - currentInsertedShardPlacements); + placementsForShard); } CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, @@ -292,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool /* * load shard placements for the shard at once after all placement insertions - * finished. That prevents MetadataCache from rebuilding unnecessarily after + * finished. This prevents MetadataCache from rebuilding unnecessarily after * each placement insertion. */ uint64 *shardIdPtr; @@ -360,9 +373,18 @@ CreateReferenceTableShard(Oid distributedTableId) InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, shardMaxValue); - List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, - nodeList, workerStartIndex, - replicationFactor); + InsertShardPlacementRows(distributedTableId, + shardId, + nodeList, + workerStartIndex, + replicationFactor); + + /* + * load shard placements for the shard at once after all placement insertions + * finished. This prevents MetadataCache from rebuilding unnecessarily after + * each placement insertion. + */ + List *insertedShardPlacements = ShardPlacementList(shardId); CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, useExclusiveConnection); @@ -408,12 +430,18 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio minHashTokenText, maxHashTokenText); int replicationFactor = 1; - List *insertedShardPlacements = InsertShardPlacementRows( - relationId, - shardId, - workerNodeList, - roundRobinNodeIdx, - replicationFactor); + InsertShardPlacementRows(relationId, + shardId, + workerNodeList, + roundRobinNodeIdx, + replicationFactor); + + /* + * load shard placements for the shard at once after all placement insertions + * finished. This prevents MetadataCache from rebuilding unnecessarily after + * each placement insertion. + */ + List *insertedShardPlacements = ShardPlacementList(shardId); /* * We don't need to force using exclusive connections because we're anyway diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 402f949b2..421593c66 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -383,14 +383,13 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, /* * InsertShardPlacementRows inserts shard placements to the metadata table on - * the coordinator node. Then, returns the list of added shard placements. + * the coordinator node. */ -List * +void InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, int workerStartIndex, int replicationFactor) { int workerNodeCount = list_length(workerNodeList); - List *insertedShardPlacements = NIL; for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++) { @@ -399,13 +398,11 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, uint32 nodeGroupId = workerNode->groupId; const uint64 shardSize = 0; - uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, - shardSize, nodeGroupId); - ShardPlacement *shardPlacement = LoadShardPlacement(shardId, shardPlacementId); - insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement); + InsertShardPlacementRow(shardId, + INVALID_PLACEMENT_ID, + shardSize, + nodeGroupId); } - - return insertedShardPlacements; } diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 4f8f12580..0dcc66141 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -251,9 +251,9 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId replicationFactor); extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, bool useExclusiveConnection); -extern List * InsertShardPlacementRows(Oid relationId, int64 shardId, - List *workerNodeList, int workerStartIndex, - int replicationFactor); +extern void InsertShardPlacementRows(Oid relationId, int64 shardId, + List *workerNodeList, int workerStartIndex, + int replicationFactor); extern uint64 UpdateShardStatistics(int64 shardId); extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, int32 replicationFactor,