Merge pull request #1473 from citusdata/fix/cancellation

Fix issues around statement cancellation and connection closure.
pull/1487/head
Andres Freund 2017-07-04 14:58:23 -07:00 committed by GitHub
commit 74a6bac8cc
12 changed files with 577 additions and 186 deletions

View File

@ -658,6 +658,7 @@ MasterPartitionMethod(RangeVar *relation)
{
char partitionMethod = '\0';
PGresult *queryResult = NULL;
bool raiseInterrupts = true;
char *relationName = relation->relname;
char *schemaName = relation->schemaname;
@ -666,7 +667,11 @@ MasterPartitionMethod(RangeVar *relation)
StringInfo partitionMethodCommand = makeStringInfo();
appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName);
queryResult = PQexec(masterConnection->pgConn, partitionMethodCommand->data);
if (!SendRemoteCommand(masterConnection, partitionMethodCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0);
@ -687,6 +692,9 @@ MasterPartitionMethod(RangeVar *relation)
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
return partitionMethod;
}
@ -735,6 +743,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
ListCell *placementCell = NULL;
List *connectionList = NULL;
int64 shardId = shardConnections->shardId;
bool raiseInterrupts = true;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"OpenCopyConnections",
@ -784,15 +793,19 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
MarkRemoteTransactionCritical(connection);
ClaimConnectionExclusively(connection);
RemoteTransactionBeginIfNecessary(connection);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId,
useBinaryCopyFormat);
result = PQexec(connection->pgConn, copyCommand->data);
if (!SendRemoteCommand(connection, copyCommand->data))
{
ReportConnectionError(connection, ERROR);
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (PQresultStatus(result) != PGRES_COPY_IN)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
connectionList = lappend(connectionList, connection);
}
@ -926,11 +939,16 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
{
List *finalizedPlacementList = NIL;
PGresult *queryResult = NULL;
bool raiseInterrupts = true;
StringInfo shardPlacementsCommand = makeStringInfo();
appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId);
queryResult = PQexec(masterConnection->pgConn, shardPlacementsCommand->data);
if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
int rowCount = PQntuples(queryResult);
@ -939,8 +957,8 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
char *placementIdString = PQgetvalue(queryResult, rowIndex, 0);
char *nodeName = PQgetvalue(queryResult, rowIndex, 1);
char *nodePortString = PQgetvalue(queryResult, rowIndex, 2);
char *nodeName = pstrdup(PQgetvalue(queryResult, rowIndex, 1));
char *nodePortString = pstrdup(PQgetvalue(queryResult, rowIndex, 2));
uint32 nodePort = atoi(nodePortString);
uint64 placementId = atoll(placementIdString);
@ -959,6 +977,10 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
ereport(ERROR, (errmsg("could not get shard placements from the master node")));
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
return finalizedPlacementList;
}
@ -1064,8 +1086,7 @@ SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList)
static void
SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection)
{
int copyResult = PQputCopyData(connection->pgConn, dataBuffer->data, dataBuffer->len);
if (copyResult != 1)
if (!PutRemoteCopyData(connection, dataBuffer->data, dataBuffer->len))
{
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to COPY to shard %ld on %s:%d",
@ -1089,13 +1110,11 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
int copyEndResult = 0;
PGresult *result = NULL;
bool raiseInterrupts = true;
/* end the COPY input */
copyEndResult = PQputCopyEnd(connection->pgConn, NULL);
if (copyEndResult != 1)
if (!PutRemoteCopyEnd(connection, NULL))
{
if (stopOnFailure)
{
@ -1108,7 +1127,7 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
}
/* check whether there were any COPY errors */
result = PQgetResult(connection->pgConn);
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure)
{
ReportCopyError(connection, result);
@ -1427,11 +1446,16 @@ RemoteCreateEmptyShard(char *relationName)
{
int64 shardId = 0;
PGresult *queryResult = NULL;
bool raiseInterrupts = true;
StringInfo createEmptyShardCommand = makeStringInfo();
appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName);
queryResult = PQexec(masterConnection->pgConn, createEmptyShardCommand->data);
if (!SendRemoteCommand(masterConnection, createEmptyShardCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0);
@ -1445,6 +1469,8 @@ RemoteCreateEmptyShard(char *relationName)
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
return shardId;
}
@ -1475,18 +1501,25 @@ static void
RemoteUpdateShardStatistics(uint64 shardId)
{
PGresult *queryResult = NULL;
bool raiseInterrupts = true;
StringInfo updateShardStatisticsCommand = makeStringInfo();
appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY,
shardId);
queryResult = PQexec(masterConnection->pgConn, updateShardStatisticsCommand->data);
if (!SendRemoteCommand(masterConnection, updateShardStatisticsCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) != PGRES_TUPLES_OK)
{
ereport(ERROR, (errmsg("could not update shard statistics")));
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
}

View File

@ -401,6 +401,40 @@ CloseConnectionByPGconn(PGconn *pqConn)
}
/*
* ShutdownConnection, if necessary cancels the currently running statement,
* and then closes the underlying libpq connection. The MultiConnection
* itself is left intact.
*
* NB: Cancelling a statement requires network IO, and currently is not
* interruptible. Unfortunately libpq does not provide a non-blocking
* implementation of PQcancel(), so we don't have much choice for now.
*/
void
ShutdownConnection(MultiConnection *connection)
{
/*
* Only cancel statement if there's currently one running, and the
* connection is in an OK state.
*/
if (PQstatus(connection->pgConn) == CONNECTION_OK &&
PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE)
{
char errorMessage[256] = { 0 };
PGcancel *cancel = PQgetCancel(connection->pgConn);
if (!PQcancel(cancel, errorMessage, sizeof(errorMessage)))
{
ereport(WARNING, (errmsg("could not cancel connection: %s",
errorMessage)));
}
PQfreeCancel(cancel);
}
PQfinish(connection->pgConn);
connection->pgConn = NULL;
}
/*
* FinishConnectionListEstablishment is a wrapper around FinishConnectionEstablishment.
* The function iterates over the multiConnectionList and finishes the connection
@ -640,6 +674,13 @@ StartConnectionEstablishment(ConnectionHashKey *key)
connection->pgConn = PQconnectStartParams(keywords, values, false);
connection->connectionStart = GetCurrentTimestamp();
/*
* To avoid issues with interrupts not getting caught all our connections
* are managed in a non-blocking manner. remote_commands.c provides
* wrappers emulating blocking behaviour.
*/
PQsetnonblocking(connection->pgConn, true);
return connection;
}
@ -677,8 +718,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
PQstatus(connection->pgConn) != CONNECTION_OK ||
PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE)
{
PQfinish(connection->pgConn);
connection->pgConn = NULL;
ShutdownConnection(connection);
/* unlink from list */
dlist_delete(iter.cur);

View File

@ -23,6 +23,7 @@
/* GUC, determining whether statements sent to remote nodes are logged */
bool LogRemoteCommands = false;
static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts);
/* simple helpers */
@ -47,8 +48,8 @@ IsResponseOK(PGresult *result)
/*
* ForgetResults clears a connection from pending activity.
*
* XXX: In the future it might be a good idea to use use PQcancel() if results
* would require network IO.
* Note that this might require network IO. If that's not acceptable, use
* NonblockingForgetResults().
*/
void
ForgetResults(MultiConnection *connection)
@ -74,6 +75,74 @@ ForgetResults(MultiConnection *connection)
}
/*
* NonblockingForgetResults clears a connection from pending activity if doing
* so does not require network IO. Returns true if successful, false
* otherwise.
*/
bool
NonblockingForgetResults(MultiConnection *connection)
{
PGconn *pgConn = connection->pgConn;
if (PQstatus(pgConn) != CONNECTION_OK)
{
return false;
}
Assert(PQisnonblocking(pgConn));
while (true)
{
PGresult *result = NULL;
/* just in case there's a lot of results */
CHECK_FOR_INTERRUPTS();
/*
* If busy, there might still be results already received and buffered
* by the OS. As connection is in non-blocking mode, we can check for
* that without blocking.
*/
if (PQisBusy(pgConn))
{
if (PQflush(pgConn) == -1)
{
/* write failed */
return false;
}
if (PQconsumeInput(pgConn) == 0)
{
/* some low-level failure */
return false;
}
}
/* clearing would require blocking IO, return */
if (PQisBusy(pgConn))
{
return false;
}
result = PQgetResult(pgConn);
if (PQresultStatus(result) == PGRES_COPY_IN)
{
/* in copy, can't reliably recover without blocking */
return false;
}
if (result == NULL)
{
return true;
}
PQclear(result);
}
pg_unreachable();
}
/*
* SqlStateMatchesCategory returns true if the given sql state (which may be
* NULL if unknown) is in the given error category. Note that we use
@ -279,7 +348,6 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command,
const char *const *parameterValues)
{
PGconn *pgConn = connection->pgConn;
bool wasNonblocking = false;
int rc = 0;
LogRemoteCommand(connection, command);
@ -293,23 +361,11 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command,
return 0;
}
wasNonblocking = PQisnonblocking(pgConn);
/* make sure not to block anywhere */
if (!wasNonblocking)
{
PQsetnonblocking(pgConn, true);
}
Assert(PQisnonblocking(pgConn));
rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes,
parameterValues, NULL, NULL, 0);
/* reset nonblocking connection to its original state */
if (!wasNonblocking)
{
PQsetnonblocking(pgConn, false);
}
return rc;
}
@ -367,7 +423,7 @@ ReadFirstColumnAsText(PGresult *queryResult)
* an error.
*
* If raiseInterrupts is false and an interrupt arrives that'd otherwise raise
* an error, GetRemotecommandResult returns NULL, and the transaction is
* an error, GetRemoteCommandResult returns NULL, and the transaction is
* marked as having failed. While that's not a perfect way to signal failure,
* callers will usually treat that as an error, and it's easy to use.
*
@ -379,11 +435,7 @@ PGresult *
GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
{
PGconn *pgConn = connection->pgConn;
int socket = 0;
int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET;
bool wasNonblocking = false;
PGresult *result = NULL;
bool failed = false;
/*
* Short circuit tests around the more expensive parts of this
@ -395,106 +447,159 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
return PQgetResult(connection->pgConn);
}
socket = PQsocket(pgConn);
wasNonblocking = PQisnonblocking(pgConn);
/* make sure not to block anywhere */
if (!wasNonblocking)
if (!FinishConnectionIO(connection, raiseInterrupts))
{
PQsetnonblocking(pgConn, true);
return NULL;
}
/* no IO should be necessary to get result */
Assert(!PQisBusy(pgConn));
result = PQgetResult(connection->pgConn);
return result;
}
/*
* PutRemoteCopyData is a wrapper around PQputCopyData() that handles
* interrupts.
*
* Returns false if PQputCopyData() failed, true otherwise.
*/
bool
PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
{
PGconn *pgConn = connection->pgConn;
int copyState = 0;
if (PQstatus(pgConn) != CONNECTION_OK)
{
return false;
}
Assert(PQisnonblocking(pgConn));
copyState = PQputCopyData(pgConn, buffer, nbytes);
if (copyState == 1)
{
/* successful */
return true;
}
else if (copyState == -1)
{
return false;
}
else
{
bool allowInterrupts = true;
return FinishConnectionIO(connection, allowInterrupts);
}
}
/*
* PutRemoteCopyEnd is a wrapper around PQputCopyEnd() that handles
* interrupts.
*
* Returns false if PQputCopyEnd() failed, true otherwise.
*/
bool
PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
{
PGconn *pgConn = connection->pgConn;
int copyState = 0;
if (PQstatus(pgConn) != CONNECTION_OK)
{
return false;
}
Assert(PQisnonblocking(pgConn));
copyState = PQputCopyEnd(pgConn, errormsg);
if (copyState == 1)
{
/* successful */
return true;
}
else if (copyState == -1)
{
return false;
}
else
{
bool allowInterrupts = true;
return FinishConnectionIO(connection, allowInterrupts);
}
}
/*
* FinishConnectionIO performs pending IO for the connection, while accepting
* interrupts.
*
* See GetRemoteCommandResult() for documentation of interrupt handling
* behaviour.
*
* Returns true if IO was successfully completed, false otherwise.
*/
static bool
FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts)
{
PGconn *pgConn = connection->pgConn;
int socket = PQsocket(pgConn);
Assert(pgConn);
Assert(PQisnonblocking(pgConn));
if (raiseInterrupts)
{
CHECK_FOR_INTERRUPTS();
}
/* make sure command has been sent out */
while (!failed)
/* perform the necessary IO */
while (true)
{
int sendStatus = 0;
int rc = 0;
int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET;
ResetLatch(MyLatch);
/* try to send all the data */
rc = PQflush(pgConn);
/* stop writing if all data has been sent, or there was none to send */
if (rc == 0)
{
break;
}
/* try to send all pending data */
sendStatus = PQflush(pgConn);
/* if sending failed, there's nothing more we can do */
if (rc == -1)
if (sendStatus == -1)
{
failed = true;
break;
return false;
}
/* this means we have to wait for data to go out */
Assert(rc == 1);
#if (PG_VERSION_NUM >= 100000)
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0,
PG_WAIT_EXTENSION);
#else
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0);
#endif
if (rc & WL_POSTMASTER_DEATH)
else if (sendStatus == 1)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
waitFlags |= WL_SOCKET_WRITEABLE;
}
if (rc & WL_LATCH_SET)
{
/* if allowed raise errors */
if (raiseInterrupts)
{
CHECK_FOR_INTERRUPTS();
}
/*
* If raising errors allowed, or called within in a section with
* interrupts held, return NULL instead, and mark the transaction
* as failed.
*/
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
{
connection->remoteTransaction.transactionFailed = true;
failed = true;
break;
}
}
}
/* wait for the result of the command to come in */
while (!failed)
{
int rc = 0;
ResetLatch(MyLatch);
/* if reading fails, there's not much we can do */
if (PQconsumeInput(pgConn) == 0)
{
failed = true;
break;
return false;
}
if (PQisBusy(pgConn))
{
waitFlags |= WL_SOCKET_READABLE;
}
/* check if all the necessary data is now available */
if (!PQisBusy(pgConn))
if ((waitFlags & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0)
{
result = PQgetResult(connection->pgConn);
break;
/* no IO necessary anymore, we're done */
return true;
}
#if (PG_VERSION_NUM >= 100000)
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0,
PG_WAIT_EXTENSION);
rc = WaitLatchOrSocket(MyLatch, waitFlags, socket, 0, PG_WAIT_EXTENSION);
#else
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0);
rc = WaitLatchOrSocket(MyLatch, waitFlags, socket, 0);
#endif
if (rc & WL_POSTMASTER_DEATH)
@ -504,6 +609,8 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/* if allowed raise errors */
if (raiseInterrupts)
{
@ -512,22 +619,16 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
/*
* If raising errors allowed, or called within in a section with
* interrupts held, return NULL instead, and mark the transaction
* as failed.
* interrupts held, return instead, and mark the transaction as
* failed.
*/
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
{
connection->remoteTransaction.transactionFailed = true;
failed = true;
break;
}
}
}
if (!wasNonblocking)
{
PQsetnonblocking(pgConn, false);
}
return result;
return false;
}

