diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 66a15baa9..3f5f36273 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -167,9 +167,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand) void DoRepartitionCleanup(List *jobIds) { - SendOptionalCommandListToAllWorkers(list_make1(GenerateDeleteJobsCommand( - jobIds)), - CitusExtensionOwnerName()); + SendCommandToWorkersOptionalInParallel(ALL_WORKERS, GenerateDeleteJobsCommand(jobIds), + CitusExtensionOwnerName()); } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 37aca6898..3c0f0835e 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -45,7 +45,9 @@ static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList); static void SendCommandListToAllWorkersInternal(List *commandList, bool failOnError, char *superuser); - +static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const + char *user); +static void GetConnectionsResultsOptional(List *connectionList); /* * SendCommandToWorker sends a command to a particular worker as part of the @@ -90,7 +92,7 @@ void SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser, const char *command) { - uint connectionFlags = 0; + uint32 connectionFlags = 0; UseCoordinatedTransaction(); CoordinatedTransactionUse2PC(); @@ -329,6 +331,87 @@ SendCommandToMetadataWorkersParams(const char *command, } +/* + * SendCommandToWorkersOptionalInParallel sends the given command to workers in parallel. + * It does error if there is a problem while sending the query, but it doesn't error + * if there is a problem while executing the query. + */ +void +SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const + char *command, + const char *user) +{ + ListCell *connectionCell = NULL; + + List *connectionList = OpenConnectionsToWorkersInParallel(targetWorkerSet, user); + + /* finish opening connections */ + FinishConnectionListEstablishment(connectionList); + + /* send commands in parallel */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + SendRemoteCommand(connection, command); + } + + GetConnectionsResultsOptional(connectionList); +} + + +/* + * OpenConnectionsToWorkersInParallel opens connections to the given target worker set in parallel, + * as the given user. + */ +static List * +OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char *user) +{ + ListCell *workerNodeCell = NULL; + List *connectionList = NIL; + + List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + int32 connectionFlags = OUTSIDE_TRANSACTION; + + MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + user, NULL); + connectionList = lappend(connectionList, connection); + } + return connectionList; +} + + +/* + * GetConnectionsResultsOptional gets remote command results + * for the given connections. It doesn't raise any error. + */ +static void +GetConnectionsResultsOptional(List *connectionList) +{ + ListCell *connectionCell = NULL; + + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + bool raiseInterrupt = false; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupt); + PQclear(result); + + if (result != NULL && IsResponseOK(result)) + { + ForgetResults(connection); + } + } +} + + /* * SendCommandToWorkersParamsInternal sends a command to all workers in parallel. * Commands are committed on the workers when the local transaction commits. The diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 91774822a..b61203eaa 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -48,6 +48,9 @@ extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); +extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const + char *command, + const char *user); extern void RemoveWorkerTransaction(char *nodeName, int32 nodePort); /* helper functions for worker transactions */