16k in 1:42:xx

moonshot/60k-shards
Nils Dijk 2020-04-02 14:05:38 +02:00
parent d80baa3557
commit cc8c8cdef9
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
5 changed files with 192 additions and 12 deletions

View File

@ -108,7 +108,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
bool colocatedShard = false;
List *insertedShardPlacements = NIL;
/* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId);
@ -193,9 +192,13 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* set shard storage type according to relation type */
char shardStorageType = ShardStorageType(distributedTableId);
uint64 *shardIds = palloc0(shardCount * sizeof(uint64));
text **minHashTokenTexts = palloc0(shardCount * sizeof(text*));
text **maxHashTokenTexts = palloc0(shardCount * sizeof(text*));
for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
CHECK_FOR_INTERRUPTS();
/* initialize the hash token space for this shard */
int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement);
@ -212,19 +215,22 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken);
InsertShardRow(distributedTableId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText);
shardIds[shardIndex] = shardId;
minHashTokenTexts[shardIndex] = minHashTokenText;
maxHashTokenTexts[shardIndex] = maxHashTokenText;
List *currentInsertedShardPlacements = InsertShardPlacementRows(
distributedTableId,
shardId,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
insertedShardPlacements = list_concat(insertedShardPlacements,
currentInsertedShardPlacements);
}
InsertShardRows(distributedTableId, shardIds, shardStorageType, minHashTokenTexts, maxHashTokenTexts, shardCount);
List *insertedShardPlacements = InsertShardPlacementRowsBatch(
distributedTableId,
shardIds,
workerNodeList,
0,
replicationFactor,
shardCount);
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnections, colocatedShard);
}

View File

