Introduce distributed transaction ids

This commit adds distributed transaction id infrastructure in
the scope of distributed deadlock detection.

In general, the distributed transaction id consists of a tuple
in the form of: `(databaseId, initiatorNodeIdentifier, transactionId,
timestamp)`.

Briefly, we add a shared memory block on each node, which holds some
information per backend (i.e., an array `BackendData backends[MaxBackends]`).
Later, on each coordinated transaction, Citus sends
`SELECT assign_distributed_transaction_id()` right after `BEGIN`.
For that backend on the worker, the distributed transaction id is set to
the values assigned via the function call.

The aim of the above is to correlate the transactions on the coordinator
to the transactions on the worker nodes.
pull/1489/head
Onder Kalaci 2017-07-06 16:10:25 +03:00
parent 0f4fa854cc
commit 3369f3486f
18 changed files with 1090 additions and 13 deletions

View File

@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
6.2-1 6.2-2 6.2-3 6.2-4 \
7.0-1 7.0-2 7.0-3
7.0-1 7.0-2 7.0-3 7.0-4
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -145,6 +145,8 @@ $(EXTENSION)--7.0-2.sql: $(EXTENSION)--7.0-1.sql $(EXTENSION)--7.0-1--7.0-2.sql
cat $^ > $@
$(EXTENSION)--7.0-3.sql: $(EXTENSION)--7.0-2.sql $(EXTENSION)--7.0-2--7.0-3.sql
cat $^ > $@
$(EXTENSION)--7.0-4.sql: $(EXTENSION)--7.0-3.sql $(EXTENSION)--7.0-3--7.0-4.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,25 @@
/* citus--7.0-3--7.0-4.sql */
SET search_path = 'pg_catalog';
CREATE FUNCTION assign_distributed_transaction_id(initiator_node_identifier int4, transaction_number int8, transaction_stamp timestamptz)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$assign_distributed_transaction_id$$;
COMMENT ON FUNCTION assign_distributed_transaction_id(initiator_node_identifier int4, transaction_number int8, transaction_stamp timestamptz)
IS 'Only intended for internal use, users should not call this. The function sets the distributed transaction id';
CREATE OR REPLACE FUNCTION get_current_transaction_id(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
RETURNS RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$get_current_transaction_id$$;
COMMENT ON FUNCTION get_current_transaction_id(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
IS 'returns the current backend data including distributed transaction id';
CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$get_all_active_transactions$$;
COMMENT ON FUNCTION get_all_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
IS 'returns distributed transaction ids of active distributed transactions';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.0-3'
default_version = '7.0-4'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -337,10 +337,11 @@ ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
/*
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
* SendRemoteCommandParams is a PQsendQueryParams wrapper that logs remote commands,
* and accepts a MultiConnection instead of a plain PGconn. It makes sure it can
* send commands asynchronously without blocking (at the potential expense of
* an additional memory allocation).
* an additional memory allocation). The command string can only include a single
* command since PQsendQueryParams() supports only that.
*/
int
SendRemoteCommandParams(MultiConnection *connection, const char *command,
@ -372,14 +373,33 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command,
/*
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
* send commands asynchronously without blocking (at the potential expense of
* an additional memory allocation).
* an additional memory allocation). The command string can include multiple
* commands since PQsendQuery() supports that.
*/
int
SendRemoteCommand(MultiConnection *connection, const char *command)
{
return SendRemoteCommandParams(connection, command, 0, NULL, NULL);
PGconn *pgConn = connection->pgConn;
int rc = 0;
LogRemoteCommand(connection, command);
/*
* Don't try to send command if connection is entirely gone
* (PQisnonblocking() would crash).
*/
if (!pgConn)
{
return 0;
}
Assert(PQisnonblocking(pgConn));
rc = PQsendQuery(pgConn, command);
return rc;
}

View File

@ -19,6 +19,7 @@
#include "citus_version.h"
#include "commands/explain.h"
#include "executor/executor.h"
#include "distributed/backend_data.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/connection_management.h"
#include "distributed/connection_management.h"
@ -173,6 +174,7 @@ _PG_init(void)
/* initialize coordinated transaction management */
InitializeTransactionManagement();
InitializeBackendManagement();
InitializeConnectionManagement();
InitPlacementConnectionManagement();
@ -197,6 +199,7 @@ void
StartupCitusBackend(void)
{
InitializeMaintenanceDaemonBackend();
InitializeBackendData();
}

View File

@ -0,0 +1,489 @@
/*-------------------------------------------------------------------------
*
* backend_data.c
*
* Infrastructure for managing per backend data that can efficiently
* accessed by all sessions.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "datatype/timestamp.h"
#include "distributed/backend_data.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/transaction_identifier.h"
#include "nodes/execnodes.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/spin.h"
#include "storage/s_lock.h"
#include "utils/timestamp.h"
/*
* Each backend's data reside in the shared memory
* on the BackendManagementShmemData.
*/
typedef struct BackendManagementShmemData
{
int trancheId;
#if (PG_VERSION_NUM >= 100000)
NamedLWLockTranche namedLockTranche;
#else
LWLockTranche lockTranche;
#endif
LWLock lock;
/*
* We prefer to use an atomic integer over sequences for two
* reasons (i) orders of magnitude performance difference
* (ii) allowing read-only replicas to be able to generate ids
*/
pg_atomic_uint64 nextTransactionNumber;
BackendData backends[FLEXIBLE_ARRAY_MEMBER];
} BackendManagementShmemData;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static BackendManagementShmemData *backendManagementShmemData = NULL;
static BackendData *MyBackendData = NULL;
static void BackendManagementShmemInit(void);
static size_t BackendManagementShmemSize(void);
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
PG_FUNCTION_INFO_V1(get_current_transaction_id);
PG_FUNCTION_INFO_V1(get_all_active_transactions);
/*
* assign_distributed_transaction_id updates the shared memory allocated for this backend
* and sets initiatorNodeIdentifier, transactionNumber, timestamp fields with the given
* inputs. Also, the function sets the database id and process id via the information that
* Postgres provides.
*
* This function is only intended for internal use for managing distributed transactions.
* Users should not use this function for any purpose.
*/
Datum
assign_distributed_transaction_id(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/* MyBackendData should always be avaliable, just out of paranoia */
if (!MyBackendData)
{
ereport(ERROR, (errmsg("backend is not ready for distributed transactions")));
}
SpinLockAcquire(&MyBackendData->mutex);
/* if an id is already assigned, release the lock and error */
if (MyBackendData->transactionId.initiatorNodeIdentifier != 0)
{
SpinLockRelease(&MyBackendData->mutex);
ereport(ERROR, (errmsg("the backend has already been assigned a "
"transaction id")));
}
MyBackendData->databaseId = MyDatabaseId;
MyBackendData->transactionId.initiatorNodeIdentifier = PG_GETARG_INT32(0);
MyBackendData->transactionId.transactionNumber = PG_GETARG_INT64(1);
MyBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2);
SpinLockRelease(&MyBackendData->mutex);
PG_RETURN_VOID();
}
/*
* get_current_transaction_id returns a tuple with (databaseId, processId,
* initiatorNodeIdentifier, transactionNumber, timestamp) that exists in the
* shared memory associated with this backend. Note that if the backend
* is not in a transaction, the function returns uninitialized data where
* transactionNumber equals to 0.
*/
Datum
get_current_transaction_id(PG_FUNCTION_ARGS)
{
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
const int attributeCount = 5;
Datum values[attributeCount];
bool isNulls[attributeCount];
DistributedTransactionId *distributedTransctionId = NULL;
CheckCitusVersion(ERROR);
/* build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
/* MyBackendData should always be avaliable, just out of paranoia */
if (!MyBackendData)
{
ereport(ERROR, (errmsg("backend is not ready for distributed transactions")));
}
distributedTransctionId = GetCurrentDistributedTransctionId();
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
/* first two fields do not change for this backend, so get directly */
values[0] = ObjectIdGetDatum(MyDatabaseId);
values[1] = Int32GetDatum(MyProcPid);
values[2] = Int32GetDatum(distributedTransctionId->initiatorNodeIdentifier);
values[3] = UInt64GetDatum(distributedTransctionId->transactionNumber);
/* provide a better output */
if (distributedTransctionId->initiatorNodeIdentifier != 0)
{
values[4] = TimestampTzGetDatum(distributedTransctionId->timestamp);
}
else
{
isNulls[4] = true;
}
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
PG_RETURN_DATUM(HeapTupleGetDatum(heapTuple));
}
/*
* get_all_active_transactions returns all the avaliable information about all
* the active backends.
*/
Datum
get_all_active_transactions(PG_FUNCTION_ARGS)
{
ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL;
MemoryContext perQueryContext = NULL;
MemoryContext oldContext = NULL;
int backendIndex = 0;
const int attributeCount = 5;
Datum values[attributeCount];
bool isNulls[attributeCount];
CheckCitusVersion(ERROR);
/* check to see if caller supports us returning a tuplestore */
if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context " \
"that cannot accept a set")));
}
if (!(returnSetInfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
/* build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory;
oldContext = MemoryContextSwitchTo(perQueryContext);
tupleStore = tuplestore_begin_heap(true, false, work_mem);
returnSetInfo->returnMode = SFRM_Materialize;
returnSetInfo->setResult = tupleStore;
returnSetInfo->setDesc = tupleDescriptor;
MemoryContextSwitchTo(oldContext);
/*
* We don't want to initialize memory while spinlock is held so we
* prefer to do it here. This initialization is done only for the first
* row.
*/
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
/* we're reading all the backend data, take a lock to prevent concurrent additions */
LWLockAcquire(AddinShmemInitLock, LW_SHARED);
for (backendIndex = 0; backendIndex < MaxBackends; ++backendIndex)
{
BackendData *currentBackend =
&backendManagementShmemData->backends[backendIndex];
SpinLockAcquire(&currentBackend->mutex);
/* we're only interested in active backends */
if (currentBackend->transactionId.transactionNumber == 0)
{
SpinLockRelease(&currentBackend->mutex);
continue;
}
values[0] = ObjectIdGetDatum(currentBackend->databaseId);
values[1] = Int32GetDatum(ProcGlobal->allProcs[backendIndex].pid);
values[2] = Int32GetDatum(currentBackend->transactionId.initiatorNodeIdentifier);
values[3] = UInt64GetDatum(currentBackend->transactionId.transactionNumber);
values[4] = TimestampTzGetDatum(currentBackend->transactionId.timestamp);
SpinLockRelease(&currentBackend->mutex);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
/*
* We don't want to initialize memory while spinlock is held so we
* prefer to do it here. This initialization is done for the rows
* starting from the second one.
*/
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
}
LWLockRelease(AddinShmemInitLock);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID();
}
/*
* InitializeBackendManagement requests the necessary shared memory
* from Postgres and sets up the shared memory startup hook.
*/
void
InitializeBackendManagement(void)
{
/* allocate shared memory */
RequestAddinShmemSpace(BackendManagementShmemSize());
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = BackendManagementShmemInit;
}
/*
* BackendManagementShmemInit is the callback that is to be called on shared
* memory startup hook. The function sets up the necessary shared memory
* segment for the backend manager.
*/
static void
BackendManagementShmemInit(void)
{
bool alreadyInitialized = false;
/* we may update the shmem, acquire lock exclusively */
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
backendManagementShmemData =
(BackendManagementShmemData *) ShmemInitStruct(
"Backend Management Shmem",
BackendManagementShmemSize(),
&alreadyInitialized);
if (!alreadyInitialized)
{
int backendIndex = 0;
char *trancheName = "Backend Management Tranche";
#if (PG_VERSION_NUM >= 100000)
NamedLWLockTranche *namedLockTranche =
&backendManagementShmemData->namedLockTranche;
#else
LWLockTranche *lockTranche = &backendManagementShmemData->lockTranche;
#endif
/* start by zeroing out all the memory */
memset(backendManagementShmemData, 0,
BackendManagementShmemSize());
#if (PG_VERSION_NUM >= 100000)
namedLockTranche->trancheId = LWLockNewTrancheId();
LWLockRegisterTranche(namedLockTranche->trancheId, trancheName);
LWLockInitialize(&backendManagementShmemData->lock,
namedLockTranche->trancheId);
#else
backendManagementShmemData->trancheId = LWLockNewTrancheId();
/* we only need a single lock */
lockTranche->array_base = &backendManagementShmemData->lock;
lockTranche->array_stride = sizeof(LWLock);
lockTranche->name = trancheName;
LWLockRegisterTranche(backendManagementShmemData->trancheId, lockTranche);
LWLockInitialize(&backendManagementShmemData->lock,
backendManagementShmemData->trancheId);
#endif
/* start the distributed transaction ids from 1 */
pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1);
/*
* We need to init per backend's spinlock before any backend
* starts its execution.
*/
for (backendIndex = 0; backendIndex < MaxBackends; ++backendIndex)
{
SpinLockInit(&backendManagementShmemData->backends[backendIndex].mutex);
}
}
LWLockRelease(AddinShmemInitLock);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
/*
* BackendManagementShmemSize returns the size that should be allocated
* on the shared memory for backend management.
*/
static size_t
BackendManagementShmemSize(void)
{
Size size = 0;
size = add_size(size, sizeof(BackendManagementShmemData));
size = add_size(size, mul_size(sizeof(BackendData), MaxBackends));
return size;
}
/*
* InitializeBackendData is called per backend and does the
* required initialization.
*/
void
InitializeBackendData(void)
{
MyBackendData = &backendManagementShmemData->backends[MyProc->pgprocno];
Assert(MyBackendData);
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->databaseId = MyDatabaseId;
MyBackendData->transactionId.initiatorNodeIdentifier = 0;
MyBackendData->transactionId.transactionNumber = 0;
MyBackendData->transactionId.timestamp = 0;
SpinLockRelease(&MyBackendData->mutex);
}
/*
* UnSetDistributedTransactionId simply acquires the mutex and resets the backend's
* distributed transaction data in shared memory to the initial values.
*/
void
UnSetDistributedTransactionId(void)
{
/* backend does not exist if the extension is not created */
if (MyBackendData)
{
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->databaseId = 0;
MyBackendData->transactionId.initiatorNodeIdentifier = 0;
MyBackendData->transactionId.transactionNumber = 0;
MyBackendData->transactionId.timestamp = 0;
SpinLockRelease(&MyBackendData->mutex);
}
}
/*
* GetCurrentDistributedTransctionId reads the backend's distributed transaction id and
* returns a copy of it.
*/
DistributedTransactionId *
GetCurrentDistributedTransctionId(void)
{
DistributedTransactionId *currentDistributedTransactionId =
(DistributedTransactionId *) palloc(sizeof(DistributedTransactionId));
SpinLockAcquire(&MyBackendData->mutex);
currentDistributedTransactionId->initiatorNodeIdentifier =
MyBackendData->transactionId.initiatorNodeIdentifier;
currentDistributedTransactionId->transactionNumber =
MyBackendData->transactionId.transactionNumber;
currentDistributedTransactionId->timestamp =
MyBackendData->transactionId.timestamp;
SpinLockRelease(&MyBackendData->mutex);
return currentDistributedTransactionId;
}
/*
* AssignDistributedTransactionId generates a new distributed transaction id and
* sets it for the current backend. It also sets the databaseId and
* processId fields.
*
* This function should only be called on BeginCoordinatedTransaction(). Any other
* callers is very likely to break the distributed transction management.
*/
void
AssignDistributedTransactionId(void)
{
pg_atomic_uint64 *transactionNumberSequence =
&backendManagementShmemData->nextTransactionNumber;
uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1);
int localGroupId = GetLocalGroupId();
TimestampTz currentTimestamp = GetCurrentTimestamp();
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->databaseId = MyDatabaseId;
MyBackendData->transactionId.initiatorNodeIdentifier = localGroupId;
MyBackendData->transactionId.transactionNumber =
nextTransactionNumber;
MyBackendData->transactionId.timestamp = currentTimestamp;
SpinLockRelease(&MyBackendData->mutex);
}

View File

@ -15,6 +15,7 @@
#include "miscadmin.h"
#include "access/xact.h"
#include "distributed/backend_data.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
@ -32,12 +33,16 @@ static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool
/*
* StartRemoteTransactionBeging initiates beginning the remote transaction in
* a non-blocking manner.
* a non-blocking manner. The function sends "BEGIN" followed by
* assign_distributed_transaction_id() to assign the distributed transaction
* id on the remote node.
*/
void
StartRemoteTransactionBegin(struct MultiConnection *connection)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
DistributedTransactionId *distributedTransactionId = NULL;
Assert(transaction->transactionState == REMOTE_TRANS_INVALID);
@ -51,8 +56,23 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
* side might have been changed, and that would cause problematic
* behaviour.
*/
if (!SendRemoteCommand(connection,
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"))
appendStringInfoString(beginAndSetDistributedTransactionId,
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
/*
* Append BEGIN and assign_distributed_transaction_id() statements into a single command
* and send both in one step. The reason is purely performance, we don't want
* seperate roundtrips for these two statements.
*/
distributedTransactionId = GetCurrentDistributedTransctionId();
appendStringInfo(beginAndSetDistributedTransactionId,
"SELECT assign_distributed_transaction_id(%d, %ld, '%s')",
distributedTransactionId->initiatorNodeIdentifier,
distributedTransactionId->transactionNumber,
timestamptz_to_str(distributedTransactionId->timestamp));
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
{
ReportConnectionError(connection, WARNING);
MarkRemoteTransactionFailed(connection, true);

View File

@ -19,6 +19,7 @@
#include "access/twophase.h"
#include "access/xact.h"
#include "distributed/backend_data.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/multi_shard_transaction.h"
@ -73,6 +74,8 @@ BeginCoordinatedTransaction(void)
}
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
AssignDistributedTransactionId();
}
@ -168,6 +171,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions);
CoordinatedTransactionUses2PC = false;
UnSetDistributedTransactionId();
break;
}
@ -204,13 +209,19 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
dlist_init(&InProgressTransactions);
CoordinatedTransactionUses2PC = false;
subXactAbortAttempted = false;
UnSetDistributedTransactionId();
break;
}
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_PARALLEL_ABORT:
{
break;
}
case XACT_EVENT_PREPARE:
{
UnSetDistributedTransactionId();
break;
}

View File

@ -0,0 +1,39 @@
/*
* backend_data.h
*
* Data structure definition for managing backend data and related function
* declarations.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef BACKEND_DATA_H
#define BACKEND_DATA_H
#include "datatype/timestamp.h"
#include "distributed/transaction_identifier.h"
#include "nodes/pg_list.h"
#include "storage/s_lock.h"
/*
* Each backend's active distributed transaction information is tracked via
* BackendData in shared memory.
*/
typedef struct BackendData
{
Oid databaseId;
slock_t mutex;
DistributedTransactionId transactionId;
} BackendData;
extern void InitializeBackendManagement(void);
extern void InitializeBackendData(void);
extern void UnSetDistributedTransactionId(void);
extern void AssignDistributedTransactionId(void);
#endif /* BACKEND_DATA_H */

View File

@ -0,0 +1,39 @@
/*
* transaction_identifier.h
*
* Data structure for distributed transaction id and related function
* declarations.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef TRANSACTION_IDENTIFIER_H
#define TRANSACTION_IDENTIFIER_H
#include "datatype/timestamp.h"
/*
* Citus identifies a distributed transaction with a triplet consisting of
*
* - initiatorNodeIdentifier: A unique identifier of the node that initiated
* the distributed transaction
* - transactionNumber: A locally unique identifier assigned for the distributed
* transaction on the node that initiated the distributed transaction
* - timestamp: The current timestamp of distributed transaction initiation
*
*/
typedef struct DistributedTransactionId
{
int initiatorNodeIdentifier;
uint64 transactionNumber;
TimestampTz timestamp;
} DistributedTransactionId;
extern DistributedTransactionId * GetCurrentDistributedTransctionId(void);
#endif /* TRANSACTION_IDENTIFIER_H */

View File

@ -0,0 +1,112 @@
Parsed test spec with 4 sessions
starting permutation: s1-begin s1-assign-transaction-id s4-get-all-transactions s2-begin s2-assign-transaction-id s4-get-all-transactions s3-begin s3-assign-transaction-id s4-get-all-transactions s1-commit s4-get-all-transactions s2-commit s4-get-all-transactions s3-commit s4-get-all-transactions
step s1-begin:
BEGIN;
step s1-assign-transaction-id:
SELECT assign_distributed_transaction_id(1, 1, '2015-01-01 00:00:00+0');
assign_distributed_transaction_id
step s4-get-all-transactions:
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
initiator_node_identifiertransaction_numbertransaction_stamp
1 1 Wed Dec 31 16:00:00 2014 PST
step s2-begin:
BEGIN;
step s2-assign-transaction-id:
SELECT assign_distributed_transaction_id(2, 2, '2015-01-02 00:00:00+0');
assign_distributed_transaction_id
step s4-get-all-transactions:
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
initiator_node_identifiertransaction_numbertransaction_stamp
1 1 Wed Dec 31 16:00:00 2014 PST
2 2 Thu Jan 01 16:00:00 2015 PST
step s3-begin:
BEGIN;
step s3-assign-transaction-id:
SELECT assign_distributed_transaction_id(3, 3, '2015-01-03 00:00:00+0');
assign_distributed_transaction_id
step s4-get-all-transactions:
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
initiator_node_identifiertransaction_numbertransaction_stamp
1 1 Wed Dec 31 16:00:00 2014 PST
2 2 Thu Jan 01 16:00:00 2015 PST
3 3 Fri Jan 02 16:00:00 2015 PST
step s1-commit:
COMMIT;
step s4-get-all-transactions:
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
initiator_node_identifiertransaction_numbertransaction_stamp
2 2 Thu Jan 01 16:00:00 2015 PST
3 3 Fri Jan 02 16:00:00 2015 PST
step s2-commit:
COMMIT;
step s4-get-all-transactions:
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
initiator_node_identifiertransaction_numbertransaction_stamp
3 3 Fri Jan 02 16:00:00 2015 PST
step s3-commit:
COMMIT;
step s4-get-all-transactions:
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
initiator_node_identifiertransaction_numbertransaction_stamp
starting permutation: s1-create-table s1-begin s1-insert s1-get-current-transaction-id s2-get-first-worker-active-transactions
step s1-create-table:
-- some tests also use distributed table
CREATE TABLE distributed_transaction_id_table(some_value int, other_value int);
SET citus.shard_count TO 4;
SELECT create_distributed_table('distributed_transaction_id_table', 'some_value');
create_distributed_table
step s1-begin:
BEGIN;
step s1-insert:
INSERT INTO distributed_transaction_id_table VALUES (1, 1);
step s1-get-current-transaction-id:
SELECT row(initiator_node_identifier, transaction_number) FROM get_current_transaction_id();
row
(0,186)
step s2-get-first-worker-active-transactions:
SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number)
FROM
get_all_active_transactions();
')
WHERE nodeport = 57637;
;
nodename nodeport success result
localhost 57637 t (0,186)

