citus/src/backend/distributed/transaction/transaction_management.c

145 lines
3.8 KiB
C

/*-------------------------------------------------------------------------
*
* transaction_management.c
*
* Transaction management for Citus. Most of the work is delegated to other
* subsystems, this files, and especially CoordinatedTransactionCallback,
* coordinates the work between them.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/xact.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/transaction_management.h"
#include "utils/hsearch.h"
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
/* GUC, the commit protocol to use for commands affecting more than one connection */
int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
/* state needed to keep track of operations used during a transaction */
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
static bool subXactAbortAttempted = false;
/* transaction management functions */
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg);
void
InitializeTransactionManagement(void)
{
/* hook into transaction machinery */
RegisterXactCallback(CoordinatedTransactionCallback, NULL);
RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL);
}
/*
* Transaction management callback, handling coordinated transaction, and
* transaction independent connection management.
*
* NB: There should only ever be a single transaction callback in citus, the
* ordering between the callbacks and thee actions within those callbacks
* otherwise becomes too undeterministic / hard to reason about.
*/
static void
CoordinatedTransactionCallback(XactEvent event, void *arg)
{
switch (event)
{
case XACT_EVENT_COMMIT:
{
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
AfterXactConnectionHandling(true);
}
Assert(!subXactAbortAttempted);
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE;
}
break;
case XACT_EVENT_ABORT:
{
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
AfterXactConnectionHandling(false);
}
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE;
subXactAbortAttempted = false;
}
break;
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_PREPARE:
{ }
break;
case XACT_EVENT_PRE_COMMIT:
{
if (subXactAbortAttempted)
{
subXactAbortAttempted = false;
if (XactModificationLevel != XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
"which modify distributed tables")));
}
}
}
break;
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_PREPARE:
{
if (CurrentCoordinatedTransactionState > COORD_TRANS_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use 2PC in transactions involving "
"multiple servers")));
}
}
break;
}
}
/*
* Subtransaction callback - currently only used to remember whether a
* savepoint has been rolled back, as we don't support that.
*/
static void
CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg)
{
if (event == SUBXACT_EVENT_ABORT_SUB)
{
subXactAbortAttempted = true;
}
}