diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 635ee2120..1b716e017 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -229,6 +229,42 @@ ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command) } +/* + * ExecuteOptionalRemoteCommand executes a remote command. If the command fails a WARNING + * is emitted but execution continues. + * + * could return 0, QUERY_SEND_FAILED, or RESPONSE_NOT_OKAY + * result is only set if there was no error + */ +int +ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command, + PGresult **result) +{ + int querySent = 0; + PGresult *localResult = NULL; + bool raiseInterrupts = true; + + querySent = SendRemoteCommand(connection, command); + if (querySent == 0) + { + ReportConnectionError(connection, WARNING); + return QUERY_SEND_FAILED; + } + + localResult = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(localResult)) + { + ReportResultError(connection, localResult, WARNING); + PQclear(localResult); + ForgetResults(connection); + return RESPONSE_NOT_OKAY; + } + + *result = localResult; + return 0; +} + + /* * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and * accepts a MultiConnection instead of a plain PGconn. It makes sure it can diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 1a7140f9b..8a95eb8b2 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -210,9 +210,8 @@ RecoverWorkerTransactions(WorkerNode *workerNode) { char *transactionName = (char *) lfirst(pendingTransactionCell); StringInfo command = makeStringInfo(); - int querySent = 0; + int executeCommand = 0; PGresult *result = NULL; - bool raiseInterrupts = true; bool shouldCommit = FindMatchingName(unconfirmedTransactionArray, unconfirmedTransactionCount, @@ -230,20 +229,13 @@ RecoverWorkerTransactions(WorkerNode *workerNode) appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName); } - querySent = SendRemoteCommand(connection, command->data); - if (querySent == 0) + executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result); + if (executeCommand == QUERY_SEND_FAILED) { - ReportConnectionError(connection, WARNING); - break; } - - result = GetRemoteCommandResult(connection, raiseInterrupts); - if (!IsResponseOK(result)) + if (executeCommand == RESPONSE_NOT_OKAY) { - ReportResultError(connection, result, WARNING); - PQclear(result); - /* cannot recover this transaction right now */ continue; } diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index e4b9ee78c..b95454947 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -13,6 +13,9 @@ #include "distributed/connection_management.h" +/* errors which ExecuteRemoteCommand might return */ +#define QUERY_SEND_FAILED 1 +#define RESPONSE_NOT_OKAY 2 struct pg_result; /* target of the PGresult typedef */ @@ -34,6 +37,9 @@ extern void LogRemoteCommand(MultiConnection *connection, const char *command); /* wrappers around libpq functions, with command logging support */ extern void ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command); +extern int ExecuteOptionalRemoteCommand(MultiConnection *connection, + const char *command, + PGresult **result); extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern int SendRemoteCommandParams(MultiConnection *connection, const char *command, int parameterCount, const Oid *parameterTypes,