diff --git a/src/backend/distributed/operations/isolate_shards.c b/src/backend/distributed/operations/isolate_shards.c index 5e4681fc5..ff01a5ac6 100644 --- a/src/backend/distributed/operations/isolate_shards.c +++ b/src/backend/distributed/operations/isolate_shards.c @@ -57,6 +57,7 @@ static ShardInterval * CreateSplitOffShardFromTemplate(ShardInterval *shardTempl Oid relationId); static List * SplitOffCommandList(ShardInterval *sourceShard, ShardInterval *splitOffShard); +static void ExecuteCommandListOnPlacements(List *commandList, List *placementList); static void InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList); static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); @@ -551,6 +552,95 @@ SplitOffCommandList(ShardInterval *sourceShard, ShardInterval *splitOffShard) } +/* + * ExecuteCommandListOnPlacements runs the given command list on the nodes of + * the given shard placement list. First, it creates connections. Then it sends + * commands one by one. For every command, first it send the command to all + * connections and then checks the results. This helps to run long running + * commands in parallel. Finally, it sends commit messages to all connections + * and close them. + */ +void +ExecuteCommandListOnPlacements(List *commandList, List *placementList) +{ + List *workerConnectionList = NIL; + ListCell *workerConnectionCell = NULL; + ListCell *shardPlacementCell = NULL; + ListCell *commandCell = NULL; + + /* create connections and start transactions */ + foreach(shardPlacementCell, placementList) + { + ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); + char *nodeName = shardPlacement->nodeName; + int32 nodePort = shardPlacement->nodePort; + + int connectionFlags = FORCE_NEW_CONNECTION; + char *currentUser = CurrentUserName(); + + /* create a new connection */ + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, + nodePort, + currentUser, + NULL); + + /* mark connection as critical ans start transaction */ + MarkRemoteTransactionCritical(workerConnection); + RemoteTransactionBegin(workerConnection); + + /* add connection to the list */ + workerConnectionList = lappend(workerConnectionList, workerConnection); + } + + /* send and check results for every command one by one */ + foreach(commandCell, commandList) + { + char *command = lfirst(commandCell); + + /* first only send the command */ + foreach(workerConnectionCell, workerConnectionList) + { + MultiConnection *workerConnection = + (MultiConnection *) lfirst(workerConnectionCell); + + int querySent = SendRemoteCommand(workerConnection, command); + if (querySent == 0) + { + ReportConnectionError(workerConnection, ERROR); + } + } + + /* then check the result separately to run long running commands in parallel */ + foreach(workerConnectionCell, workerConnectionList) + { + MultiConnection *workerConnection = + (MultiConnection *) lfirst(workerConnectionCell); + bool raiseInterrupts = true; + + PGresult *result = GetRemoteCommandResult(workerConnection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(workerConnection, result, ERROR); + } + + PQclear(result); + ForgetResults(workerConnection); + } + } + + /* finally commit each transaction and close connections */ + foreach(workerConnectionCell, workerConnectionList) + { + MultiConnection *workerConnection = + (MultiConnection *) lfirst(workerConnectionCell); + + RemoteTransactionCommit(workerConnection); + CloseConnection(workerConnection); + } +} + + /* * InsertSplitOffShardMetadata inserts new shard and shard placement data into * catolog tables both the coordinator and mx nodes. diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 7cd5d12af..69eb20bcf 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -823,92 +823,3 @@ DropShardList(List *shardIntervalList) DeleteShardRow(oldShardId); } } - - -/* - * ExecuteCommandListOnPlacements runs the given command list on the nodes of - * the given shard placement list. First, it creates connections. Then it sends - * commands one by one. For every command, first it send the command to all - * connections and then checks the results. This helps to run long running - * commands in parallel. Finally, it sends commit messages to all connections - * and close them. - */ -void -ExecuteCommandListOnPlacements(List *commandList, List *placementList) -{ - List *workerConnectionList = NIL; - ListCell *workerConnectionCell = NULL; - ListCell *shardPlacementCell = NULL; - ListCell *commandCell = NULL; - - /* create connections and start transactions */ - foreach(shardPlacementCell, placementList) - { - ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); - char *nodeName = shardPlacement->nodeName; - int32 nodePort = shardPlacement->nodePort; - - int connectionFlags = FORCE_NEW_CONNECTION; - char *currentUser = CurrentUserName(); - - /* create a new connection */ - MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, - nodeName, - nodePort, - currentUser, - NULL); - - /* mark connection as critical ans start transaction */ - MarkRemoteTransactionCritical(workerConnection); - RemoteTransactionBegin(workerConnection); - - /* add connection to the list */ - workerConnectionList = lappend(workerConnectionList, workerConnection); - } - - /* send and check results for every command one by one */ - foreach(commandCell, commandList) - { - char *command = lfirst(commandCell); - - /* first only send the command */ - foreach(workerConnectionCell, workerConnectionList) - { - MultiConnection *workerConnection = - (MultiConnection *) lfirst(workerConnectionCell); - - int querySent = SendRemoteCommand(workerConnection, command); - if (querySent == 0) - { - ReportConnectionError(workerConnection, ERROR); - } - } - - /* then check the result separately to run long running commands in parallel */ - foreach(workerConnectionCell, workerConnectionList) - { - MultiConnection *workerConnection = - (MultiConnection *) lfirst(workerConnectionCell); - bool raiseInterrupts = true; - - PGresult *result = GetRemoteCommandResult(workerConnection, raiseInterrupts); - if (!IsResponseOK(result)) - { - ReportResultError(workerConnection, result, ERROR); - } - - PQclear(result); - ForgetResults(workerConnection); - } - } - - /* finally commit each transaction and close connections */ - foreach(workerConnectionCell, workerConnectionList) - { - MultiConnection *workerConnection = - (MultiConnection *) lfirst(workerConnectionCell); - - RemoteTransactionCommit(workerConnection); - CloseConnection(workerConnection); - } -} diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index cccd5efe9..790e3d612 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -42,6 +42,5 @@ extern void SplitShard(SplitMode splitMode, extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); extern void DropShardList(List *shardIntervalList); -extern void ExecuteCommandListOnPlacements(List *commandList, List *placementList); #endif /* SHARDSPLIT_H_ */