diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 128957c18..42012d43b 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -108,7 +108,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); bool colocatedShard = false; - List *insertedShardPlacements = NIL; /* make sure table is hash partitioned */ CheckHashPartitionedTable(distributedTableId); @@ -193,9 +192,13 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(distributedTableId); + uint64 *shardIds = palloc0(shardCount * sizeof(uint64)); + text **minHashTokenTexts = palloc0(shardCount * sizeof(text*)); + text **maxHashTokenTexts = palloc0(shardCount * sizeof(text*)); + for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) { - uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; + CHECK_FOR_INTERRUPTS(); /* initialize the hash token space for this shard */ int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); @@ -212,19 +215,22 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *minHashTokenText = IntegerToText(shardMinHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken); - InsertShardRow(distributedTableId, shardId, shardStorageType, - minHashTokenText, maxHashTokenText); + shardIds[shardIndex] = shardId; + minHashTokenTexts[shardIndex] = minHashTokenText; + maxHashTokenTexts[shardIndex] = maxHashTokenText; - List *currentInsertedShardPlacements = InsertShardPlacementRows( - distributedTableId, - shardId, - workerNodeList, - roundRobinNodeIndex, - replicationFactor); - insertedShardPlacements = list_concat(insertedShardPlacements, - currentInsertedShardPlacements); } + InsertShardRows(distributedTableId, shardIds, shardStorageType, minHashTokenTexts, maxHashTokenTexts, shardCount); + + List *insertedShardPlacements = InsertShardPlacementRowsBatch( + distributedTableId, + shardIds, + workerNodeList, + 0, + replicationFactor, + shardCount); + CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, useExclusiveConnections, colocatedShard); } diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index dabac087f..4a02078aa 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -916,6 +916,55 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, } +void +InsertShardRows(Oid relationId, uint64* shardIds, char storageType, + text *shardMinValues[], text *shardMaxValues[], int count) +{ + Datum values[Natts_pg_dist_shard]; + bool isNulls[Natts_pg_dist_shard]; + + /* open shard relation and insert new tuple */ + Relation pgDistShard = heap_open(DistShardRelationId(), RowExclusiveLock); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard); + + for (int i = 0; i < count; i++) + { + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_shard_logicalrelid - 1] = ObjectIdGetDatum(relationId); + values[Anum_pg_dist_shard_shardstorage - 1] = CharGetDatum(storageType); + values[Anum_pg_dist_shard_shardid - 1] = Int64GetDatum(shardIds[i]); + + /* dropped shardalias column must also be set; it is still part of the tuple */ + isNulls[Anum_pg_dist_shard_shardalias_DROPPED - 1] = true; + + /* check if shard min/max values are null */ + if (shardMinValues != NULL && shardMaxValues != NULL) + { + values[Anum_pg_dist_shard_shardminvalue - 1] = PointerGetDatum(shardMinValues[i]); + values[Anum_pg_dist_shard_shardmaxvalue - 1] = PointerGetDatum(shardMaxValues[i]); + } + else + { + isNulls[Anum_pg_dist_shard_shardminvalue - 1] = true; + isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true; + } + + HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + CatalogTupleInsert(pgDistShard, heapTuple); + } + + /* invalidate previous cache entry and close relation */ + CitusInvalidateRelcacheByRelid(relationId); + + CommandCounterIncrement(); + heap_close(pgDistShard, NoLock); +} + + /* * InsertShardPlacementRow opens the shard placement system catalog, and inserts * a new row with the given values into that system catalog. If placementId is @@ -961,6 +1010,66 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId, } +/* + * InsertShardPlacementRow opens the shard placement system catalog, and inserts + * a new row with the given values into that system catalog. If placementId is + * INVALID_PLACEMENT_ID, a new placement id will be assigned.Then, returns the + * placement id of the added shard placement. + */ +uint64 * +InsertShardPlacementRowBatch(uint64 *shardIds, uint64 *placementIds, + char shardState, uint64 shardLength, + int32 *groupIds, int count) +{ + Datum values[Natts_pg_dist_placement]; + bool isNulls[Natts_pg_dist_placement]; + + /* open shard placement relation and insert new tuple */ + Relation pgDistPlacement = heap_open(DistPlacementRelationId(), RowExclusiveLock); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); + + List *invalidateShardIds = NIL; + + for (int i = 0; i < count; i++) + { + CHECK_FOR_INTERRUPTS(); + + /* form new shard placement tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + if (placementIds[i] == INVALID_PLACEMENT_ID) + { + placementIds[i] = master_get_new_placementid(NULL); + } + values[Anum_pg_dist_placement_placementid - 1] = Int64GetDatum(placementIds[i]); + values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardIds[i]); + values[Anum_pg_dist_placement_shardstate - 1] = CharGetDatum(shardState); + values[Anum_pg_dist_placement_shardlength - 1] = Int64GetDatum(shardLength); + values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupIds[i]); + + HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + CatalogTupleInsert(pgDistPlacement, heapTuple); + + invalidateShardIds = list_append_unique_int(invalidateShardIds, shardIds[i]); + } + + int shardId = 0; + foreach_int(shardId, invalidateShardIds) + { + CHECK_FOR_INTERRUPTS(); + CitusInvalidateRelcacheByShardId(shardId); + } + list_free(invalidateShardIds); + + CommandCounterIncrement(); + heap_close(pgDistPlacement, NoLock); + + return placementIds; +} + + /* * InsertIntoPgDistPartition inserts a new tuple into pg_dist_partition. */ diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 32d8d805a..75ae190f8 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -472,6 +472,63 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, } +/* + * InsertShardPlacementRows inserts shard placements to the metadata table on + * the coordinator node. Then, returns the list of added shard placements. + */ +List * +InsertShardPlacementRowsBatch(Oid relationId, uint64 *shardIds, List *workerNodeList, + int workerStartIndex, int replicationFactor, int count) +{ + int workerNodeCount = list_length(workerNodeList); + List *insertedShardPlacements = NIL; + + int insertRowCount = count * replicationFactor; + uint64 *insertShardIds = palloc0(insertRowCount * sizeof(uint64)); + uint64 *insertPlacementsIds = palloc0(insertRowCount * sizeof(uint64)); + int32 *insertGroupIds = palloc0(insertRowCount * sizeof(int32)); + + for (int i = 0; i < count; i++) + { + CHECK_FOR_INTERRUPTS(); + + uint64 shardId = shardIds[i]; + + for (int attemptNumber = 0; attemptNumber < replicationFactor; attemptNumber++) + { + int insertIndex = i*replicationFactor + attemptNumber; + + int workerNodeIndex = (workerStartIndex + i + attemptNumber) % workerNodeCount; + WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex); + uint32 nodeGroupId = workerNode->groupId; + + insertShardIds[insertIndex] = shardId; + insertPlacementsIds[insertIndex] = INVALID_PLACEMENT_ID; + insertGroupIds[insertIndex] = nodeGroupId; + + } + } + + + uint64 *shardPlacementIds = InsertShardPlacementRowBatch(insertShardIds, + insertPlacementsIds, + SHARD_STATE_ACTIVE, 0, + insertGroupIds, + insertRowCount); + for (int i = 0; i < insertRowCount; i++) + { + CHECK_FOR_INTERRUPTS(); + + ShardPlacement *shardPlacement = LoadShardPlacement(insertShardIds[i], + shardPlacementIds[i]); + insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement); + } + + + return insertedShardPlacements; +} + + /* * CreateShardsOnWorkers creates shards on worker nodes given the shard placements * as a parameter The function creates the shards via the executor. This means diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 4c3c2b5ac..c5e2fb385 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -118,10 +118,15 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, text *shardMinValue, text *shardMaxValue); +extern void InsertShardRows(Oid relationId, uint64 *shardIds, char storageType, + text *shardMinValues[], text *shardMaxValues[], int count); extern void DeleteShardRow(uint64 shardId); extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, char shardState, uint64 shardLength, int32 groupId); +extern uint64 *InsertShardPlacementRowBatch(uint64 *shardIds, uint64 *placementIds, + char shardState, uint64 shardLength, + int32 *groupIds, int count); extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId, char replicationModel); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index b9f5b2671..7c6703b4e 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -111,6 +111,9 @@ extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacemen extern List * InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, int workerStartIndex, int replicationFactor); +extern List *InsertShardPlacementRowsBatch(Oid relationId, uint64 *shardIds, + List *workerNodeList, int workerStartIndex, + int replicationFactor, int count); extern uint64 UpdateShardStatistics(int64 shardId); extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, int32 replicationFactor,