Integrate router executor into transaction management framework.

One less place managing remote transactions. It also makes it fairly
easy to use 2PC for certain modifications (e.g. reference tables). Just
issue a CoordinatedTransactionUse2PC(). If every placement failure
should cause the whole transaction to abort, additionally mark the
relevant transactions as critical.
pull/1020/head
Andres Freund 2016-12-08 19:29:48 -08:00
parent fa5e202403
commit 80b34a5d6b
6 changed files with 71 additions and 182 deletions

View File

@ -41,6 +41,7 @@
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "executor/execdesc.h" #include "executor/execdesc.h"
#include "executor/executor.h" #include "executor/executor.h"
@ -80,13 +81,9 @@ bool AllModificationsCommutative = false;
* of XactShardConnSets, which map a shard identifier to a set of connection * of XactShardConnSets, which map a shard identifier to a set of connection
* hash entries. This list is walked by MarkRemainingInactivePlacements to * hash entries. This list is walked by MarkRemainingInactivePlacements to
* ensure we mark placements as failed if they reject a COMMIT. * ensure we mark placements as failed if they reject a COMMIT.
*
* Beyond that, there's a backend hook to register xact callbacks and a flag to
* track when a user tries to roll back to a savepoint (not allowed).
*/ */
static HTAB *xactParticipantHash = NULL; static HTAB *xactParticipantHash = NULL;
static List *xactShardConnSetList = NIL; static List *xactShardConnSetList = NIL;
static bool subXactAbortAttempted = false;
/* functions needed during start phase */ /* functions needed during start phase */
static void InitTransactionStateForTask(Task *task); static void InitTransactionStateForTask(Task *task);
@ -124,11 +121,7 @@ static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows
static void RecordShardIdParticipant(uint64 affectedShardId, static void RecordShardIdParticipant(uint64 affectedShardId,
NodeConnectionEntry *participantEntry); NodeConnectionEntry *participantEntry);
/* functions needed by callbacks and hooks */ /* to verify the health of shards after a transactional modification command */
static void RouterTransactionCallback(XactEvent event, void *arg);
static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg);
static void ExecuteTransactionEnd(bool commit);
static void MarkRemainingInactivePlacements(void); static void MarkRemainingInactivePlacements(void);
@ -249,6 +242,8 @@ InitTransactionStateForTask(Task *task)
{ {
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
BeginCoordinatedTransaction();
xactParticipantHash = CreateXactParticipantHash(); xactParticipantHash = CreateXactParticipantHash();
foreach(placementCell, task->taskPlacementList) foreach(placementCell, task->taskPlacementList)
@ -257,8 +252,9 @@ InitTransactionStateForTask(Task *task)
NodeConnectionKey participantKey; NodeConnectionKey participantKey;
NodeConnectionEntry *participantEntry = NULL; NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false; bool entryFound = false;
int connectionFlags = SESSION_LIFESPAN;
PGconn *connection = NULL; MultiConnection *connection =
GetNodeConnection(connectionFlags, placement->nodeName, placement->nodePort);
MemSet(&participantKey, 0, sizeof(participantKey)); MemSet(&participantKey, 0, sizeof(participantKey));
strlcpy(participantKey.nodeName, placement->nodeName, strlcpy(participantKey.nodeName, placement->nodeName,
@ -269,21 +265,8 @@ InitTransactionStateForTask(Task *task)
HASH_ENTER, &entryFound); HASH_ENTER, &entryFound);
Assert(!entryFound); Assert(!entryFound);
connection = GetOrEstablishConnection(placement->nodeName, /* issue BEGIN if necessary */
placement->nodePort); RemoteTransactionBeginIfNecessary(connection);
if (connection != NULL)
{
PGresult *result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
WarnRemoteError(connection, result);
CloseConnectionByPGconn(connection);
connection = NULL;
}
PQclear(result);
}
participantEntry->connection = connection; participantEntry->connection = connection;
} }
@ -1212,7 +1195,7 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
RecordShardIdParticipant(placement->shardId, participantEntry); RecordShardIdParticipant(placement->shardId, participantEntry);
} }
return participantEntry->connection; return participantEntry->connection->pgConn;
} }
else else
{ {
@ -1721,173 +1704,52 @@ RouterExecutorEnd(QueryDesc *queryDesc)
/* /*
* RegisterRouterExecutorXactCallbacks registers this executor's callbacks. * RouterExecutorPreCommitCheck() gets called after remote transactions have
* committed, so it can invalidate failed shards and perform related checks.
*/ */
void void
RegisterRouterExecutorXactCallbacks(void) RouterExecutorPreCommitCheck(void)
{ {
RegisterXactCallback(RouterTransactionCallback, NULL); /* no transactional router modification were issued, nothing to do */
RegisterSubXactCallback(RouterSubtransactionCallback, NULL);
}
/*
* RouterTransactionCallback handles committing or aborting remote transactions
* after the local one has committed or aborted. It only sends COMMIT or ABORT
* commands to still-healthy remotes; the failed ones are marked as inactive if
* after a successful COMMIT (no need to mark on ABORTs).
*/
static void
RouterTransactionCallback(XactEvent event, void *arg)
{
if (XactModificationLevel != XACT_MODIFICATION_DATA)
{
return;
}
switch (event)
{
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_COMMIT:
{
break;
}
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
{
bool commit = false;
ExecuteTransactionEnd(commit);
break;
}
/* no support for prepare with multi-statement transactions */
case XACT_EVENT_PREPARE:
case XACT_EVENT_PRE_PREPARE:
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot prepare a transaction that modified "
"distributed tables")));
break;
}
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
{
bool commit = true;
if (subXactAbortAttempted)
{
subXactAbortAttempted = false;
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
"which modify distributed tables")));
}
ExecuteTransactionEnd(commit);
MarkRemainingInactivePlacements();
/* leave early to avoid resetting transaction state */
return;
}
}
/* reset transaction state */
xactParticipantHash = NULL;
xactShardConnSetList = NIL;
subXactAbortAttempted = false;
}
/*
* RouterSubtransactionCallback silently keeps track of any attempt to ROLLBACK
* TO SAVEPOINT, which is not permitted by this executor. At transaction end,
* the executor checks whether such a rollback was attempted and, if so, errors
* out entirely (with an appropriate message).
*
* This implementation permits savepoints so long as no rollbacks occur.
*/
static void
RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg)
{
if ((xactParticipantHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB))
{
subXactAbortAttempted = true;
}
}
/*
* ExecuteTransactionEnd ends any remote transactions still taking place on
* remote nodes. It uses xactParticipantHash to know which nodes need any
* final COMMIT or ABORT commands. Nodes which fail a final COMMIT will have
* their connection field set to NULL to permit placement invalidation.
*/
static void
ExecuteTransactionEnd(bool commit)
{
const char *sqlCommand = commit ? "COMMIT TRANSACTION" : "ABORT TRANSACTION";
HASH_SEQ_STATUS scan;
NodeConnectionEntry *participant;
bool completed = !commit; /* aborts are assumed completed */
if (xactParticipantHash == NULL) if (xactParticipantHash == NULL)
{ {
return; return;
} }
hash_seq_init(&scan, xactParticipantHash); MarkRemainingInactivePlacements();
while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan)))
{
PGconn *connection = participant->connection;
PGresult *result = NULL;
if (PQstatus(connection) != CONNECTION_OK)
{
continue;
} }
result = PQexec(connection, sqlCommand);
if (PQresultStatus(result) == PGRES_COMMAND_OK)
{
completed = true;
}
else
{
WarnRemoteError(connection, result);
CloseConnectionByPGconn(participant->connection);
participant->connection = NULL; /*
} * Cleanup callback called after a transaction commits or aborts.
*/
PQclear(result); void
} RouterExecutorPostCommit(void)
if (!completed)
{ {
ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); /* reset transaction state */
} xactParticipantHash = NULL;
xactShardConnSetList = NIL;
} }
/* /*
* MarkRemainingInactivePlacements takes care of marking placements of a shard * MarkRemainingInactivePlacements takes care of marking placements of a shard
* inactive after some of the placements rejected the final COMMIT phase of a * inactive after some of the placements rejected the final COMMIT phase of a
* transaction. This step is skipped if all placements reject the COMMIT, since * transaction.
* in that case no modifications to the placement have persisted.
* *
* Failures are detected by checking the connection field of the entries in the * Failures are detected by checking the connection & transaction state for
* connection set for each shard: it is always set to NULL after errors. * each of the entries in the connection set for each shard.
*/ */
static void static void
MarkRemainingInactivePlacements(void) MarkRemainingInactivePlacements(void)
{ {
ListCell *shardConnSetCell = NULL; ListCell *shardConnSetCell = NULL;
int totalSuccesses = 0;
if (xactParticipantHash == NULL)
{
return;
}
foreach(shardConnSetCell, xactShardConnSetList) foreach(shardConnSetCell, xactShardConnSetList)
{ {
@ -1899,11 +1761,16 @@ MarkRemainingInactivePlacements(void)
/* determine how many actual successes there were: subtract failures */ /* determine how many actual successes there were: subtract failures */
foreach(participantCell, participantList) foreach(participantCell, participantList)
{ {
NodeConnectionEntry *participant = NULL; NodeConnectionEntry *participant =
participant = (NodeConnectionEntry *) lfirst(participantCell); (NodeConnectionEntry *) lfirst(participantCell);
MultiConnection *connection = participant->connection;
/* other codes sets connection to NULL after errors */ /*
if (participant->connection == NULL) * Fail if the connection has been set to NULL after an error, or
* if the transaction failed for other reasons (e.g. COMMIT
* failed).
*/
if (connection == NULL || connection->remoteTransaction.transactionFailed)
{ {
successes--; successes--;
} }
@ -1921,7 +1788,8 @@ MarkRemainingInactivePlacements(void)
NodeConnectionEntry *participant = NULL; NodeConnectionEntry *participant = NULL;
participant = (NodeConnectionEntry *) lfirst(participantCell); participant = (NodeConnectionEntry *) lfirst(participantCell);
if (participant->connection == NULL) if (participant->connection == NULL ||
participant->connection->remoteTransaction.transactionFailed)
{ {
uint64 shardId = shardConnSet->shardId; uint64 shardId = shardConnSet->shardId;
NodeConnectionKey *nodeKey = &participant->cacheKey; NodeConnectionKey *nodeKey = &participant->cacheKey;
@ -1934,5 +1802,13 @@ MarkRemainingInactivePlacements(void)
nodeKey->nodeName, nodeKey->nodePort); nodeKey->nodeName, nodeKey->nodePort);
} }
} }
totalSuccesses++;
}
/* If no shards could be modified at all, error out. */
if (totalSuccesses == 0)
{
ereport(ERROR, (errmsg("could not commit transaction on any active nodes")));
} }
} }

