From 31231ce1962f9916f31709bc1ca93d39bdcf831f Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 6 Jan 2017 06:26:48 +0100 Subject: [PATCH 1/2] Use GetNodeConnection to establish a connection in transaction recovery --- .../transaction/transaction_recovery.c | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 5bdb15610..1a7140f9b 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -14,6 +14,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "libpq-fe.h" #include #include @@ -23,10 +24,11 @@ #include "access/relscan.h" #include "access/xact.h" #include "catalog/indexing.h" -#include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/pg_dist_transaction.h" +#include "distributed/remote_commands.h" #include "distributed/transaction_recovery.h" #include "distributed/worker_manager.h" #include "lib/stringinfo.h" @@ -49,7 +51,7 @@ static List * NameListDifference(List *nameList, List *subtractList); static int CompareNames(const void *leftPointer, const void *rightPointer); static bool FindMatchingName(char **nameArray, int nameCount, char *needle, int *matchIndex); -static List * PendingWorkerTransactionList(PGconn *connection); +static List * PendingWorkerTransactionList(MultiConnection *connection); static List * UnconfirmedWorkerTransactionsList(int groupId); static void DeleteTransactionRecord(int32 groupId, char *transactionName); @@ -163,8 +165,9 @@ RecoverWorkerTransactions(WorkerNode *workerNode) MemoryContext localContext = NULL; MemoryContext oldContext = NULL; - PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); - if (connection == NULL) + int connectionFlags = SESSION_LIFESPAN; + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); + if (connection->pgConn == NULL) { /* cannot recover transactions on this worker right now */ return 0; @@ -207,7 +210,9 @@ RecoverWorkerTransactions(WorkerNode *workerNode) { char *transactionName = (char *) lfirst(pendingTransactionCell); StringInfo command = makeStringInfo(); + int querySent = 0; PGresult *result = NULL; + bool raiseInterrupts = true; bool shouldCommit = FindMatchingName(unconfirmedTransactionArray, unconfirmedTransactionCount, @@ -225,10 +230,18 @@ RecoverWorkerTransactions(WorkerNode *workerNode) appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName); } - result = PQexec(connection, command->data); - if (PQresultStatus(result) != PGRES_COMMAND_OK) + querySent = SendRemoteCommand(connection, command->data); + if (querySent == 0) { - WarnRemoteError(connection, result); + ReportConnectionError(connection, WARNING); + + break; + } + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); PQclear(result); /* cannot recover this transaction right now */ @@ -236,6 +249,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode) } PQclear(result); + ForgetResults(connection); ereport(NOTICE, (errmsg("recovered a prepared transaction on %s:%d", nodeName, nodePort), @@ -367,9 +381,11 @@ FindMatchingName(char **nameArray, int nameCount, char *needle, * transactions on a remote node that were started by this node. */ static List * -PendingWorkerTransactionList(PGconn *connection) +PendingWorkerTransactionList(MultiConnection *connection) { StringInfo command = makeStringInfo(); + bool raiseInterrupts = true; + int querySent = 0; PGresult *result = NULL; int rowCount = 0; int rowIndex = 0; @@ -380,10 +396,16 @@ PendingWorkerTransactionList(PGconn *connection) "WHERE gid LIKE 'citus_%d_%%'", coordinatorId); - result = PQexec(connection, command->data); - if (PQresultStatus(result) != PGRES_TUPLES_OK) + querySent = SendRemoteCommand(connection, command->data); + if (querySent == 0) { - ReraiseRemoteError(connection, result); + ReportConnectionError(connection, ERROR); + } + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); } rowCount = PQntuples(result); @@ -397,6 +419,7 @@ PendingWorkerTransactionList(PGconn *connection) } PQclear(result); + ForgetResults(connection); return transactionNames; } From ef326b202abd11a8a375815158877c096a8541c2 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 10 Jan 2017 02:18:01 +0100 Subject: [PATCH 2/2] PQclear in ReportResultError to prevent memory leaks --- .../distributed/connection/remote_commands.c | 81 +++++++++++-------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index d17a44d5a..635ee2120 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -125,46 +125,59 @@ ReportConnectionError(MultiConnection *connection, int elevel) void ReportResultError(MultiConnection *connection, PGresult *result, int elevel) { - char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); - char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); - char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL); - char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT); - char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT); - - char *nodeName = connection->hostname; - int nodePort = connection->port; - int sqlState = ERRCODE_INTERNAL_ERROR; - - if (sqlStateString != NULL) + /* we release PQresult when throwing an error because the caller can't */ + PG_TRY(); { - sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], - sqlStateString[3], sqlStateString[4]); - } + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); + char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL); + char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT); + char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT); - /* - * If the PGresult did not contain a message, the connection may provide a - * suitable top level one. At worst, this is an empty string. - */ - if (messagePrimary == NULL) - { - char *lastNewlineIndex = NULL; + char *nodeName = connection->hostname; + int nodePort = connection->port; + int sqlState = ERRCODE_INTERNAL_ERROR; - messagePrimary = PQerrorMessage(connection->pgConn); - lastNewlineIndex = strrchr(messagePrimary, '\n'); - - /* trim trailing newline, if any */ - if (lastNewlineIndex != NULL) + if (sqlStateString != NULL) { - *lastNewlineIndex = '\0'; + sqlState = MAKE_SQLSTATE(sqlStateString[0], + sqlStateString[1], + sqlStateString[2], + sqlStateString[3], + sqlStateString[4]); } - } - ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary), - messageDetail ? errdetail("%s", messageDetail) : 0, - messageHint ? errhint("%s", messageHint) : 0, - messageContext ? errcontext("%s", messageContext) : 0, - errcontext("while executing command on %s:%d", - nodeName, nodePort))); + /* + * If the PGresult did not contain a message, the connection may provide a + * suitable top level one. At worst, this is an empty string. + */ + if (messagePrimary == NULL) + { + char *lastNewlineIndex = NULL; + + messagePrimary = PQerrorMessage(connection->pgConn); + lastNewlineIndex = strrchr(messagePrimary, '\n'); + + /* trim trailing newline, if any */ + if (lastNewlineIndex != NULL) + { + *lastNewlineIndex = '\0'; + } + } + + ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary), + messageDetail ? errdetail("%s", messageDetail) : 0, + messageHint ? errhint("%s", messageHint) : 0, + messageContext ? errcontext("%s", messageContext) : 0, + errcontext("while executing command on %s:%d", + nodeName, nodePort))); + } + PG_CATCH(); + { + PQclear(result); + PG_RE_THROW(); + } + PG_END_TRY(); }