send repartition cleanup jobs in parallel to all workers (#3485)

* send repartition cleanup jobs in parallel to all workers

* add review items
pull/3540/head
SaitTalhaNisanci 2020-02-26 13:44:06 +03:00 committed by GitHub
parent 1b6020e2d6
commit d94c3fd43d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 5 deletions

View File

@ -167,9 +167,8 @@ GenerateJobCommands(List *jobIds, char *templateCommand)
void void
DoRepartitionCleanup(List *jobIds) DoRepartitionCleanup(List *jobIds)
{ {
SendOptionalCommandListToAllWorkers(list_make1(GenerateDeleteJobsCommand( SendCommandToWorkersOptionalInParallel(ALL_WORKERS, GenerateDeleteJobsCommand(jobIds),
jobIds)), CitusExtensionOwnerName());
CitusExtensionOwnerName());
} }

View File

@ -45,7 +45,9 @@ static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList); static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList);
static void SendCommandListToAllWorkersInternal(List *commandList, bool failOnError, static void SendCommandListToAllWorkersInternal(List *commandList, bool failOnError,
char *superuser); char *superuser);
static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const
char *user);
static void GetConnectionsResultsOptional(List *connectionList);
/* /*
* SendCommandToWorker sends a command to a particular worker as part of the * SendCommandToWorker sends a command to a particular worker as part of the
@ -90,7 +92,7 @@ void
SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser, SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser,
const char *command) const char *command)
{ {
uint connectionFlags = 0; uint32 connectionFlags = 0;
UseCoordinatedTransaction(); UseCoordinatedTransaction();
CoordinatedTransactionUse2PC(); CoordinatedTransactionUse2PC();
@ -329,6 +331,87 @@ SendCommandToMetadataWorkersParams(const char *command,
} }
/*
* SendCommandToWorkersOptionalInParallel sends the given command to workers in parallel.
* It does error if there is a problem while sending the query, but it doesn't error
* if there is a problem while executing the query.
*/
void
SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const
char *command,
const char *user)
{
ListCell *connectionCell = NULL;
List *connectionList = OpenConnectionsToWorkersInParallel(targetWorkerSet, user);
/* finish opening connections */
FinishConnectionListEstablishment(connectionList);
/* send commands in parallel */
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
SendRemoteCommand(connection, command);
}
GetConnectionsResultsOptional(connectionList);
}
/*
* OpenConnectionsToWorkersInParallel opens connections to the given target worker set in parallel,
* as the given user.
*/
static List *
OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char *user)
{
ListCell *workerNodeCell = NULL;
List *connectionList = NIL;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
int32 connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
user, NULL);
connectionList = lappend(connectionList, connection);
}
return connectionList;
}
/*
* GetConnectionsResultsOptional gets remote command results
* for the given connections. It doesn't raise any error.
*/
static void
GetConnectionsResultsOptional(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
bool raiseInterrupt = false;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupt);
PQclear(result);
if (result != NULL && IsResponseOK(result))
{
ForgetResults(connection);
}
}
}
/* /*
* SendCommandToWorkersParamsInternal sends a command to all workers in parallel. * SendCommandToWorkersParamsInternal sends a command to all workers in parallel.
* Commands are committed on the workers when the local transaction commits. The * Commands are committed on the workers when the local transaction commits. The

View File

@ -48,6 +48,9 @@ extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName,
int32 nodePort, int32 nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);
extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const
char *command,
const char *user);
extern void RemoveWorkerTransaction(char *nodeName, int32 nodePort); extern void RemoveWorkerTransaction(char *nodeName, int32 nodePort);
/* helper functions for worker transactions */ /* helper functions for worker transactions */