WIP: Introduction of very basic distributed transaction identifier concept.

This will be used, fat first, for distributed deadlock
detection. There it's important to be able to correlate transactions
on workers with transactions on the coordinator.  For that distributed
transactions (ones that actually issue BEGIN on workers etc), get
assigned a distributed transaction ID.  That ID currently consists out
of (nodeId, transactionNumber, timestamp).

TODO:
* think about 2PC
* locking
* query functionality
* ...
pull/1447/merge^2
Andres Freund 2017-06-11 18:30:48 -07:00
parent 907274a58a
commit 570e1159c1
4 changed files with 199 additions and 4 deletions

View File

@ -197,6 +197,7 @@ void
StartupCitusBackend(void)
{
InitializeMaintenanceDaemonBackend();
InitializeTransactionManagementBackend();
}

View File

@ -70,6 +70,7 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection)
RemoteTransaction *transaction = &connection->remoteTransaction;
PGresult *result = NULL;
const bool raiseErrors = true;
char *query = NULL;
Assert(transaction->transactionState == REMOTE_TRANS_STARTING);
@ -91,6 +92,33 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection)
{
Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS);
}
{
const char *tzstr = timestamptz_to_str(
MyTmgmtBackendData->transactionId.timestamp);
query = psprintf("SELECT assign_distributed_transaction_id("
UINT64_FORMAT ","UINT64_FORMAT ", '%s')",
MyTmgmtBackendData->transactionId.nodeId,
MyTmgmtBackendData->transactionId.transactionId,
tzstr);
}
/* FIXME: this is a bad idea performancewise */
if (!SendRemoteCommand(connection, query))
{
ReportConnectionError(connection, WARNING);
MarkRemoteTransactionFailed(connection, true);
}
result = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, WARNING);
MarkRemoteTransactionFailed(connection, raiseErrors);
}
PQclear(result);
ForgetResults(connection);
}
@ -263,7 +291,7 @@ RemoteTransactionCommit(MultiConnection *connection)
/*
* StartRemoteTransactionAbort initiates abortin the transaction in a
* StartRemoteTransactionAbort initiates aborting the transaction in a
* non-blocking manner.
*/
void

View File

