From c161c2fbe30b62cfd03dc28370bd1441b7e7b7b0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 01/14] Fix some trailing whitespace. --- src/test/regress/Makefile | 2 +- src/test/regress/isolation_schedule | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index 3dded6576..488043dd1 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -60,7 +60,7 @@ check-isolation: all tempinstall-main check-vanilla: all tempinstall-main $(pg_regress_multi_check) --load-extension=citus --vanillatest - + check-multi-mx: all tempinstall-main $(pg_regress_multi_check) --load-extension=citus \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_mx_schedule $(EXTRA_TESTS) diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 8f7dd91e5..3594eecc2 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -1,10 +1,10 @@ test: isolation_add_node_vs_reference_table_operations -# tests that change node metadata should precede -# isolation_cluster_management such that tests +# tests that change node metadata should precede +# isolation_cluster_management such that tests # that come later can be parallelized test: isolation_cluster_management test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement test: isolation_concurrent_dml isolation_data_migration -test: isolation_drop_shards isolation_copy_placement_vs_modification +test: isolation_drop_shards isolation_copy_placement_vs_modification From 3dedeadb5ee285be393f479276a9039280eddcaa Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 02/14] Fix memory leak in RemoteFinalizedShardPlacementList(). --- src/backend/distributed/commands/multi_copy.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index e2b2703e2..b590a70de 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -939,8 +939,8 @@ RemoteFinalizedShardPlacementList(uint64 shardId) for (rowIndex = 0; rowIndex < rowCount; rowIndex++) { char *placementIdString = PQgetvalue(queryResult, rowIndex, 0); - char *nodeName = PQgetvalue(queryResult, rowIndex, 1); - char *nodePortString = PQgetvalue(queryResult, rowIndex, 2); + char *nodeName = pstrdup(PQgetvalue(queryResult, rowIndex, 1)); + char *nodePortString = pstrdup(PQgetvalue(queryResult, rowIndex, 2)); uint32 nodePort = atoi(nodePortString); uint64 placementId = atoll(placementIdString); @@ -959,6 +959,7 @@ RemoteFinalizedShardPlacementList(uint64 shardId) ereport(ERROR, (errmsg("could not get shard placements from the master node"))); } + PQclear(queryResult); return finalizedPlacementList; } From ec0ed677e314e64266085594c9da92691bf725d8 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 03/14] Fix copy & pasto in WARNING message. --- src/backend/distributed/transaction/remote_transaction.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 01bfe0eea..17a70ea83 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -339,7 +339,7 @@ FinishRemoteTransactionAbort(MultiConnection *connection) if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING) { ereport(WARNING, - (errmsg("failed to abort 2PC transaction \"%s\" on %s:%d", + (errmsg("failed to abort 1PC transaction \"%s\" on %s:%d", transaction->preparedName, connection->hostname, connection->port))); } From b7f9679ccc6aabab76529b0576557c192a019f2f Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 04/14] Move interrupt handling code from GetRemoteCommandResult to FinishConnectionIO. Nearby commits will add additional interrupt handling functions, this way we don't have to duplicate the code. --- .../distributed/connection/remote_commands.c | 142 ++++++++---------- 1 file changed, 60 insertions(+), 82 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index ecfc54517..ffa4625cf 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -23,6 +23,7 @@ /* GUC, determining whether statements sent to remote nodes are logged */ bool LogRemoteCommands = false; +static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts); /* simple helpers */ @@ -367,7 +368,7 @@ ReadFirstColumnAsText(PGresult *queryResult) * an error. * * If raiseInterrupts is false and an interrupt arrives that'd otherwise raise - * an error, GetRemotecommandResult returns NULL, and the transaction is + * an error, GetRemoteCommandResult returns NULL, and the transaction is * marked as having failed. While that's not a perfect way to signal failure, * callers will usually treat that as an error, and it's easy to use. * @@ -379,11 +380,8 @@ PGresult * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) { PGconn *pgConn = connection->pgConn; - int socket = 0; - int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET; bool wasNonblocking = false; PGresult *result = NULL; - bool failed = false; /* * Short circuit tests around the more expensive parts of this @@ -395,106 +393,90 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) return PQgetResult(connection->pgConn); } - socket = PQsocket(pgConn); wasNonblocking = PQisnonblocking(pgConn); /* make sure not to block anywhere */ - if (!wasNonblocking) + PQsetnonblocking(pgConn, true); + + if (!FinishConnectionIO(connection, raiseInterrupts)) { - PQsetnonblocking(pgConn, true); + return NULL; } + /* no IO should be necessary to get result */ + Assert(!PQisBusy(pgConn)); + + result = PQgetResult(connection->pgConn); + + PQsetnonblocking(pgConn, wasNonblocking); + + return result; +} + + +/* + * FinishConnectionIO performs pending IO for the connection, while accepting + * interrupts. + * + * See GetRemoteCommandResult() for documentation of interrupt handling + * behaviour. + * + * Returns true if IO was successfully completed, false otherwise. + */ +static bool +FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts) +{ + PGconn *pgConn = connection->pgConn; + int socket = PQsocket(pgConn); + + Assert(pgConn); + Assert(PQisnonblocking(pgConn)); + if (raiseInterrupts) { CHECK_FOR_INTERRUPTS(); } - /* make sure command has been sent out */ - while (!failed) + /* perform the necessary IO */ + while (true) { + int sendStatus = 0; int rc = 0; + int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET; - ResetLatch(MyLatch); - - /* try to send all the data */ - rc = PQflush(pgConn); - - /* stop writing if all data has been sent, or there was none to send */ - if (rc == 0) - { - break; - } + /* try to send all pending data */ + sendStatus = PQflush(pgConn); /* if sending failed, there's nothing more we can do */ - if (rc == -1) + if (sendStatus == -1) { - failed = true; - break; + return false; } - - /* this means we have to wait for data to go out */ - Assert(rc == 1); - -#if (PG_VERSION_NUM >= 100000) - rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0, - PG_WAIT_EXTENSION); -#else - rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0); -#endif - - if (rc & WL_POSTMASTER_DEATH) + else if (sendStatus == 1) { - ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + waitFlags |= WL_SOCKET_WRITEABLE; } - if (rc & WL_LATCH_SET) - { - /* if allowed raise errors */ - if (raiseInterrupts) - { - CHECK_FOR_INTERRUPTS(); - } - - /* - * If raising errors allowed, or called within in a section with - * interrupts held, return NULL instead, and mark the transaction - * as failed. - */ - if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) - { - connection->remoteTransaction.transactionFailed = true; - failed = true; - break; - } - } - } - - /* wait for the result of the command to come in */ - while (!failed) - { - int rc = 0; - - ResetLatch(MyLatch); - /* if reading fails, there's not much we can do */ if (PQconsumeInput(pgConn) == 0) { - failed = true; - break; + return false; + } + if (PQisBusy(pgConn)) + { + waitFlags |= WL_SOCKET_READABLE; } - /* check if all the necessary data is now available */ - if (!PQisBusy(pgConn)) + if ((waitFlags & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0) { - result = PQgetResult(connection->pgConn); - break; + /* no IO necessary anymore, we're done */ + return true; } #if (PG_VERSION_NUM >= 100000) - rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0, - PG_WAIT_EXTENSION); + rc = WaitLatchOrSocket(MyLatch, waitFlags, socket, 0, PG_WAIT_EXTENSION); #else - rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0); + rc = WaitLatchOrSocket(MyLatch, waitFlags, socket, 0); #endif if (rc & WL_POSTMASTER_DEATH) @@ -504,6 +486,8 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) if (rc & WL_LATCH_SET) { + ResetLatch(MyLatch); + /* if allowed raise errors */ if (raiseInterrupts) { @@ -512,22 +496,16 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) /* * If raising errors allowed, or called within in a section with - * interrupts held, return NULL instead, and mark the transaction - * as failed. + * interrupts held, return instead, and mark the transaction as + * failed. */ if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) { connection->remoteTransaction.transactionFailed = true; - failed = true; break; } } } - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, false); - } - - return result; + return false; } From c674bc86401b359c302363d864d1c9ef097cd5de Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 05/14] Add interrupt aware PQputCopy{End,Data} wrappers. --- .../distributed/connection/remote_commands.c | 90 +++++++++++++++++++ src/include/distributed/remote_commands.h | 3 + 2 files changed, 93 insertions(+) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index ffa4625cf..8e3dcc669 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -414,6 +414,96 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) } +/* + * PutRemoteCopyData is a wrapper around PQputCopyData() that handles + * interrupts. + * + * Returns false if PQputCopyData() failed, true otherwise. + */ +bool +PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) +{ + PGconn *pgConn = connection->pgConn; + bool wasNonblocking = false; + int copyState = 0; + bool success = false; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + wasNonblocking = PQisnonblocking(pgConn); + + PQsetnonblocking(pgConn, true); + + copyState = PQputCopyData(pgConn, buffer, nbytes); + + if (copyState == 1) + { + /* successful */ + success = true; + } + else if (copyState == -1) + { + success = false; + } + else + { + bool allowInterrupts = true; + success = FinishConnectionIO(connection, allowInterrupts); + } + + PQsetnonblocking(pgConn, wasNonblocking); + return success; +} + + +/* + * PutRemoteCopyEnd is a wrapper around PQputCopyEnd() that handles + * interrupts. + * + * Returns false if PQputCopyEnd() failed, true otherwise. + */ +bool +PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) +{ + PGconn *pgConn = connection->pgConn; + bool wasNonblocking = false; + int copyState = 0; + bool success = false; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + wasNonblocking = PQisnonblocking(pgConn); + + PQsetnonblocking(pgConn, true); + + copyState = PQputCopyEnd(pgConn, errormsg); + + if (copyState == 1) + { + /* successful */ + success = true; + } + else if (copyState == -1) + { + success = false; + } + else + { + bool allowInterrupts = true; + success = FinishConnectionIO(connection, allowInterrupts); + } + + PQsetnonblocking(pgConn, wasNonblocking); + return success; +} + + /* * FinishConnectionIO performs pending IO for the connection, while accepting * interrupts. diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 3799d9d97..6d3f2abb3 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -47,6 +47,9 @@ extern int SendRemoteCommandParams(MultiConnection *connection, const char *comm extern List * ReadFirstColumnAsText(struct pg_result *queryResult); extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts); +extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, + int nbytes); +extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg); #endif /* REMOTE_COMMAND_H */ From ddb06519670b8916356b744da44c178cfba297a0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 06/14] Move citus tools to interrupt aware libpq wrappers. --- .../distributed/master/master_citus_tools.c | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index 946ec16cf..62679a83f 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -21,6 +21,7 @@ #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_server_executor.h" +#include "distributed/remote_commands.h" #include "distributed/worker_protocol.h" #include "lib/stringinfo.h" #include "utils/builtins.h" @@ -39,11 +40,11 @@ static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, bool *statusArray, StringInfo *resultStringArray, int commmandCount); -static bool GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, +static bool GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, StringInfo queryResultString); -static bool EvaluateQueryResult(PGconn *connection, PGresult *queryResult, StringInfo - queryResultString); -static void StoreErrorMessage(PGconn *connection, StringInfo queryResultString); +static bool EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, + StringInfo queryResultString); +static void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString); static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, StringInfo *commandStringArray, @@ -223,7 +224,8 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor int commmandCount) { int commandIndex = 0; - PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *)); + MultiConnection **connectionArray = + palloc0(commmandCount * sizeof(MultiConnection *)); int finishedCount = 0; /* establish connections */ @@ -232,14 +234,13 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor char *nodeName = nodeNameArray[commandIndex]->data; int nodePort = nodePortArray[commandIndex]; int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *multiConnection = + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - PGconn *connection = multiConnection->pgConn; StringInfo queryResultString = resultStringArray[commandIndex]; statusArray[commandIndex] = true; - if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) + if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, (int) nodePort); @@ -256,7 +257,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) { int querySent = 0; - PGconn *connection = connectionArray[commandIndex]; + MultiConnection *connection = connectionArray[commandIndex]; char *queryString = commandStringArray[commandIndex]->data; StringInfo queryResultString = resultStringArray[commandIndex]; @@ -269,13 +270,17 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor continue; } - querySent = PQsendQuery(connection, queryString); + /* + * NB: this intentionally uses PQsendQuery rather than + * SendRemoteCommand as multiple commands are allowed. + */ + querySent = PQsendQuery(connection->pgConn, queryString); if (querySent == 0) { StoreErrorMessage(connection, queryResultString); statusArray[commandIndex] = false; - CloseConnectionByPGconn(connection); + CloseConnection(connection); connectionArray[commandIndex] = NULL; finishedCount++; } @@ -286,7 +291,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor { for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) { - PGconn *connection = connectionArray[commandIndex]; + MultiConnection *connection = connectionArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex]; bool success = false; bool queryFinished = false; @@ -304,7 +309,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor finishedCount++; statusArray[commandIndex] = success; connectionArray[commandIndex] = NULL; - CloseConnectionByPGconn(connection); + CloseConnection(connection); } } @@ -328,11 +333,11 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor * reported upon completion of the query. */ static bool -GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, +GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, StringInfo queryResultString) { bool finished = true; - ConnStatusType connectionStatus = PQstatus(connection); + ConnStatusType connectionStatus = PQstatus(connection->pgConn); int consumeInput = 0; PGresult *queryResult = NULL; bool success = false; @@ -346,7 +351,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, return finished; } - consumeInput = PQconsumeInput(connection); + consumeInput = PQconsumeInput(connection->pgConn); if (consumeInput == 0) { appendStringInfo(queryResultString, "query result unavailable"); @@ -354,14 +359,14 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, } /* check later if busy */ - if (PQisBusy(connection) != 0) + if (PQisBusy(connection->pgConn) != 0) { finished = false; return finished; } /* query result is available at this point */ - queryResult = PQgetResult(connection); + queryResult = PQgetResult(connection->pgConn); success = EvaluateQueryResult(connection, queryResult, queryResultString); PQclear(queryResult); @@ -379,7 +384,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, * error otherwise. */ static bool -EvaluateQueryResult(PGconn *connection, PGresult *queryResult, +EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, StringInfo queryResultString) { bool success = false; @@ -434,9 +439,9 @@ EvaluateQueryResult(PGconn *connection, PGresult *queryResult, * otherwise it would return a default error message. */ static void -StoreErrorMessage(PGconn *connection, StringInfo queryResultString) +StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString) { - char *errorMessage = PQerrorMessage(connection); + char *errorMessage = PQerrorMessage(connection->pgConn); if (errorMessage != NULL) { char *firstNewlineIndex = strchr(errorMessage, '\n'); @@ -499,26 +504,27 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, StringInfo queryResultString) { int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *multiConnection = + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - PGconn *nodeConnection = multiConnection->pgConn; bool success = false; PGresult *queryResult = NULL; + bool raiseInterrupts = true; - if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) + if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, (int) nodePort); return false; } - queryResult = PQexec(nodeConnection, queryString); - success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString); + SendRemoteCommand(connection, queryString); + queryResult = GetRemoteCommandResult(connection, raiseInterrupts); + success = EvaluateQueryResult(connection, queryResult, queryResultString); PQclear(queryResult); /* close the connection */ - CloseConnection(multiConnection); + CloseConnection(connection); return success; } From 21c25abbb1d8dda56cba1f77ef1a3cc41a71d9c4 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 07/14] Move multi_client_executor to interrupt aware libpq wrappers. --- .../executor/multi_client_executor.c | 36 ++++++------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index e7252ef62..827387489 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -44,7 +44,6 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT]; /* Local functions forward declarations */ -static void ClearRemainingResults(MultiConnection *connection); static bool ClientConnectionReady(MultiConnection *connection, PostgresPollingStatusType pollingStatus); @@ -401,6 +400,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -413,7 +413,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, return false; } - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_TUPLES_OK) { @@ -430,7 +430,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, } /* clear extra result objects */ - ClearRemainingResults(connection); + ForgetResults(connection); return true; } @@ -454,6 +454,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -471,7 +472,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, return CLIENT_BATCH_QUERY_FAILED; } - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); if (result == NULL) { return CLIENT_BATCH_QUERY_DONE; @@ -538,6 +539,7 @@ MultiClientQueryStatus(int32 connectionId) ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; QueryStatus queryStatus = CLIENT_INVALID_QUERY; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -555,7 +557,7 @@ MultiClientQueryStatus(int32 connectionId) * isn't ready yet (the caller didn't wait for the connection to be ready), * we will block on this call. */ - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -598,7 +600,7 @@ MultiClientQueryStatus(int32 connectionId) */ if (!copyResults) { - ClearRemainingResults(connection); + ForgetResults(connection); } return queryStatus; @@ -665,7 +667,8 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) else if (receiveLength == -1) { /* received copy done message */ - PGresult *result = PQgetResult(connection->pgConn); + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); ExecStatusType resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -692,7 +695,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) /* if copy out completed, make sure we drain all results from libpq */ if (receiveLength < 0) { - ClearRemainingResults(connection); + ForgetResults(connection); } return copyStatus; @@ -853,23 +856,6 @@ MultiClientWait(WaitInfo *waitInfo) } -/* - * ClearRemainingResults reads result objects from the connection until we get - * null, and clears these results. This is the last step in completing an async - * query. - */ -static void -ClearRemainingResults(MultiConnection *connection) -{ - PGresult *result = PQgetResult(connection->pgConn); - while (result != NULL) - { - PQclear(result); - result = PQgetResult(connection->pgConn); - } -} - - /* * ClientConnectionReady checks if the given connection is ready for non-blocking * reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c From 90a2d13a64bfe289061a15b95e15605e7200c9a7 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 08/14] Move multi_copy.c to interrupt aware libpq wrappers. --- src/backend/distributed/commands/multi_copy.c | 58 ++++++++++++++----- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index b590a70de..9f186146a 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -658,6 +658,7 @@ MasterPartitionMethod(RangeVar *relation) { char partitionMethod = '\0'; PGresult *queryResult = NULL; + bool raiseInterrupts = true; char *relationName = relation->relname; char *schemaName = relation->schemaname; @@ -666,7 +667,11 @@ MasterPartitionMethod(RangeVar *relation) StringInfo partitionMethodCommand = makeStringInfo(); appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName); - queryResult = PQexec(masterConnection->pgConn, partitionMethodCommand->data); + if (!SendRemoteCommand(masterConnection, partitionMethodCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0); @@ -687,6 +692,9 @@ MasterPartitionMethod(RangeVar *relation) PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); + return partitionMethod; } @@ -735,6 +743,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, ListCell *placementCell = NULL; List *connectionList = NULL; int64 shardId = shardConnections->shardId; + bool raiseInterrupts = true; MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "OpenCopyConnections", @@ -784,15 +793,19 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, MarkRemoteTransactionCritical(connection); ClaimConnectionExclusively(connection); RemoteTransactionBeginIfNecessary(connection); + copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, useBinaryCopyFormat); - result = PQexec(connection->pgConn, copyCommand->data); + if (!SendRemoteCommand(connection, copyCommand->data)) + { + ReportConnectionError(connection, ERROR); + } + result = GetRemoteCommandResult(connection, raiseInterrupts); if (PQresultStatus(result) != PGRES_COPY_IN) { ReportResultError(connection, result, ERROR); } - PQclear(result); connectionList = lappend(connectionList, connection); } @@ -926,11 +939,16 @@ RemoteFinalizedShardPlacementList(uint64 shardId) { List *finalizedPlacementList = NIL; PGresult *queryResult = NULL; + bool raiseInterrupts = true; StringInfo shardPlacementsCommand = makeStringInfo(); appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId); - queryResult = PQexec(masterConnection->pgConn, shardPlacementsCommand->data); + if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { int rowCount = PQntuples(queryResult); @@ -960,6 +978,9 @@ RemoteFinalizedShardPlacementList(uint64 shardId) } PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); + return finalizedPlacementList; } @@ -1065,8 +1086,7 @@ SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList) static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection) { - int copyResult = PQputCopyData(connection->pgConn, dataBuffer->data, dataBuffer->len); - if (copyResult != 1) + if (!PutRemoteCopyData(connection, dataBuffer->data, dataBuffer->len)) { ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("failed to COPY to shard %ld on %s:%d", @@ -1090,13 +1110,11 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) foreach(connectionCell, connectionList) { MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); - int copyEndResult = 0; PGresult *result = NULL; + bool raiseInterrupts = true; /* end the COPY input */ - copyEndResult = PQputCopyEnd(connection->pgConn, NULL); - - if (copyEndResult != 1) + if (!PutRemoteCopyEnd(connection, NULL)) { if (stopOnFailure) { @@ -1109,7 +1127,7 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) } /* check whether there were any COPY errors */ - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) { ReportCopyError(connection, result); @@ -1428,11 +1446,16 @@ RemoteCreateEmptyShard(char *relationName) { int64 shardId = 0; PGresult *queryResult = NULL; + bool raiseInterrupts = true; StringInfo createEmptyShardCommand = makeStringInfo(); appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName); - queryResult = PQexec(masterConnection->pgConn, createEmptyShardCommand->data); + if (!SendRemoteCommand(masterConnection, createEmptyShardCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0); @@ -1446,6 +1469,8 @@ RemoteCreateEmptyShard(char *relationName) } PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); return shardId; } @@ -1476,18 +1501,25 @@ static void RemoteUpdateShardStatistics(uint64 shardId) { PGresult *queryResult = NULL; + bool raiseInterrupts = true; StringInfo updateShardStatisticsCommand = makeStringInfo(); appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY, shardId); - queryResult = PQexec(masterConnection->pgConn, updateShardStatisticsCommand->data); + if (!SendRemoteCommand(masterConnection, updateShardStatisticsCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) != PGRES_TUPLES_OK) { ereport(ERROR, (errmsg("could not update shard statistics"))); } PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); } From 75a7ddea0d0ddddd9c5aa419c50d912a1116a0cb Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 09/14] Always use connections in non-blocking mode. Now that there's no blocking libpq callers left, default to using non-blocking mode in connection_management.c. This has two advantages: 1) Blockiness doesn't have to frequently be reset, simplifying code 2) Prevents accidental use of blocking libpq functions, since they'll frequently return 'need IO' --- .../connection/connection_management.c | 7 +++ .../distributed/connection/remote_commands.c | 53 ++++--------------- 2 files changed, 16 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index aff9c2f58..74eb0aece 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -640,6 +640,13 @@ StartConnectionEstablishment(ConnectionHashKey *key) connection->pgConn = PQconnectStartParams(keywords, values, false); connection->connectionStart = GetCurrentTimestamp(); + /* + * To avoid issues with interrupts not getting caught all our connections + * are managed in a non-blocking manner. remote_commands.c provides + * wrappers emulating blocking behaviour. + */ + PQsetnonblocking(connection->pgConn, true); + return connection; } diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 8e3dcc669..186ca8f50 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -280,7 +280,6 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, const char *const *parameterValues) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int rc = 0; LogRemoteCommand(connection, command); @@ -294,23 +293,11 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, return 0; } - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, true); - } + Assert(PQisnonblocking(pgConn)); rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes, parameterValues, NULL, NULL, 0); - /* reset nonblocking connection to its original state */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, false); - } - return rc; } @@ -380,7 +367,6 @@ PGresult * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; PGresult *result = NULL; /* @@ -393,11 +379,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) return PQgetResult(connection->pgConn); } - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - PQsetnonblocking(pgConn, true); - if (!FinishConnectionIO(connection, raiseInterrupts)) { return NULL; @@ -408,8 +389,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) result = PQgetResult(connection->pgConn); - PQsetnonblocking(pgConn, wasNonblocking); - return result; } @@ -424,38 +403,31 @@ bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int copyState = 0; - bool success = false; if (PQstatus(pgConn) != CONNECTION_OK) { return false; } - wasNonblocking = PQisnonblocking(pgConn); - - PQsetnonblocking(pgConn, true); + Assert(PQisnonblocking(pgConn)); copyState = PQputCopyData(pgConn, buffer, nbytes); if (copyState == 1) { /* successful */ - success = true; + return true; } else if (copyState == -1) { - success = false; + return false; } else { bool allowInterrupts = true; - success = FinishConnectionIO(connection, allowInterrupts); + return FinishConnectionIO(connection, allowInterrupts); } - - PQsetnonblocking(pgConn, wasNonblocking); - return success; } @@ -469,38 +441,31 @@ bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int copyState = 0; - bool success = false; if (PQstatus(pgConn) != CONNECTION_OK) { return false; } - wasNonblocking = PQisnonblocking(pgConn); - - PQsetnonblocking(pgConn, true); + Assert(PQisnonblocking(pgConn)); copyState = PQputCopyEnd(pgConn, errormsg); if (copyState == 1) { /* successful */ - success = true; + return true; } else if (copyState == -1) { - success = false; + return false; } else { bool allowInterrupts = true; - success = FinishConnectionIO(connection, allowInterrupts); + return FinishConnectionIO(connection, allowInterrupts); } - - PQsetnonblocking(pgConn, wasNonblocking); - return success; } From 24153fae5d61784fb31e905854f2beb73585717c Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 10/14] Add ShutdownConnection() which cancels statement before closing connection. That's primarily useful in error cases, where we want to make sure locks etc held by commands running on workers are released promptly. --- .../connection/connection_management.c | 34 +++++++++++++++++++ .../distributed/connection_management.h | 1 + 2 files changed, 35 insertions(+) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 74eb0aece..a056b852e 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -401,6 +401,40 @@ CloseConnectionByPGconn(PGconn *pqConn) } +/* + * ShutdownConnection, if necessary cancels the currently running statement, + * and then closes the underlying libpq connection. The MultiConnection + * itself is left intact. + * + * NB: Cancelling a statement requires network IO, and currently is not + * interruptible. Unfortunately libpq does not provide a non-blocking + * implementation of PQcancel(), so we don't have much choice for now. + */ +void +ShutdownConnection(MultiConnection *connection) +{ + /* + * Only cancel statement if there's currently one running, and the + * connection is in an OK state. + */ + if (PQstatus(connection->pgConn) == CONNECTION_OK && + PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE) + { + char errorMessage[256] = { 0 }; + PGcancel *cancel = PQgetCancel(connection->pgConn); + + if (!PQcancel(cancel, errorMessage, sizeof(errorMessage))) + { + ereport(WARNING, (errmsg("could not cancel connection: %s", + errorMessage))); + } + PQfreeCancel(cancel); + } + PQfinish(connection->pgConn); + connection->pgConn = NULL; +} + + /* * FinishConnectionListEstablishment is a wrapper around FinishConnectionEstablishment. * The function iterates over the multiConnectionList and finishes the connection diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 11b023630..21d8a7bbe 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -134,6 +134,7 @@ extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern void CloseConnection(MultiConnection *connection); extern void CloseConnectionByPGconn(struct pg_conn *pqConn); +extern void ShutdownConnection(MultiConnection *connection); /* dealing with a connection */ extern void FinishConnectionListEstablishment(List *multiConnectionList); From be8677f926d5b881a839c4a8ecda41c47e7aba0c Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 11/14] Add NonblockingForgetResults(). This is very similar to ForgetResults() except that no network IO is performed. Primarily useful in error handling cases. --- .../distributed/connection/remote_commands.c | 72 ++++++++++++++++++- src/include/distributed/remote_commands.h | 1 + 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 186ca8f50..1f7f833d9 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -48,8 +48,8 @@ IsResponseOK(PGresult *result) /* * ForgetResults clears a connection from pending activity. * - * XXX: In the future it might be a good idea to use use PQcancel() if results - * would require network IO. + * Note that this might require network IO. If that's not acceptable, use + * NonblockingForgetResults(). */ void ForgetResults(MultiConnection *connection) @@ -75,6 +75,74 @@ ForgetResults(MultiConnection *connection) } +/* + * NonblockingForgetResults clears a connection from pending activity if doing + * so does not require network IO. Returns true if successful, false + * otherwise. + */ +bool +NonblockingForgetResults(MultiConnection *connection) +{ + PGconn *pgConn = connection->pgConn; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + Assert(PQisnonblocking(pgConn)); + + while (true) + { + PGresult *result = NULL; + + /* just in case there's a lot of results */ + CHECK_FOR_INTERRUPTS(); + + /* + * If busy, there might still be results already received and buffered + * by the OS. As connection is in non-blocking mode, we can check for + * that without blocking. + */ + if (PQisBusy(pgConn)) + { + if (PQflush(pgConn) == -1) + { + /* write failed */ + return false; + } + if (PQconsumeInput(pgConn) == 0) + { + /* some low-level failure */ + return false; + } + } + + /* clearing would require blocking IO, return */ + if (PQisBusy(pgConn)) + { + return false; + } + + result = PQgetResult(pgConn); + if (PQresultStatus(result) == PGRES_COPY_IN) + { + /* in copy, can't reliably recover without blocking */ + return false; + } + + if (result == NULL) + { + return true; + } + + PQclear(result); + } + + pg_unreachable(); +} + + /* * SqlStateMatchesCategory returns true if the given sql state (which may be * NULL if unknown) is in the given error category. Note that we use diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 6d3f2abb3..6cdc28802 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -26,6 +26,7 @@ extern bool LogRemoteCommands; /* simple helpers */ extern bool IsResponseOK(struct pg_result *result); extern void ForgetResults(MultiConnection *connection); +extern bool NonblockingForgetResults(MultiConnection *connection); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); /* report errors & warnings */ From 0d791f674097bf01d870547e918ceaae50446f68 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 12/14] Cancel statements when closing connection at transaction end. That's important because the currently running statement on a worker might continue to hold locks and consume resources, even after the connection is closed. Unfortunately postgres will only notice closed connections when reading from / writing to the network. That might only happen much later. --- src/backend/distributed/connection/connection_management.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index a056b852e..11a0f1640 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -718,8 +718,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) PQstatus(connection->pgConn) != CONNECTION_OK || PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE) { - PQfinish(connection->pgConn); - connection->pgConn = NULL; + ShutdownConnection(connection); /* unlink from list */ dlist_delete(iter.cur); From 3461244539153c41c6854c23930161ce72ba6d86 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 13/14] Don't wait for statement completion when aborting coordinated transaction. Previously we used ForgetResults() in StartRemoteTransactionAbort() - that's problematic because there might still be an ongoing statement, and this causes us to wait for its completion. That e.g. happens when a statement running on the coordinator is cancelled. --- .../transaction/remote_transaction.c | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 17a70ea83..72c064099 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -276,16 +276,21 @@ StartRemoteTransactionAbort(MultiConnection *connection) Assert(transaction->transactionState != REMOTE_TRANS_INVALID); /* - * Clear previous results, so we have a better chance to send - * ROLLBACK [PREPARED]; + * Clear previous results, so we have a better chance to send ROLLBACK + * [PREPARED]. If we've previously sent a PREPARE TRANSACTION, we always + * want to wait for that result, as that shouldn't take long and will + * reserve resources. But if there's another query running, we don't want + * to wait, because a longrunning statement may be running, force it to be + * killed in that case. */ - ForgetResults(connection); - if (transaction->transactionState == REMOTE_TRANS_PREPARING || transaction->transactionState == REMOTE_TRANS_PREPARED) { StringInfoData command; + /* await PREPARE TRANSACTION results, closing the connection would leave it dangling */ + ForgetResults(connection); + initStringInfo(&command); appendStringInfo(&command, "ROLLBACK PREPARED '%s'", transaction->preparedName); @@ -304,6 +309,14 @@ StartRemoteTransactionAbort(MultiConnection *connection) } else { + if (!NonblockingForgetResults(connection)) + { + ShutdownConnection(connection); + + /* FinishRemoteTransactionAbort will emit warning */ + return; + } + if (!SendRemoteCommand(connection, "ROLLBACK")) { /* no point in reporting a likely redundant message */ @@ -336,17 +349,17 @@ FinishRemoteTransactionAbort(MultiConnection *connection) ReportResultError(connection, result, WARNING); MarkRemoteTransactionFailed(connection, dontRaiseErrors); - if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING) + if (transaction->transactionState == REMOTE_TRANS_2PC_ABORTING) + { + WarnAboutLeakedPreparedTransaction(connection, isNotCommit); + } + else { ereport(WARNING, (errmsg("failed to abort 1PC transaction \"%s\" on %s:%d", transaction->preparedName, connection->hostname, connection->port))); } - else - { - WarnAboutLeakedPreparedTransaction(connection, isNotCommit); - } } PQclear(result); From d76b093185e911391469831a984f18ba1589f787 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH 14/14] Add tests for statement cancellation. --- .../expected/isolation_cancellation.out | 127 ++++++++++++++++++ src/test/regress/isolation_schedule | 2 +- .../regress/specs/isolation_cancellation.spec | 80 +++++++++++ 3 files changed, 208 insertions(+), 1 deletion(-) create mode 100644 src/test/regress/expected/isolation_cancellation.out create mode 100644 src/test/regress/specs/isolation_cancellation.spec diff --git a/src/test/regress/expected/isolation_cancellation.out b/src/test/regress/expected/isolation_cancellation.out new file mode 100644 index 000000000..fc929c568 --- /dev/null +++ b/src/test/regress/expected/isolation_cancellation.out @@ -0,0 +1,127 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-timeout s1-sleep10000 s1-reset s1-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-reset: + RESET ALL; + +step s1-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-sleep10000 s1-reset s2-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-reset: + RESET ALL; + +step s2-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-begin s1-sleep10000 s1-rollback s1-reset s1-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-begin: + BEGIN; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-rollback: + ROLLBACK; + +step s1-reset: + RESET ALL; + +step s1-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-begin s1-sleep10000 s1-rollback s1-reset s2-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-begin: + BEGIN; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-rollback: + ROLLBACK; + +step s1-reset: + RESET ALL; + +step s2-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-begin s1-update1 s1-sleep10000 s1-rollback s1-reset s1-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-begin: + BEGIN; + +step s1-update1: + UPDATE cancel_table SET data = '' WHERE test_id = 1; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-rollback: + ROLLBACK; + +step s1-reset: + RESET ALL; + +step s1-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-begin s1-update1 s1-sleep10000 s1-rollback s1-reset s2-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-begin: + BEGIN; + +step s1-update1: + UPDATE cancel_table SET data = '' WHERE test_id = 1; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-rollback: + ROLLBACK; + +step s1-reset: + RESET ALL; + +step s2-drop: + + DROP TABLE cancel_table; + diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 3594eecc2..d4077e44e 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -5,6 +5,6 @@ test: isolation_add_node_vs_reference_table_operations # that come later can be parallelized test: isolation_cluster_management -test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement +test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation test: isolation_concurrent_dml isolation_data_migration test: isolation_drop_shards isolation_copy_placement_vs_modification diff --git a/src/test/regress/specs/isolation_cancellation.spec b/src/test/regress/specs/isolation_cancellation.spec new file mode 100644 index 000000000..872cb5070 --- /dev/null +++ b/src/test/regress/specs/isolation_cancellation.spec @@ -0,0 +1,80 @@ +# Tests around cancelling statements. As we can't trigger cancel +# interrupts directly, we use statement_timeout instead, which largely +# behaves the same as proper cancellation. + +setup +{ + CREATE TABLE cancel_table (test_id integer NOT NULL, data text); + SELECT create_distributed_table('cancel_table', 'test_id', 'hash'); + INSERT INTO cancel_table VALUES(1); +} + +teardown +{ + DROP TABLE IF EXISTS cancel_table; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-commit" +{ + COMMIT; +} + +step "s1-rollback" +{ + ROLLBACK; +} + +step "s1-sleep10000" +{ + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; +} + +step "s1-timeout" +{ + SET statement_timeout = '100ms'; +} + +step "s1-update1" +{ + UPDATE cancel_table SET data = '' WHERE test_id = 1; +} + +step "s1-reset" +{ + RESET ALL; +} + +step "s1-drop" +{ + + DROP TABLE cancel_table; +} + +session "s2" + +step "s2-drop" +{ + + DROP TABLE cancel_table; +} + +# check that statement cancel works for plain selects, drop table +# afterwards to make sure sleep on workers is cancelled (thereby not +# preventing drop via locks) +permutation "s1-timeout" "s1-sleep10000" "s1-reset" "s1-drop" +permutation "s1-timeout" "s1-sleep10000" "s1-reset" "s2-drop" + +# check that statement cancel works for selects in transaction +permutation "s1-timeout" "s1-begin" "s1-sleep10000" "s1-rollback" "s1-reset" "s1-drop" +permutation "s1-timeout" "s1-begin" "s1-sleep10000" "s1-rollback" "s1-reset" "s2-drop" + +# check that statement cancel works for selects in transaction, that previously wrote +permutation "s1-timeout" "s1-begin" "s1-update1" "s1-sleep10000" "s1-rollback" "s1-reset" "s1-drop" +permutation "s1-timeout" "s1-begin" "s1-update1" "s1-sleep10000" "s1-rollback" "s1-reset" "s2-drop"