diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 82f4fdf61..c266da0fa 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -366,28 +366,17 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, createMetadataCommandList); - /* - * Send the snapshot recreation commands in a single remote transaction and - * if requested, error out in any kind of failure. Note that it is not - * required to send createMetadataSnapshotCommandList in the same transaction - * that we send nodeDeleteCommand and nodeInsertCommand commands below. - */ - if (raiseOnError) + const char *command = NULL; + foreach_ptr(command, recreateMetadataSnapshotCommandList) { - SendCommandListToWorkerInSingleTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - recreateMetadataSnapshotCommandList); - return true; - } - else - { - bool success = - SendOptionalCommandListToWorkerInTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - recreateMetadataSnapshotCommandList); - return success; + if (raiseOnError) + { + SendCommandToWorkersWithMetadata(command); + } + else + { + SendOptionalCommandToWorkersWithMetadata(command); + } } } @@ -407,10 +396,11 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) dropMetadataCommandList = lappend(dropMetadataCommandList, LocalGroupIdUpdateCommand(0)); - SendOptionalCommandListToWorkerInTransaction(workerNode->workerName, - workerNode->workerPort, - userName, - dropMetadataCommandList); + const char *command = NULL; + foreach_ptr(command, dropMetadataCommandList) + { + SendOptionalCommandToWorkersWithMetadata(command); + } } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index a10530ea6..4d70b21dd 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -36,12 +36,14 @@ static void SendCommandToMetadataWorkersParams(const char *command, const char *user, int parameterCount, const Oid *parameterTypes, - const char *const *parameterValues); + const char *const *parameterValues, + bool raiseInterrupts); static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command, const char *user, int parameterCount, const Oid *parameterTypes, - const char *const *parameterValues); + const char *const *parameterValues, + bool raiseInterrupts); static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList); static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char *user); @@ -112,14 +114,29 @@ SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *node /* * SendCommandToWorkers sends a command to all workers in * parallel. Commands are committed on the workers when the local - * transaction commits. The connection are made as the extension - * owner to ensure write access to the Citus metadata tables. + * transaction commits. Failures abort the current transaction. */ void SendCommandToWorkersWithMetadata(const char *command) { + bool raiseInterrupts = true; SendCommandToMetadataWorkersParams(command, CurrentUserName(), - 0, NULL, NULL); + 0, NULL, NULL, raiseInterrupts); +} + + +/* + * SendOptionalCommandToWorkersWithMetadata sends a command to + * all workers in parallel. Commands are committed on the workers + * when the local transaction commits. Failures do not abort + * the current transaction. + */ +void +SendOptionalCommandToWorkersWithMetadata(const char *command) +{ + bool raiseInterrupts = false; + SendCommandToMetadataWorkersParams(command, CurrentUserName(), + 0, NULL, NULL, raiseInterrupts); } @@ -205,7 +222,8 @@ static void SendCommandToMetadataWorkersParams(const char *command, const char *user, int parameterCount, const Oid *parameterTypes, - const char *const *parameterValues) + const char *const *parameterValues, + bool raiseInterrupts) { List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, ShareLock); @@ -214,7 +232,7 @@ SendCommandToMetadataWorkersParams(const char *command, SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user, parameterCount, parameterTypes, - parameterValues); + parameterValues, raiseInterrupts); } @@ -353,7 +371,8 @@ static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command, const char *user, int parameterCount, const Oid *parameterTypes, - const char *const *parameterValues) + const char *const *parameterValues, + bool raiseInterrupts) { List *connectionList = NIL; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); @@ -361,6 +380,8 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * UseCoordinatedTransaction(); Use2PCForCoordinatedTransaction(); + int logLevel = raiseInterrupts ? ERROR : WARNING; + /* open connections in parallel */ WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -397,7 +418,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * parameterTypes, parameterValues, false); if (querySent == 0) { - ReportConnectionError(connection, ERROR); + ReportConnectionError(connection, logLevel); } } @@ -407,7 +428,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * PGresult *result = GetRemoteCommandResult(connection, true); if (!IsResponseOK(result)) { - ReportResultError(connection, result, ERROR); + ReportResultError(connection, result, logLevel); } PQclear(result); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index a0f2bede0..7e0ec0e56 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -42,6 +42,7 @@ extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, i const char *nodeUser, List *commandList); extern void SendCommandToWorkersWithMetadata(const char *command); +extern void SendOptionalCommandToWorkersWithMetadata(const char *command); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName,