diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c4931d0cb..3f3e37488 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -197,6 +197,7 @@ void StartupCitusBackend(void) { InitializeMaintenanceDaemonBackend(); + InitializeTransactionManagementBackend(); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 01bfe0eea..306c19424 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -70,6 +70,7 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection) RemoteTransaction *transaction = &connection->remoteTransaction; PGresult *result = NULL; const bool raiseErrors = true; + char *query = NULL; Assert(transaction->transactionState == REMOTE_TRANS_STARTING); @@ -91,6 +92,33 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection) { Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS); } + + { + const char *tzstr = timestamptz_to_str( + MyTmgmtBackendData->transactionId.timestamp); + + query = psprintf("SELECT assign_distributed_transaction_id(" + UINT64_FORMAT ","UINT64_FORMAT ", '%s')", + MyTmgmtBackendData->transactionId.nodeId, + MyTmgmtBackendData->transactionId.transactionId, + tzstr); + } + + /* FIXME: this is a bad idea performancewise */ + if (!SendRemoteCommand(connection, query)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, true); + } + result = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + + PQclear(result); + ForgetResults(connection); } @@ -263,7 +291,7 @@ RemoteTransactionCommit(MultiConnection *connection) /* - * StartRemoteTransactionAbort initiates abortin the transaction in a + * StartRemoteTransactionAbort initiates aborting the transaction in a * non-blocking manner. */ void diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index a0d14d9b0..e6faedffd 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,11 +21,15 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" +#include "storage/ipc.h" +#include "storage/proc.h" #include "utils/hsearch.h" #include "utils/guc.h" +#include "port/atomics.h" CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; @@ -40,9 +44,6 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; /* list of connections that are part of the current coordinated transaction */ dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions); - -static bool subXactAbortAttempted = false; - /* * Should this coordinated transaction use 2PC? Set by * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and @@ -50,6 +51,10 @@ static bool subXactAbortAttempted = false; */ bool CoordinatedTransactionUses2PC = false; + +static bool subXactAbortAttempted = false; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, @@ -58,6 +63,39 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction /* remaining functions */ static void AdjustMaxPreparedTransactions(void); +TmgmtShmemControlData *TmgmtShmemControl = NULL; +TmgmtBackendData *MyTmgmtBackendData = NULL; + +PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); + +Datum +assign_distributed_transaction_id(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Assert(MyTmgmtBackendData); + + /* FIXME: spinlock? */ + MyTmgmtBackendData->transactionId.nodeId = PG_GETARG_INT64(0); + MyTmgmtBackendData->transactionId.transactionId = PG_GETARG_INT64(1); + MyTmgmtBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2); + + PG_RETURN_VOID(); +} + + +static void +UnsetDistributedTransactionId(void) +{ + if (MyTmgmtBackendData) + { + /* FIXME: spinlock? */ + MyTmgmtBackendData->transactionId.nodeId = 0; + MyTmgmtBackendData->transactionId.transactionId = 0; + MyTmgmtBackendData->transactionId.timestamp = 0; + } +} + /* * BeginCoordinatedTransaction begins a coordinated transaction. No @@ -73,6 +111,20 @@ BeginCoordinatedTransaction(void) } CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + + Assert(MyTmgmtBackendData); + + /* FIXME: Spinlock? Consistency is nice ;) */ + /* FIXME: determine proper local node id for MX etc */ + MyTmgmtBackendData->transactionId.nodeId = 0; + MyTmgmtBackendData->transactionId.transactionId = + pg_atomic_fetch_add_u64(&TmgmtShmemControl->nextTransactionId, 1); + MyTmgmtBackendData->transactionId.timestamp = GetCurrentTimestamp(); + + elog(DEBUG3, "assigning xact: (%lu, %lu, %lu)", + MyTmgmtBackendData->transactionId.nodeId, + MyTmgmtBackendData->transactionId.transactionId, + MyTmgmtBackendData->transactionId.timestamp); } @@ -117,6 +169,59 @@ CoordinatedTransactionUse2PC(void) } +static size_t +TmgmtShmemSize(void) +{ + Size size = 0; + + size = add_size(size, sizeof(TmgmtShmemControlData)); + size = add_size(size, mul_size(sizeof(TmgmtBackendData), MaxBackends)); + + return size; +} + + +static void +TmgmtShmemInit(void) +{ + bool alreadyInitialized = false; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + TmgmtShmemControl = + (TmgmtShmemControlData *) ShmemInitStruct("Transaction Management", + TmgmtShmemSize(), + &alreadyInitialized); + if (!alreadyInitialized) + { + /* initialize lwlock */ + LWLockTranche *tranche = &TmgmtShmemControl->lockTranche; + + /* start by zeroing out all the memory */ + memset(TmgmtShmemControl, 0, TmgmtShmemSize()); + + TmgmtShmemControl->numSessions = MaxBackends; + + /* initialize lock */ + TmgmtShmemControl->trancheId = LWLockNewTrancheId(); + tranche->array_base = &TmgmtShmemControl->lock; + tranche->array_stride = sizeof(LWLock); + tranche->name = "Distributed Transaction Management"; + LWLockRegisterTranche(TmgmtShmemControl->trancheId, tranche); + LWLockInitialize(&TmgmtShmemControl->lock, + TmgmtShmemControl->trancheId); + + pg_atomic_init_u64(&TmgmtShmemControl->nextTransactionId, 7); + } + LWLockRelease(AddinShmemInitLock); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + void InitializeTransactionManagement(void) { @@ -125,6 +230,26 @@ InitializeTransactionManagement(void) RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL); AdjustMaxPreparedTransactions(); + + /* allocate shared memory */ + RequestAddinShmemSpace(TmgmtShmemSize()); + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = TmgmtShmemInit; +} + + +void +InitializeTransactionManagementBackend(void) +{ + /* Fill this backend's lock information */ + LWLockAcquire(&TmgmtShmemControl->lock, LW_EXCLUSIVE); + MyTmgmtBackendData = &TmgmtShmemControl->sessions[MyProc->pgprocno]; + MyTmgmtBackendData->databaseId = MyDatabaseId; + + /* FIXME: get id usable for MX, where multiple nodes can start distributed transactions */ + MyTmgmtBackendData->transactionId.nodeId = 0; + LWLockRelease(&TmgmtShmemControl->lock); } @@ -168,6 +293,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; + + UnsetDistributedTransactionId(); } break; @@ -204,6 +331,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; subXactAbortAttempted = false; + + UnsetDistributedTransactionId(); } break; diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index dd0c691ed..3f9d79427 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -11,6 +11,9 @@ #include "lib/ilist.h" +#include "datatype/timestamp.h" +#include "storage/lwlock.h" + /* describes what kind of modifications have occurred in the current transaction */ typedef enum { @@ -52,6 +55,39 @@ typedef enum COMMIT_PROTOCOL_2PC = 2 } CommitProtocolType; + +/* FIXME: move to different header? */ +typedef struct TmgmtTransactionId +{ + uint64 nodeId; + uint64 transactionId; + TimestampTz timestamp; +} TmgmtTransactionId; + +typedef struct TmgmtBackendData +{ + Oid databaseId; + TmgmtTransactionId transactionId; +} TmgmtBackendData; + + +typedef struct TmgmtShmemControlData +{ + int trancheId; + LWLockTranche lockTranche; + LWLock lock; + + int numSessions; + + pg_atomic_uint64 nextTransactionId; + + TmgmtBackendData sessions[FLEXIBLE_ARRAY_MEMBER]; +} TmgmtShmemControlData; + +extern TmgmtBackendData *MyTmgmtBackendData; +extern TmgmtShmemControlData *TmgmtShmemControl; + + /* config variable managed via guc.c */ extern int MultiShardCommitProtocol; @@ -76,6 +112,7 @@ extern void CoordinatedTransactionUse2PC(void); /* initialization function(s) */ extern void InitializeTransactionManagement(void); +extern void InitializeTransactionManagementBackend(void); #endif /* TRANSACTION_MANAGMENT_H */