From 82d22b34fe87f865992c14cc63ff65e60174c7fb Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 26 Feb 2020 16:20:08 +0300 Subject: [PATCH] create temp schemas in parallel (#3540) --- .../executor/repartition_join_execution.c | 3 +- .../transaction/worker_transaction.c | 58 ++++++++++++++++--- src/include/distributed/worker_transaction.h | 3 + 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 3f5f36273..6202dd1e2 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -83,7 +83,8 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) { List *jobIds = ExtractJobsInJobTree(topLeveLJob); char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); - SendCommandToAllWorkers(createSchemasCommand, CitusExtensionOwnerName()); + SendCommandToWorkersInParallel(ALL_WORKERS, createSchemasCommand, + CitusExtensionOwnerName()); return jobIds; } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 3c0f0835e..1de3e9e79 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -47,7 +47,10 @@ static void SendCommandListToAllWorkersInternal(List *commandList, bool failOnEr char *superuser); static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char *user); -static void GetConnectionsResultsOptional(List *connectionList); +static void GetConnectionsResults(List *connectionList, bool failOnError); +static void SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet, const + char *command, const char *user, bool + failOnError); /* * SendCommandToWorker sends a command to a particular worker as part of the @@ -340,6 +343,36 @@ void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const char *command, const char *user) +{ + bool failOnError = false; + SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user, + failOnError); +} + + +/* + * SendCommandToWorkersInParallel sends the given command to workers in parallel. + * It does error if there is a problem while sending the query, it errors if there + * was any problem when sending/receiving. + */ +void +SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet, const + char *command, + const char *user) +{ + bool failOnError = true; + SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user, + failOnError); +} + + +/* + * SendCommandToWorkersOutsideTransaction sends the given command to workers in parallel. + */ +static void +SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet, const + char *command, const char *user, bool + failOnError) { ListCell *connectionCell = NULL; @@ -353,10 +386,14 @@ SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const { MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); - SendRemoteCommand(connection, command); + int querySent = SendRemoteCommand(connection, command); + if (failOnError && querySent == 0) + { + ReportConnectionError(connection, ERROR); + } } - GetConnectionsResultsOptional(connectionList); + GetConnectionsResults(connectionList, failOnError); } @@ -389,11 +426,11 @@ OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char * /* - * GetConnectionsResultsOptional gets remote command results - * for the given connections. It doesn't raise any error. + * GetConnectionsResults gets remote command results + * for the given connections. It raises any error if failOnError is true. */ static void -GetConnectionsResultsOptional(List *connectionList) +GetConnectionsResults(List *connectionList, bool failOnError) { ListCell *connectionCell = NULL; @@ -402,9 +439,16 @@ GetConnectionsResultsOptional(List *connectionList) MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); bool raiseInterrupt = false; PGresult *result = GetRemoteCommandResult(connection, raiseInterrupt); + + bool isResponseOK = result != NULL && IsResponseOK(result); + if (failOnError && !isResponseOK) + { + ReportResultError(connection, result, ERROR); + } + PQclear(result); - if (result != NULL && IsResponseOK(result)) + if (isResponseOK) { ForgetResults(connection); } diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index b61203eaa..dac399361 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -51,6 +51,9 @@ extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName, extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const char *command, const char *user); +void SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet, const + char *command, + const char *user); extern void RemoveWorkerTransaction(char *nodeName, int32 nodePort); /* helper functions for worker transactions */