diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index e2b2703e2..9f186146a 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -658,6 +658,7 @@ MasterPartitionMethod(RangeVar *relation) { char partitionMethod = '\0'; PGresult *queryResult = NULL; + bool raiseInterrupts = true; char *relationName = relation->relname; char *schemaName = relation->schemaname; @@ -666,7 +667,11 @@ MasterPartitionMethod(RangeVar *relation) StringInfo partitionMethodCommand = makeStringInfo(); appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName); - queryResult = PQexec(masterConnection->pgConn, partitionMethodCommand->data); + if (!SendRemoteCommand(masterConnection, partitionMethodCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0); @@ -687,6 +692,9 @@ MasterPartitionMethod(RangeVar *relation) PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); + return partitionMethod; } @@ -735,6 +743,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, ListCell *placementCell = NULL; List *connectionList = NULL; int64 shardId = shardConnections->shardId; + bool raiseInterrupts = true; MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "OpenCopyConnections", @@ -784,15 +793,19 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, MarkRemoteTransactionCritical(connection); ClaimConnectionExclusively(connection); RemoteTransactionBeginIfNecessary(connection); + copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, useBinaryCopyFormat); - result = PQexec(connection->pgConn, copyCommand->data); + if (!SendRemoteCommand(connection, copyCommand->data)) + { + ReportConnectionError(connection, ERROR); + } + result = GetRemoteCommandResult(connection, raiseInterrupts); if (PQresultStatus(result) != PGRES_COPY_IN) { ReportResultError(connection, result, ERROR); } - PQclear(result); connectionList = lappend(connectionList, connection); } @@ -926,11 +939,16 @@ RemoteFinalizedShardPlacementList(uint64 shardId) { List *finalizedPlacementList = NIL; PGresult *queryResult = NULL; + bool raiseInterrupts = true; StringInfo shardPlacementsCommand = makeStringInfo(); appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId); - queryResult = PQexec(masterConnection->pgConn, shardPlacementsCommand->data); + if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { int rowCount = PQntuples(queryResult); @@ -939,8 +957,8 @@ RemoteFinalizedShardPlacementList(uint64 shardId) for (rowIndex = 0; rowIndex < rowCount; rowIndex++) { char *placementIdString = PQgetvalue(queryResult, rowIndex, 0); - char *nodeName = PQgetvalue(queryResult, rowIndex, 1); - char *nodePortString = PQgetvalue(queryResult, rowIndex, 2); + char *nodeName = pstrdup(PQgetvalue(queryResult, rowIndex, 1)); + char *nodePortString = pstrdup(PQgetvalue(queryResult, rowIndex, 2)); uint32 nodePort = atoi(nodePortString); uint64 placementId = atoll(placementIdString); @@ -959,6 +977,10 @@ RemoteFinalizedShardPlacementList(uint64 shardId) ereport(ERROR, (errmsg("could not get shard placements from the master node"))); } + PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); + return finalizedPlacementList; } @@ -1064,8 +1086,7 @@ SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList) static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection) { - int copyResult = PQputCopyData(connection->pgConn, dataBuffer->data, dataBuffer->len); - if (copyResult != 1) + if (!PutRemoteCopyData(connection, dataBuffer->data, dataBuffer->len)) { ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("failed to COPY to shard %ld on %s:%d", @@ -1089,13 +1110,11 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) foreach(connectionCell, connectionList) { MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); - int copyEndResult = 0; PGresult *result = NULL; + bool raiseInterrupts = true; /* end the COPY input */ - copyEndResult = PQputCopyEnd(connection->pgConn, NULL); - - if (copyEndResult != 1) + if (!PutRemoteCopyEnd(connection, NULL)) { if (stopOnFailure) { @@ -1108,7 +1127,7 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) } /* check whether there were any COPY errors */ - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) { ReportCopyError(connection, result); @@ -1427,11 +1446,16 @@ RemoteCreateEmptyShard(char *relationName) { int64 shardId = 0; PGresult *queryResult = NULL; + bool raiseInterrupts = true; StringInfo createEmptyShardCommand = makeStringInfo(); appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName); - queryResult = PQexec(masterConnection->pgConn, createEmptyShardCommand->data); + if (!SendRemoteCommand(masterConnection, createEmptyShardCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) { char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0); @@ -1445,6 +1469,8 @@ RemoteCreateEmptyShard(char *relationName) } PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); return shardId; } @@ -1475,18 +1501,25 @@ static void RemoteUpdateShardStatistics(uint64 shardId) { PGresult *queryResult = NULL; + bool raiseInterrupts = true; StringInfo updateShardStatisticsCommand = makeStringInfo(); appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY, shardId); - queryResult = PQexec(masterConnection->pgConn, updateShardStatisticsCommand->data); + if (!SendRemoteCommand(masterConnection, updateShardStatisticsCommand->data)) + { + ReportConnectionError(masterConnection, ERROR); + } + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); if (PQresultStatus(queryResult) != PGRES_TUPLES_OK) { ereport(ERROR, (errmsg("could not update shard statistics"))); } PQclear(queryResult); + queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); + Assert(!queryResult); } diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index aff9c2f58..11a0f1640 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -401,6 +401,40 @@ CloseConnectionByPGconn(PGconn *pqConn) } +/* + * ShutdownConnection, if necessary cancels the currently running statement, + * and then closes the underlying libpq connection. The MultiConnection + * itself is left intact. + * + * NB: Cancelling a statement requires network IO, and currently is not + * interruptible. Unfortunately libpq does not provide a non-blocking + * implementation of PQcancel(), so we don't have much choice for now. + */ +void +ShutdownConnection(MultiConnection *connection) +{ + /* + * Only cancel statement if there's currently one running, and the + * connection is in an OK state. + */ + if (PQstatus(connection->pgConn) == CONNECTION_OK && + PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE) + { + char errorMessage[256] = { 0 }; + PGcancel *cancel = PQgetCancel(connection->pgConn); + + if (!PQcancel(cancel, errorMessage, sizeof(errorMessage))) + { + ereport(WARNING, (errmsg("could not cancel connection: %s", + errorMessage))); + } + PQfreeCancel(cancel); + } + PQfinish(connection->pgConn); + connection->pgConn = NULL; +} + + /* * FinishConnectionListEstablishment is a wrapper around FinishConnectionEstablishment. * The function iterates over the multiConnectionList and finishes the connection @@ -640,6 +674,13 @@ StartConnectionEstablishment(ConnectionHashKey *key) connection->pgConn = PQconnectStartParams(keywords, values, false); connection->connectionStart = GetCurrentTimestamp(); + /* + * To avoid issues with interrupts not getting caught all our connections + * are managed in a non-blocking manner. remote_commands.c provides + * wrappers emulating blocking behaviour. + */ + PQsetnonblocking(connection->pgConn, true); + return connection; } @@ -677,8 +718,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) PQstatus(connection->pgConn) != CONNECTION_OK || PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE) { - PQfinish(connection->pgConn); - connection->pgConn = NULL; + ShutdownConnection(connection); /* unlink from list */ dlist_delete(iter.cur); diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index ecfc54517..1f7f833d9 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -23,6 +23,7 @@ /* GUC, determining whether statements sent to remote nodes are logged */ bool LogRemoteCommands = false; +static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts); /* simple helpers */ @@ -47,8 +48,8 @@ IsResponseOK(PGresult *result) /* * ForgetResults clears a connection from pending activity. * - * XXX: In the future it might be a good idea to use use PQcancel() if results - * would require network IO. + * Note that this might require network IO. If that's not acceptable, use + * NonblockingForgetResults(). */ void ForgetResults(MultiConnection *connection) @@ -74,6 +75,74 @@ ForgetResults(MultiConnection *connection) } +/* + * NonblockingForgetResults clears a connection from pending activity if doing + * so does not require network IO. Returns true if successful, false + * otherwise. + */ +bool +NonblockingForgetResults(MultiConnection *connection) +{ + PGconn *pgConn = connection->pgConn; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + Assert(PQisnonblocking(pgConn)); + + while (true) + { + PGresult *result = NULL; + + /* just in case there's a lot of results */ + CHECK_FOR_INTERRUPTS(); + + /* + * If busy, there might still be results already received and buffered + * by the OS. As connection is in non-blocking mode, we can check for + * that without blocking. + */ + if (PQisBusy(pgConn)) + { + if (PQflush(pgConn) == -1) + { + /* write failed */ + return false; + } + if (PQconsumeInput(pgConn) == 0) + { + /* some low-level failure */ + return false; + } + } + + /* clearing would require blocking IO, return */ + if (PQisBusy(pgConn)) + { + return false; + } + + result = PQgetResult(pgConn); + if (PQresultStatus(result) == PGRES_COPY_IN) + { + /* in copy, can't reliably recover without blocking */ + return false; + } + + if (result == NULL) + { + return true; + } + + PQclear(result); + } + + pg_unreachable(); +} + + /* * SqlStateMatchesCategory returns true if the given sql state (which may be * NULL if unknown) is in the given error category. Note that we use @@ -279,7 +348,6 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, const char *const *parameterValues) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int rc = 0; LogRemoteCommand(connection, command); @@ -293,23 +361,11 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, return 0; } - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, true); - } + Assert(PQisnonblocking(pgConn)); rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes, parameterValues, NULL, NULL, 0); - /* reset nonblocking connection to its original state */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, false); - } - return rc; } @@ -367,7 +423,7 @@ ReadFirstColumnAsText(PGresult *queryResult) * an error. * * If raiseInterrupts is false and an interrupt arrives that'd otherwise raise - * an error, GetRemotecommandResult returns NULL, and the transaction is + * an error, GetRemoteCommandResult returns NULL, and the transaction is * marked as having failed. While that's not a perfect way to signal failure, * callers will usually treat that as an error, and it's easy to use. * @@ -379,11 +435,7 @@ PGresult * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) { PGconn *pgConn = connection->pgConn; - int socket = 0; - int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET; - bool wasNonblocking = false; PGresult *result = NULL; - bool failed = false; /* * Short circuit tests around the more expensive parts of this @@ -395,106 +447,159 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) return PQgetResult(connection->pgConn); } - socket = PQsocket(pgConn); - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - if (!wasNonblocking) + if (!FinishConnectionIO(connection, raiseInterrupts)) { - PQsetnonblocking(pgConn, true); + return NULL; } + /* no IO should be necessary to get result */ + Assert(!PQisBusy(pgConn)); + + result = PQgetResult(connection->pgConn); + + return result; +} + + +/* + * PutRemoteCopyData is a wrapper around PQputCopyData() that handles + * interrupts. + * + * Returns false if PQputCopyData() failed, true otherwise. + */ +bool +PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) +{ + PGconn *pgConn = connection->pgConn; + int copyState = 0; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + Assert(PQisnonblocking(pgConn)); + + copyState = PQputCopyData(pgConn, buffer, nbytes); + + if (copyState == 1) + { + /* successful */ + return true; + } + else if (copyState == -1) + { + return false; + } + else + { + bool allowInterrupts = true; + return FinishConnectionIO(connection, allowInterrupts); + } +} + + +/* + * PutRemoteCopyEnd is a wrapper around PQputCopyEnd() that handles + * interrupts. + * + * Returns false if PQputCopyEnd() failed, true otherwise. + */ +bool +PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) +{ + PGconn *pgConn = connection->pgConn; + int copyState = 0; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + Assert(PQisnonblocking(pgConn)); + + copyState = PQputCopyEnd(pgConn, errormsg); + + if (copyState == 1) + { + /* successful */ + return true; + } + else if (copyState == -1) + { + return false; + } + else + { + bool allowInterrupts = true; + return FinishConnectionIO(connection, allowInterrupts); + } +} + + +/* + * FinishConnectionIO performs pending IO for the connection, while accepting + * interrupts. + * + * See GetRemoteCommandResult() for documentation of interrupt handling + * behaviour. + * + * Returns true if IO was successfully completed, false otherwise. + */ +static bool +FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts) +{ + PGconn *pgConn = connection->pgConn; + int socket = PQsocket(pgConn); + + Assert(pgConn); + Assert(PQisnonblocking(pgConn)); + if (raiseInterrupts) { CHECK_FOR_INTERRUPTS(); } - /* make sure command has been sent out */ - while (!failed) + /* perform the necessary IO */ + while (true) { + int sendStatus = 0; int rc = 0; + int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET; - ResetLatch(MyLatch); - - /* try to send all the data */ - rc = PQflush(pgConn); - - /* stop writing if all data has been sent, or there was none to send */ - if (rc == 0) - { - break; - } + /* try to send all pending data */ + sendStatus = PQflush(pgConn); /* if sending failed, there's nothing more we can do */ - if (rc == -1) + if (sendStatus == -1) { - failed = true; - break; + return false; } - - /* this means we have to wait for data to go out */ - Assert(rc == 1); - -#if (PG_VERSION_NUM >= 100000) - rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0, - PG_WAIT_EXTENSION); -#else - rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0); -#endif - - if (rc & WL_POSTMASTER_DEATH) + else if (sendStatus == 1) { - ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + waitFlags |= WL_SOCKET_WRITEABLE; } - if (rc & WL_LATCH_SET) - { - /* if allowed raise errors */ - if (raiseInterrupts) - { - CHECK_FOR_INTERRUPTS(); - } - - /* - * If raising errors allowed, or called within in a section with - * interrupts held, return NULL instead, and mark the transaction - * as failed. - */ - if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) - { - connection->remoteTransaction.transactionFailed = true; - failed = true; - break; - } - } - } - - /* wait for the result of the command to come in */ - while (!failed) - { - int rc = 0; - - ResetLatch(MyLatch); - /* if reading fails, there's not much we can do */ if (PQconsumeInput(pgConn) == 0) { - failed = true; - break; + return false; + } + if (PQisBusy(pgConn)) + { + waitFlags |= WL_SOCKET_READABLE; } - /* check if all the necessary data is now available */ - if (!PQisBusy(pgConn)) + if ((waitFlags & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0) { - result = PQgetResult(connection->pgConn); - break; + /* no IO necessary anymore, we're done */ + return true; } #if (PG_VERSION_NUM >= 100000) - rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0, - PG_WAIT_EXTENSION); + rc = WaitLatchOrSocket(MyLatch, waitFlags, socket, 0, PG_WAIT_EXTENSION); #else - rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0); + rc = WaitLatchOrSocket(MyLatch, waitFlags, socket, 0); #endif if (rc & WL_POSTMASTER_DEATH) @@ -504,6 +609,8 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) if (rc & WL_LATCH_SET) { + ResetLatch(MyLatch); + /* if allowed raise errors */ if (raiseInterrupts) { @@ -512,22 +619,16 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) /* * If raising errors allowed, or called within in a section with - * interrupts held, return NULL instead, and mark the transaction - * as failed. + * interrupts held, return instead, and mark the transaction as + * failed. */ if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) { connection->remoteTransaction.transactionFailed = true; - failed = true; break; } } } - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, false); - } - - return result; + return false; } diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index e7252ef62..827387489 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -44,7 +44,6 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT]; /* Local functions forward declarations */ -static void ClearRemainingResults(MultiConnection *connection); static bool ClientConnectionReady(MultiConnection *connection, PostgresPollingStatusType pollingStatus); @@ -401,6 +400,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, PGresult *result = NULL; ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -413,7 +413,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, return false; } - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_TUPLES_OK) { @@ -430,7 +430,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount, } /* clear extra result objects */ - ClearRemainingResults(connection); + ForgetResults(connection); return true; } @@ -454,6 +454,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -471,7 +472,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, return CLIENT_BATCH_QUERY_FAILED; } - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); if (result == NULL) { return CLIENT_BATCH_QUERY_DONE; @@ -538,6 +539,7 @@ MultiClientQueryStatus(int32 connectionId) ConnStatusType connStatusType = CONNECTION_OK; ExecStatusType resultStatus = PGRES_COMMAND_OK; QueryStatus queryStatus = CLIENT_INVALID_QUERY; + bool raiseInterrupts = true; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; @@ -555,7 +557,7 @@ MultiClientQueryStatus(int32 connectionId) * isn't ready yet (the caller didn't wait for the connection to be ready), * we will block on this call. */ - result = PQgetResult(connection->pgConn); + result = GetRemoteCommandResult(connection, raiseInterrupts); resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -598,7 +600,7 @@ MultiClientQueryStatus(int32 connectionId) */ if (!copyResults) { - ClearRemainingResults(connection); + ForgetResults(connection); } return queryStatus; @@ -665,7 +667,8 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) else if (receiveLength == -1) { /* received copy done message */ - PGresult *result = PQgetResult(connection->pgConn); + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); ExecStatusType resultStatus = PQresultStatus(result); if (resultStatus == PGRES_COMMAND_OK) @@ -692,7 +695,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor) /* if copy out completed, make sure we drain all results from libpq */ if (receiveLength < 0) { - ClearRemainingResults(connection); + ForgetResults(connection); } return copyStatus; @@ -853,23 +856,6 @@ MultiClientWait(WaitInfo *waitInfo) } -/* - * ClearRemainingResults reads result objects from the connection until we get - * null, and clears these results. This is the last step in completing an async - * query. - */ -static void -ClearRemainingResults(MultiConnection *connection) -{ - PGresult *result = PQgetResult(connection->pgConn); - while (result != NULL) - { - PQclear(result); - result = PQgetResult(connection->pgConn); - } -} - - /* * ClientConnectionReady checks if the given connection is ready for non-blocking * reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index 946ec16cf..62679a83f 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -21,6 +21,7 @@ #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_server_executor.h" +#include "distributed/remote_commands.h" #include "distributed/worker_protocol.h" #include "lib/stringinfo.h" #include "utils/builtins.h" @@ -39,11 +40,11 @@ static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, bool *statusArray, StringInfo *resultStringArray, int commmandCount); -static bool GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, +static bool GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, StringInfo queryResultString); -static bool EvaluateQueryResult(PGconn *connection, PGresult *queryResult, StringInfo - queryResultString); -static void StoreErrorMessage(PGconn *connection, StringInfo queryResultString); +static bool EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, + StringInfo queryResultString); +static void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString); static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, StringInfo *commandStringArray, @@ -223,7 +224,8 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor int commmandCount) { int commandIndex = 0; - PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *)); + MultiConnection **connectionArray = + palloc0(commmandCount * sizeof(MultiConnection *)); int finishedCount = 0; /* establish connections */ @@ -232,14 +234,13 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor char *nodeName = nodeNameArray[commandIndex]->data; int nodePort = nodePortArray[commandIndex]; int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *multiConnection = + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - PGconn *connection = multiConnection->pgConn; StringInfo queryResultString = resultStringArray[commandIndex]; statusArray[commandIndex] = true; - if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) + if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, (int) nodePort); @@ -256,7 +257,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) { int querySent = 0; - PGconn *connection = connectionArray[commandIndex]; + MultiConnection *connection = connectionArray[commandIndex]; char *queryString = commandStringArray[commandIndex]->data; StringInfo queryResultString = resultStringArray[commandIndex]; @@ -269,13 +270,17 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor continue; } - querySent = PQsendQuery(connection, queryString); + /* + * NB: this intentionally uses PQsendQuery rather than + * SendRemoteCommand as multiple commands are allowed. + */ + querySent = PQsendQuery(connection->pgConn, queryString); if (querySent == 0) { StoreErrorMessage(connection, queryResultString); statusArray[commandIndex] = false; - CloseConnectionByPGconn(connection); + CloseConnection(connection); connectionArray[commandIndex] = NULL; finishedCount++; } @@ -286,7 +291,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor { for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) { - PGconn *connection = connectionArray[commandIndex]; + MultiConnection *connection = connectionArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex]; bool success = false; bool queryFinished = false; @@ -304,7 +309,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor finishedCount++; statusArray[commandIndex] = success; connectionArray[commandIndex] = NULL; - CloseConnectionByPGconn(connection); + CloseConnection(connection); } } @@ -328,11 +333,11 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor * reported upon completion of the query. */ static bool -GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, +GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, StringInfo queryResultString) { bool finished = true; - ConnStatusType connectionStatus = PQstatus(connection); + ConnStatusType connectionStatus = PQstatus(connection->pgConn); int consumeInput = 0; PGresult *queryResult = NULL; bool success = false; @@ -346,7 +351,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, return finished; } - consumeInput = PQconsumeInput(connection); + consumeInput = PQconsumeInput(connection->pgConn); if (consumeInput == 0) { appendStringInfo(queryResultString, "query result unavailable"); @@ -354,14 +359,14 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, } /* check later if busy */ - if (PQisBusy(connection) != 0) + if (PQisBusy(connection->pgConn) != 0) { finished = false; return finished; } /* query result is available at this point */ - queryResult = PQgetResult(connection); + queryResult = PQgetResult(connection->pgConn); success = EvaluateQueryResult(connection, queryResult, queryResultString); PQclear(queryResult); @@ -379,7 +384,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, * error otherwise. */ static bool -EvaluateQueryResult(PGconn *connection, PGresult *queryResult, +EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, StringInfo queryResultString) { bool success = false; @@ -434,9 +439,9 @@ EvaluateQueryResult(PGconn *connection, PGresult *queryResult, * otherwise it would return a default error message. */ static void -StoreErrorMessage(PGconn *connection, StringInfo queryResultString) +StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString) { - char *errorMessage = PQerrorMessage(connection); + char *errorMessage = PQerrorMessage(connection->pgConn); if (errorMessage != NULL) { char *firstNewlineIndex = strchr(errorMessage, '\n'); @@ -499,26 +504,27 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, StringInfo queryResultString) { int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *multiConnection = + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - PGconn *nodeConnection = multiConnection->pgConn; bool success = false; PGresult *queryResult = NULL; + bool raiseInterrupts = true; - if (PQstatus(multiConnection->pgConn) != CONNECTION_OK) + if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, (int) nodePort); return false; } - queryResult = PQexec(nodeConnection, queryString); - success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString); + SendRemoteCommand(connection, queryString); + queryResult = GetRemoteCommandResult(connection, raiseInterrupts); + success = EvaluateQueryResult(connection, queryResult, queryResultString); PQclear(queryResult); /* close the connection */ - CloseConnection(multiConnection); + CloseConnection(connection); return success; } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 01bfe0eea..72c064099 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -276,16 +276,21 @@ StartRemoteTransactionAbort(MultiConnection *connection) Assert(transaction->transactionState != REMOTE_TRANS_INVALID); /* - * Clear previous results, so we have a better chance to send - * ROLLBACK [PREPARED]; + * Clear previous results, so we have a better chance to send ROLLBACK + * [PREPARED]. If we've previously sent a PREPARE TRANSACTION, we always + * want to wait for that result, as that shouldn't take long and will + * reserve resources. But if there's another query running, we don't want + * to wait, because a longrunning statement may be running, force it to be + * killed in that case. */ - ForgetResults(connection); - if (transaction->transactionState == REMOTE_TRANS_PREPARING || transaction->transactionState == REMOTE_TRANS_PREPARED) { StringInfoData command; + /* await PREPARE TRANSACTION results, closing the connection would leave it dangling */ + ForgetResults(connection); + initStringInfo(&command); appendStringInfo(&command, "ROLLBACK PREPARED '%s'", transaction->preparedName); @@ -304,6 +309,14 @@ StartRemoteTransactionAbort(MultiConnection *connection) } else { + if (!NonblockingForgetResults(connection)) + { + ShutdownConnection(connection); + + /* FinishRemoteTransactionAbort will emit warning */ + return; + } + if (!SendRemoteCommand(connection, "ROLLBACK")) { /* no point in reporting a likely redundant message */ @@ -336,16 +349,16 @@ FinishRemoteTransactionAbort(MultiConnection *connection) ReportResultError(connection, result, WARNING); MarkRemoteTransactionFailed(connection, dontRaiseErrors); - if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING) + if (transaction->transactionState == REMOTE_TRANS_2PC_ABORTING) { - ereport(WARNING, - (errmsg("failed to abort 2PC transaction \"%s\" on %s:%d", - transaction->preparedName, connection->hostname, - connection->port))); + WarnAboutLeakedPreparedTransaction(connection, isNotCommit); } else { - WarnAboutLeakedPreparedTransaction(connection, isNotCommit); + ereport(WARNING, + (errmsg("failed to abort 1PC transaction \"%s\" on %s:%d", + transaction->preparedName, connection->hostname, + connection->port))); } } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 11b023630..21d8a7bbe 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -134,6 +134,7 @@ extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn); extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern void CloseConnection(MultiConnection *connection); extern void CloseConnectionByPGconn(struct pg_conn *pqConn); +extern void ShutdownConnection(MultiConnection *connection); /* dealing with a connection */ extern void FinishConnectionListEstablishment(List *multiConnectionList); diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 3799d9d97..6cdc28802 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -26,6 +26,7 @@ extern bool LogRemoteCommands; /* simple helpers */ extern bool IsResponseOK(struct pg_result *result); extern void ForgetResults(MultiConnection *connection); +extern bool NonblockingForgetResults(MultiConnection *connection); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); /* report errors & warnings */ @@ -47,6 +48,9 @@ extern int SendRemoteCommandParams(MultiConnection *connection, const char *comm extern List * ReadFirstColumnAsText(struct pg_result *queryResult); extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts); +extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, + int nbytes); +extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg); #endif /* REMOTE_COMMAND_H */ diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index 3dded6576..488043dd1 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -60,7 +60,7 @@ check-isolation: all tempinstall-main check-vanilla: all tempinstall-main $(pg_regress_multi_check) --load-extension=citus --vanillatest - + check-multi-mx: all tempinstall-main $(pg_regress_multi_check) --load-extension=citus \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_mx_schedule $(EXTRA_TESTS) diff --git a/src/test/regress/expected/isolation_cancellation.out b/src/test/regress/expected/isolation_cancellation.out new file mode 100644 index 000000000..fc929c568 --- /dev/null +++ b/src/test/regress/expected/isolation_cancellation.out @@ -0,0 +1,127 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-timeout s1-sleep10000 s1-reset s1-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-reset: + RESET ALL; + +step s1-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-sleep10000 s1-reset s2-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-reset: + RESET ALL; + +step s2-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-begin s1-sleep10000 s1-rollback s1-reset s1-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-begin: + BEGIN; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-rollback: + ROLLBACK; + +step s1-reset: + RESET ALL; + +step s1-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-begin s1-sleep10000 s1-rollback s1-reset s2-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-begin: + BEGIN; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-rollback: + ROLLBACK; + +step s1-reset: + RESET ALL; + +step s2-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-begin s1-update1 s1-sleep10000 s1-rollback s1-reset s1-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-begin: + BEGIN; + +step s1-update1: + UPDATE cancel_table SET data = '' WHERE test_id = 1; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-rollback: + ROLLBACK; + +step s1-reset: + RESET ALL; + +step s1-drop: + + DROP TABLE cancel_table; + + +starting permutation: s1-timeout s1-begin s1-update1 s1-sleep10000 s1-rollback s1-reset s2-drop +step s1-timeout: + SET statement_timeout = '100ms'; + +step s1-begin: + BEGIN; + +step s1-update1: + UPDATE cancel_table SET data = '' WHERE test_id = 1; + +step s1-sleep10000: + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; + +ERROR: canceling statement due to statement timeout +step s1-rollback: + ROLLBACK; + +step s1-reset: + RESET ALL; + +step s2-drop: + + DROP TABLE cancel_table; + diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 8f7dd91e5..d4077e44e 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -1,10 +1,10 @@ test: isolation_add_node_vs_reference_table_operations -# tests that change node metadata should precede -# isolation_cluster_management such that tests +# tests that change node metadata should precede +# isolation_cluster_management such that tests # that come later can be parallelized test: isolation_cluster_management -test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement +test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation test: isolation_concurrent_dml isolation_data_migration -test: isolation_drop_shards isolation_copy_placement_vs_modification +test: isolation_drop_shards isolation_copy_placement_vs_modification diff --git a/src/test/regress/specs/isolation_cancellation.spec b/src/test/regress/specs/isolation_cancellation.spec new file mode 100644 index 000000000..872cb5070 --- /dev/null +++ b/src/test/regress/specs/isolation_cancellation.spec @@ -0,0 +1,80 @@ +# Tests around cancelling statements. As we can't trigger cancel +# interrupts directly, we use statement_timeout instead, which largely +# behaves the same as proper cancellation. + +setup +{ + CREATE TABLE cancel_table (test_id integer NOT NULL, data text); + SELECT create_distributed_table('cancel_table', 'test_id', 'hash'); + INSERT INTO cancel_table VALUES(1); +} + +teardown +{ + DROP TABLE IF EXISTS cancel_table; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-commit" +{ + COMMIT; +} + +step "s1-rollback" +{ + ROLLBACK; +} + +step "s1-sleep10000" +{ + SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1; +} + +step "s1-timeout" +{ + SET statement_timeout = '100ms'; +} + +step "s1-update1" +{ + UPDATE cancel_table SET data = '' WHERE test_id = 1; +} + +step "s1-reset" +{ + RESET ALL; +} + +step "s1-drop" +{ + + DROP TABLE cancel_table; +} + +session "s2" + +step "s2-drop" +{ + + DROP TABLE cancel_table; +} + +# check that statement cancel works for plain selects, drop table +# afterwards to make sure sleep on workers is cancelled (thereby not +# preventing drop via locks) +permutation "s1-timeout" "s1-sleep10000" "s1-reset" "s1-drop" +permutation "s1-timeout" "s1-sleep10000" "s1-reset" "s2-drop" + +# check that statement cancel works for selects in transaction +permutation "s1-timeout" "s1-begin" "s1-sleep10000" "s1-rollback" "s1-reset" "s1-drop" +permutation "s1-timeout" "s1-begin" "s1-sleep10000" "s1-rollback" "s1-reset" "s2-drop" + +# check that statement cancel works for selects in transaction, that previously wrote +permutation "s1-timeout" "s1-begin" "s1-update1" "s1-sleep10000" "s1-rollback" "s1-reset" "s1-drop" +permutation "s1-timeout" "s1-begin" "s1-update1" "s1-sleep10000" "s1-rollback" "s1-reset" "s2-drop"