Parallel PK creation

parallelize-pk-creation
Jelte Fennema 2022-09-22 17:05:35 +02:00
parent bae4b47c2f
commit 12b6bbd77a
2 changed files with 47 additions and 4 deletions

View File

@ -643,15 +643,55 @@ PrepareReplicationSubscriptionList(List *shardList)
void
CreateReplicaIdentities(List *logicalRepTargetList)
{
List *taskList = NIL;
LogicalRepTarget *target = NULL;
foreach_ptr(target, logicalRepTargetList)
{
MultiConnection *superuserConnection = target->superuserConnection;
CreateReplicaIdentitiesOnNode(
target->newShards,
superuserConnection->hostname,
superuserConnection->port);
taskList = list_concat(taskList, CreateReplicaIdentitiesOnNodeTasks(
target->newShards,
superuserConnection->hostname,
superuserConnection->port));
}
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, taskList,
MaxAdaptiveExecutorPoolSize,
NIL);
}
/*
* CreateReplicaIdentitiesOnNode gets a shardList and creates all the replica
* identities on the shards in the given node.
*/
List *
CreateReplicaIdentitiesOnNodeTasks(List *shardList, char *nodeName, int32 nodePort)
{
List *taskList = NIL;
ShardInterval *shardInterval;
foreach_ptr(shardInterval, shardList)
{
uint64 shardId = shardInterval->shardId;
Oid relationId = shardInterval->relationId;
List *backingIndexCommandList =
GetIndexCommandListForShardBackingReplicaIdentity(relationId, shardId);
List *replicaIdentityShardCommandList =
GetReplicaIdentityCommandListForShard(relationId, shardId);
List *commandList =
list_concat(backingIndexCommandList, replicaIdentityShardCommandList);
List *taskListForShard =
ConvertNonExistingPlacementDDLCommandsToTasks(
commandList,
nodeName,
nodePort);
taskList = list_concat(taskList, taskListForShard);
}
return taskList;
}

View File

@ -133,6 +133,9 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
extern void ConflictOnlyWithIsolationTesting(void);
extern void CreateReplicaIdentities(List *subscriptionInfoList);
extern List * CreateReplicaIdentitiesOnNodeTasks(List *shardList,
char *nodeName,
int32 nodePort);
extern void CreateReplicaIdentitiesOnNode(List *shardList,
char *nodeName,
int32 nodePort);