start & stop metadata operations commit when the main transaction commits

do_not_have_single_tx_command
Onder Kalaci 2021-07-27 17:24:13 +02:00
parent 2aa67421a7
commit bb381e7eee
3 changed files with 47 additions and 35 deletions

View File

@ -366,28 +366,17 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
createMetadataCommandList);
/*
* Send the snapshot recreation commands in a single remote transaction and
* if requested, error out in any kind of failure. Note that it is not
* required to send createMetadataSnapshotCommandList in the same transaction
* that we send nodeDeleteCommand and nodeInsertCommand commands below.
*/
if (raiseOnError)
const char *command = NULL;
foreach_ptr(command, recreateMetadataSnapshotCommandList)
{
SendCommandListToWorkerInSingleTransaction(workerNode->workerName,
workerNode->workerPort,
currentUser,
recreateMetadataSnapshotCommandList);
return true;
}
else
{
bool success =
SendOptionalCommandListToWorkerInTransaction(workerNode->workerName,
workerNode->workerPort,
currentUser,
recreateMetadataSnapshotCommandList);
return success;
if (raiseOnError)
{
SendCommandToWorkersWithMetadata(command);
}
else
{
SendOptionalCommandToWorkersWithMetadata(command);
}
}
}
@ -407,10 +396,11 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
dropMetadataCommandList = lappend(dropMetadataCommandList,
LocalGroupIdUpdateCommand(0));
SendOptionalCommandListToWorkerInTransaction(workerNode->workerName,
workerNode->workerPort,
userName,
dropMetadataCommandList);
const char *command = NULL;
foreach_ptr(command, dropMetadataCommandList)
{
SendOptionalCommandToWorkersWithMetadata(command);
}
}

View File

@ -36,12 +36,14 @@
static void SendCommandToMetadataWorkersParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues);
const char *const *parameterValues,
bool raiseInterrupts);
static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
const char *command, const char *user,
int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues);
const char *const *parameterValues,
bool raiseInterrupts);
static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList);
static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet,
const char *user);
@ -112,14 +114,29 @@ SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *node
/*
* SendCommandToWorkers sends a command to all workers in
* parallel. Commands are committed on the workers when the local
* transaction commits. The connection are made as the extension
* owner to ensure write access to the Citus metadata tables.
* transaction commits. Failures abort the current transaction.
*/
void
SendCommandToWorkersWithMetadata(const char *command)
{
bool raiseInterrupts = true;
SendCommandToMetadataWorkersParams(command, CurrentUserName(),
0, NULL, NULL);
0, NULL, NULL, raiseInterrupts);
}
/*
* SendOptionalCommandToWorkersWithMetadata sends a command to
* all workers in parallel. Commands are committed on the workers
* when the local transaction commits. Failures do not abort
* the current transaction.
*/
void
SendOptionalCommandToWorkersWithMetadata(const char *command)
{
bool raiseInterrupts = false;
SendCommandToMetadataWorkersParams(command, CurrentUserName(),
0, NULL, NULL, raiseInterrupts);
}
@ -205,7 +222,8 @@ static void
SendCommandToMetadataWorkersParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues)
const char *const *parameterValues,
bool raiseInterrupts)
{
List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
ShareLock);
@ -214,7 +232,7 @@ SendCommandToMetadataWorkersParams(const char *command,
SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user,
parameterCount, parameterTypes,
parameterValues);
parameterValues, raiseInterrupts);
}
@ -353,7 +371,8 @@ static void
SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues)
const char *const *parameterValues,
bool raiseInterrupts)
{
List *connectionList = NIL;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
@ -361,6 +380,8 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *
UseCoordinatedTransaction();
Use2PCForCoordinatedTransaction();
int logLevel = raiseInterrupts ? ERROR : WARNING;
/* open connections in parallel */
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
@ -397,7 +418,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *
parameterTypes, parameterValues, false);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
ReportConnectionError(connection, logLevel);
}
}
@ -407,7 +428,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *
PGresult *result = GetRemoteCommandResult(connection, true);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
ReportResultError(connection, result, logLevel);
}
PQclear(result);

View File

@ -42,6 +42,7 @@ extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, i
const char *nodeUser,
List *commandList);
extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendOptionalCommandToWorkersWithMetadata(const char *command);
extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern void EnsureNoModificationsHaveBeenDone(void);
extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName,