Blocking Split workflow works

pull/6029/head
Nitish Upreti 2022-06-23 18:35:34 -07:00
parent aa047bda16
commit 686ce21e80
3 changed files with 223 additions and 124 deletions

View File

@ -57,6 +57,10 @@ static ShardInterval * CreateSplitOffShardFromTemplate(ShardInterval *shardTempl
Oid relationId);
static List * SplitOffCommandList(ShardInterval *sourceShard,
ShardInterval *splitOffShard);
static void InsertSplitOffShardMetadata(List *splitOffShardList,
List *sourcePlacementList);
static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList);
static void ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList);
/*
* isolate_tenant_to_new_shard isolates a tenant to its own shard by spliting
@ -277,6 +281,94 @@ SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum)
}
/*
* CreateForeignConstraints creates the foreign constraints on the newly
* created shards via the tenant isolation.
*
* The function treats foreign keys to reference tables and foreign keys to
* co-located distributed tables differently. The former one needs to be
* executed over a single connection to prevent self-deadlocks. The latter
* one can be executed in parallel if there are multiple replicas.
*/
static void
CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList)
{
ListCell *splitOffShardCell = NULL;
List *colocatedShardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
foreach(splitOffShardCell, splitOffShardList)
{
ShardInterval *splitOffShard = (ShardInterval *) lfirst(splitOffShardCell);
List *currentColocatedForeignKeyList = NIL;
List *currentReferenceForeignKeyList = NIL;
CopyShardForeignConstraintCommandListGrouped(splitOffShard,
&currentColocatedForeignKeyList,
&currentReferenceForeignKeyList);
colocatedShardForeignConstraintCommandList =
list_concat(colocatedShardForeignConstraintCommandList,
currentColocatedForeignKeyList);
referenceTableForeignConstraintList =
list_concat(referenceTableForeignConstraintList,
currentReferenceForeignKeyList);
}
/*
* We can use parallel connections to while creating co-located foreign keys
* if the source placement .
* However, foreign keys to reference tables need to be created using a single
* connection per worker to prevent self-deadlocks.
*/
if (colocatedShardForeignConstraintCommandList != NIL)
{
ExecuteCommandListOnPlacements(colocatedShardForeignConstraintCommandList,
sourcePlacementList);
}
if (referenceTableForeignConstraintList != NIL)
{
ListCell *shardPlacementCell = NULL;
foreach(shardPlacementCell, sourcePlacementList)
{
ShardPlacement *shardPlacement =
(ShardPlacement *) lfirst(shardPlacementCell);
char *nodeName = shardPlacement->nodeName;
int32 nodePort = shardPlacement->nodePort;
/*
* We're using the connections that we've used for dropping the
* source placements within the same coordinated transaction.
*/
ExecuteCommandListOnWorker(nodeName, nodePort,
referenceTableForeignConstraintList);
}
}
}
/*
* ExecuteCommandListOnWorker executes the command on the given node within
* the coordinated 2PC.
*/
static void
ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList)
{
ListCell *commandCell = NULL;
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorker(nodeName, nodePort, command);
}
}
/*
* CreateSplitOffShards gets a shard and a hashed value to pick the split point.
* First, it creates templates to create new shards. Then, for every colocated
@ -457,3 +549,57 @@ SplitOffCommandList(ShardInterval *sourceShard, ShardInterval *splitOffShard)
return splitOffCommandList;
}
/*
* InsertSplitOffShardMetadata inserts new shard and shard placement data into
* catolog tables both the coordinator and mx nodes.
*/
static void
InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList)
{
List *syncedShardList = NIL;
ListCell *shardCell = NULL;
ListCell *commandCell = NULL;
/* add new metadata */
foreach(shardCell, splitOffShardList)
{
ShardInterval *splitOffShard = (ShardInterval *) lfirst(shardCell);
Oid relationId = splitOffShard->relationId;
uint64 shardId = splitOffShard->shardId;
char storageType = splitOffShard->storageType;
ListCell *shardPlacementCell = NULL;
int32 shardMinValue = DatumGetInt32(splitOffShard->minValue);
int32 shardMaxValue = DatumGetInt32(splitOffShard->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue);
InsertShardRow(relationId, shardId, storageType, shardMinValueText,
shardMaxValueText);
/* split off shard placement metadata */
foreach(shardPlacementCell, sourcePlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 shardSize = 0;
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, SHARD_STATE_ACTIVE,
shardSize, placement->groupId);
}
if (ShouldSyncTableMetadata(relationId))
{
syncedShardList = lappend(syncedShardList, splitOffShard);
}
}
/* send commands to synced nodes one by one */
List *splitOffShardMetadataCommandList = ShardListInsertCommand(syncedShardList);
foreach(commandCell, splitOffShardMetadataCommandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkersWithMetadata(command);
}
}

View File

@ -41,8 +41,7 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
static void CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
List **splitOffShardList);
List *workersForPlacementList);
static void CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerNode);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
@ -61,6 +60,8 @@ static void DoSplitCopy(WorkerNode *sourceShardNode,
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *workersForPlacementList);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList);
/* Customize error message strings based on operation type */
static const char *const SplitOperationName[] =
@ -413,13 +414,11 @@ BlockingShardSplit(SplitOperation splitOperation,
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */);
/* Physically create split children and perform split copy */
List *splitOffShardList = NULL;
CreateSplitShardsForShardGroup(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList,
&splitOffShardList);
workersForPlacementList);
/*
* Drop old shards and delete related metadata. Have to do that before
@ -428,15 +427,15 @@ BlockingShardSplit(SplitOperation splitOperation,
*/
DropShardList(sourceColocatedShardIntervalList);
/* insert new metadata */
InsertSplitOffShardMetadata(splitOffShardList, sourcePlacementList);
/* Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList);
/*
* Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitOffShardMetadata() because the foreign
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateForeignConstraints(splitOffShardList, sourcePlacementList);
CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList);
CitusInvalidateRelcacheByRelid(DistShardRelationId());
}
@ -447,8 +446,7 @@ static void
CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
List **splitOffShardList)
List *workersForPlacementList)
{
/* Iterate on shard intervals for shard group */
List *shardIntervalList = NULL;
@ -470,8 +468,6 @@ CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
(*splitOffShardList) = lappend(*splitOffShardList, shardInterval);
}
}
@ -479,7 +475,10 @@ CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList);
/* TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely? */
/*
* Create Indexes post copy.
* TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely
*/
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *shardInterval = NULL;
@ -677,59 +676,89 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard,
/*
* InsertSplitOffShardMetadata inserts new shard and shard placement data into
* catolog tables both the coordinator and mx nodes.
* Insert new shard and placement metadata.
*/
void
InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList)
static void
InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList)
{
List *syncedShardList = NIL;
ListCell *shardCell = NULL;
ListCell *commandCell = NULL;
/* add new metadata */
foreach(shardCell, splitOffShardList)
/* Iterate on shard intervals for shard group */
List *shardIntervalList = NULL;
List *syncedShardList = NULL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *splitOffShard = (ShardInterval *) lfirst(shardCell);
Oid relationId = splitOffShard->relationId;
uint64 shardId = splitOffShard->shardId;
char storageType = splitOffShard->storageType;
ListCell *shardPlacementCell = NULL;
int32 shardMinValue = DatumGetInt32(splitOffShard->minValue);
int32 shardMaxValue = DatumGetInt32(splitOffShard->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue);
InsertShardRow(relationId, shardId, storageType, shardMinValueText,
shardMaxValueText);
/* split off shard placement metadata */
foreach(shardPlacementCell, sourcePlacementList)
/* Iterate on split children shards along with the respective placement workers */
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 shardSize = 0;
InsertShardRow(
shardInterval->relationId,
shardInterval->shardId,
shardInterval->storageType,
IntegerToText(DatumGetInt32(shardInterval->minValue)),
IntegerToText(DatumGetInt32(shardInterval->maxValue)));
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, SHARD_STATE_ACTIVE,
shardSize, placement->groupId);
InsertShardPlacementRow(
shardInterval->shardId,
INVALID_PLACEMENT_ID, /* triggers generation of new id */
SHARD_STATE_ACTIVE,
0, /* shard length */
workerPlacementNode->groupId);
}
if (ShouldSyncTableMetadata(relationId))
if (ShouldSyncTableMetadata(shardInterval->relationId))
{
syncedShardList = lappend(syncedShardList, splitOffShard);
syncedShardList = lappend(syncedShardList, shardInterval);
}
}
/* send commands to synced nodes one by one */
List *splitOffShardMetadataCommandList = ShardListInsertCommand(syncedShardList);
foreach(commandCell, splitOffShardMetadataCommandList)
char *command = NULL;
foreach_ptr(command, splitOffShardMetadataCommandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkersWithMetadata(command);
}
}
/*
* Create foreign key constraints on the split children shards.
*/
static void
CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList)
{
/* Create constraints between shards */
List* shardIntervalList = NULL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
List *shardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
CopyShardForeignConstraintCommandListGrouped(shardInterval,
&shardForeignConstraintCommandList,
&referenceTableForeignConstraintList);
List *commandList = NIL;
commandList = list_concat(commandList, shardForeignConstraintCommandList);
commandList = list_concat(commandList, referenceTableForeignConstraintList);
SendCommandListToWorkerOutsideTransaction(
workerPlacementNode->workerName,
workerPlacementNode->workerPort,
TableOwner(shardInterval->relationId),
commandList);
}
}
}
/*
* DropShardList drops shards and their metadata from both the coordinator and
* mx nodes.
@ -796,79 +825,6 @@ DropShardList(List *shardIntervalList)
}
/*
* CreateForeignConstraints creates the foreign constraints on the newly
* created shards via the tenant isolation.
*
* The function treats foreign keys to reference tables and foreign keys to
* co-located distributed tables differently. The former one needs to be
* executed over a single connection to prevent self-deadlocks. The latter
* one can be executed in parallel if there are multiple replicas.
*/
void
CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList)
{
ListCell *splitOffShardCell = NULL;
List *colocatedShardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
foreach(splitOffShardCell, splitOffShardList)
{
ShardInterval *splitOffShard = (ShardInterval *) lfirst(splitOffShardCell);
List *currentColocatedForeignKeyList = NIL;
List *currentReferenceForeignKeyList = NIL;
CopyShardForeignConstraintCommandListGrouped(splitOffShard,
&currentColocatedForeignKeyList,
&currentReferenceForeignKeyList);
colocatedShardForeignConstraintCommandList =
list_concat(colocatedShardForeignConstraintCommandList,
currentColocatedForeignKeyList);
referenceTableForeignConstraintList =
list_concat(referenceTableForeignConstraintList,
currentReferenceForeignKeyList);
}
/*
* We can use parallel connections to while creating co-located foreign keys
* if the source placement .
* However, foreign keys to reference tables need to be created using a single
* connection per worker to prevent self-deadlocks.
*/
if (colocatedShardForeignConstraintCommandList != NIL)
{
ExecuteCommandListOnPlacements(colocatedShardForeignConstraintCommandList,
sourcePlacementList);
}
if (referenceTableForeignConstraintList != NIL)
{
ListCell *shardPlacementCell = NULL;
foreach(shardPlacementCell, sourcePlacementList)
{
ShardPlacement *shardPlacement =
(ShardPlacement *) lfirst(shardPlacementCell);
char *nodeName = shardPlacement->nodeName;
int32 nodePort = shardPlacement->nodePort;
/*
* We're using the connections that we've used for dropping the
* source placements within the same coordinated transaction.
*/
char *command = NULL;
foreach_ptr(command, referenceTableForeignConstraintList)
{
SendCommandToWorker(nodeName, nodePort, command);
}
}
}
}
/*
* ExecuteCommandListOnPlacements runs the given command list on the nodes of
* the given shard placement list. First, it creates connections. Then it sends

View File

@ -41,10 +41,7 @@ extern void SplitShard(SplitMode splitMode,
/* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */
extern void ErrorIfCannotSplitShard(SplitOperation splitOperation,
ShardInterval *sourceShard);
extern void InsertSplitOffShardMetadata(List *splitOffShardList,
List *sourcePlacementList);
extern void DropShardList(List *shardIntervalList);
extern void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList);
extern void ExecuteCommandListOnPlacements(List *commandList, List *placementList);
#endif /* SHARDSPLIT_H_ */