diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index c266da0fa..a566736a2 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -349,7 +349,7 @@ ShouldSyncTableMetadata(Oid relationId) static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) { - char *currentUser = CurrentUserName(); + bool failed = false; /* generate and add the local group id's update query */ char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); @@ -375,9 +375,14 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) } else { - SendOptionalCommandToWorkersWithMetadata(command); + bool commandFailed = + SendOptionalCommandToWorkersWithMetadata(command); + + failed |= commandFailed; } } + + return failed; } @@ -388,8 +393,6 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) static void DropMetadataSnapshotOnNode(WorkerNode *workerNode) { - char *userName = CurrentUserName(); - /* generate the queries which drop the metadata */ List *dropMetadataCommandList = MetadataDropCommands(); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 4d70b21dd..f2a44bbba 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -33,12 +33,12 @@ #include "utils/memutils.h" -static void SendCommandToMetadataWorkersParams(const char *command, +static bool SendCommandToMetadataWorkersParams(const char *command, const char *user, int parameterCount, const Oid *parameterTypes, const char *const *parameterValues, bool raiseInterrupts); -static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, +static bool SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command, const char *user, int parameterCount, const Oid *parameterTypes, @@ -129,13 +129,14 @@ SendCommandToWorkersWithMetadata(const char *command) * SendOptionalCommandToWorkersWithMetadata sends a command to * all workers in parallel. Commands are committed on the workers * when the local transaction commits. Failures do not abort - * the current transaction. + * the current transaction. Instead, the function returns false + * if any command fails. */ -void +bool SendOptionalCommandToWorkersWithMetadata(const char *command) { bool raiseInterrupts = false; - SendCommandToMetadataWorkersParams(command, CurrentUserName(), + return SendCommandToMetadataWorkersParams(command, CurrentUserName(), 0, NULL, NULL, raiseInterrupts); } @@ -218,7 +219,7 @@ SendBareCommandListToMetadataWorkers(List *commandList) * SendCommandToMetadataWorkersParams is a wrapper around * SendCommandToWorkersParamsInternal() enforcing some extra checks. */ -static void +static bool SendCommandToMetadataWorkersParams(const char *command, const char *user, int parameterCount, const Oid *parameterTypes, @@ -230,7 +231,7 @@ SendCommandToMetadataWorkersParams(const char *command, ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); - SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user, + return SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user, parameterCount, parameterTypes, parameterValues, raiseInterrupts); } @@ -367,7 +368,7 @@ GetConnectionsResults(List *connectionList, bool failOnError) * paramLengths, paramFormats and resultFormat are hard-coded to NULL, NULL and 0 * respectively. */ -static void +static bool SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command, const char *user, int parameterCount, const Oid *parameterTypes, @@ -381,6 +382,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * Use2PCForCoordinatedTransaction(); int logLevel = raiseInterrupts ? ERROR : WARNING; + bool failed = false; /* open connections in parallel */ WorkerNode *workerNode = NULL; @@ -419,6 +421,8 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * if (querySent == 0) { ReportConnectionError(connection, logLevel); + + failed = true; } } @@ -429,12 +433,16 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * if (!IsResponseOK(result)) { ReportResultError(connection, result, logLevel); + + failed = true; } PQclear(result); ForgetResults(connection); } + + return failed; } diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 7e0ec0e56..9a8ed9238 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -42,7 +42,7 @@ extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, i const char *nodeUser, List *commandList); extern void SendCommandToWorkersWithMetadata(const char *command); -extern void SendOptionalCommandToWorkersWithMetadata(const char *command); +extern bool SendOptionalCommandToWorkersWithMetadata(const char *command); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName,