create temp schemas in parallel (#3540)

pull/3543/head
SaitTalhaNisanci 2020-02-26 16:20:08 +03:00 committed by GitHub
parent d94c3fd43d
commit 82d22b34fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 8 deletions

View File

@ -83,7 +83,8 @@ CreateTemporarySchemasForMergeTasks(Job *topLeveLJob)
{
List *jobIds = ExtractJobsInJobTree(topLeveLJob);
char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName());
SendCommandToAllWorkers(createSchemasCommand, CitusExtensionOwnerName());
SendCommandToWorkersInParallel(ALL_WORKERS, createSchemasCommand,
CitusExtensionOwnerName());
return jobIds;
}

View File

@ -47,7 +47,10 @@ static void SendCommandListToAllWorkersInternal(List *commandList, bool failOnEr
char *superuser);
static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const
char *user);
static void GetConnectionsResultsOptional(List *connectionList);
static void GetConnectionsResults(List *connectionList, bool failOnError);
static void SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet, const
char *command, const char *user, bool
failOnError);
/*
* SendCommandToWorker sends a command to a particular worker as part of the
@ -340,6 +343,36 @@ void
SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const
char *command,
const char *user)
{
bool failOnError = false;
SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
failOnError);
}
/*
* SendCommandToWorkersInParallel sends the given command to workers in parallel.
* It does error if there is a problem while sending the query, it errors if there
* was any problem when sending/receiving.
*/
void
SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet, const
char *command,
const char *user)
{
bool failOnError = true;
SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
failOnError);
}
/*
* SendCommandToWorkersOutsideTransaction sends the given command to workers in parallel.
*/
static void
SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet, const
char *command, const char *user, bool
failOnError)
{
ListCell *connectionCell = NULL;
@ -353,10 +386,14 @@ SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
SendRemoteCommand(connection, command);
int querySent = SendRemoteCommand(connection, command);
if (failOnError && querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
GetConnectionsResultsOptional(connectionList);
GetConnectionsResults(connectionList, failOnError);
}
@ -389,11 +426,11 @@ OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char *
/*
* GetConnectionsResultsOptional gets remote command results
* for the given connections. It doesn't raise any error.
* GetConnectionsResults gets remote command results
* for the given connections. It raises any error if failOnError is true.
*/
static void
GetConnectionsResultsOptional(List *connectionList)
GetConnectionsResults(List *connectionList, bool failOnError)
{
ListCell *connectionCell = NULL;
@ -402,9 +439,16 @@ GetConnectionsResultsOptional(List *connectionList)
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
bool raiseInterrupt = false;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupt);
bool isResponseOK = result != NULL && IsResponseOK(result);
if (failOnError && !isResponseOK)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
if (result != NULL && IsResponseOK(result))
if (isResponseOK)
{
ForgetResults(connection);
}

View File

@ -51,6 +51,9 @@ extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName,
extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const
char *command,
const char *user);
void SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet, const
char *command,
const char *user);
extern void RemoveWorkerTransaction(char *nodeName, int32 nodePort);
/* helper functions for worker transactions */