From 570e1159c1b82191e0f5b94407942d68ce0b54d0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 11 Jun 2017 18:30:48 -0700 Subject: [PATCH] WIP: Introduction of very basic distributed transaction identifier concept. This will be used, fat first, for distributed deadlock detection. There it's important to be able to correlate transactions on workers with transactions on the coordinator. For that distributed transactions (ones that actually issue BEGIN on workers etc), get assigned a distributed transaction ID. That ID currently consists out of (nodeId, transactionNumber, timestamp). TODO: * think about 2PC * locking * query functionality * ... --- src/backend/distributed/shared_library_init.c | 1 + .../transaction/remote_transaction.c | 30 +++- .../transaction/transaction_management.c | 135 +++++++++++++++++- .../distributed/transaction_management.h | 37 +++++ 4 files changed, 199 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c4931d0cb..3f3e37488 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -197,6 +197,7 @@ void StartupCitusBackend(void) { InitializeMaintenanceDaemonBackend(); + InitializeTransactionManagementBackend(); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 01bfe0eea..306c19424 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -70,6 +70,7 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection) RemoteTransaction *transaction = &connection->remoteTransaction; PGresult *result = NULL; const bool raiseErrors = true; + char *query = NULL; Assert(transaction->transactionState == REMOTE_TRANS_STARTING); @@ -91,6 +92,33 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection) { Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS); } + + { + const char *tzstr = timestamptz_to_str( + MyTmgmtBackendData->transactionId.timestamp); + + query = psprintf("SELECT assign_distributed_transaction_id(" + UINT64_FORMAT ","UINT64_FORMAT ", '%s')", + MyTmgmtBackendData->transactionId.nodeId, + MyTmgmtBackendData->transactionId.transactionId, + tzstr); + } + + /* FIXME: this is a bad idea performancewise */ + if (!SendRemoteCommand(connection, query)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, true); + } + result = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + + PQclear(result); + ForgetResults(connection); } @@ -263,7 +291,7 @@ RemoteTransactionCommit(MultiConnection *connection) /* - * StartRemoteTransactionAbort initiates abortin the transaction in a + * StartRemoteTransactionAbort initiates aborting the transaction in a * non-blocking manner. */ void diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index a0d14d9b0..e6faedffd 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,11 +21,15 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" +#include "storage/ipc.h" +#include "storage/proc.h" #include "utils/hsearch.h" #include "utils/guc.h" +#include "port/atomics.h" CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; @@ -40,9 +44,6 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; /* list of connections that are part of the current coordinated transaction */ dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions); - -static bool subXactAbortAttempted = false; - /* * Should this coordinated transaction use 2PC? Set by * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and @@ -50,6 +51,10 @@ static bool subXactAbortAttempted = false; */ bool CoordinatedTransactionUses2PC = false; + +static bool subXactAbortAttempted = false; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, @@ -58,6 +63,39 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction /* remaining functions */ static void AdjustMaxPreparedTransactions(void); +TmgmtShmemControlData *TmgmtShmemControl = NULL; +TmgmtBackendData *MyTmgmtBackendData = NULL; + +PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); + +Datum +assign_distributed_transaction_id(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Assert(MyTmgmtBackendData); + + /* FIXME: spinlock? */ + MyTmgmtBackendData->transactionId.nodeId = PG_GETARG_INT64(0); + MyTmgmtBackendData->transactionId.transactionId = PG_GETARG_INT64(1); + MyTmgmtBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2); + + PG_RETURN_VOID(); +} + + +static void +UnsetDistributedTransactionId(void) +{ + if (MyTmgmtBackendData) + { + /* FIXME: spinlock? */ + MyTmgmtBackendData->transactionId.nodeId = 0; + MyTmgmtBackendData->transactionId.transactionId = 0; + MyTmgmtBackendData->transactionId.timestamp = 0; + } +} + /* * BeginCoordinatedTransaction begins a coordinated transaction. No @@ -73,6 +111,20 @@ BeginCoordinatedTransaction(void) } CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + + Assert(MyTmgmtBackendData); + + /* FIXME: Spinlock? Consistency is nice ;) */ + /* FIXME: determine proper local node id for MX etc */ + MyTmgmtBackendData->transactionId.nodeId = 0; + MyTmgmtBackendData->transactionId.transactionId = + pg_atomic_fetch_add_u64(&TmgmtShmemControl->nextTransactionId, 1); + MyTmgmtBackendData->transactionId.timestamp = GetCurrentTimestamp(); + + elog(DEBUG3, "assigning xact: (%lu, %lu, %lu)", + MyTmgmtBackendData->transactionId.nodeId, + MyTmgmtBackendData->transactionId.transactionId, + MyTmgmtBackendData->transactionId.timestamp); } @@ -117,6 +169,59 @@ CoordinatedTransactionUse2PC(void) } +static size_t +TmgmtShmemSize(void) +{ + Size size = 0; + + size = add_size(size, sizeof(TmgmtShmemControlData)); + size = add_size(size, mul_size(sizeof(TmgmtBackendData), MaxBackends)); + + return size; +} + + +static void +TmgmtShmemInit(void) +{ + bool alreadyInitialized = false; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + TmgmtShmemControl = + (TmgmtShmemControlData *) ShmemInitStruct("Transaction Management", + TmgmtShmemSize(), + &alreadyInitialized); + if (!alreadyInitialized) + { + /* initialize lwlock */ + LWLockTranche *tranche = &TmgmtShmemControl->lockTranche; + + /* start by zeroing out all the memory */ + memset(TmgmtShmemControl, 0, TmgmtShmemSize()); + + TmgmtShmemControl->numSessions = MaxBackends; + + /* initialize lock */ + TmgmtShmemControl->trancheId = LWLockNewTrancheId(); + tranche->array_base = &TmgmtShmemControl->lock; + tranche->array_stride = sizeof(LWLock); + tranche->name = "Distributed Transaction Management"; + LWLockRegisterTranche(TmgmtShmemControl->trancheId, tranche); + LWLockInitialize(&TmgmtShmemControl->lock, + TmgmtShmemControl->trancheId); + + pg_atomic_init_u64(&TmgmtShmemControl->nextTransactionId, 7); + } + LWLockRelease(AddinShmemInitLock); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + void InitializeTransactionManagement(void) { @@ -125,6 +230,26 @@ InitializeTransactionManagement(void) RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL); AdjustMaxPreparedTransactions(); + + /* allocate shared memory */ + RequestAddinShmemSpace(TmgmtShmemSize()); + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = TmgmtShmemInit; +} + + +void +InitializeTransactionManagementBackend(void) +{ + /* Fill this backend's lock information */ + LWLockAcquire(&TmgmtShmemControl->lock, LW_EXCLUSIVE); + MyTmgmtBackendData = &TmgmtShmemControl->sessions[MyProc->pgprocno]; + MyTmgmtBackendData->databaseId = MyDatabaseId; + + /* FIXME: get id usable for MX, where multiple nodes can start distributed transactions */ + MyTmgmtBackendData->transactionId.nodeId = 0; + LWLockRelease(&TmgmtShmemControl->lock); } @@ -168,6 +293,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; + + UnsetDistributedTransactionId(); } break; @@ -204,6 +331,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; subXactAbortAttempted = false; + + UnsetDistributedTransactionId(); } break; diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index dd0c691ed..3f9d79427 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -11,6 +11,9 @@ #include "lib/ilist.h" +#include "datatype/timestamp.h" +#include "storage/lwlock.h" + /* describes what kind of modifications have occurred in the current transaction */ typedef enum { @@ -52,6 +55,39 @@ typedef enum COMMIT_PROTOCOL_2PC = 2 } CommitProtocolType; + +/* FIXME: move to different header? */ +typedef struct TmgmtTransactionId +{ + uint64 nodeId; + uint64 transactionId; + TimestampTz timestamp; +} TmgmtTransactionId; + +typedef struct TmgmtBackendData +{ + Oid databaseId; + TmgmtTransactionId transactionId; +} TmgmtBackendData; + + +typedef struct TmgmtShmemControlData +{ + int trancheId; + LWLockTranche lockTranche; + LWLock lock; + + int numSessions; + + pg_atomic_uint64 nextTransactionId; + + TmgmtBackendData sessions[FLEXIBLE_ARRAY_MEMBER]; +} TmgmtShmemControlData; + +extern TmgmtBackendData *MyTmgmtBackendData; +extern TmgmtShmemControlData *TmgmtShmemControl; + + /* config variable managed via guc.c */ extern int MultiShardCommitProtocol; @@ -76,6 +112,7 @@ extern void CoordinatedTransactionUse2PC(void); /* initialization function(s) */ extern void InitializeTransactionManagement(void); +extern void InitializeTransactionManagementBackend(void); #endif /* TRANSACTION_MANAGMENT_H */