View File

@ -44,7 +44,6 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT];
/* Local functions forward declarations */
static void ClearRemainingResults(MultiConnection *connection);
static bool ClientConnectionReady(MultiConnection *connection,
PostgresPollingStatusType pollingStatus);
@ -401,6 +400,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
PGresult *result = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
ExecStatusType resultStatus = PGRES_COMMAND_OK;
bool raiseInterrupts = true;
Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId];
@ -413,7 +413,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
return false;
}
result = PQgetResult(connection->pgConn);
result = GetRemoteCommandResult(connection, raiseInterrupts);
resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_TUPLES_OK)
{
@ -430,7 +430,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
}
/* clear extra result objects */
ClearRemainingResults(connection);
ForgetResults(connection);
return true;
}
@ -454,6 +454,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
ConnStatusType connStatusType = CONNECTION_OK;
ExecStatusType resultStatus = PGRES_COMMAND_OK;
BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY;
bool raiseInterrupts = true;
Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId];
@ -471,7 +472,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
return CLIENT_BATCH_QUERY_FAILED;
}
result = PQgetResult(connection->pgConn);
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (result == NULL)
{
return CLIENT_BATCH_QUERY_DONE;
@ -538,6 +539,7 @@ MultiClientQueryStatus(int32 connectionId)
ConnStatusType connStatusType = CONNECTION_OK;
ExecStatusType resultStatus = PGRES_COMMAND_OK;
QueryStatus queryStatus = CLIENT_INVALID_QUERY;
bool raiseInterrupts = true;
Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId];
@ -555,7 +557,7 @@ MultiClientQueryStatus(int32 connectionId)
* isn't ready yet (the caller didn't wait for the connection to be ready),
* we will block on this call.
*/
result = PQgetResult(connection->pgConn);
result = GetRemoteCommandResult(connection, raiseInterrupts);
resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_COMMAND_OK)
@ -598,7 +600,7 @@ MultiClientQueryStatus(int32 connectionId)
*/
if (!copyResults)
{
ClearRemainingResults(connection);
ForgetResults(connection);
}
return queryStatus;
@ -665,7 +667,8 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
else if (receiveLength == -1)
{
/* received copy done message */
PGresult *result = PQgetResult(connection->pgConn);
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
ExecStatusType resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_COMMAND_OK)
@ -692,7 +695,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
/* if copy out completed, make sure we drain all results from libpq */
if (receiveLength < 0)
{
ClearRemainingResults(connection);
ForgetResults(connection);
}
return copyStatus;
@ -853,23 +856,6 @@ MultiClientWait(WaitInfo *waitInfo)
}
/*
* ClearRemainingResults reads result objects from the connection until we get
* null, and clears these results. This is the last step in completing an async
* query.
*/
static void
ClearRemainingResults(MultiConnection *connection)
{
PGresult *result = PQgetResult(connection->pgConn);
while (result != NULL)
{
PQclear(result);
result = PQgetResult(connection->pgConn);
}
}
/*
* ClientConnectionReady checks if the given connection is ready for non-blocking
* reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c

View File

@ -21,6 +21,7 @@
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_server_executor.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_protocol.h"
#include "lib/stringinfo.h"
#include "utils/builtins.h"
@ -39,11 +40,11 @@ static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray,
bool *statusArray,
StringInfo *resultStringArray,
int commmandCount);
static bool GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
static bool GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus,
StringInfo queryResultString);
static bool EvaluateQueryResult(PGconn *connection, PGresult *queryResult, StringInfo
queryResultString);
static void StoreErrorMessage(PGconn *connection, StringInfo queryResultString);
static bool EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult,
StringInfo queryResultString);
static void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString);
static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray,
int *nodePortArray,
StringInfo *commandStringArray,
@ -223,7 +224,8 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
int commmandCount)
{
int commandIndex = 0;
PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *));
MultiConnection **connectionArray =
palloc0(commmandCount * sizeof(MultiConnection *));
int finishedCount = 0;
/* establish connections */
@ -232,14 +234,13 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
char *nodeName = nodeNameArray[commandIndex]->data;
int nodePort = nodePortArray[commandIndex];
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *multiConnection =
MultiConnection *connection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
PGconn *connection = multiConnection->pgConn;
StringInfo queryResultString = resultStringArray[commandIndex];
statusArray[commandIndex] = true;
if (PQstatus(multiConnection->pgConn) != CONNECTION_OK)
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort);
@ -256,7 +257,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{
int querySent = 0;
PGconn *connection = connectionArray[commandIndex];
MultiConnection *connection = connectionArray[commandIndex];
char *queryString = commandStringArray[commandIndex]->data;
StringInfo queryResultString = resultStringArray[commandIndex];
@ -269,13 +270,17 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
continue;
}
querySent = PQsendQuery(connection, queryString);
/*
* NB: this intentionally uses PQsendQuery rather than
* SendRemoteCommand as multiple commands are allowed.
*/
querySent = PQsendQuery(connection->pgConn, queryString);
if (querySent == 0)
{
StoreErrorMessage(connection, queryResultString);
statusArray[commandIndex] = false;
CloseConnectionByPGconn(connection);
CloseConnection(connection);
connectionArray[commandIndex] = NULL;
finishedCount++;
}
@ -286,7 +291,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
{
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{
PGconn *connection = connectionArray[commandIndex];
MultiConnection *connection = connectionArray[commandIndex];
StringInfo queryResultString = resultStringArray[commandIndex];
bool success = false;
bool queryFinished = false;
@ -304,7 +309,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
finishedCount++;
statusArray[commandIndex] = success;
connectionArray[commandIndex] = NULL;
CloseConnectionByPGconn(connection);
CloseConnection(connection);
}
}
@ -328,11 +333,11 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
* reported upon completion of the query.
*/
static bool
GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus,
StringInfo queryResultString)
{
bool finished = true;
ConnStatusType connectionStatus = PQstatus(connection);
ConnStatusType connectionStatus = PQstatus(connection->pgConn);
int consumeInput = 0;
PGresult *queryResult = NULL;
bool success = false;
@ -346,7 +351,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
return finished;
}
consumeInput = PQconsumeInput(connection);
consumeInput = PQconsumeInput(connection->pgConn);
if (consumeInput == 0)
{
appendStringInfo(queryResultString, "query result unavailable");
@ -354,14 +359,14 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
}
/* check later if busy */
if (PQisBusy(connection) != 0)
if (PQisBusy(connection->pgConn) != 0)
{
finished = false;
return finished;
}
/* query result is available at this point */
queryResult = PQgetResult(connection);
queryResult = PQgetResult(connection->pgConn);
success = EvaluateQueryResult(connection, queryResult, queryResultString);
PQclear(queryResult);
@ -379,7 +384,7 @@ GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
* error otherwise.
*/
static bool
EvaluateQueryResult(PGconn *connection, PGresult *queryResult,
EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult,
StringInfo queryResultString)
{
bool success = false;
@ -434,9 +439,9 @@ EvaluateQueryResult(PGconn *connection, PGresult *queryResult,
* otherwise it would return a default error message.
*/
static void
StoreErrorMessage(PGconn *connection, StringInfo queryResultString)
StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString)
{
char *errorMessage = PQerrorMessage(connection);
char *errorMessage = PQerrorMessage(connection->pgConn);
if (errorMessage != NULL)
{
char *firstNewlineIndex = strchr(errorMessage, '\n');
@ -499,26 +504,27 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
StringInfo queryResultString)
{
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *multiConnection =
MultiConnection *connection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
PGconn *nodeConnection = multiConnection->pgConn;
bool success = false;
PGresult *queryResult = NULL;
bool raiseInterrupts = true;
if (PQstatus(multiConnection->pgConn) != CONNECTION_OK)
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort);
return false;
}
queryResult = PQexec(nodeConnection, queryString);
success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString);
SendRemoteCommand(connection, queryString);
queryResult = GetRemoteCommandResult(connection, raiseInterrupts);
success = EvaluateQueryResult(connection, queryResult, queryResultString);
PQclear(queryResult);
/* close the connection */
CloseConnection(multiConnection);
CloseConnection(connection);
return success;
}