View File

@ -160,9 +160,6 @@ _PG_init(void)
InitializeTransactionManagement(); InitializeTransactionManagement();
InitializeConnectionManagement(); InitializeConnectionManagement();
/* initialize transaction callbacks */
RegisterRouterExecutorXactCallbacks();
/* enable modification of pg_catalog tables during pg_upgrade */ /* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade) if (IsBinaryUpgrade)
{ {

View File

@ -21,6 +21,7 @@
#include "access/xact.h" #include "access/xact.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
@ -147,6 +148,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* callbacks still can perform work if needed. * callbacks still can perform work if needed.
*/ */
ResetShardPlacementTransactionState(); ResetShardPlacementTransactionState();
RouterExecutorPostCommit();
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
{ {
@ -182,6 +184,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* callbacks still can perform work if needed. * callbacks still can perform work if needed.
*/ */
ResetShardPlacementTransactionState(); ResetShardPlacementTransactionState();
RouterExecutorPostCommit();
/* handles both already prepared and open transactions */ /* handles both already prepared and open transactions */
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
@ -245,6 +248,14 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
CoordinatedRemoteTransactionsCommit(); CoordinatedRemoteTransactionsCommit();
CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED;
} }
/*
* Call other parts of citus that need to integrate into
* transaction management. Call them *after* committing/preparing
* the remote transactions, to allow marking shards as invalid
* (e.g. if the remote commit failed).
*/
RouterExecutorPreCommitCheck();
} }
break; break;

View File

@ -35,7 +35,7 @@ typedef struct NodeConnectionKey
typedef struct NodeConnectionEntry typedef struct NodeConnectionEntry
{ {
NodeConnectionKey cacheKey; /* hash entry key */ NodeConnectionKey cacheKey; /* hash entry key */
PGconn *connection; /* connection to remote server, if any */ MultiConnection *connection; /* connection to remote server, if any */
} NodeConnectionEntry; } NodeConnectionEntry;

View File

@ -37,7 +37,8 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc);
extern void RegisterRouterExecutorXactCallbacks(void); extern void RouterExecutorPreCommitCheck(void);
extern void RouterExecutorPostCommit(void);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksWithoutResults(List *taskList);

View File

@ -442,6 +442,7 @@ INSERT INTO objects VALUES (2, 'BAD');
INSERT INTO labs VALUES (9, 'Umbrella Corporation'); INSERT INTO labs VALUES (9, 'Umbrella Corporation');
COMMIT; COMMIT;
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:57638
-- data should be persisted -- data should be persisted
SELECT * FROM objects WHERE id = 2; SELECT * FROM objects WHERE id = 2;
id | name id | name
@ -490,7 +491,9 @@ INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD'); INSERT INTO labs VALUES (9, 'BAD');
COMMIT; COMMIT;
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:57637
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:57638
ERROR: could not commit transaction on any active nodes ERROR: could not commit transaction on any active nodes
-- data should NOT be persisted -- data should NOT be persisted
SELECT * FROM objects WHERE id = 1; SELECT * FROM objects WHERE id = 1;
@ -526,6 +529,7 @@ INSERT INTO labs VALUES (8, 'Aperture Science');
INSERT INTO labs VALUES (9, 'BAD'); INSERT INTO labs VALUES (9, 'BAD');
COMMIT; COMMIT;
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:57637
\set VERBOSITY default \set VERBOSITY default
-- data to objects should be persisted, but labs should not... -- data to objects should be persisted, but labs should not...
SELECT * FROM objects WHERE id = 1; SELECT * FROM objects WHERE id = 1;