mirror of https://github.com/citusdata/citus.git
Merge pull request #1489 from citusdata/add_distributed_transaction_id
Introduce distributed transaction idspull/1497/head
commit
b873e6fcc8
|
@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||||
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
|
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
|
||||||
6.2-1 6.2-2 6.2-3 6.2-4 \
|
6.2-1 6.2-2 6.2-3 6.2-4 \
|
||||||
7.0-1 7.0-2 7.0-3
|
7.0-1 7.0-2 7.0-3 7.0-4
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -145,6 +145,8 @@ $(EXTENSION)--7.0-2.sql: $(EXTENSION)--7.0-1.sql $(EXTENSION)--7.0-1--7.0-2.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--7.0-3.sql: $(EXTENSION)--7.0-2.sql $(EXTENSION)--7.0-2--7.0-3.sql
|
$(EXTENSION)--7.0-3.sql: $(EXTENSION)--7.0-2.sql $(EXTENSION)--7.0-2--7.0-3.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--7.0-4.sql: $(EXTENSION)--7.0-3.sql $(EXTENSION)--7.0-3--7.0-4.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
/* citus--7.0-3--7.0-4.sql */
|
||||||
|
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
CREATE FUNCTION assign_distributed_transaction_id(initiator_node_identifier int4, transaction_number int8, transaction_stamp timestamptz)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME',$$assign_distributed_transaction_id$$;
|
||||||
|
COMMENT ON FUNCTION assign_distributed_transaction_id(initiator_node_identifier int4, transaction_number int8, transaction_stamp timestamptz)
|
||||||
|
IS 'Only intended for internal use, users should not call this. The function sets the distributed transaction id';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION get_current_transaction_id(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
|
||||||
|
RETURNS RECORD
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME',$$get_current_transaction_id$$;
|
||||||
|
COMMENT ON FUNCTION get_current_transaction_id(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
|
||||||
|
IS 'returns the current backend data including distributed transaction id';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$get_all_active_transactions$$;
|
||||||
|
COMMENT ON FUNCTION get_all_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
|
||||||
|
IS 'returns distributed transaction ids of active distributed transactions';
|
||||||
|
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '7.0-3'
|
default_version = '7.0-4'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -337,10 +337,11 @@ ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
|
* SendRemoteCommandParams is a PQsendQueryParams wrapper that logs remote commands,
|
||||||
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
|
* and accepts a MultiConnection instead of a plain PGconn. It makes sure it can
|
||||||
* send commands asynchronously without blocking (at the potential expense of
|
* send commands asynchronously without blocking (at the potential expense of
|
||||||
* an additional memory allocation).
|
* an additional memory allocation). The command string can only include a single
|
||||||
|
* command since PQsendQueryParams() supports only that.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
SendRemoteCommandParams(MultiConnection *connection, const char *command,
|
SendRemoteCommandParams(MultiConnection *connection, const char *command,
|
||||||
|
@ -374,12 +375,31 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command,
|
||||||
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
|
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
|
||||||
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
|
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
|
||||||
* send commands asynchronously without blocking (at the potential expense of
|
* send commands asynchronously without blocking (at the potential expense of
|
||||||
* an additional memory allocation).
|
* an additional memory allocation). The command string can include multiple
|
||||||
|
* commands since PQsendQuery() supports that.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
SendRemoteCommand(MultiConnection *connection, const char *command)
|
SendRemoteCommand(MultiConnection *connection, const char *command)
|
||||||
{
|
{
|
||||||
return SendRemoteCommandParams(connection, command, 0, NULL, NULL);
|
PGconn *pgConn = connection->pgConn;
|
||||||
|
int rc = 0;
|
||||||
|
|
||||||
|
LogRemoteCommand(connection, command);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Don't try to send command if connection is entirely gone
|
||||||
|
* (PQisnonblocking() would crash).
|
||||||
|
*/
|
||||||
|
if (!pgConn)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert(PQisnonblocking(pgConn));
|
||||||
|
|
||||||
|
rc = PQsendQuery(pgConn, command);
|
||||||
|
|
||||||
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "citus_version.h"
|
#include "citus_version.h"
|
||||||
#include "commands/explain.h"
|
#include "commands/explain.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
@ -173,6 +174,7 @@ _PG_init(void)
|
||||||
|
|
||||||
/* initialize coordinated transaction management */
|
/* initialize coordinated transaction management */
|
||||||
InitializeTransactionManagement();
|
InitializeTransactionManagement();
|
||||||
|
InitializeBackendManagement();
|
||||||
InitializeConnectionManagement();
|
InitializeConnectionManagement();
|
||||||
InitPlacementConnectionManagement();
|
InitPlacementConnectionManagement();
|
||||||
|
|
||||||
|
@ -197,6 +199,7 @@ void
|
||||||
StartupCitusBackend(void)
|
StartupCitusBackend(void)
|
||||||
{
|
{
|
||||||
InitializeMaintenanceDaemonBackend();
|
InitializeMaintenanceDaemonBackend();
|
||||||
|
InitializeBackendData();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,489 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* backend_data.c
|
||||||
|
*
|
||||||
|
* Infrastructure for managing per backend data that can efficiently
|
||||||
|
* accessed by all sessions.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2017, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "funcapi.h"
|
||||||
|
#include "access/htup_details.h"
|
||||||
|
#include "catalog/pg_type.h"
|
||||||
|
#include "datatype/timestamp.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/transaction_identifier.h"
|
||||||
|
#include "nodes/execnodes.h"
|
||||||
|
#include "storage/ipc.h"
|
||||||
|
#include "storage/lwlock.h"
|
||||||
|
#include "storage/proc.h"
|
||||||
|
#include "storage/spin.h"
|
||||||
|
#include "storage/s_lock.h"
|
||||||
|
#include "utils/timestamp.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Each backend's data reside in the shared memory
|
||||||
|
* on the BackendManagementShmemData.
|
||||||
|
*/
|
||||||
|
typedef struct BackendManagementShmemData
|
||||||
|
{
|
||||||
|
int trancheId;
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
NamedLWLockTranche namedLockTranche;
|
||||||
|
#else
|
||||||
|
LWLockTranche lockTranche;
|
||||||
|
#endif
|
||||||
|
LWLock lock;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We prefer to use an atomic integer over sequences for two
|
||||||
|
* reasons (i) orders of magnitude performance difference
|
||||||
|
* (ii) allowing read-only replicas to be able to generate ids
|
||||||
|
*/
|
||||||
|
pg_atomic_uint64 nextTransactionNumber;
|
||||||
|
|
||||||
|
BackendData backends[FLEXIBLE_ARRAY_MEMBER];
|
||||||
|
} BackendManagementShmemData;
|
||||||
|
|
||||||
|
|
||||||
|
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||||
|
static BackendManagementShmemData *backendManagementShmemData = NULL;
|
||||||
|
static BackendData *MyBackendData = NULL;
|
||||||
|
|
||||||
|
|
||||||
|
static void BackendManagementShmemInit(void);
|
||||||
|
static size_t BackendManagementShmemSize(void);
|
||||||
|
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
|
||||||
|
PG_FUNCTION_INFO_V1(get_current_transaction_id);
|
||||||
|
PG_FUNCTION_INFO_V1(get_all_active_transactions);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* assign_distributed_transaction_id updates the shared memory allocated for this backend
|
||||||
|
* and sets initiatorNodeIdentifier, transactionNumber, timestamp fields with the given
|
||||||
|
* inputs. Also, the function sets the database id and process id via the information that
|
||||||
|
* Postgres provides.
|
||||||
|
*
|
||||||
|
* This function is only intended for internal use for managing distributed transactions.
|
||||||
|
* Users should not use this function for any purpose.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
assign_distributed_transaction_id(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/* MyBackendData should always be avaliable, just out of paranoia */
|
||||||
|
if (!MyBackendData)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("backend is not ready for distributed transactions")));
|
||||||
|
}
|
||||||
|
|
||||||
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
/* if an id is already assigned, release the lock and error */
|
||||||
|
if (MyBackendData->transactionId.initiatorNodeIdentifier != 0)
|
||||||
|
{
|
||||||
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
ereport(ERROR, (errmsg("the backend has already been assigned a "
|
||||||
|
"transaction id")));
|
||||||
|
}
|
||||||
|
|
||||||
|
MyBackendData->databaseId = MyDatabaseId;
|
||||||
|
|
||||||
|
MyBackendData->transactionId.initiatorNodeIdentifier = PG_GETARG_INT32(0);
|
||||||
|
MyBackendData->transactionId.transactionNumber = PG_GETARG_INT64(1);
|
||||||
|
MyBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2);
|
||||||
|
|
||||||
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* get_current_transaction_id returns a tuple with (databaseId, processId,
|
||||||
|
* initiatorNodeIdentifier, transactionNumber, timestamp) that exists in the
|
||||||
|
* shared memory associated with this backend. Note that if the backend
|
||||||
|
* is not in a transaction, the function returns uninitialized data where
|
||||||
|
* transactionNumber equals to 0.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
get_current_transaction_id(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
|
||||||
|
const int attributeCount = 5;
|
||||||
|
Datum values[attributeCount];
|
||||||
|
bool isNulls[attributeCount];
|
||||||
|
|
||||||
|
DistributedTransactionId *distributedTransctionId = NULL;
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/* build a tuple descriptor for our result type */
|
||||||
|
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
|
||||||
|
{
|
||||||
|
elog(ERROR, "return type must be a row type");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* MyBackendData should always be avaliable, just out of paranoia */
|
||||||
|
if (!MyBackendData)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("backend is not ready for distributed transactions")));
|
||||||
|
}
|
||||||
|
|
||||||
|
distributedTransctionId = GetCurrentDistributedTransctionId();
|
||||||
|
|
||||||
|
memset(values, 0, sizeof(values));
|
||||||
|
memset(isNulls, false, sizeof(isNulls));
|
||||||
|
|
||||||
|
/* first two fields do not change for this backend, so get directly */
|
||||||
|
values[0] = ObjectIdGetDatum(MyDatabaseId);
|
||||||
|
values[1] = Int32GetDatum(MyProcPid);
|
||||||
|
|
||||||
|
values[2] = Int32GetDatum(distributedTransctionId->initiatorNodeIdentifier);
|
||||||
|
values[3] = UInt64GetDatum(distributedTransctionId->transactionNumber);
|
||||||
|
|
||||||
|
/* provide a better output */
|
||||||
|
if (distributedTransctionId->initiatorNodeIdentifier != 0)
|
||||||
|
{
|
||||||
|
values[4] = TimestampTzGetDatum(distributedTransctionId->timestamp);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
isNulls[4] = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||||
|
|
||||||
|
PG_RETURN_DATUM(HeapTupleGetDatum(heapTuple));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* get_all_active_transactions returns all the avaliable information about all
|
||||||
|
* the active backends.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
get_all_active_transactions(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
Tuplestorestate *tupleStore = NULL;
|
||||||
|
MemoryContext perQueryContext = NULL;
|
||||||
|
MemoryContext oldContext = NULL;
|
||||||
|
|
||||||
|
int backendIndex = 0;
|
||||||
|
|
||||||
|
const int attributeCount = 5;
|
||||||
|
Datum values[attributeCount];
|
||||||
|
bool isNulls[attributeCount];
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/* check to see if caller supports us returning a tuplestore */
|
||||||
|
if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo))
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("set-valued function called in context " \
|
||||||
|
"that cannot accept a set")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(returnSetInfo->allowedModes & SFRM_Materialize))
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("materialize mode required, but it is not " \
|
||||||
|
"allowed in this context")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* build a tuple descriptor for our result type */
|
||||||
|
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
|
||||||
|
{
|
||||||
|
elog(ERROR, "return type must be a row type");
|
||||||
|
}
|
||||||
|
|
||||||
|
perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory;
|
||||||
|
|
||||||
|
oldContext = MemoryContextSwitchTo(perQueryContext);
|
||||||
|
|
||||||
|
tupleStore = tuplestore_begin_heap(true, false, work_mem);
|
||||||
|
returnSetInfo->returnMode = SFRM_Materialize;
|
||||||
|
returnSetInfo->setResult = tupleStore;
|
||||||
|
returnSetInfo->setDesc = tupleDescriptor;
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't want to initialize memory while spinlock is held so we
|
||||||
|
* prefer to do it here. This initialization is done only for the first
|
||||||
|
* row.
|
||||||
|
*/
|
||||||
|
memset(values, 0, sizeof(values));
|
||||||
|
memset(isNulls, false, sizeof(isNulls));
|
||||||
|
|
||||||
|
/* we're reading all the backend data, take a lock to prevent concurrent additions */
|
||||||
|
LWLockAcquire(AddinShmemInitLock, LW_SHARED);
|
||||||
|
|
||||||
|
for (backendIndex = 0; backendIndex < MaxBackends; ++backendIndex)
|
||||||
|
{
|
||||||
|
BackendData *currentBackend =
|
||||||
|
&backendManagementShmemData->backends[backendIndex];
|
||||||
|
|
||||||
|
SpinLockAcquire(¤tBackend->mutex);
|
||||||
|
|
||||||
|
/* we're only interested in active backends */
|
||||||
|
if (currentBackend->transactionId.transactionNumber == 0)
|
||||||
|
{
|
||||||
|
SpinLockRelease(¤tBackend->mutex);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
values[0] = ObjectIdGetDatum(currentBackend->databaseId);
|
||||||
|
values[1] = Int32GetDatum(ProcGlobal->allProcs[backendIndex].pid);
|
||||||
|
values[2] = Int32GetDatum(currentBackend->transactionId.initiatorNodeIdentifier);
|
||||||
|
values[3] = UInt64GetDatum(currentBackend->transactionId.transactionNumber);
|
||||||
|
values[4] = TimestampTzGetDatum(currentBackend->transactionId.timestamp);
|
||||||
|
|
||||||
|
SpinLockRelease(¤tBackend->mutex);
|
||||||
|
|
||||||
|
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't want to initialize memory while spinlock is held so we
|
||||||
|
* prefer to do it here. This initialization is done for the rows
|
||||||
|
* starting from the second one.
|
||||||
|
*/
|
||||||
|
memset(values, 0, sizeof(values));
|
||||||
|
memset(isNulls, false, sizeof(isNulls));
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(AddinShmemInitLock);
|
||||||
|
|
||||||
|
/* clean up and return the tuplestore */
|
||||||
|
tuplestore_donestoring(tupleStore);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitializeBackendManagement requests the necessary shared memory
|
||||||
|
* from Postgres and sets up the shared memory startup hook.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InitializeBackendManagement(void)
|
||||||
|
{
|
||||||
|
/* allocate shared memory */
|
||||||
|
RequestAddinShmemSpace(BackendManagementShmemSize());
|
||||||
|
|
||||||
|
prev_shmem_startup_hook = shmem_startup_hook;
|
||||||
|
shmem_startup_hook = BackendManagementShmemInit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BackendManagementShmemInit is the callback that is to be called on shared
|
||||||
|
* memory startup hook. The function sets up the necessary shared memory
|
||||||
|
* segment for the backend manager.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
BackendManagementShmemInit(void)
|
||||||
|
{
|
||||||
|
bool alreadyInitialized = false;
|
||||||
|
|
||||||
|
/* we may update the shmem, acquire lock exclusively */
|
||||||
|
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
backendManagementShmemData =
|
||||||
|
(BackendManagementShmemData *) ShmemInitStruct(
|
||||||
|
"Backend Management Shmem",
|
||||||
|
BackendManagementShmemSize(),
|
||||||
|
&alreadyInitialized);
|
||||||
|
|
||||||
|
if (!alreadyInitialized)
|
||||||
|
{
|
||||||
|
int backendIndex = 0;
|
||||||
|
char *trancheName = "Backend Management Tranche";
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
NamedLWLockTranche *namedLockTranche =
|
||||||
|
&backendManagementShmemData->namedLockTranche;
|
||||||
|
|
||||||
|
#else
|
||||||
|
LWLockTranche *lockTranche = &backendManagementShmemData->lockTranche;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* start by zeroing out all the memory */
|
||||||
|
memset(backendManagementShmemData, 0,
|
||||||
|
BackendManagementShmemSize());
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
namedLockTranche->trancheId = LWLockNewTrancheId();
|
||||||
|
|
||||||
|
LWLockRegisterTranche(namedLockTranche->trancheId, trancheName);
|
||||||
|
LWLockInitialize(&backendManagementShmemData->lock,
|
||||||
|
namedLockTranche->trancheId);
|
||||||
|
#else
|
||||||
|
backendManagementShmemData->trancheId = LWLockNewTrancheId();
|
||||||
|
|
||||||
|
/* we only need a single lock */
|
||||||
|
lockTranche->array_base = &backendManagementShmemData->lock;
|
||||||
|
lockTranche->array_stride = sizeof(LWLock);
|
||||||
|
lockTranche->name = trancheName;
|
||||||
|
|
||||||
|
LWLockRegisterTranche(backendManagementShmemData->trancheId, lockTranche);
|
||||||
|
LWLockInitialize(&backendManagementShmemData->lock,
|
||||||
|
backendManagementShmemData->trancheId);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* start the distributed transaction ids from 1 */
|
||||||
|
pg_atomic_init_u64(&backendManagementShmemData->nextTransactionNumber, 1);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We need to init per backend's spinlock before any backend
|
||||||
|
* starts its execution.
|
||||||
|
*/
|
||||||
|
for (backendIndex = 0; backendIndex < MaxBackends; ++backendIndex)
|
||||||
|
{
|
||||||
|
SpinLockInit(&backendManagementShmemData->backends[backendIndex].mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(AddinShmemInitLock);
|
||||||
|
|
||||||
|
if (prev_shmem_startup_hook != NULL)
|
||||||
|
{
|
||||||
|
prev_shmem_startup_hook();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BackendManagementShmemSize returns the size that should be allocated
|
||||||
|
* on the shared memory for backend management.
|
||||||
|
*/
|
||||||
|
static size_t
|
||||||
|
BackendManagementShmemSize(void)
|
||||||
|
{
|
||||||
|
Size size = 0;
|
||||||
|
|
||||||
|
size = add_size(size, sizeof(BackendManagementShmemData));
|
||||||
|
size = add_size(size, mul_size(sizeof(BackendData), MaxBackends));
|
||||||
|
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitializeBackendData is called per backend and does the
|
||||||
|
* required initialization.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InitializeBackendData(void)
|
||||||
|
{
|
||||||
|
MyBackendData = &backendManagementShmemData->backends[MyProc->pgprocno];
|
||||||
|
|
||||||
|
Assert(MyBackendData);
|
||||||
|
|
||||||
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
MyBackendData->databaseId = MyDatabaseId;
|
||||||
|
MyBackendData->transactionId.initiatorNodeIdentifier = 0;
|
||||||
|
MyBackendData->transactionId.transactionNumber = 0;
|
||||||
|
MyBackendData->transactionId.timestamp = 0;
|
||||||
|
|
||||||
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UnSetDistributedTransactionId simply acquires the mutex and resets the backend's
|
||||||
|
* distributed transaction data in shared memory to the initial values.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
UnSetDistributedTransactionId(void)
|
||||||
|
{
|
||||||
|
/* backend does not exist if the extension is not created */
|
||||||
|
if (MyBackendData)
|
||||||
|
{
|
||||||
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
MyBackendData->databaseId = 0;
|
||||||
|
MyBackendData->transactionId.initiatorNodeIdentifier = 0;
|
||||||
|
MyBackendData->transactionId.transactionNumber = 0;
|
||||||
|
MyBackendData->transactionId.timestamp = 0;
|
||||||
|
|
||||||
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetCurrentDistributedTransctionId reads the backend's distributed transaction id and
|
||||||
|
* returns a copy of it.
|
||||||
|
*/
|
||||||
|
DistributedTransactionId *
|
||||||
|
GetCurrentDistributedTransctionId(void)
|
||||||
|
{
|
||||||
|
DistributedTransactionId *currentDistributedTransactionId =
|
||||||
|
(DistributedTransactionId *) palloc(sizeof(DistributedTransactionId));
|
||||||
|
|
||||||
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
currentDistributedTransactionId->initiatorNodeIdentifier =
|
||||||
|
MyBackendData->transactionId.initiatorNodeIdentifier;
|
||||||
|
currentDistributedTransactionId->transactionNumber =
|
||||||
|
MyBackendData->transactionId.transactionNumber;
|
||||||
|
currentDistributedTransactionId->timestamp =
|
||||||
|
MyBackendData->transactionId.timestamp;
|
||||||
|
|
||||||
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
return currentDistributedTransactionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AssignDistributedTransactionId generates a new distributed transaction id and
|
||||||
|
* sets it for the current backend. It also sets the databaseId and
|
||||||
|
* processId fields.
|
||||||
|
*
|
||||||
|
* This function should only be called on BeginCoordinatedTransaction(). Any other
|
||||||
|
* callers is very likely to break the distributed transction management.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
AssignDistributedTransactionId(void)
|
||||||
|
{
|
||||||
|
pg_atomic_uint64 *transactionNumberSequence =
|
||||||
|
&backendManagementShmemData->nextTransactionNumber;
|
||||||
|
|
||||||
|
uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1);
|
||||||
|
int localGroupId = GetLocalGroupId();
|
||||||
|
TimestampTz currentTimestamp = GetCurrentTimestamp();
|
||||||
|
|
||||||
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
|
MyBackendData->databaseId = MyDatabaseId;
|
||||||
|
|
||||||
|
MyBackendData->transactionId.initiatorNodeIdentifier = localGroupId;
|
||||||
|
MyBackendData->transactionId.transactionNumber =
|
||||||
|
nextTransactionNumber;
|
||||||
|
MyBackendData->transactionId.timestamp = currentTimestamp;
|
||||||
|
|
||||||
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
}
|
|
@ -15,6 +15,7 @@
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
@ -32,12 +33,16 @@ static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* StartRemoteTransactionBeging initiates beginning the remote transaction in
|
* StartRemoteTransactionBeging initiates beginning the remote transaction in
|
||||||
* a non-blocking manner.
|
* a non-blocking manner. The function sends "BEGIN" followed by
|
||||||
|
* assign_distributed_transaction_id() to assign the distributed transaction
|
||||||
|
* id on the remote node.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
StartRemoteTransactionBegin(struct MultiConnection *connection)
|
StartRemoteTransactionBegin(struct MultiConnection *connection)
|
||||||
{
|
{
|
||||||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
|
||||||
|
DistributedTransactionId *distributedTransactionId = NULL;
|
||||||
|
|
||||||
Assert(transaction->transactionState == REMOTE_TRANS_INVALID);
|
Assert(transaction->transactionState == REMOTE_TRANS_INVALID);
|
||||||
|
|
||||||
|
@ -51,8 +56,23 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
|
||||||
* side might have been changed, and that would cause problematic
|
* side might have been changed, and that would cause problematic
|
||||||
* behaviour.
|
* behaviour.
|
||||||
*/
|
*/
|
||||||
if (!SendRemoteCommand(connection,
|
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||||
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"))
|
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Append BEGIN and assign_distributed_transaction_id() statements into a single command
|
||||||
|
* and send both in one step. The reason is purely performance, we don't want
|
||||||
|
* seperate roundtrips for these two statements.
|
||||||
|
*/
|
||||||
|
distributedTransactionId = GetCurrentDistributedTransctionId();
|
||||||
|
appendStringInfo(beginAndSetDistributedTransactionId,
|
||||||
|
"SELECT assign_distributed_transaction_id(%d, %ld, '%s')",
|
||||||
|
distributedTransactionId->initiatorNodeIdentifier,
|
||||||
|
distributedTransactionId->transactionNumber,
|
||||||
|
timestamptz_to_str(distributedTransactionId->timestamp));
|
||||||
|
|
||||||
|
|
||||||
|
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
|
||||||
{
|
{
|
||||||
ReportConnectionError(connection, WARNING);
|
ReportConnectionError(connection, WARNING);
|
||||||
MarkRemoteTransactionFailed(connection, true);
|
MarkRemoteTransactionFailed(connection, true);
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
#include "access/twophase.h"
|
#include "access/twophase.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/hash_helpers.h"
|
#include "distributed/hash_helpers.h"
|
||||||
#include "distributed/multi_shard_transaction.h"
|
#include "distributed/multi_shard_transaction.h"
|
||||||
|
@ -73,6 +74,8 @@ BeginCoordinatedTransaction(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
|
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
|
||||||
|
|
||||||
|
AssignDistributedTransactionId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -168,6 +171,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
CoordinatedTransactionUses2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
|
|
||||||
|
UnSetDistributedTransactionId();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,13 +209,19 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
CoordinatedTransactionUses2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
subXactAbortAttempted = false;
|
subXactAbortAttempted = false;
|
||||||
|
UnSetDistributedTransactionId();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case XACT_EVENT_PARALLEL_COMMIT:
|
case XACT_EVENT_PARALLEL_COMMIT:
|
||||||
case XACT_EVENT_PARALLEL_ABORT:
|
case XACT_EVENT_PARALLEL_ABORT:
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case XACT_EVENT_PREPARE:
|
case XACT_EVENT_PREPARE:
|
||||||
{
|
{
|
||||||
|
UnSetDistributedTransactionId();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* backend_data.h
|
||||||
|
*
|
||||||
|
* Data structure definition for managing backend data and related function
|
||||||
|
* declarations.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2017, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef BACKEND_DATA_H
|
||||||
|
#define BACKEND_DATA_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "datatype/timestamp.h"
|
||||||
|
#include "distributed/transaction_identifier.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
#include "storage/s_lock.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Each backend's active distributed transaction information is tracked via
|
||||||
|
* BackendData in shared memory.
|
||||||
|
*/
|
||||||
|
typedef struct BackendData
|
||||||
|
{
|
||||||
|
Oid databaseId;
|
||||||
|
slock_t mutex;
|
||||||
|
DistributedTransactionId transactionId;
|
||||||
|
} BackendData;
|
||||||
|
|
||||||
|
|
||||||
|
extern void InitializeBackendManagement(void);
|
||||||
|
extern void InitializeBackendData(void);
|
||||||
|
extern void UnSetDistributedTransactionId(void);
|
||||||
|
extern void AssignDistributedTransactionId(void);
|
||||||
|
|
||||||
|
#endif /* BACKEND_DATA_H */
|
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* transaction_identifier.h
|
||||||
|
*
|
||||||
|
* Data structure for distributed transaction id and related function
|
||||||
|
* declarations.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2017, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TRANSACTION_IDENTIFIER_H
|
||||||
|
#define TRANSACTION_IDENTIFIER_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "datatype/timestamp.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Citus identifies a distributed transaction with a triplet consisting of
|
||||||
|
*
|
||||||
|
* - initiatorNodeIdentifier: A unique identifier of the node that initiated
|
||||||
|
* the distributed transaction
|
||||||
|
* - transactionNumber: A locally unique identifier assigned for the distributed
|
||||||
|
* transaction on the node that initiated the distributed transaction
|
||||||
|
* - timestamp: The current timestamp of distributed transaction initiation
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
typedef struct DistributedTransactionId
|
||||||
|
{
|
||||||
|
int initiatorNodeIdentifier;
|
||||||
|
uint64 transactionNumber;
|
||||||
|
TimestampTz timestamp;
|
||||||
|
} DistributedTransactionId;
|
||||||
|
|
||||||
|
|
||||||
|
extern DistributedTransactionId * GetCurrentDistributedTransctionId(void);
|
||||||
|
|
||||||
|
#endif /* TRANSACTION_IDENTIFIER_H */
|
|
@ -0,0 +1,112 @@
|
||||||
|
Parsed test spec with 4 sessions
|
||||||
|
|
||||||
|
starting permutation: s1-begin s1-assign-transaction-id s4-get-all-transactions s2-begin s2-assign-transaction-id s4-get-all-transactions s3-begin s3-assign-transaction-id s4-get-all-transactions s1-commit s4-get-all-transactions s2-commit s4-get-all-transactions s3-commit s4-get-all-transactions
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-assign-transaction-id:
|
||||||
|
SELECT assign_distributed_transaction_id(1, 1, '2015-01-01 00:00:00+0');
|
||||||
|
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
|
||||||
|
|
||||||
|
step s4-get-all-transactions:
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
initiator_node_identifiertransaction_numbertransaction_stamp
|
||||||
|
|
||||||
|
1 1 Wed Dec 31 16:00:00 2014 PST
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-assign-transaction-id:
|
||||||
|
SELECT assign_distributed_transaction_id(2, 2, '2015-01-02 00:00:00+0');
|
||||||
|
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
|
||||||
|
|
||||||
|
step s4-get-all-transactions:
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
initiator_node_identifiertransaction_numbertransaction_stamp
|
||||||
|
|
||||||
|
1 1 Wed Dec 31 16:00:00 2014 PST
|
||||||
|
2 2 Thu Jan 01 16:00:00 2015 PST
|
||||||
|
step s3-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s3-assign-transaction-id:
|
||||||
|
SELECT assign_distributed_transaction_id(3, 3, '2015-01-03 00:00:00+0');
|
||||||
|
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
|
||||||
|
|
||||||
|
step s4-get-all-transactions:
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
initiator_node_identifiertransaction_numbertransaction_stamp
|
||||||
|
|
||||||
|
1 1 Wed Dec 31 16:00:00 2014 PST
|
||||||
|
2 2 Thu Jan 01 16:00:00 2015 PST
|
||||||
|
3 3 Fri Jan 02 16:00:00 2015 PST
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s4-get-all-transactions:
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
initiator_node_identifiertransaction_numbertransaction_stamp
|
||||||
|
|
||||||
|
2 2 Thu Jan 01 16:00:00 2015 PST
|
||||||
|
3 3 Fri Jan 02 16:00:00 2015 PST
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s4-get-all-transactions:
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
initiator_node_identifiertransaction_numbertransaction_stamp
|
||||||
|
|
||||||
|
3 3 Fri Jan 02 16:00:00 2015 PST
|
||||||
|
step s3-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s4-get-all-transactions:
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
|
||||||
|
|
||||||
|
initiator_node_identifiertransaction_numbertransaction_stamp
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-create-table s1-begin s1-insert s1-get-current-transaction-id s2-get-first-worker-active-transactions
|
||||||
|
step s1-create-table:
|
||||||
|
-- some tests also use distributed table
|
||||||
|
CREATE TABLE distributed_transaction_id_table(some_value int, other_value int);
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SELECT create_distributed_table('distributed_transaction_id_table', 'some_value');
|
||||||
|
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-insert:
|
||||||
|
INSERT INTO distributed_transaction_id_table VALUES (1, 1);
|
||||||
|
|
||||||
|
step s1-get-current-transaction-id:
|
||||||
|
SELECT row(initiator_node_identifier, transaction_number) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
row
|
||||||
|
|
||||||
|
(0,186)
|
||||||
|
step s2-get-first-worker-active-transactions:
|
||||||
|
SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number)
|
||||||
|
FROM
|
||||||
|
get_all_active_transactions();
|
||||||
|
')
|
||||||
|
WHERE nodeport = 57637;
|
||||||
|
;
|
||||||
|
|
||||||
|
nodename nodeport success result
|
||||||
|
|
||||||
|
localhost 57637 t (0,186)
|
|
@ -0,0 +1,127 @@
|
||||||
|
--
|
||||||
|
-- MULTI_DISTRIBUTED_TRANSACTION_ID
|
||||||
|
--
|
||||||
|
-- Unit tests for distributed transaction id functionality
|
||||||
|
--
|
||||||
|
-- get the current transaction id, which should be uninitialized
|
||||||
|
-- note that we skip printing the databaseId, which might change
|
||||||
|
-- per run
|
||||||
|
-- set timezone to a specific value to prevent
|
||||||
|
-- different values on different servers
|
||||||
|
SET TIME ZONE 'PST8PDT';
|
||||||
|
-- should return uninitialized values if not in a transaction
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp
|
||||||
|
---------------------------+--------------------+-------------------
|
||||||
|
0 | 0 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
-- we should still see the uninitialized values
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
||||||
|
---------------------------+--------------------+-------------------+----------
|
||||||
|
0 | 0 | | t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- now assign a value
|
||||||
|
SELECT assign_distributed_transaction_id(50, 50, '2016-01-01 00:00:00+0');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- see the assigned value
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
||||||
|
---------------------------+--------------------+------------------------------+----------
|
||||||
|
50 | 50 | Thu Dec 31 16:00:00 2015 PST | t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- a backend cannot be assigned another tx id if already assigned
|
||||||
|
SELECT assign_distributed_transaction_id(51, 51, '2017-01-01 00:00:00+0');
|
||||||
|
ERROR: the backend has already been assigned a transaction id
|
||||||
|
ROLLBACK;
|
||||||
|
-- since the transaction finished, we should see the uninitialized values
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
||||||
|
---------------------------+--------------------+-------------------+----------
|
||||||
|
0 | 0 | | t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- also see that ROLLBACK (i.e., failures in the transaction) clears the shared memory
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
-- we should still see the uninitialized values
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
||||||
|
---------------------------+--------------------+-------------------+----------
|
||||||
|
0 | 0 | | t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- now assign a value
|
||||||
|
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT 5 / 0;
|
||||||
|
ERROR: division by zero
|
||||||
|
COMMIT;
|
||||||
|
-- since the transaction errored, we should see the uninitialized values again
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
||||||
|
---------------------------+--------------------+-------------------+----------
|
||||||
|
0 | 0 | | t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- we should also see that a new connection means an uninitialized transaction id
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
||||||
|
---------------------------+--------------------+------------------------------+----------
|
||||||
|
52 | 52 | Wed Dec 31 16:00:00 2014 PST | t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
||||||
|
---------------------------+--------------------+-------------------+----------
|
||||||
|
0 | 0 | | t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- now show that PREPARE resets the distributed transaction id
|
||||||
|
BEGIN;
|
||||||
|
SELECT assign_distributed_transaction_id(120, 120, '2015-01-01 00:00:00+0');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
||||||
|
---------------------------+--------------------+------------------------------+----------
|
||||||
|
120 | 120 | Wed Dec 31 16:00:00 2014 PST | t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
PREPARE TRANSACTION 'dist_xact_id_test';
|
||||||
|
-- after the prepare we should see that transaction id is cleared
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
||||||
|
---------------------------+--------------------+-------------------+----------
|
||||||
|
0 | 0 | | t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cleanup the transaction
|
||||||
|
ROLLBACK PREPARED 'dist_xact_id_test';
|
||||||
|
-- set back to the original zone
|
||||||
|
SET TIME ZONE DEFAULT;
|
|
@ -113,6 +113,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-4';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.0-1';
|
ALTER EXTENSION citus UPDATE TO '7.0-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.0-2';
|
ALTER EXTENSION citus UPDATE TO '7.0-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.0-3';
|
ALTER EXTENSION citus UPDATE TO '7.0-3';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.0-4';
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
citus.version
|
citus.version
|
||||||
|
|
|
@ -8,5 +8,5 @@ test: isolation_cluster_management
|
||||||
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation
|
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation
|
||||||
test: isolation_concurrent_dml isolation_data_migration
|
test: isolation_concurrent_dml isolation_data_migration
|
||||||
test: isolation_drop_shards isolation_copy_placement_vs_modification
|
test: isolation_drop_shards isolation_copy_placement_vs_modification
|
||||||
|
|
||||||
test: isolation_insert_vs_vacuum
|
test: isolation_insert_vs_vacuum
|
||||||
|
test: isolation_distributed_transaction_id
|
||||||
|
|
|
@ -44,7 +44,7 @@ test: multi_insert_select
|
||||||
# ----------
|
# ----------
|
||||||
# Miscellaneous tests to check our query planning behavior
|
# Miscellaneous tests to check our query planning behavior
|
||||||
# ----------
|
# ----------
|
||||||
test: multi_deparse_shard_query
|
test: multi_deparse_shard_query multi_distributed_transaction_id
|
||||||
test: multi_basic_queries multi_complex_expressions
|
test: multi_basic_queries multi_complex_expressions
|
||||||
test: multi_explain
|
test: multi_explain
|
||||||
test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
||||||
|
|
|
@ -0,0 +1,107 @@
|
||||||
|
# Tests around distributed transaction id generation
|
||||||
|
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
SET TIME ZONE 'PST8PDT';
|
||||||
|
}
|
||||||
|
|
||||||
|
teardown
|
||||||
|
{
|
||||||
|
SET TIME ZONE DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s1"
|
||||||
|
|
||||||
|
step "s1-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-assign-transaction-id"
|
||||||
|
{
|
||||||
|
SELECT assign_distributed_transaction_id(1, 1, '2015-01-01 00:00:00+0');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-commit"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-create-table"
|
||||||
|
{
|
||||||
|
-- some tests also use distributed table
|
||||||
|
CREATE TABLE distributed_transaction_id_table(some_value int, other_value int);
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SELECT create_distributed_table('distributed_transaction_id_table', 'some_value');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-insert"
|
||||||
|
{
|
||||||
|
INSERT INTO distributed_transaction_id_table VALUES (1, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-get-current-transaction-id"
|
||||||
|
{
|
||||||
|
SELECT row(initiator_node_identifier, transaction_number) FROM get_current_transaction_id();
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s2"
|
||||||
|
|
||||||
|
step "s2-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-assign-transaction-id"
|
||||||
|
{
|
||||||
|
SELECT assign_distributed_transaction_id(2, 2, '2015-01-02 00:00:00+0');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-commit"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
# print only the necessary parts to prevent concurrent runs to print different values
|
||||||
|
step "s2-get-first-worker-active-transactions"
|
||||||
|
{
|
||||||
|
SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number)
|
||||||
|
FROM
|
||||||
|
get_all_active_transactions();
|
||||||
|
')
|
||||||
|
WHERE nodeport = 57637;
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s3"
|
||||||
|
|
||||||
|
step "s3-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s3-assign-transaction-id"
|
||||||
|
{
|
||||||
|
SELECT assign_distributed_transaction_id(3, 3, '2015-01-03 00:00:00+0');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s3-commit"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s4"
|
||||||
|
|
||||||
|
step "s4-get-all-transactions"
|
||||||
|
{
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3;
|
||||||
|
}
|
||||||
|
|
||||||
|
# show that we could get all distributed transaction ids from seperate sessions
|
||||||
|
permutation "s1-begin" "s1-assign-transaction-id" "s4-get-all-transactions" "s2-begin" "s2-assign-transaction-id" "s4-get-all-transactions" "s3-begin" "s3-assign-transaction-id" "s4-get-all-transactions" "s1-commit" "s4-get-all-transactions" "s2-commit" "s4-get-all-transactions" "s3-commit" "s4-get-all-transactions"
|
||||||
|
|
||||||
|
|
||||||
|
# now show that distributed transaction id on the coordinator
|
||||||
|
# is the same with the one on the worker
|
||||||
|
permutation "s1-create-table" "s1-begin" "s1-insert" "s1-get-current-transaction-id" "s2-get-first-worker-active-transactions"
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
--
|
||||||
|
-- MULTI_DISTRIBUTED_TRANSACTION_ID
|
||||||
|
--
|
||||||
|
-- Unit tests for distributed transaction id functionality
|
||||||
|
--
|
||||||
|
|
||||||
|
-- get the current transaction id, which should be uninitialized
|
||||||
|
-- note that we skip printing the databaseId, which might change
|
||||||
|
-- per run
|
||||||
|
|
||||||
|
-- set timezone to a specific value to prevent
|
||||||
|
-- different values on different servers
|
||||||
|
SET TIME ZONE 'PST8PDT';
|
||||||
|
|
||||||
|
-- should return uninitialized values if not in a transaction
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
-- we should still see the uninitialized values
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
-- now assign a value
|
||||||
|
SELECT assign_distributed_transaction_id(50, 50, '2016-01-01 00:00:00+0');
|
||||||
|
|
||||||
|
-- see the assigned value
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
-- a backend cannot be assigned another tx id if already assigned
|
||||||
|
SELECT assign_distributed_transaction_id(51, 51, '2017-01-01 00:00:00+0');
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- since the transaction finished, we should see the uninitialized values
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
|
||||||
|
-- also see that ROLLBACK (i.e., failures in the transaction) clears the shared memory
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
-- we should still see the uninitialized values
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
-- now assign a value
|
||||||
|
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
|
||||||
|
|
||||||
|
SELECT 5 / 0;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- since the transaction errored, we should see the uninitialized values again
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
|
||||||
|
-- we should also see that a new connection means an uninitialized transaction id
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
|
||||||
|
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
-- now show that PREPARE resets the distributed transaction id
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT assign_distributed_transaction_id(120, 120, '2015-01-01 00:00:00+0');
|
||||||
|
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
PREPARE TRANSACTION 'dist_xact_id_test';
|
||||||
|
|
||||||
|
-- after the prepare we should see that transaction id is cleared
|
||||||
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
||||||
|
|
||||||
|
-- cleanup the transaction
|
||||||
|
ROLLBACK PREPARED 'dist_xact_id_test';
|
||||||
|
|
||||||
|
-- set back to the original zone
|
||||||
|
SET TIME ZONE DEFAULT;
|
|
@ -113,6 +113,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-4';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.0-1';
|
ALTER EXTENSION citus UPDATE TO '7.0-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.0-2';
|
ALTER EXTENSION citus UPDATE TO '7.0-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.0-3';
|
ALTER EXTENSION citus UPDATE TO '7.0-3';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.0-4';
|
||||||
|
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
|
|
Loading…
Reference in New Issue