@ -916,6 +916,55 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
}
void
InsertShardRows(Oid relationId, uint64* shardIds, char storageType,
text *shardMinValues[], text *shardMaxValues[], int count)
{
Datum values[Natts_pg_dist_shard];
bool isNulls[Natts_pg_dist_shard];
/* open shard relation and insert new tuple */
Relation pgDistShard = heap_open(DistShardRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard);
for (int i = 0; i < count; i++)
{
/* form new shard tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_shard_logicalrelid - 1] = ObjectIdGetDatum(relationId);
values[Anum_pg_dist_shard_shardstorage - 1] = CharGetDatum(storageType);
values[Anum_pg_dist_shard_shardid - 1] = Int64GetDatum(shardIds[i]);
/* dropped shardalias column must also be set; it is still part of the tuple */
isNulls[Anum_pg_dist_shard_shardalias_DROPPED - 1] = true;
/* check if shard min/max values are null */
if (shardMinValues != NULL && shardMaxValues != NULL)
{
values[Anum_pg_dist_shard_shardminvalue - 1] = PointerGetDatum(shardMinValues[i]);
values[Anum_pg_dist_shard_shardmaxvalue - 1] = PointerGetDatum(shardMaxValues[i]);
}
else
{
isNulls[Anum_pg_dist_shard_shardminvalue - 1] = true;
isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true;
}
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistShard, heapTuple);
}
/* invalidate previous cache entry and close relation */
CitusInvalidateRelcacheByRelid(relationId);
CommandCounterIncrement();
heap_close(pgDistShard, NoLock);
}
/*
* InsertShardPlacementRow opens the shard placement system catalog, and inserts
* a new row with the given values into that system catalog. If placementId is
@ -961,6 +1010,66 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
}
/*
* InsertShardPlacementRow opens the shard placement system catalog, and inserts
* a new row with the given values into that system catalog. If placementId is
* INVALID_PLACEMENT_ID, a new placement id will be assigned.Then, returns the
* placement id of the added shard placement.
*/
uint64 *
InsertShardPlacementRowBatch(uint64 *shardIds, uint64 *placementIds,
char shardState, uint64 shardLength,
int32 *groupIds, int count)
{
Datum values[Natts_pg_dist_placement];
bool isNulls[Natts_pg_dist_placement];
/* open shard placement relation and insert new tuple */
Relation pgDistPlacement = heap_open(DistPlacementRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);
List *invalidateShardIds = NIL;
for (int i = 0; i < count; i++)
{
CHECK_FOR_INTERRUPTS();
/* form new shard placement tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
if (placementIds[i] == INVALID_PLACEMENT_ID)
{
placementIds[i] = master_get_new_placementid(NULL);
}
values[Anum_pg_dist_placement_placementid - 1] = Int64GetDatum(placementIds[i]);
values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardIds[i]);
values[Anum_pg_dist_placement_shardstate - 1] = CharGetDatum(shardState);
values[Anum_pg_dist_placement_shardlength - 1] = Int64GetDatum(shardLength);
values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupIds[i]);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistPlacement, heapTuple);
invalidateShardIds = list_append_unique_int(invalidateShardIds, shardIds[i]);
}
int shardId = 0;
foreach_int(shardId, invalidateShardIds)
{
CHECK_FOR_INTERRUPTS();
CitusInvalidateRelcacheByShardId(shardId);
}
list_free(invalidateShardIds);
CommandCounterIncrement();
heap_close(pgDistPlacement, NoLock);
return placementIds;
}
/*
* InsertIntoPgDistPartition inserts a new tuple into pg_dist_partition.
*/

View File

@ -472,6 +472,63 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
}
/*
* InsertShardPlacementRows inserts shard placements to the metadata table on
* the coordinator node. Then, returns the list of added shard placements.
*/
List *
InsertShardPlacementRowsBatch(Oid relationId, uint64 *shardIds, List *workerNodeList,
int workerStartIndex, int replicationFactor, int count)
{
int workerNodeCount = list_length(workerNodeList);
List *insertedShardPlacements = NIL;
int insertRowCount = count * replicationFactor;
uint64 *insertShardIds = palloc0(insertRowCount * sizeof(uint64));
uint64 *insertPlacementsIds = palloc0(insertRowCount * sizeof(uint64));
int32 *insertGroupIds = palloc0(insertRowCount * sizeof(int32));
for (int i = 0; i < count; i++)
{
CHECK_FOR_INTERRUPTS();
uint64 shardId = shardIds[i];
for (int attemptNumber = 0; attemptNumber < replicationFactor; attemptNumber++)
{
int insertIndex = i*replicationFactor + attemptNumber;
int workerNodeIndex = (workerStartIndex + i + attemptNumber) % workerNodeCount;
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex);
uint32 nodeGroupId = workerNode->groupId;
insertShardIds[insertIndex] = shardId;
insertPlacementsIds[insertIndex] = INVALID_PLACEMENT_ID;
insertGroupIds[insertIndex] = nodeGroupId;
}
}
uint64 *shardPlacementIds = InsertShardPlacementRowBatch(insertShardIds,
insertPlacementsIds,
SHARD_STATE_ACTIVE, 0,
insertGroupIds,
insertRowCount);
for (int i = 0; i < insertRowCount; i++)
{
CHECK_FOR_INTERRUPTS();
ShardPlacement *shardPlacement = LoadShardPlacement(insertShardIds[i],
shardPlacementIds[i]);
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
}
return insertedShardPlacements;
}
/*
* CreateShardsOnWorkers creates shards on worker nodes given the shard placements
* as a parameter The function creates the shards via the executor. This means

View File

@ -118,10 +118,15 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
/* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue);
extern void InsertShardRows(Oid relationId, uint64 *shardIds, char storageType,
text *shardMinValues[], text *shardMaxValues[], int count);
extern void DeleteShardRow(uint64 shardId);
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
int32 groupId);
extern uint64 *InsertShardPlacementRowBatch(uint64 *shardIds, uint64 *placementIds,
char shardState, uint64 shardLength,
int32 *groupIds, int count);
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId,
char replicationModel);

View File

@ -111,6 +111,9 @@ extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacemen
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern List *InsertShardPlacementRowsBatch(Oid relationId, uint64 *shardIds,
List *workerNodeList, int workerStartIndex,
int replicationFactor, int count);
extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor,