diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 883c9284d..44f052ba2 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ 6.2-1 6.2-2 6.2-3 6.2-4 \ - 7.0-1 7.0-2 7.0-3 + 7.0-1 7.0-2 7.0-3 7.0-4 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -145,6 +145,8 @@ $(EXTENSION)--7.0-2.sql: $(EXTENSION)--7.0-1.sql $(EXTENSION)--7.0-1--7.0-2.sql cat $^ > $@ $(EXTENSION)--7.0-3.sql: $(EXTENSION)--7.0-2.sql $(EXTENSION)--7.0-2--7.0-3.sql cat $^ > $@ +$(EXTENSION)--7.0-4.sql: $(EXTENSION)--7.0-3.sql $(EXTENSION)--7.0-3--7.0-4.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.0-3--7.0-4.sql b/src/backend/distributed/citus--7.0-3--7.0-4.sql new file mode 100644 index 000000000..a7a00ad09 --- /dev/null +++ b/src/backend/distributed/citus--7.0-3--7.0-4.sql @@ -0,0 +1,25 @@ +/* citus--7.0-3--7.0-4.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION assign_distributed_transaction_id(initiator_node_identifier int4, transaction_number int8, transaction_stamp timestamptz) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$assign_distributed_transaction_id$$; + COMMENT ON FUNCTION assign_distributed_transaction_id(initiator_node_identifier int4, transaction_number int8, transaction_stamp timestamptz) + IS 'Only intended for internal use, users should not call this. The function sets the distributed transaction id'; + +CREATE OR REPLACE FUNCTION get_current_transaction_id(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz) + RETURNS RECORD + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$get_current_transaction_id$$; + COMMENT ON FUNCTION get_current_transaction_id(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz) + IS 'returns the current backend data including distributed transaction id'; + +CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz) + RETURNS SETOF RECORD + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$get_all_active_transactions$$; + COMMENT ON FUNCTION get_all_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz) + IS 'returns distributed transaction ids of active distributed transactions'; +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index ba31416e5..ac5f945a7 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.0-3' +default_version = '7.0-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 1f7f833d9..76efecc3d 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -337,10 +337,11 @@ ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command, /* - * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and - * accepts a MultiConnection instead of a plain PGconn. It makes sure it can + * SendRemoteCommandParams is a PQsendQueryParams wrapper that logs remote commands, + * and accepts a MultiConnection instead of a plain PGconn. It makes sure it can * send commands asynchronously without blocking (at the potential expense of - * an additional memory allocation). + * an additional memory allocation). The command string can only include a single + * command since PQsendQueryParams() supports only that. */ int SendRemoteCommandParams(MultiConnection *connection, const char *command, @@ -372,14 +373,33 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, /* * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and - * accepts a MultiConnection instead of a plain PGconn. It makes sure it can + * accepts a MultiConnection instead of a plain PGconn. It makes sure it can * send commands asynchronously without blocking (at the potential expense of - * an additional memory allocation). + * an additional memory allocation). The command string can include multiple + * commands since PQsendQuery() supports that. */ int SendRemoteCommand(MultiConnection *connection, const char *command) { - return SendRemoteCommandParams(connection, command, 0, NULL, NULL); + PGconn *pgConn = connection->pgConn; + int rc = 0; + + LogRemoteCommand(connection, command); + + /* + * Don't try to send command if connection is entirely gone + * (PQisnonblocking() would crash). + */ + if (!pgConn) + { + return 0; + } + + Assert(PQisnonblocking(pgConn)); + + rc = PQsendQuery(pgConn, command); + + return rc; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c4931d0cb..113f6e416 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -19,6 +19,7 @@ #include "citus_version.h" #include "commands/explain.h" #include "executor/executor.h" +#include "distributed/backend_data.h" #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" #include "distributed/connection_management.h" @@ -173,6 +174,7 @@ _PG_init(void) /* initialize coordinated transaction management */ InitializeTransactionManagement(); + InitializeBackendManagement(); InitializeConnectionManagement(); InitPlacementConnectionManagement(); @@ -197,6 +199,7 @@ void StartupCitusBackend(void) { InitializeMaintenanceDaemonBackend(); + InitializeBackendData(); } diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c new file mode 100644 index 000000000..8732b1a4f --- /dev/null +++ b/src/backend/distributed/transaction/backend_data.c @@ -0,0 +1,489 @@ +/*------------------------------------------------------------------------- + * + * backend_data.c + * + * Infrastructure for managing per backend data that can efficiently + * accessed by all sessions. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include "funcapi.h" +#include "access/htup_details.h" +#include "catalog/pg_type.h" +#include "datatype/timestamp.h" +#include "distributed/backend_data.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/transaction_identifier.h" +#include "nodes/execnodes.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/spin.h" +#include "storage/s_lock.h" +#include "utils/timestamp.h" + + +/* + * Each backend's data reside in the shared memory + * on the BackendManagementShmemData. + */ +typedef struct BackendManagementShmemData +{ + int trancheId; +#if (PG_VERSION_NUM >= 100000) + NamedLWLockTranche namedLockTranche; +#else + LWLockTranche lockTranche; +#endif + LWLock lock; + + /* + * We prefer to use an atomic integer over sequences for two + * reasons (i) orders of magnitude performance difference + * (ii) allowing read-only replicas to be able to generate ids + */ + pg_atomic_uint64 nextTransactionNumber; + + BackendData backends[FLEXIBLE_ARRAY_MEMBER]; +} BackendManagementShmemData; + + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; +static BackendManagementShmemData *backendManagementShmemData = NULL; +static BackendData *MyBackendData = NULL; + + +static void BackendManagementShmemInit(void); +static size_t BackendManagementShmemSize(void); + + +PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); +PG_FUNCTION_INFO_V1(get_current_transaction_id); +PG_FUNCTION_INFO_V1(get_all_active_transactions); + + +/* + * assign_distributed_transaction_id updates the shared memory allocated for this backend + * and sets initiatorNodeIdentifier, transactionNumber, timestamp fields with the given + * inputs. Also, the function sets the database id and process id via the information that + * Postgres provides. + * + * This function is only intended for internal use for managing distributed transactions. + * Users should not use this function for any purpose. + */ +Datum +assign_distributed_transaction_id(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + /* MyBackendData should always be avaliable, just out of paranoia */ + if (!MyBackendData) + { + ereport(ERROR, (errmsg("backend is not ready for distributed transactions"))); + } + + SpinLockAcquire(&MyBackendData->mutex); + + /* if an id is already assigned, release the lock and error */ + if (MyBackendData->transactionId.initiatorNodeIdentifier != 0) + { + SpinLockRelease(&MyBackendData->mutex); + + ereport(ERROR, (errmsg("the backend has already been assigned a " + "transaction id"))); + } + + MyBackendData->databaseId = MyDatabaseId; + + MyBackendData->transactionId.initiatorNodeIdentifier = PG_GETARG_INT32(0); + MyBackendData->transactionId.transactionNumber = PG_GETARG_INT64(1); + MyBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2); + + SpinLockRelease(&MyBackendData->mutex); + + PG_RETURN_VOID(); +} + + +/* + * get_current_transaction_id returns a tuple with (databaseId, processId, + * initiatorNodeIdentifier, transactionNumber, timestamp) that exists in the + * shared memory associated with this backend. Note that if the backend + * is not in a transaction, the function returns uninitialized data where + * transactionNumber equals to 0. + */ +Datum +get_current_transaction_id(PG_FUNCTION_ARGS) +{ + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + + const int attributeCount = 5; + Datum values[attributeCount]; + bool isNulls[attributeCount]; + + DistributedTransactionId *distributedTransctionId = NULL; + + CheckCitusVersion(ERROR); + + /* build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + /* MyBackendData should always be avaliable, just out of paranoia */ + if (!MyBackendData) + { + ereport(ERROR, (errmsg("backend is not ready for distributed transactions"))); + } + + distributedTransctionId = GetCurrentDistributedTransctionId(); + + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + /* first two fields do not change for this backend, so get directly */ + values[0] = ObjectIdGetDatum(MyDatabaseId); + values[1] = Int32GetDatum(MyProcPid); + + values[2] = Int32GetDatum(distributedTransctionId->initiatorNodeIdentifier); + values[3] = UInt64GetDatum(distributedTransctionId->transactionNumber); + + /* provide a better output */ + if (distributedTransctionId->initiatorNodeIdentifier != 0) + { + values[4] = TimestampTzGetDatum(distributedTransctionId->timestamp); + } + else + { + isNulls[4] = true; + } + + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + PG_RETURN_DATUM(HeapTupleGetDatum(heapTuple)); +} + + +/* + * get_all_active_transactions returns all the avaliable information about all + * the active backends. + */ +Datum +get_all_active_transactions(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = NULL; + MemoryContext perQueryContext = NULL; + MemoryContext oldContext = NULL; + + int backendIndex = 0; + + const int attributeCount = 5; + Datum values[attributeCount]; + bool isNulls[attributeCount]; + + CheckCitusVersion(ERROR); + + /* check to see if caller supports us returning a tuplestore */ + if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context " \ + "that cannot accept a set"))); + } + + if (!(returnSetInfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } + + /* build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory; + + oldContext = MemoryContextSwitchTo(perQueryContext); + + tupleStore = tuplestore_begin_heap(true, false, work_mem); + returnSetInfo->returnMode = SFRM_Materialize; + returnSetInfo->setResult = tupleStore; + returnSetInfo->setDesc = tupleDescriptor; + + MemoryContextSwitchTo(oldContext); + + /* + * We don't want to initialize memory while spinlock is held so we + * prefer to do it here. This initialization is done only for the first + * row. + */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + /* we're reading all the backend data, take a lock to prevent concurrent additions */ + LWLockAcquire(AddinShmemInitLock, LW_SHARED); + + for (backendIndex = 0; backendIndex < MaxBackends; ++backendIndex) + { + BackendData *currentBackend = + &backendManagementShmemData->backends[backendIndex]; + + SpinLockAcquire(¤tBackend->mutex); + + /* we're only interested in active backends */ + if (currentBackend->transactionId.transactionNumber == 0) + { + SpinLockRelease(¤tBackend->mutex); + continue; + } + + values[0] = ObjectIdGetDatum(currentBackend->databaseId); + values[1] = Int32GetDatum(ProcGlobal->allProcs[backendIndex].pid); + values[2] = Int32GetDatum(currentBackend->transactionId.initiatorNodeIdentifier); + values[3] = UInt64GetDatum(currentBackend->transactionId.transactionNumber); + values[4] = TimestampTzGetDatum(currentBackend->transactionId.timestamp); + + SpinLockRelease(¤tBackend->mutex); + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + + /* + * We don't want to initialize memory while spinlock is held so we + * prefer to do it here. This initialization is done for the rows + * starting from the second one. + */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + } + + LWLockRelease(AddinShmemInitLock); + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupleStore); + + PG_RETURN_VOID(); +} + + +/* + * InitializeBackendManagement requests the necessary shared memory + * from Postgres and sets up the shared memory startup hook. + */ +void +InitializeBackendManagement(void) +{ + /* allocate shared memory */ + RequestAddinShmemSpace(BackendManagementShmemSize()); + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = BackendManagementShmemInit; +} + + +/* + * BackendManagementShmemInit is the callback that is to be called on shared + * memory startup hook. The function sets up the necessary shared memory + * segment for the backend manager. + */ +static void +BackendManagementShmemInit(void) +{ + bool alreadyInitialized = false; + + /* we may update the shmem, acquire lock exclusively */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + backendManagementShmemData = + (BackendManagementShmemData *) ShmemInitStruct( + "Backend Management Shmem", + BackendManagementShmemSize(), + &alreadyInitialized); + + if (!alreadyInitialized) + { + int backendIndex = 0; + char *trancheName = "Backend Management Tranche"; + +#if (PG_VERSION_NUM >= 100000) + NamedLWLockTranche *namedLockTranche = + &backendManagementShmemData->namedLockTranche; + +#else + LWLockTranche *lockTranche = &backendManagementShmemData->lockTranche; +#endif + + /* start by zeroing out all the memory */ + memset(backendManagementShmemData, 0, + BackendManagementShmemSize()); + +#if (PG_VERSION_NUM >= 100000) + namedLockTranche->trancheId = LWLockNewTrancheId(); + + LWLockRegisterTranche(namedLockTranche->trancheId, trancheName); + LWLockInitialize(&backendManagementShmemData->lock, + namedLockTranche->trancheId); +#else + backendManagementShmemData->trancheId = LWLockNewTrancheId(); + + /* we only need a single lock */ + lockTranche->array_base = &backendManagementShmemData->lock; + lockTranche->array_stride = sizeof(LWLock); + lockTranche->name = trancheName; + + LWLockRegisterTranche(backendManagementShmemData->trancheId, lockTranche); + LWLockInitialize(&backendManagementShmemData->lock, + backendManagementShmemData->trancheId); +#endif + + /* start the distributed transaction ids from 1 */ + pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1); + + /* + * We need to init per backend's spinlock before any backend + * starts its execution. + */ + for (backendIndex = 0; backendIndex < MaxBackends; ++backendIndex) + { + SpinLockInit(&backendManagementShmemData->backends[backendIndex].mutex); + } + } + + LWLockRelease(AddinShmemInitLock); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + +/* + * BackendManagementShmemSize returns the size that should be allocated + * on the shared memory for backend management. + */ +static size_t +BackendManagementShmemSize(void) +{ + Size size = 0; + + size = add_size(size, sizeof(BackendManagementShmemData)); + size = add_size(size, mul_size(sizeof(BackendData), MaxBackends)); + + return size; +} + + +/* + * InitializeBackendData is called per backend and does the + * required initialization. + */ +void +InitializeBackendData(void) +{ + MyBackendData = &backendManagementShmemData->backends[MyProc->pgprocno]; + + Assert(MyBackendData); + + SpinLockAcquire(&MyBackendData->mutex); + + MyBackendData->databaseId = MyDatabaseId; + MyBackendData->transactionId.initiatorNodeIdentifier = 0; + MyBackendData->transactionId.transactionNumber = 0; + MyBackendData->transactionId.timestamp = 0; + + SpinLockRelease(&MyBackendData->mutex); +} + + +/* + * UnSetDistributedTransactionId simply acquires the mutex and resets the backend's + * distributed transaction data in shared memory to the initial values. + */ +void +UnSetDistributedTransactionId(void) +{ + /* backend does not exist if the extension is not created */ + if (MyBackendData) + { + SpinLockAcquire(&MyBackendData->mutex); + + MyBackendData->databaseId = 0; + MyBackendData->transactionId.initiatorNodeIdentifier = 0; + MyBackendData->transactionId.transactionNumber = 0; + MyBackendData->transactionId.timestamp = 0; + + SpinLockRelease(&MyBackendData->mutex); + } +} + + +/* + * GetCurrentDistributedTransctionId reads the backend's distributed transaction id and + * returns a copy of it. + */ +DistributedTransactionId * +GetCurrentDistributedTransctionId(void) +{ + DistributedTransactionId *currentDistributedTransactionId = + (DistributedTransactionId *) palloc(sizeof(DistributedTransactionId)); + + SpinLockAcquire(&MyBackendData->mutex); + + currentDistributedTransactionId->initiatorNodeIdentifier = + MyBackendData->transactionId.initiatorNodeIdentifier; + currentDistributedTransactionId->transactionNumber = + MyBackendData->transactionId.transactionNumber; + currentDistributedTransactionId->timestamp = + MyBackendData->transactionId.timestamp; + + SpinLockRelease(&MyBackendData->mutex); + + return currentDistributedTransactionId; +} + + +/* + * AssignDistributedTransactionId generates a new distributed transaction id and + * sets it for the current backend. It also sets the databaseId and + * processId fields. + * + * This function should only be called on BeginCoordinatedTransaction(). Any other + * callers is very likely to break the distributed transction management. + */ +void +AssignDistributedTransactionId(void) +{ + pg_atomic_uint64 *transactionNumberSequence = + &backendManagementShmemData->nextTransactionNumber; + + uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1); + int localGroupId = GetLocalGroupId(); + TimestampTz currentTimestamp = GetCurrentTimestamp(); + + SpinLockAcquire(&MyBackendData->mutex); + + MyBackendData->databaseId = MyDatabaseId; + + MyBackendData->transactionId.initiatorNodeIdentifier = localGroupId; + MyBackendData->transactionId.transactionNumber = + nextTransactionNumber; + MyBackendData->transactionId.timestamp = currentTimestamp; + + SpinLockRelease(&MyBackendData->mutex); +} diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 72c064099..d215948ef 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -15,6 +15,7 @@ #include "miscadmin.h" #include "access/xact.h" +#include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/remote_commands.h" @@ -32,12 +33,16 @@ static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool /* * StartRemoteTransactionBeging initiates beginning the remote transaction in - * a non-blocking manner. + * a non-blocking manner. The function sends "BEGIN" followed by + * assign_distributed_transaction_id() to assign the distributed transaction + * id on the remote node. */ void StartRemoteTransactionBegin(struct MultiConnection *connection) { RemoteTransaction *transaction = &connection->remoteTransaction; + StringInfo beginAndSetDistributedTransactionId = makeStringInfo(); + DistributedTransactionId *distributedTransactionId = NULL; Assert(transaction->transactionState == REMOTE_TRANS_INVALID); @@ -51,8 +56,23 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) * side might have been changed, and that would cause problematic * behaviour. */ - if (!SendRemoteCommand(connection, - "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED")) + appendStringInfoString(beginAndSetDistributedTransactionId, + "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;"); + + /* + * Append BEGIN and assign_distributed_transaction_id() statements into a single command + * and send both in one step. The reason is purely performance, we don't want + * seperate roundtrips for these two statements. + */ + distributedTransactionId = GetCurrentDistributedTransctionId(); + appendStringInfo(beginAndSetDistributedTransactionId, + "SELECT assign_distributed_transaction_id(%d, %ld, '%s')", + distributedTransactionId->initiatorNodeIdentifier, + distributedTransactionId->transactionNumber, + timestamptz_to_str(distributedTransactionId->timestamp)); + + + if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data)) { ReportConnectionError(connection, WARNING); MarkRemoteTransactionFailed(connection, true); diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index a1212a43a..e3094840d 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -19,6 +19,7 @@ #include "access/twophase.h" #include "access/xact.h" +#include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" #include "distributed/multi_shard_transaction.h" @@ -73,6 +74,8 @@ BeginCoordinatedTransaction(void) } CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + + AssignDistributedTransactionId(); } @@ -168,6 +171,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; + + UnSetDistributedTransactionId(); break; } @@ -204,13 +209,19 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; subXactAbortAttempted = false; + UnSetDistributedTransactionId(); break; } case XACT_EVENT_PARALLEL_COMMIT: case XACT_EVENT_PARALLEL_ABORT: + { + break; + } + case XACT_EVENT_PREPARE: { + UnSetDistributedTransactionId(); break; } diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h new file mode 100644 index 000000000..6d0e52adc --- /dev/null +++ b/src/include/distributed/backend_data.h @@ -0,0 +1,39 @@ +/* + * backend_data.h + * + * Data structure definition for managing backend data and related function + * declarations. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef BACKEND_DATA_H +#define BACKEND_DATA_H + + +#include "datatype/timestamp.h" +#include "distributed/transaction_identifier.h" +#include "nodes/pg_list.h" +#include "storage/s_lock.h" + + +/* + * Each backend's active distributed transaction information is tracked via + * BackendData in shared memory. + */ +typedef struct BackendData +{ + Oid databaseId; + slock_t mutex; + DistributedTransactionId transactionId; +} BackendData; + + +extern void InitializeBackendManagement(void); +extern void InitializeBackendData(void); +extern void UnSetDistributedTransactionId(void); +extern void AssignDistributedTransactionId(void); + +#endif /* BACKEND_DATA_H */ diff --git a/src/include/distributed/transaction_identifier.h b/src/include/distributed/transaction_identifier.h new file mode 100644 index 000000000..9e1de3f9d --- /dev/null +++ b/src/include/distributed/transaction_identifier.h @@ -0,0 +1,39 @@ +/* + * transaction_identifier.h + * + * Data structure for distributed transaction id and related function + * declarations. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef TRANSACTION_IDENTIFIER_H +#define TRANSACTION_IDENTIFIER_H + + +#include "datatype/timestamp.h" + + +/* + * Citus identifies a distributed transaction with a triplet consisting of + * + * - initiatorNodeIdentifier: A unique identifier of the node that initiated + * the distributed transaction + * - transactionNumber: A locally unique identifier assigned for the distributed + * transaction on the node that initiated the distributed transaction + * - timestamp: The current timestamp of distributed transaction initiation + * + */ +typedef struct DistributedTransactionId +{ + int initiatorNodeIdentifier; + uint64 transactionNumber; + TimestampTz timestamp; +} DistributedTransactionId; + + +extern DistributedTransactionId * GetCurrentDistributedTransctionId(void); + +#endif /* TRANSACTION_IDENTIFIER_H */ diff --git a/src/test/regress/expected/isolation_distributed_transaction_id.out b/src/test/regress/expected/isolation_distributed_transaction_id.out new file mode 100644 index 000000000..25fc9cf88 --- /dev/null +++ b/src/test/regress/expected/isolation_distributed_transaction_id.out @@ -0,0 +1,112 @@ +Parsed test spec with 4 sessions + +starting permutation: s1-begin s1-assign-transaction-id s4-get-all-transactions s2-begin s2-assign-transaction-id s4-get-all-transactions s3-begin s3-assign-transaction-id s4-get-all-transactions s1-commit s4-get-all-transactions s2-commit s4-get-all-transactions s3-commit s4-get-all-transactions +step s1-begin: + BEGIN; + +step s1-assign-transaction-id: + SELECT assign_distributed_transaction_id(1, 1, '2015-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step s4-get-all-transactions: + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; + +initiator_node_identifiertransaction_numbertransaction_stamp + +1 1 Wed Dec 31 16:00:00 2014 PST +step s2-begin: + BEGIN; + +step s2-assign-transaction-id: + SELECT assign_distributed_transaction_id(2, 2, '2015-01-02 00:00:00+0'); + +assign_distributed_transaction_id + + +step s4-get-all-transactions: + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; + +initiator_node_identifiertransaction_numbertransaction_stamp + +1 1 Wed Dec 31 16:00:00 2014 PST +2 2 Thu Jan 01 16:00:00 2015 PST +step s3-begin: + BEGIN; + +step s3-assign-transaction-id: + SELECT assign_distributed_transaction_id(3, 3, '2015-01-03 00:00:00+0'); + +assign_distributed_transaction_id + + +step s4-get-all-transactions: + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; + +initiator_node_identifiertransaction_numbertransaction_stamp + +1 1 Wed Dec 31 16:00:00 2014 PST +2 2 Thu Jan 01 16:00:00 2015 PST +3 3 Fri Jan 02 16:00:00 2015 PST +step s1-commit: + COMMIT; + +step s4-get-all-transactions: + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; + +initiator_node_identifiertransaction_numbertransaction_stamp + +2 2 Thu Jan 01 16:00:00 2015 PST +3 3 Fri Jan 02 16:00:00 2015 PST +step s2-commit: + COMMIT; + +step s4-get-all-transactions: + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; + +initiator_node_identifiertransaction_numbertransaction_stamp + +3 3 Fri Jan 02 16:00:00 2015 PST +step s3-commit: + COMMIT; + +step s4-get-all-transactions: + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; + +initiator_node_identifiertransaction_numbertransaction_stamp + + +starting permutation: s1-create-table s1-begin s1-insert s1-get-current-transaction-id s2-get-first-worker-active-transactions +step s1-create-table: + -- some tests also use distributed table + CREATE TABLE distributed_transaction_id_table(some_value int, other_value int); + SET citus.shard_count TO 4; + SELECT create_distributed_table('distributed_transaction_id_table', 'some_value'); + +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-insert: + INSERT INTO distributed_transaction_id_table VALUES (1, 1); + +step s1-get-current-transaction-id: + SELECT row(initiator_node_identifier, transaction_number) FROM get_current_transaction_id(); + +row + +(0,186) +step s2-get-first-worker-active-transactions: + SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number) + FROM + get_all_active_transactions(); + ') + WHERE nodeport = 57637; +; + +nodename nodeport success result + +localhost 57637 t (0,186) diff --git a/src/test/regress/expected/multi_distributed_transaction_id.out b/src/test/regress/expected/multi_distributed_transaction_id.out new file mode 100644 index 000000000..01328c153 --- /dev/null +++ b/src/test/regress/expected/multi_distributed_transaction_id.out @@ -0,0 +1,127 @@ +-- +-- MULTI_DISTRIBUTED_TRANSACTION_ID +-- +-- Unit tests for distributed transaction id functionality +-- +-- get the current transaction id, which should be uninitialized +-- note that we skip printing the databaseId, which might change +-- per run +-- set timezone to a specific value to prevent +-- different values on different servers +SET TIME ZONE 'PST8PDT'; +-- should return uninitialized values if not in a transaction +SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp +---------------------------+--------------------+------------------- + 0 | 0 | +(1 row) + +BEGIN; + + -- we should still see the uninitialized values + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp | ?column? +---------------------------+--------------------+-------------------+---------- + 0 | 0 | | t +(1 row) + + -- now assign a value + SELECT assign_distributed_transaction_id(50, 50, '2016-01-01 00:00:00+0'); + assign_distributed_transaction_id +----------------------------------- + +(1 row) + + -- see the assigned value + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp | ?column? +---------------------------+--------------------+------------------------------+---------- + 50 | 50 | Thu Dec 31 16:00:00 2015 PST | t +(1 row) + + -- a backend cannot be assigned another tx id if already assigned + SELECT assign_distributed_transaction_id(51, 51, '2017-01-01 00:00:00+0'); +ERROR: the backend has already been assigned a transaction id +ROLLBACK; +-- since the transaction finished, we should see the uninitialized values +SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp | ?column? +---------------------------+--------------------+-------------------+---------- + 0 | 0 | | t +(1 row) + +-- also see that ROLLBACK (i.e., failures in the transaction) clears the shared memory +BEGIN; + + -- we should still see the uninitialized values + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp | ?column? +---------------------------+--------------------+-------------------+---------- + 0 | 0 | | t +(1 row) + + -- now assign a value + SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0'); + assign_distributed_transaction_id +----------------------------------- + +(1 row) + + SELECT 5 / 0; +ERROR: division by zero +COMMIT; +-- since the transaction errored, we should see the uninitialized values again + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp | ?column? +---------------------------+--------------------+-------------------+---------- + 0 | 0 | | t +(1 row) + +-- we should also see that a new connection means an uninitialized transaction id +BEGIN; + + SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0'); + assign_distributed_transaction_id +----------------------------------- + +(1 row) + + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp | ?column? +---------------------------+--------------------+------------------------------+---------- + 52 | 52 | Wed Dec 31 16:00:00 2014 PST | t +(1 row) + + \c - - - :master_port + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp | ?column? +---------------------------+--------------------+-------------------+---------- + 0 | 0 | | t +(1 row) + +-- now show that PREPARE resets the distributed transaction id +BEGIN; + SELECT assign_distributed_transaction_id(120, 120, '2015-01-01 00:00:00+0'); + assign_distributed_transaction_id +----------------------------------- + +(1 row) + + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp | ?column? +---------------------------+--------------------+------------------------------+---------- + 120 | 120 | Wed Dec 31 16:00:00 2014 PST | t +(1 row) + + PREPARE TRANSACTION 'dist_xact_id_test'; +-- after the prepare we should see that transaction id is cleared +SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + initiator_node_identifier | transaction_number | transaction_stamp | ?column? +---------------------------+--------------------+-------------------+---------- + 0 | 0 | | t +(1 row) + +-- cleanup the transaction +ROLLBACK PREPARED 'dist_xact_id_test'; +-- set back to the original zone +SET TIME ZONE DEFAULT; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 02daaf7bc..038c3b992 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -113,6 +113,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-4'; ALTER EXTENSION citus UPDATE TO '7.0-1'; ALTER EXTENSION citus UPDATE TO '7.0-2'; ALTER EXTENSION citus UPDATE TO '7.0-3'; +ALTER EXTENSION citus UPDATE TO '7.0-4'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 143dc6e86..24860fe48 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -8,5 +8,5 @@ test: isolation_cluster_management test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation test: isolation_concurrent_dml isolation_data_migration test: isolation_drop_shards isolation_copy_placement_vs_modification - test: isolation_insert_vs_vacuum +test: isolation_distributed_transaction_id diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 935e6cefd..1d7b9b3cf 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -44,7 +44,7 @@ test: multi_insert_select # ---------- # Miscellaneous tests to check our query planning behavior # ---------- -test: multi_deparse_shard_query +test: multi_deparse_shard_query multi_distributed_transaction_id test: multi_basic_queries multi_complex_expressions test: multi_explain test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics diff --git a/src/test/regress/specs/isolation_distributed_transaction_id.spec b/src/test/regress/specs/isolation_distributed_transaction_id.spec new file mode 100644 index 000000000..8d04b17f9 --- /dev/null +++ b/src/test/regress/specs/isolation_distributed_transaction_id.spec @@ -0,0 +1,107 @@ +# Tests around distributed transaction id generation + +setup +{ + SET TIME ZONE 'PST8PDT'; +} + +teardown +{ + SET TIME ZONE DEFAULT; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-assign-transaction-id" +{ + SELECT assign_distributed_transaction_id(1, 1, '2015-01-01 00:00:00+0'); +} + +step "s1-commit" +{ + COMMIT; +} + +step "s1-create-table" +{ + -- some tests also use distributed table + CREATE TABLE distributed_transaction_id_table(some_value int, other_value int); + SET citus.shard_count TO 4; + SELECT create_distributed_table('distributed_transaction_id_table', 'some_value'); +} + +step "s1-insert" +{ + INSERT INTO distributed_transaction_id_table VALUES (1, 1); +} + +step "s1-get-current-transaction-id" +{ + SELECT row(initiator_node_identifier, transaction_number) FROM get_current_transaction_id(); +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-assign-transaction-id" +{ + SELECT assign_distributed_transaction_id(2, 2, '2015-01-02 00:00:00+0'); +} + +step "s2-commit" +{ + COMMIT; +} + +# print only the necessary parts to prevent concurrent runs to print different values +step "s2-get-first-worker-active-transactions" +{ + SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number) + FROM + get_all_active_transactions(); + ') + WHERE nodeport = 57637; +; +} + +session "s3" + +step "s3-begin" +{ + BEGIN; +} + +step "s3-assign-transaction-id" +{ + SELECT assign_distributed_transaction_id(3, 3, '2015-01-03 00:00:00+0'); +} + +step "s3-commit" +{ + COMMIT; +} + +session "s4" + +step "s4-get-all-transactions" +{ + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; +} + +# show that we could get all distributed transaction ids from seperate sessions +permutation "s1-begin" "s1-assign-transaction-id" "s4-get-all-transactions" "s2-begin" "s2-assign-transaction-id" "s4-get-all-transactions" "s3-begin" "s3-assign-transaction-id" "s4-get-all-transactions" "s1-commit" "s4-get-all-transactions" "s2-commit" "s4-get-all-transactions" "s3-commit" "s4-get-all-transactions" + + +# now show that distributed transaction id on the coordinator +# is the same with the one on the worker +permutation "s1-create-table" "s1-begin" "s1-insert" "s1-get-current-transaction-id" "s2-get-first-worker-active-transactions" + diff --git a/src/test/regress/sql/multi_distributed_transaction_id.sql b/src/test/regress/sql/multi_distributed_transaction_id.sql new file mode 100644 index 000000000..838549cb7 --- /dev/null +++ b/src/test/regress/sql/multi_distributed_transaction_id.sql @@ -0,0 +1,81 @@ +-- +-- MULTI_DISTRIBUTED_TRANSACTION_ID +-- +-- Unit tests for distributed transaction id functionality +-- + +-- get the current transaction id, which should be uninitialized +-- note that we skip printing the databaseId, which might change +-- per run + +-- set timezone to a specific value to prevent +-- different values on different servers +SET TIME ZONE 'PST8PDT'; + +-- should return uninitialized values if not in a transaction +SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id(); + +BEGIN; + + -- we should still see the uninitialized values + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + + -- now assign a value + SELECT assign_distributed_transaction_id(50, 50, '2016-01-01 00:00:00+0'); + + -- see the assigned value + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + + -- a backend cannot be assigned another tx id if already assigned + SELECT assign_distributed_transaction_id(51, 51, '2017-01-01 00:00:00+0'); + +ROLLBACK; + +-- since the transaction finished, we should see the uninitialized values +SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + + +-- also see that ROLLBACK (i.e., failures in the transaction) clears the shared memory +BEGIN; + + -- we should still see the uninitialized values + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + + -- now assign a value + SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0'); + + SELECT 5 / 0; +COMMIT; + +-- since the transaction errored, we should see the uninitialized values again + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + + +-- we should also see that a new connection means an uninitialized transaction id +BEGIN; + + SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0'); + + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + + \c - - - :master_port + + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + +-- now show that PREPARE resets the distributed transaction id + +BEGIN; + SELECT assign_distributed_transaction_id(120, 120, '2015-01-01 00:00:00+0'); + + SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + + PREPARE TRANSACTION 'dist_xact_id_test'; + +-- after the prepare we should see that transaction id is cleared +SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id(); + +-- cleanup the transaction +ROLLBACK PREPARED 'dist_xact_id_test'; + +-- set back to the original zone +SET TIME ZONE DEFAULT; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 58809f36c..5895adb0d 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -113,6 +113,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-4'; ALTER EXTENSION citus UPDATE TO '7.0-1'; ALTER EXTENSION citus UPDATE TO '7.0-2'; ALTER EXTENSION citus UPDATE TO '7.0-3'; +ALTER EXTENSION citus UPDATE TO '7.0-4'; -- show running version SHOW citus.version;