From 12b6bbd77a44a371a571bb3bdf390af07a7be324 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Thu, 22 Sep 2022 17:05:35 +0200 Subject: [PATCH] Parallel PK creation --- .../replication/multi_logical_replication.c | 48 +++++++++++++++++-- .../distributed/multi_logical_replication.h | 3 ++ 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 3d82afdc9..d0847b28d 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -643,15 +643,55 @@ PrepareReplicationSubscriptionList(List *shardList) void CreateReplicaIdentities(List *logicalRepTargetList) { + List *taskList = NIL; LogicalRepTarget *target = NULL; foreach_ptr(target, logicalRepTargetList) { MultiConnection *superuserConnection = target->superuserConnection; - CreateReplicaIdentitiesOnNode( - target->newShards, - superuserConnection->hostname, - superuserConnection->port); + taskList = list_concat(taskList, CreateReplicaIdentitiesOnNodeTasks( + target->newShards, + superuserConnection->hostname, + superuserConnection->port)); } + + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, taskList, + MaxAdaptiveExecutorPoolSize, + NIL); +} + + +/* + * CreateReplicaIdentitiesOnNode gets a shardList and creates all the replica + * identities on the shards in the given node. + */ +List * +CreateReplicaIdentitiesOnNodeTasks(List *shardList, char *nodeName, int32 nodePort) +{ + List *taskList = NIL; + + ShardInterval *shardInterval; + foreach_ptr(shardInterval, shardList) + { + uint64 shardId = shardInterval->shardId; + Oid relationId = shardInterval->relationId; + + List *backingIndexCommandList = + GetIndexCommandListForShardBackingReplicaIdentity(relationId, shardId); + + List *replicaIdentityShardCommandList = + GetReplicaIdentityCommandListForShard(relationId, shardId); + + List *commandList = + list_concat(backingIndexCommandList, replicaIdentityShardCommandList); + + List *taskListForShard = + ConvertNonExistingPlacementDDLCommandsToTasks( + commandList, + nodeName, + nodePort); + taskList = list_concat(taskList, taskListForShard); + } + return taskList; } diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 994650568..3dc407412 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -133,6 +133,9 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, extern void ConflictOnlyWithIsolationTesting(void); extern void CreateReplicaIdentities(List *subscriptionInfoList); +extern List * CreateReplicaIdentitiesOnNodeTasks(List *shardList, + char *nodeName, + int32 nodePort); extern void CreateReplicaIdentitiesOnNode(List *shardList, char *nodeName, int32 nodePort);