@ -21,11 +21,15 @@
#include "access/xact.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "utils/hsearch.h"
#include "utils/guc.h"
#include "port/atomics.h"
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
@ -40,9 +44,6 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
/* list of connections that are part of the current coordinated transaction */
dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
static bool subXactAbortAttempted = false;
/*
* Should this coordinated transaction use 2PC? Set by
* CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
@ -50,6 +51,10 @@ static bool subXactAbortAttempted = false;
*/
bool CoordinatedTransactionUses2PC = false;
static bool subXactAbortAttempted = false;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* transaction management functions */
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
@ -58,6 +63,39 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction
/* remaining functions */
static void AdjustMaxPreparedTransactions(void);
TmgmtShmemControlData *TmgmtShmemControl = NULL;
TmgmtBackendData *MyTmgmtBackendData = NULL;
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
Datum
assign_distributed_transaction_id(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Assert(MyTmgmtBackendData);
/* FIXME: spinlock? */
MyTmgmtBackendData->transactionId.nodeId = PG_GETARG_INT64(0);
MyTmgmtBackendData->transactionId.transactionId = PG_GETARG_INT64(1);
MyTmgmtBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2);
PG_RETURN_VOID();
}
static void
UnsetDistributedTransactionId(void)
{
if (MyTmgmtBackendData)
{
/* FIXME: spinlock? */
MyTmgmtBackendData->transactionId.nodeId = 0;
MyTmgmtBackendData->transactionId.transactionId = 0;
MyTmgmtBackendData->transactionId.timestamp = 0;
}
}
/*
* BeginCoordinatedTransaction begins a coordinated transaction. No
@ -73,6 +111,20 @@ BeginCoordinatedTransaction(void)
}
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
Assert(MyTmgmtBackendData);
/* FIXME: Spinlock? Consistency is nice ;) */
/* FIXME: determine proper local node id for MX etc */
MyTmgmtBackendData->transactionId.nodeId = 0;
MyTmgmtBackendData->transactionId.transactionId =
pg_atomic_fetch_add_u64(&TmgmtShmemControl->nextTransactionId, 1);
MyTmgmtBackendData->transactionId.timestamp = GetCurrentTimestamp();
elog(DEBUG3, "assigning xact: (%lu, %lu, %lu)",
MyTmgmtBackendData->transactionId.nodeId,
MyTmgmtBackendData->transactionId.transactionId,
MyTmgmtBackendData->transactionId.timestamp);
}
@ -117,6 +169,59 @@ CoordinatedTransactionUse2PC(void)
}
static size_t
TmgmtShmemSize(void)
{
Size size = 0;
size = add_size(size, sizeof(TmgmtShmemControlData));
size = add_size(size, mul_size(sizeof(TmgmtBackendData), MaxBackends));
return size;
}
static void
TmgmtShmemInit(void)
{
bool alreadyInitialized = false;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
TmgmtShmemControl =
(TmgmtShmemControlData *) ShmemInitStruct("Transaction Management",
TmgmtShmemSize(),
&alreadyInitialized);
if (!alreadyInitialized)
{
/* initialize lwlock */
LWLockTranche *tranche = &TmgmtShmemControl->lockTranche;
/* start by zeroing out all the memory */
memset(TmgmtShmemControl, 0, TmgmtShmemSize());
TmgmtShmemControl->numSessions = MaxBackends;
/* initialize lock */
TmgmtShmemControl->trancheId = LWLockNewTrancheId();
tranche->array_base = &TmgmtShmemControl->lock;
tranche->array_stride = sizeof(LWLock);
tranche->name = "Distributed Transaction Management";
LWLockRegisterTranche(TmgmtShmemControl->trancheId, tranche);
LWLockInitialize(&TmgmtShmemControl->lock,
TmgmtShmemControl->trancheId);
pg_atomic_init_u64(&TmgmtShmemControl->nextTransactionId, 7);
}
LWLockRelease(AddinShmemInitLock);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
void
InitializeTransactionManagement(void)
{
@ -125,6 +230,26 @@ InitializeTransactionManagement(void)
RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL);
AdjustMaxPreparedTransactions();
/* allocate shared memory */
RequestAddinShmemSpace(TmgmtShmemSize());
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = TmgmtShmemInit;
}
void
InitializeTransactionManagementBackend(void)
{
/* Fill this backend's lock information */
LWLockAcquire(&TmgmtShmemControl->lock, LW_EXCLUSIVE);
MyTmgmtBackendData = &TmgmtShmemControl->sessions[MyProc->pgprocno];
MyTmgmtBackendData->databaseId = MyDatabaseId;
/* FIXME: get id usable for MX, where multiple nodes can start distributed transactions */
MyTmgmtBackendData->transactionId.nodeId = 0;
LWLockRelease(&TmgmtShmemControl->lock);
}
@ -168,6 +293,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions);
CoordinatedTransactionUses2PC = false;
UnsetDistributedTransactionId();
}
break;
@ -204,6 +331,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
dlist_init(&InProgressTransactions);
CoordinatedTransactionUses2PC = false;
subXactAbortAttempted = false;
UnsetDistributedTransactionId();
}
break;

View File

@ -11,6 +11,9 @@
#include "lib/ilist.h"
#include "datatype/timestamp.h"
#include "storage/lwlock.h"
/* describes what kind of modifications have occurred in the current transaction */
typedef enum
{
@ -52,6 +55,39 @@ typedef enum
COMMIT_PROTOCOL_2PC = 2
} CommitProtocolType;
/* FIXME: move to different header? */
typedef struct TmgmtTransactionId
{
uint64 nodeId;
uint64 transactionId;
TimestampTz timestamp;
} TmgmtTransactionId;
typedef struct TmgmtBackendData
{
Oid databaseId;
TmgmtTransactionId transactionId;
} TmgmtBackendData;
typedef struct TmgmtShmemControlData
{
int trancheId;
LWLockTranche lockTranche;
LWLock lock;
int numSessions;
pg_atomic_uint64 nextTransactionId;
TmgmtBackendData sessions[FLEXIBLE_ARRAY_MEMBER];
} TmgmtShmemControlData;
extern TmgmtBackendData *MyTmgmtBackendData;
extern TmgmtShmemControlData *TmgmtShmemControl;
/* config variable managed via guc.c */
extern int MultiShardCommitProtocol;
@ -76,6 +112,7 @@ extern void CoordinatedTransactionUse2PC(void);
/* initialization function(s) */
extern void InitializeTransactionManagement(void);
extern void InitializeTransactionManagementBackend(void);
#endif /* TRANSACTION_MANAGMENT_H */