implement shardgroups on reference tables;

backup/feature/shardgroup
Nils Dijk 2022-11-25 14:54:12 +01:00
parent d2c73bcbbf
commit acac3090ae
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
3 changed files with 91 additions and 17 deletions

View File

@ -1185,7 +1185,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
}
else if (tableType == REFERENCE_TABLE)
{
CreateReferenceTableShard(relationId);
CreateReferenceTableShard(relationId, colocatedTableId, colocationId);
}
if (ShouldSyncTableMetadata(relationId))

View File

@ -350,7 +350,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
* Also, the shard is replicated to the all active nodes in the cluster.
*/
void
CreateReferenceTableShard(Oid distributedTableId)
CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId,
uint32 colocationId)
{
int workerStartIndex = 0;
text *shardMinValue = NULL;
@ -382,24 +383,96 @@ CreateReferenceTableShard(Oid distributedTableId)
tableName)));
}
/*
* load and sort the worker node list for deterministic placements
* create_reference_table has already acquired pg_dist_node lock
*/
List *nodeList = ReferenceTablePlacementNodeList(ShareLock);
nodeList = SortList(nodeList, CompareWorkerNodes);
List *insertedShardPlacements = NIL;
if (!OidIsValid(colocatedTableId))
{
/* create first reference table, place them on all active nodes */
int replicationFactor = list_length(nodeList);
/*
* load and sort the worker node list for deterministic placements
* create_reference_table has already acquired pg_dist_node lock
*/
List *nodeList = ReferenceTablePlacementNodeList(ShareLock);
nodeList = SortList(nodeList, CompareWorkerNodes);
/* get the next shard id */
uint64 shardId = GetNextShardId();
int replicationFactor = list_length(nodeList);
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue, NULL);
/* get the next shard id */
uint64 shardId = GetNextShardId();
uint64 shardGroupId = shardId;
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
nodeList, workerStartIndex,
replicationFactor);
StringInfoData shardgroupQuery = { 0 };
initStringInfo(&shardgroupQuery);
appendStringInfoString(&shardgroupQuery,
"WITH shardgroup_data(shardgroupid, colocationid, "
"shardminvalue, shardmaxvalue) AS (VALUES ");
InsertShardGroupRow(shardGroupId, colocationId, shardMinValue, shardMaxValue);
appendStringInfo(&shardgroupQuery, "(%ld, %d, %s, %s)",
shardGroupId,
colocationId,
TextToSQLLiteral(shardMinValue),
TextToSQLLiteral(shardMaxValue));
appendStringInfoString(&shardgroupQuery, ") ");
appendStringInfoString(&shardgroupQuery,
"SELECT pg_catalog.citus_internal_add_shardgroup_metadata("
"shardgroupid, colocationid, shardminvalue, shardmaxvalue)"
"FROM shardgroup_data;");
SendCommandToWorkersWithMetadata(shardgroupQuery.data);
InsertShardRow(distributedTableId, shardId, shardStorageType,
shardMinValue, shardMaxValue, &shardGroupId);
insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
nodeList, workerStartIndex,
replicationFactor);
}
else
{
/* add reference table as colocated table to already existing reference table */
/* prevent placement changes of the source relation until we colocate with them */
List *sourceShardIntervalList = LoadShardIntervalList(colocatedTableId);
LockShardListMetadata(sourceShardIntervalList, ShareLock);
if (list_length(sourceShardIntervalList) != 1)
{
elog(ERROR, "colocating a reference table to a table with shardcount > 1");
}
ShardInterval *sourceInterval =
(ShardInterval *) linitial(sourceShardIntervalList);
uint64 newShardId = GetNextShardId();
InsertShardRow(distributedTableId, newShardId, shardStorageType,
shardMinValue, shardMaxValue, &sourceInterval->shardGroupId);
List *sourceShardPlacementList =
ShardPlacementList(sourceInterval->shardId);
ShardPlacement *sourceShardPlacement = NULL;
foreach_ptr(sourceShardPlacement, sourceShardPlacementList)
{
int32 groupId = sourceShardPlacement->groupId;
const uint64 shardSize = 0;
/*
* Optimistically add shard placement row the pg_dist_shard_placement, in case
* of any error it will be roll-backed.
*/
uint64 shardPlacementId = InsertShardPlacementRow(newShardId,
INVALID_PLACEMENT_ID,
shardSize,
groupId);
ShardPlacement *shardPlacement =
LoadShardPlacement(newShardId, shardPlacementId);
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
}
}
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection, colocatedShard);

View File

@ -262,7 +262,8 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard
bool useExclusiveConnections);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
bool useExclusiveConnections);
extern void CreateReferenceTableShard(Oid distributedTableId);
extern void CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId,
uint32 colocationId);
extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList,
List *foreignConstraintCommandList);