View File

@ -0,0 +1,127 @@
--
-- MULTI_DISTRIBUTED_TRANSACTION_ID
--
-- Unit tests for distributed transaction id functionality
--
-- get the current transaction id, which should be uninitialized
-- note that we skip printing the databaseId, which might change
-- per run
-- set timezone to a specific value to prevent
-- different values on different servers
SET TIME ZONE 'PST8PDT';
-- should return uninitialized values if not in a transaction
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp
---------------------------+--------------------+-------------------
0 | 0 |
(1 row)
BEGIN;
-- we should still see the uninitialized values
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
---------------------------+--------------------+-------------------+----------
0 | 0 | | t
(1 row)
-- now assign a value
SELECT assign_distributed_transaction_id(50, 50, '2016-01-01 00:00:00+0');
assign_distributed_transaction_id
-----------------------------------
(1 row)
-- see the assigned value
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
---------------------------+--------------------+------------------------------+----------
50 | 50 | Thu Dec 31 16:00:00 2015 PST | t
(1 row)
-- a backend cannot be assigned another tx id if already assigned
SELECT assign_distributed_transaction_id(51, 51, '2017-01-01 00:00:00+0');
ERROR: the backend has already been assigned a transaction id
ROLLBACK;
-- since the transaction finished, we should see the uninitialized values
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
---------------------------+--------------------+-------------------+----------
0 | 0 | | t
(1 row)
-- also see that ROLLBACK (i.e., failures in the transaction) clears the shared memory
BEGIN;
-- we should still see the uninitialized values
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
---------------------------+--------------------+-------------------+----------
0 | 0 | | t
(1 row)
-- now assign a value
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
assign_distributed_transaction_id
-----------------------------------
(1 row)
SELECT 5 / 0;
ERROR: division by zero
COMMIT;
-- since the transaction errored, we should see the uninitialized values again
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
---------------------------+--------------------+-------------------+----------
0 | 0 | | t
(1 row)
-- we should also see that a new connection means an uninitialized transaction id
BEGIN;
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
assign_distributed_transaction_id
-----------------------------------
(1 row)
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
---------------------------+--------------------+------------------------------+----------
52 | 52 | Wed Dec 31 16:00:00 2014 PST | t
(1 row)
\c - - - :master_port
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
---------------------------+--------------------+-------------------+----------
0 | 0 | | t
(1 row)
-- now show that PREPARE resets the distributed transaction id
BEGIN;
SELECT assign_distributed_transaction_id(120, 120, '2015-01-01 00:00:00+0');
assign_distributed_transaction_id
-----------------------------------
(1 row)
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
---------------------------+--------------------+------------------------------+----------
120 | 120 | Wed Dec 31 16:00:00 2014 PST | t
(1 row)
PREPARE TRANSACTION 'dist_xact_id_test';
-- after the prepare we should see that transaction id is cleared
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
---------------------------+--------------------+-------------------+----------
0 | 0 | | t
(1 row)
-- cleanup the transaction
ROLLBACK PREPARED 'dist_xact_id_test';
-- set back to the original zone
SET TIME ZONE DEFAULT;

View File

@ -113,6 +113,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-4';
ALTER EXTENSION citus UPDATE TO '7.0-1';
ALTER EXTENSION citus UPDATE TO '7.0-2';
ALTER EXTENSION citus UPDATE TO '7.0-3';
ALTER EXTENSION citus UPDATE TO '7.0-4';
-- show running version
SHOW citus.version;
citus.version

View File

@ -8,5 +8,5 @@ test: isolation_cluster_management
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation
test: isolation_concurrent_dml isolation_data_migration
test: isolation_drop_shards isolation_copy_placement_vs_modification
test: isolation_insert_vs_vacuum
test: isolation_distributed_transaction_id

View File

@ -44,7 +44,7 @@ test: multi_insert_select
# ----------
# Miscellaneous tests to check our query planning behavior
# ----------
test: multi_deparse_shard_query
test: multi_deparse_shard_query multi_distributed_transaction_id
test: multi_basic_queries multi_complex_expressions
test: multi_explain
test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics

View File

@ -0,0 +1,107 @@
# Tests around distributed transaction id generation
setup
{
SET TIME ZONE 'PST8PDT';
}
teardown
{
SET TIME ZONE DEFAULT;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-assign-transaction-id"
{
SELECT assign_distributed_transaction_id(1, 1, '2015-01-01 00:00:00+0');
}
step "s1-commit"
{
COMMIT;
}
step "s1-create-table"
{
-- some tests also use distributed table
CREATE TABLE distributed_transaction_id_table(some_value int, other_value int);
SET citus.shard_count TO 4;
SELECT create_distributed_table('distributed_transaction_id_table', 'some_value');
}
step "s1-insert"
{
INSERT INTO distributed_transaction_id_table VALUES (1, 1);
}
step "s1-get-current-transaction-id"
{
SELECT row(initiator_node_identifier, transaction_number) FROM get_current_transaction_id();
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-assign-transaction-id"
{
SELECT assign_distributed_transaction_id(2, 2, '2015-01-02 00:00:00+0');
}
step "s2-commit"
{
COMMIT;
}
# print only the necessary parts to prevent concurrent runs to print different values
step "s2-get-first-worker-active-transactions"
{
SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number)
FROM
get_all_active_transactions();
')
WHERE nodeport = 57637;
;
}
session "s3"
step "s3-begin"
{
BEGIN;
}
step "s3-assign-transaction-id"
{
SELECT assign_distributed_transaction_id(3, 3, '2015-01-03 00:00:00+0');
}
step "s3-commit"
{
COMMIT;
}
session "s4"
step "s4-get-all-transactions"
{
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
}
# show that we could get all distributed transaction ids from seperate sessions
permutation "s1-begin" "s1-assign-transaction-id" "s4-get-all-transactions" "s2-begin" "s2-assign-transaction-id" "s4-get-all-transactions" "s3-begin" "s3-assign-transaction-id" "s4-get-all-transactions" "s1-commit" "s4-get-all-transactions" "s2-commit" "s4-get-all-transactions" "s3-commit" "s4-get-all-transactions"
# now show that distributed transaction id on the coordinator
# is the same with the one on the worker
permutation "s1-create-table" "s1-begin" "s1-insert" "s1-get-current-transaction-id" "s2-get-first-worker-active-transactions"

View File

@ -0,0 +1,81 @@
--
-- MULTI_DISTRIBUTED_TRANSACTION_ID
--
-- Unit tests for distributed transaction id functionality
--
-- get the current transaction id, which should be uninitialized
-- note that we skip printing the databaseId, which might change
-- per run
-- set timezone to a specific value to prevent
-- different values on different servers
SET TIME ZONE 'PST8PDT';
-- should return uninitialized values if not in a transaction
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id();
BEGIN;
-- we should still see the uninitialized values
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
-- now assign a value
SELECT assign_distributed_transaction_id(50, 50, '2016-01-01 00:00:00+0');
-- see the assigned value
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
-- a backend cannot be assigned another tx id if already assigned
SELECT assign_distributed_transaction_id(51, 51, '2017-01-01 00:00:00+0');
ROLLBACK;
-- since the transaction finished, we should see the uninitialized values
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
-- also see that ROLLBACK (i.e., failures in the transaction) clears the shared memory
BEGIN;
-- we should still see the uninitialized values
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
-- now assign a value
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
SELECT 5 / 0;
COMMIT;
-- since the transaction errored, we should see the uninitialized values again
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
-- we should also see that a new connection means an uninitialized transaction id
BEGIN;
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
\c - - - :master_port
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
-- now show that PREPARE resets the distributed transaction id
BEGIN;
SELECT assign_distributed_transaction_id(120, 120, '2015-01-01 00:00:00+0');
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
PREPARE TRANSACTION 'dist_xact_id_test';
-- after the prepare we should see that transaction id is cleared
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
-- cleanup the transaction
ROLLBACK PREPARED 'dist_xact_id_test';
-- set back to the original zone
SET TIME ZONE DEFAULT;

View File

@ -113,6 +113,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-4';
ALTER EXTENSION citus UPDATE TO '7.0-1';
ALTER EXTENSION citus UPDATE TO '7.0-2';
ALTER EXTENSION citus UPDATE TO '7.0-3';
ALTER EXTENSION citus UPDATE TO '7.0-4';
-- show running version
SHOW citus.version;