From 87c62d598e449a8f35ccb0903465d0f9542f5c06 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 21 Dec 2016 18:44:04 +0100 Subject: [PATCH] Connectionapify SendCommandListToWorkerInSingleTransaction --- .../distributed/connection/remote_commands.c | 29 ++++++ .../transaction/worker_transaction.c | 96 ++++--------------- src/include/distributed/remote_commands.h | 2 + 3 files changed, 49 insertions(+), 78 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 6c5658988..2e1de8452 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -187,6 +187,35 @@ LogRemoteCommand(MultiConnection *connection, const char *command) /* wrappers around libpq functions, with command logging support */ + +/* + * ExecuteCriticalRemoteCommand executes a remote command that is critical + * to the transaction. If the command fails then the transaction aborts. + */ +void +ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command) +{ + int querySent = 0; + PGresult *result = NULL; + bool raiseInterrupts = true; + + querySent = SendRemoteCommand(connection, command); + if (querySent == 0) + { + ReportConnectionError(connection, WARNING); + } + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + PQclear(result); + ForgetResults(connection); +} + + /* * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and * accepts a MultiConnection instead of a plain PGconn. It makes sure it can diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index faca72802..775fb4c1d 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -20,9 +20,11 @@ #include "access/xact.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" #include "distributed/resource_lock.h" +#include "distributed/remote_commands.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_transaction.h" #include "distributed/transaction_recovery.h" @@ -37,7 +39,6 @@ static void CompleteWorkerTransactions(XactEvent event, void *arg); static List * OpenWorkerTransactions(void); static TransactionConnection * GetWorkerTransaction(char *nodeName, int32 nodePort); static List * GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet); -static bool IsResponseOK(ExecStatusType resultStatus); /* Global worker connection list */ @@ -147,9 +148,8 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, PGconn *connection = transactionConnection->connection; PGresult *result = PQgetResult(connection); - ExecStatusType resultStatus = PQresultStatus(result); - if (!IsResponseOK(resultStatus)) + if (!IsResponseOK(result)) { ReraiseRemoteError(connection, result); } @@ -172,9 +172,9 @@ void SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char *nodeUser, List *commandList) { - PGconn *workerConnection = NULL; - PGresult *queryResult = NULL; + MultiConnection *workerConnection = NULL; ListCell *commandCell = NULL; + int connectionFlags = FORCE_NEW_CONNECTION; if (XactModificationLevel > XACT_MODIFICATION_NONE) { @@ -183,66 +183,22 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char "command within a transaction"))); } - workerConnection = ConnectToNode(nodeName, nodePort, nodeUser); - if (workerConnection == NULL) + workerConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + + MarkRemoteTransactionCritical(workerConnection); + RemoteTransactionBegin(workerConnection); + + /* iterate over the commands and execute them in the same connection */ + foreach(commandCell, commandList) { - ereport(ERROR, (errmsg("could not open connection to %s:%d as %s", - nodeName, nodePort, nodeUser))); + char *commandString = lfirst(commandCell); + + ExecuteCriticalRemoteCommand(workerConnection, commandString); } - PG_TRY(); - { - /* start the transaction on the worker node */ - queryResult = PQexec(workerConnection, "BEGIN"); - if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(workerConnection, queryResult); - } - - PQclear(queryResult); - - /* iterate over the commands and execute them in the same connection */ - foreach(commandCell, commandList) - { - char *commandString = lfirst(commandCell); - ExecStatusType resultStatus = PGRES_EMPTY_QUERY; - - CHECK_FOR_INTERRUPTS(); - - queryResult = PQexec(workerConnection, commandString); - resultStatus = PQresultStatus(queryResult); - if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || - resultStatus == PGRES_COMMAND_OK)) - { - ReraiseRemoteError(workerConnection, queryResult); - } - - PQclear(queryResult); - } - - /* commit the transaction on the worker node */ - queryResult = PQexec(workerConnection, "COMMIT"); - if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(workerConnection, queryResult); - } - - PQclear(queryResult); - - /* clear NULL result */ - PQgetResult(workerConnection); - - /* we no longer need this connection */ - CloseConnectionByPGconn(workerConnection); - } - PG_CATCH(); - { - /* close the connection */ - CloseConnectionByPGconn(workerConnection); - - PG_RE_THROW(); - } - PG_END_TRY(); + RemoteTransactionCommit(workerConnection); + CloseConnection(workerConnection); } @@ -540,19 +496,3 @@ GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet) return targetConnectionList; } - - -/* - * IsResponseOK checks the resultStatus and returns true if the status is OK. - */ -static bool -IsResponseOK(ExecStatusType resultStatus) -{ - if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || - resultStatus == PGRES_COMMAND_OK) - { - return true; - } - - return false; -} diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 66430ea64..52cf255d2 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -32,6 +32,8 @@ extern void ReportResultError(MultiConnection *connection, struct pg_result *res extern void LogRemoteCommand(MultiConnection *connection, const char *command); /* wrappers around libpq functions, with command logging support */ +extern void ExecuteCriticalRemoteCommand(MultiConnection *connection, + const char *command); extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts);