Refactor relation shard list creation during placement creation

This change is to make further refactoring even simpler such as
using the executor for shard creation.
pull/2769/head
Onder Kalaci 2019-06-18 11:57:21 +02:00
parent 4fd1fcbbef
commit 3a04374a9e
1 changed files with 101 additions and 10 deletions

View File

@ -58,6 +58,7 @@
/* Local functions forward declarations */ /* Local functions forward declarations */
static List * RelationShardListForShardCreate(ShardInterval *shardInterval);
static bool WorkerShardStats(ShardPlacement *placement, Oid relationId, static bool WorkerShardStats(ShardPlacement *placement, Oid relationId,
char *shardName, uint64 *shardSize, char *shardName, uint64 *shardSize,
text **shardMinValue, text **shardMaxValue); text **shardMinValue, text **shardMaxValue);
@ -533,6 +534,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 shardId = shardPlacement->shardId; uint64 shardId = shardPlacement->shardId;
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
List *relationShardList = NIL;
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
int shardIndex = -1; int shardIndex = -1;
List *commandList = NIL; List *commandList = NIL;
@ -549,18 +551,14 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
*/ */
if (ShouldRecordRelationAccess() && useExclusiveConnection && partitionTable) if (ShouldRecordRelationAccess() && useExclusiveConnection && partitionTable)
{ {
RelationShard *parentRelationShard = CitusMakeNode(RelationShard);
RelationShard *partitionRelationShard = CitusMakeNode(RelationShard);
List *relationShardList = NIL;
List *placementAccessList = NIL; List *placementAccessList = NIL;
parentRelationShard->relationId = PartitionParentOid(distributedRelationId); /* we only need to calculate this once per shardInterval, not placement */
parentRelationShard->shardId = if (relationShardList == NIL)
ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex); {
partitionRelationShard->relationId = distributedRelationId; relationShardList = RelationShardListForShardCreate(shardInterval);
partitionRelationShard->shardId = shardId; }
relationShardList = list_make2(parentRelationShard, partitionRelationShard);
placementAccessList = BuildPlacementDDLList(shardPlacement->groupId, placementAccessList = BuildPlacementDDLList(shardPlacement->groupId,
relationShardList); relationShardList);
@ -603,10 +601,103 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
} }
/*
* RelationShardListForShardCreate gets a shard interval and returns the placement
* accesses that would happen when a placement of the shard interval is created.
*/
static List *
RelationShardListForShardCreate(ShardInterval *shardInterval)
{
List *relationShardList = NIL;
RelationShard *relationShard = NULL;
Oid relationId = shardInterval->relationId;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey;
List *referencingRelationList = cacheEntry->referencingRelationsViaForeignKey;
List *allForeignKeyRelations =
list_concat_unique_oid(referencedRelationList, referencingRelationList);
int shardIndex = -1;
/* record the placement access of the shard itself */
relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = relationId;
relationShard->shardId = shardInterval->shardId;
relationShardList = list_make1(relationShard);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->colocationId != INVALID_COLOCATION_ID)
{
shardIndex = ShardIndex(shardInterval);
}
/* all foregin constraint relations */
if (allForeignKeyRelations != NIL)
{
ListCell *relationIdCell = NULL;
foreach(relationIdCell, allForeignKeyRelations)
{
Oid fkeyRelationid = lfirst_oid(relationIdCell);
RelationShard *fkeyRelationShard = NULL;
uint64 fkeyShardId = INVALID_SHARD_ID;
if (!IsDistributedTable(fkeyRelationid))
{
/* we're not interested in local tables */
continue;
}
if (PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_NONE)
{
fkeyShardId = GetFirstShardId(fkeyRelationid);
}
else if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_HASH)
{
/* hash distributed tables should be colocated to have fkey */
Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId);
fkeyShardId =
ColocatedShardIdInRelation(fkeyRelationid, shardIndex);
}
else
{
/*
* We currently do not support foreign keys from/to local tables or
* non-colocated tables when creating shards. Also note that shard
* creation via shard moves doesn't happen in a transaction block,
* so not relevant here.
*/
continue;
}
fkeyRelationShard = CitusMakeNode(RelationShard);
fkeyRelationShard->relationId = fkeyRelationid;
fkeyRelationShard->shardId = fkeyShardId;
relationShardList = lappend(relationShardList, fkeyRelationShard);
}
}
/* if partitioned table, make sure to record the parent table */
if (PartitionTable(relationId))
{
RelationShard *parentRelationShard = CitusMakeNode(RelationShard);
parentRelationShard->relationId = PartitionParentOid(relationId);
parentRelationShard->shardId =
ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex);
relationShardList = lappend(relationShardList, parentRelationShard);
}
return relationShardList;
}
/* /*
* WorkerCreateShardCommandList returns a list of DDL commands for the given * WorkerCreateShardCommandList returns a list of DDL commands for the given
* shardId to create the shard on the worker node. * shardId to create the shard on the worker node.
*
*/ */
List * List *
WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,