mirror of https://github.com/citusdata/citus.git
Create ExecuteOptionalRemoteCommand
A small refactor which pulls some code out of `RecoverWorkerTransactions` and into `remote_commands.c`. This code block currently only occurs in `RecoverWorkerTransactions` but will be useful to other functions shortly. Unfortunately we couldn't call it `ExecuteRemoteCommand`, that name was already taken.pull/1139/head
parent
539a205462
commit
b1b2b4fadf
|
@ -229,6 +229,42 @@ ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteOptionalRemoteCommand executes a remote command. If the command fails a WARNING
|
||||||
|
* is emitted but execution continues.
|
||||||
|
*
|
||||||
|
* could return 0, QUERY_SEND_FAILED, or RESPONSE_NOT_OKAY
|
||||||
|
* result is only set if there was no error
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
|
||||||
|
PGresult **result)
|
||||||
|
{
|
||||||
|
int querySent = 0;
|
||||||
|
PGresult *localResult = NULL;
|
||||||
|
bool raiseInterrupts = true;
|
||||||
|
|
||||||
|
querySent = SendRemoteCommand(connection, command);
|
||||||
|
if (querySent == 0)
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, WARNING);
|
||||||
|
return QUERY_SEND_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
localResult = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
|
if (!IsResponseOK(localResult))
|
||||||
|
{
|
||||||
|
ReportResultError(connection, localResult, WARNING);
|
||||||
|
PQclear(localResult);
|
||||||
|
ForgetResults(connection);
|
||||||
|
return RESPONSE_NOT_OKAY;
|
||||||
|
}
|
||||||
|
|
||||||
|
*result = localResult;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
|
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
|
||||||
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
|
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
|
||||||
|
|
|
@ -210,9 +210,8 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
{
|
{
|
||||||
char *transactionName = (char *) lfirst(pendingTransactionCell);
|
char *transactionName = (char *) lfirst(pendingTransactionCell);
|
||||||
StringInfo command = makeStringInfo();
|
StringInfo command = makeStringInfo();
|
||||||
int querySent = 0;
|
int executeCommand = 0;
|
||||||
PGresult *result = NULL;
|
PGresult *result = NULL;
|
||||||
bool raiseInterrupts = true;
|
|
||||||
|
|
||||||
bool shouldCommit = FindMatchingName(unconfirmedTransactionArray,
|
bool shouldCommit = FindMatchingName(unconfirmedTransactionArray,
|
||||||
unconfirmedTransactionCount,
|
unconfirmedTransactionCount,
|
||||||
|
@ -230,20 +229,13 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName);
|
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
querySent = SendRemoteCommand(connection, command->data);
|
executeCommand = ExecuteOptionalRemoteCommand(connection, command->data, &result);
|
||||||
if (querySent == 0)
|
if (executeCommand == QUERY_SEND_FAILED)
|
||||||
{
|
{
|
||||||
ReportConnectionError(connection, WARNING);
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (executeCommand == RESPONSE_NOT_OKAY)
|
||||||
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
|
||||||
if (!IsResponseOK(result))
|
|
||||||
{
|
{
|
||||||
ReportResultError(connection, result, WARNING);
|
|
||||||
PQclear(result);
|
|
||||||
|
|
||||||
/* cannot recover this transaction right now */
|
/* cannot recover this transaction right now */
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,9 @@
|
||||||
|
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
|
||||||
|
/* errors which ExecuteRemoteCommand might return */
|
||||||
|
#define QUERY_SEND_FAILED 1
|
||||||
|
#define RESPONSE_NOT_OKAY 2
|
||||||
|
|
||||||
struct pg_result; /* target of the PGresult typedef */
|
struct pg_result; /* target of the PGresult typedef */
|
||||||
|
|
||||||
|
@ -34,6 +37,9 @@ extern void LogRemoteCommand(MultiConnection *connection, const char *command);
|
||||||
/* wrappers around libpq functions, with command logging support */
|
/* wrappers around libpq functions, with command logging support */
|
||||||
extern void ExecuteCriticalRemoteCommand(MultiConnection *connection,
|
extern void ExecuteCriticalRemoteCommand(MultiConnection *connection,
|
||||||
const char *command);
|
const char *command);
|
||||||
|
extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,
|
||||||
|
const char *command,
|
||||||
|
PGresult **result);
|
||||||
extern int SendRemoteCommand(MultiConnection *connection, const char *command);
|
extern int SendRemoteCommand(MultiConnection *connection, const char *command);
|
||||||
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
|
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
|
||||||
int parameterCount, const Oid *parameterTypes,
|
int parameterCount, const Oid *parameterTypes,
|
||||||
|
|
Loading…
Reference in New Issue