View File

@ -276,16 +276,21 @@ StartRemoteTransactionAbort(MultiConnection *connection)
Assert(transaction->transactionState != REMOTE_TRANS_INVALID);
/*
* Clear previous results, so we have a better chance to send
* ROLLBACK [PREPARED];
* Clear previous results, so we have a better chance to send ROLLBACK
* [PREPARED]. If we've previously sent a PREPARE TRANSACTION, we always
* want to wait for that result, as that shouldn't take long and will
* reserve resources. But if there's another query running, we don't want
* to wait, because a longrunning statement may be running, force it to be
* killed in that case.
*/
ForgetResults(connection);
if (transaction->transactionState == REMOTE_TRANS_PREPARING ||
transaction->transactionState == REMOTE_TRANS_PREPARED)
{
StringInfoData command;
/* await PREPARE TRANSACTION results, closing the connection would leave it dangling */
ForgetResults(connection);
initStringInfo(&command);
appendStringInfo(&command, "ROLLBACK PREPARED '%s'",
transaction->preparedName);
@ -304,6 +309,14 @@ StartRemoteTransactionAbort(MultiConnection *connection)
}
else
{
if (!NonblockingForgetResults(connection))
{
ShutdownConnection(connection);
/* FinishRemoteTransactionAbort will emit warning */
return;
}
if (!SendRemoteCommand(connection, "ROLLBACK"))
{
/* no point in reporting a likely redundant message */
@ -336,16 +349,16 @@ FinishRemoteTransactionAbort(MultiConnection *connection)
ReportResultError(connection, result, WARNING);
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING)
if (transaction->transactionState == REMOTE_TRANS_2PC_ABORTING)
{
ereport(WARNING,
(errmsg("failed to abort 2PC transaction \"%s\" on %s:%d",
transaction->preparedName, connection->hostname,
connection->port)));
WarnAboutLeakedPreparedTransaction(connection, isNotCommit);
}
else
{
WarnAboutLeakedPreparedTransaction(connection, isNotCommit);
ereport(WARNING,
(errmsg("failed to abort 1PC transaction \"%s\" on %s:%d",
transaction->preparedName, connection->hostname,
connection->port)));
}
}

View File

@ -134,6 +134,7 @@ extern MultiConnection * GetConnectionFromPGconn(struct pg_conn *pqConn);
extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
extern void CloseConnection(MultiConnection *connection);
extern void CloseConnectionByPGconn(struct pg_conn *pqConn);
extern void ShutdownConnection(MultiConnection *connection);
/* dealing with a connection */
extern void FinishConnectionListEstablishment(List *multiConnectionList);

View File

@ -26,6 +26,7 @@ extern bool LogRemoteCommands;
/* simple helpers */
extern bool IsResponseOK(struct pg_result *result);
extern void ForgetResults(MultiConnection *connection);
extern bool NonblockingForgetResults(MultiConnection *connection);
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
/* report errors & warnings */
@ -47,6 +48,9 @@ extern int SendRemoteCommandParams(MultiConnection *connection, const char *comm
extern List * ReadFirstColumnAsText(struct pg_result *queryResult);
extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection,
bool raiseInterrupts);
extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer,
int nbytes);
extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg);
#endif /* REMOTE_COMMAND_H */

View File

@ -60,7 +60,7 @@ check-isolation: all tempinstall-main
check-vanilla: all tempinstall-main
$(pg_regress_multi_check) --load-extension=citus --vanillatest
check-multi-mx: all tempinstall-main
$(pg_regress_multi_check) --load-extension=citus \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_mx_schedule $(EXTRA_TESTS)

View File

@ -0,0 +1,127 @@
Parsed test spec with 2 sessions
starting permutation: s1-timeout s1-sleep10000 s1-reset s1-drop
step s1-timeout:
SET statement_timeout = '100ms';
step s1-sleep10000:
SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1;
ERROR: canceling statement due to statement timeout
step s1-reset:
RESET ALL;
step s1-drop:
DROP TABLE cancel_table;
starting permutation: s1-timeout s1-sleep10000 s1-reset s2-drop
step s1-timeout:
SET statement_timeout = '100ms';
step s1-sleep10000:
SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1;
ERROR: canceling statement due to statement timeout
step s1-reset:
RESET ALL;
step s2-drop:
DROP TABLE cancel_table;
starting permutation: s1-timeout s1-begin s1-sleep10000 s1-rollback s1-reset s1-drop
step s1-timeout:
SET statement_timeout = '100ms';
step s1-begin:
BEGIN;
step s1-sleep10000:
SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1;
ERROR: canceling statement due to statement timeout
step s1-rollback:
ROLLBACK;
step s1-reset:
RESET ALL;
step s1-drop:
DROP TABLE cancel_table;
starting permutation: s1-timeout s1-begin s1-sleep10000 s1-rollback s1-reset s2-drop
step s1-timeout:
SET statement_timeout = '100ms';
step s1-begin:
BEGIN;
step s1-sleep10000:
SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1;
ERROR: canceling statement due to statement timeout
step s1-rollback:
ROLLBACK;
step s1-reset:
RESET ALL;
step s2-drop:
DROP TABLE cancel_table;
starting permutation: s1-timeout s1-begin s1-update1 s1-sleep10000 s1-rollback s1-reset s1-drop
step s1-timeout:
SET statement_timeout = '100ms';
step s1-begin:
BEGIN;
step s1-update1:
UPDATE cancel_table SET data = '' WHERE test_id = 1;
step s1-sleep10000:
SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1;
ERROR: canceling statement due to statement timeout
step s1-rollback:
ROLLBACK;
step s1-reset:
RESET ALL;
step s1-drop:
DROP TABLE cancel_table;
starting permutation: s1-timeout s1-begin s1-update1 s1-sleep10000 s1-rollback s1-reset s2-drop
step s1-timeout:
SET statement_timeout = '100ms';
step s1-begin:
BEGIN;
step s1-update1:
UPDATE cancel_table SET data = '' WHERE test_id = 1;
step s1-sleep10000:
SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1;
ERROR: canceling statement due to statement timeout
step s1-rollback:
ROLLBACK;
step s1-reset:
RESET ALL;
step s2-drop:
DROP TABLE cancel_table;

View File

@ -1,10 +1,10 @@
test: isolation_add_node_vs_reference_table_operations
# tests that change node metadata should precede
# isolation_cluster_management such that tests
# tests that change node metadata should precede
# isolation_cluster_management such that tests
# that come later can be parallelized
test: isolation_cluster_management
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation
test: isolation_concurrent_dml isolation_data_migration
test: isolation_drop_shards isolation_copy_placement_vs_modification
test: isolation_drop_shards isolation_copy_placement_vs_modification

View File

@ -0,0 +1,80 @@
# Tests around cancelling statements. As we can't trigger cancel
# interrupts directly, we use statement_timeout instead, which largely
# behaves the same as proper cancellation.
setup
{
CREATE TABLE cancel_table (test_id integer NOT NULL, data text);
SELECT create_distributed_table('cancel_table', 'test_id', 'hash');
INSERT INTO cancel_table VALUES(1);
}
teardown
{
DROP TABLE IF EXISTS cancel_table;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-commit"
{
COMMIT;
}
step "s1-rollback"
{
ROLLBACK;
}
step "s1-sleep10000"
{
SELECT pg_sleep(10000) FROM cancel_table WHERE test_id = 1;
}
step "s1-timeout"
{
SET statement_timeout = '100ms';
}
step "s1-update1"
{
UPDATE cancel_table SET data = '' WHERE test_id = 1;
}
step "s1-reset"
{
RESET ALL;
}
step "s1-drop"
{
DROP TABLE cancel_table;
}
session "s2"
step "s2-drop"
{
DROP TABLE cancel_table;
}
# check that statement cancel works for plain selects, drop table
# afterwards to make sure sleep on workers is cancelled (thereby not
# preventing drop via locks)
permutation "s1-timeout" "s1-sleep10000" "s1-reset" "s1-drop"
permutation "s1-timeout" "s1-sleep10000" "s1-reset" "s2-drop"
# check that statement cancel works for selects in transaction
permutation "s1-timeout" "s1-begin" "s1-sleep10000" "s1-rollback" "s1-reset" "s1-drop"
permutation "s1-timeout" "s1-begin" "s1-sleep10000" "s1-rollback" "s1-reset" "s2-drop"
# check that statement cancel works for selects in transaction, that previously wrote
permutation "s1-timeout" "s1-begin" "s1-update1" "s1-sleep10000" "s1-rollback" "s1-reset" "s1-drop"
permutation "s1-timeout" "s1-begin" "s1-update1" "s1-sleep10000" "s1-rollback" "s1-reset" "s2-drop"