Use GetNodeConnection to establish a connection in transaction recovery

pull/1109/head
Marco Slot 2017-01-06 06:26:48 +01:00
parent 4f6c2cac67
commit 31231ce196
1 changed files with 34 additions and 11 deletions

View File

@ -14,6 +14,7 @@
#include "postgres.h"
#include "miscadmin.h"
#include "libpq-fe.h"
#include <sys/stat.h>
#include <unistd.h>
@ -23,10 +24,11 @@
#include "access/relscan.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/pg_dist_transaction.h"
#include "distributed/remote_commands.h"
#include "distributed/transaction_recovery.h"
#include "distributed/worker_manager.h"
#include "lib/stringinfo.h"
@ -49,7 +51,7 @@ static List * NameListDifference(List *nameList, List *subtractList);
static int CompareNames(const void *leftPointer, const void *rightPointer);
static bool FindMatchingName(char **nameArray, int nameCount, char *needle,
int *matchIndex);
static List * PendingWorkerTransactionList(PGconn *connection);
static List * PendingWorkerTransactionList(MultiConnection *connection);
static List * UnconfirmedWorkerTransactionsList(int groupId);
static void DeleteTransactionRecord(int32 groupId, char *transactionName);
@ -163,8 +165,9 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
MemoryContext localContext = NULL;
MemoryContext oldContext = NULL;
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
int connectionFlags = SESSION_LIFESPAN;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
if (connection->pgConn == NULL)
{
/* cannot recover transactions on this worker right now */
return 0;
@ -207,7 +210,9 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
{
char *transactionName = (char *) lfirst(pendingTransactionCell);
StringInfo command = makeStringInfo();
int querySent = 0;
PGresult *result = NULL;
bool raiseInterrupts = true;
bool shouldCommit = FindMatchingName(unconfirmedTransactionArray,
unconfirmedTransactionCount,
@ -225,10 +230,18 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName);
}
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
querySent = SendRemoteCommand(connection, command->data);
if (querySent == 0)
{
WarnRemoteError(connection, result);
ReportConnectionError(connection, WARNING);
break;
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, WARNING);
PQclear(result);
/* cannot recover this transaction right now */
@ -236,6 +249,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
}
PQclear(result);
ForgetResults(connection);
ereport(NOTICE, (errmsg("recovered a prepared transaction on %s:%d",
nodeName, nodePort),
@ -367,9 +381,11 @@ FindMatchingName(char **nameArray, int nameCount, char *needle,
* transactions on a remote node that were started by this node.
*/
static List *
PendingWorkerTransactionList(PGconn *connection)
PendingWorkerTransactionList(MultiConnection *connection)
{
StringInfo command = makeStringInfo();
bool raiseInterrupts = true;
int querySent = 0;
PGresult *result = NULL;
int rowCount = 0;
int rowIndex = 0;
@ -380,10 +396,16 @@ PendingWorkerTransactionList(PGconn *connection)
"WHERE gid LIKE 'citus_%d_%%'",
coordinatorId);
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_TUPLES_OK)
querySent = SendRemoteCommand(connection, command->data);
if (querySent == 0)
{
ReraiseRemoteError(connection, result);
ReportConnectionError(connection, ERROR);
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
rowCount = PQntuples(result);
@ -397,6 +419,7 @@ PendingWorkerTransactionList(PGconn *connection)
}
PQclear(result);
ForgetResults(connection);
return transactionNames;
}