mirror of https://github.com/citusdata/citus.git
Merge pull request #1109 from citusdata/transaction_recovery_connection_api
Use GetNodeConnection to establish a connection in transaction recoverypull/1110/head
commit
16608a3259
|
@ -125,46 +125,59 @@ ReportConnectionError(MultiConnection *connection, int elevel)
|
|||
void
|
||||
ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
|
||||
{
|
||||
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||
char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
|
||||
char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
|
||||
char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT);
|
||||
char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT);
|
||||
|
||||
char *nodeName = connection->hostname;
|
||||
int nodePort = connection->port;
|
||||
int sqlState = ERRCODE_INTERNAL_ERROR;
|
||||
|
||||
if (sqlStateString != NULL)
|
||||
/* we release PQresult when throwing an error because the caller can't */
|
||||
PG_TRY();
|
||||
{
|
||||
sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
|
||||
sqlStateString[3], sqlStateString[4]);
|
||||
}
|
||||
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||
char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
|
||||
char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
|
||||
char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT);
|
||||
char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT);
|
||||
|
||||
/*
|
||||
* If the PGresult did not contain a message, the connection may provide a
|
||||
* suitable top level one. At worst, this is an empty string.
|
||||
*/
|
||||
if (messagePrimary == NULL)
|
||||
{
|
||||
char *lastNewlineIndex = NULL;
|
||||
char *nodeName = connection->hostname;
|
||||
int nodePort = connection->port;
|
||||
int sqlState = ERRCODE_INTERNAL_ERROR;
|
||||
|
||||
messagePrimary = PQerrorMessage(connection->pgConn);
|
||||
lastNewlineIndex = strrchr(messagePrimary, '\n');
|
||||
|
||||
/* trim trailing newline, if any */
|
||||
if (lastNewlineIndex != NULL)
|
||||
if (sqlStateString != NULL)
|
||||
{
|
||||
*lastNewlineIndex = '\0';
|
||||
sqlState = MAKE_SQLSTATE(sqlStateString[0],
|
||||
sqlStateString[1],
|
||||
sqlStateString[2],
|
||||
sqlStateString[3],
|
||||
sqlStateString[4]);
|
||||
}
|
||||
}
|
||||
|
||||
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
|
||||
messageDetail ? errdetail("%s", messageDetail) : 0,
|
||||
messageHint ? errhint("%s", messageHint) : 0,
|
||||
messageContext ? errcontext("%s", messageContext) : 0,
|
||||
errcontext("while executing command on %s:%d",
|
||||
nodeName, nodePort)));
|
||||
/*
|
||||
* If the PGresult did not contain a message, the connection may provide a
|
||||
* suitable top level one. At worst, this is an empty string.
|
||||
*/
|
||||
if (messagePrimary == NULL)
|
||||
{
|
||||
char *lastNewlineIndex = NULL;
|
||||
|
||||
messagePrimary = PQerrorMessage(connection->pgConn);
|
||||
lastNewlineIndex = strrchr(messagePrimary, '\n');
|
||||
|
||||
/* trim trailing newline, if any */
|
||||
if (lastNewlineIndex != NULL)
|
||||
{
|
||||
*lastNewlineIndex = '\0';
|
||||
}
|
||||
}
|
||||
|
||||
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
|
||||
messageDetail ? errdetail("%s", messageDetail) : 0,
|
||||
messageHint ? errhint("%s", messageHint) : 0,
|
||||
messageContext ? errcontext("%s", messageContext) : 0,
|
||||
errcontext("while executing command on %s:%d",
|
||||
nodeName, nodePort)));
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
PQclear(result);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
#include "postgres.h"
|
||||
#include "miscadmin.h"
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
|
@ -23,10 +24,11 @@
|
|||
#include "access/relscan.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/pg_dist_transaction.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/transaction_recovery.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "lib/stringinfo.h"
|
||||
|
@ -49,7 +51,7 @@ static List * NameListDifference(List *nameList, List *subtractList);
|
|||
static int CompareNames(const void *leftPointer, const void *rightPointer);
|
||||
static bool FindMatchingName(char **nameArray, int nameCount, char *needle,
|
||||
int *matchIndex);
|
||||
static List * PendingWorkerTransactionList(PGconn *connection);
|
||||
static List * PendingWorkerTransactionList(MultiConnection *connection);
|
||||
static List * UnconfirmedWorkerTransactionsList(int groupId);
|
||||
static void DeleteTransactionRecord(int32 groupId, char *transactionName);
|
||||
|
||||
|
@ -163,8 +165,9 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
|||
MemoryContext localContext = NULL;
|
||||
MemoryContext oldContext = NULL;
|
||||
|
||||
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
|
||||
if (connection == NULL)
|
||||
int connectionFlags = SESSION_LIFESPAN;
|
||||
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
|
||||
if (connection->pgConn == NULL)
|
||||
{
|
||||
/* cannot recover transactions on this worker right now */
|
||||
return 0;
|
||||
|
@ -207,7 +210,9 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
|||
{
|
||||
char *transactionName = (char *) lfirst(pendingTransactionCell);
|
||||
StringInfo command = makeStringInfo();
|
||||
int querySent = 0;
|
||||
PGresult *result = NULL;
|
||||
bool raiseInterrupts = true;
|
||||
|
||||
bool shouldCommit = FindMatchingName(unconfirmedTransactionArray,
|
||||
unconfirmedTransactionCount,
|
||||
|
@ -225,10 +230,18 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
|||
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName);
|
||||
}
|
||||
|
||||
result = PQexec(connection, command->data);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
querySent = SendRemoteCommand(connection, command->data);
|
||||
if (querySent == 0)
|
||||
{
|
||||
WarnRemoteError(connection, result);
|
||||
ReportConnectionError(connection, WARNING);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(connection, result, WARNING);
|
||||
PQclear(result);
|
||||
|
||||
/* cannot recover this transaction right now */
|
||||
|
@ -236,6 +249,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
|||
}
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
|
||||
ereport(NOTICE, (errmsg("recovered a prepared transaction on %s:%d",
|
||||
nodeName, nodePort),
|
||||
|
@ -367,9 +381,11 @@ FindMatchingName(char **nameArray, int nameCount, char *needle,
|
|||
* transactions on a remote node that were started by this node.
|
||||
*/
|
||||
static List *
|
||||
PendingWorkerTransactionList(PGconn *connection)
|
||||
PendingWorkerTransactionList(MultiConnection *connection)
|
||||
{
|
||||
StringInfo command = makeStringInfo();
|
||||
bool raiseInterrupts = true;
|
||||
int querySent = 0;
|
||||
PGresult *result = NULL;
|
||||
int rowCount = 0;
|
||||
int rowIndex = 0;
|
||||
|
@ -380,10 +396,16 @@ PendingWorkerTransactionList(PGconn *connection)
|
|||
"WHERE gid LIKE 'citus_%d_%%'",
|
||||
coordinatorId);
|
||||
|
||||
result = PQexec(connection, command->data);
|
||||
if (PQresultStatus(result) != PGRES_TUPLES_OK)
|
||||
querySent = SendRemoteCommand(connection, command->data);
|
||||
if (querySent == 0)
|
||||
{
|
||||
ReraiseRemoteError(connection, result);
|
||||
ReportConnectionError(connection, ERROR);
|
||||
}
|
||||
|
||||
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(connection, result, ERROR);
|
||||
}
|
||||
|
||||
rowCount = PQntuples(result);
|
||||
|
@ -397,6 +419,7 @@ PendingWorkerTransactionList(PGconn *connection)
|
|||
}
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
|
||||
return transactionNames;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue