diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 4b62afc3b..21cd6a211 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1349,7 +1349,8 @@ LocalGroupIdUpdateCommand(int32 groupId) { StringInfo updateCommand = makeStringInfo(); - appendStringInfo(updateCommand, "UPDATE pg_dist_local_group SET groupid = %d", + appendStringInfo(updateCommand, + "UPDATE pg_dist_local_group SET groupid = %d, logical_clock_value = citus_get_cluster_clock()", groupId); return updateCommand->data; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 00b541968..0ed269b75 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -539,6 +539,7 @@ StartupCitusBackend(void) InitializeBackendData(); RegisterConnectionCleanup(); AssignGlobalPID(); + RegisterAndAdjustClockValue(); } @@ -916,6 +917,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_global_clock", + gettext_noop("Enables using citus_get_cluster_clock() for the cluster-wide " + "timestamp for transactions"), + NULL, + &EnableGlobalClock, + false, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_local_execution", gettext_noop("Enables queries on shards that are local to the current node " diff --git a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql index 374350d56..53702ca0f 100644 --- a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql @@ -6,3 +6,11 @@ DROP FUNCTION pg_catalog.worker_hash_partition_table(bigint, integer, text, text DROP FUNCTION pg_catalog.worker_merge_files_into_table(bigint, integer, text[], text[]); DROP FUNCTION pg_catalog.worker_range_partition_table(bigint, integer, text, text, oid, anyarray); DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint); + +ALTER TABLE pg_catalog.pg_dist_local_group ADD COLUMN logical_clock_value BIGINT NOT NULL DEFAULT 0; +UPDATE pg_catalog.pg_dist_local_group SET logical_clock_value = extract(epoch from now()) * 1000; + +#include "udfs/get_cluster_clock/11.1-1.sql" +#include "udfs/set_transaction_id_clock_value/11.1-1.sql" +#include "udfs/get_all_active_transactions/11.1-1.sql" +#include "udfs/get_global_active_transactions/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-2.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-2.sql index d03733bc7..1ac0e4408 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-2.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-2.sql @@ -45,3 +45,30 @@ CREATE FUNCTION pg_catalog.worker_repartition_cleanup(bigint) LANGUAGE c STRICT AS 'MODULE_PATHNAME', $function$worker_repartition_cleanup$function$ + +ALTER TABLE pg_catalog.pg_dist_local_group DROP COLUMN logical_clock_value; +DROP FUNCTION pg_catalog.get_cluster_clock(); +DROP FUNCTION citus_internal.set_transaction_id_clock_value(); + +DROP FUNCTION IF EXISTS pg_catalog.get_all_active_transactions(); +CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$get_all_active_transactions$$; + +COMMENT ON FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8) +IS 'returns transaction information for all Citus initiated transactions'; + +DROP FUNCTION IF EXISTS pg_catalog.get_global_active_transactions(); +CREATE OR REPLACE FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8) + RETURNS SETOF RECORD + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$get_global_active_transactions$$; +COMMENT ON FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8) + IS 'returns transaction information for all Citus initiated transactions from each node of the cluster'; diff --git a/src/backend/distributed/sql/udfs/get_all_active_transactions/11.1-1.sql b/src/backend/distributed/sql/udfs/get_all_active_transactions/11.1-1.sql new file mode 100644 index 000000000..9aa5aed5c --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_all_active_transactions/11.1-1.sql @@ -0,0 +1,11 @@ +DROP FUNCTION IF EXISTS pg_catalog.get_all_active_transactions(); +CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8, OUT transactionClock int8) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$get_all_active_transactions$$; +COMMENT ON FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8, OUT transactionClock int8) +IS 'returns transaction information for all Citus initiated transactions'; diff --git a/src/backend/distributed/sql/udfs/get_all_active_transactions/latest.sql b/src/backend/distributed/sql/udfs/get_all_active_transactions/latest.sql index 636abb0dd..9aa5aed5c 100644 --- a/src/backend/distributed/sql/udfs/get_all_active_transactions/latest.sql +++ b/src/backend/distributed/sql/udfs/get_all_active_transactions/latest.sql @@ -1,12 +1,11 @@ DROP FUNCTION IF EXISTS pg_catalog.get_all_active_transactions(); CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, - OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, - OUT global_pid int8) + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8, OUT transactionClock int8) RETURNS SETOF RECORD LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$get_all_active_transactions$$; - COMMENT ON FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, - OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, - OUT global_pid int8) + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8, OUT transactionClock int8) IS 'returns transaction information for all Citus initiated transactions'; diff --git a/src/backend/distributed/sql/udfs/get_cluster_clock/11.1-1.sql b/src/backend/distributed/sql/udfs/get_cluster_clock/11.1-1.sql new file mode 100644 index 000000000..4fb5109e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_cluster_clock/11.1-1.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_get_cluster_clock() + RETURNS BIGINT + LANGUAGE C STABLE PARALLEL SAFE STRICT + AS 'MODULE_PATHNAME', $$citus_get_cluster_clock$$; +COMMENT ON FUNCTION pg_catalog.citus_get_cluster_clock() + IS 'returns monotonically increasing logical clock value as close to epoch value (in milli seconds) possible, with the guarantee that it will never go back from its current value even after restart and crashes'; diff --git a/src/backend/distributed/sql/udfs/get_cluster_clock/latest.sql b/src/backend/distributed/sql/udfs/get_cluster_clock/latest.sql new file mode 100644 index 000000000..4fb5109e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_cluster_clock/latest.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_get_cluster_clock() + RETURNS BIGINT + LANGUAGE C STABLE PARALLEL SAFE STRICT + AS 'MODULE_PATHNAME', $$citus_get_cluster_clock$$; +COMMENT ON FUNCTION pg_catalog.citus_get_cluster_clock() + IS 'returns monotonically increasing logical clock value as close to epoch value (in milli seconds) possible, with the guarantee that it will never go back from its current value even after restart and crashes'; diff --git a/src/backend/distributed/sql/udfs/get_global_active_transactions/11.1-1.sql b/src/backend/distributed/sql/udfs/get_global_active_transactions/11.1-1.sql new file mode 100644 index 000000000..9f2fd4ea6 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_global_active_transactions/11.1-1.sql @@ -0,0 +1,9 @@ +DROP FUNCTION IF EXISTS pg_catalog.get_global_active_transactions(); +CREATE OR REPLACE FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8, OUT transactionClock int8) + RETURNS SETOF RECORD + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$get_global_active_transactions$$; +COMMENT ON FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8, OUT transactionClock int8) + IS 'returns transaction information for all Citus initiated transactions from each node of the cluster'; diff --git a/src/backend/distributed/sql/udfs/get_global_active_transactions/latest.sql b/src/backend/distributed/sql/udfs/get_global_active_transactions/latest.sql index c0831b521..9f2fd4ea6 100644 --- a/src/backend/distributed/sql/udfs/get_global_active_transactions/latest.sql +++ b/src/backend/distributed/sql/udfs/get_global_active_transactions/latest.sql @@ -1,9 +1,9 @@ DROP FUNCTION IF EXISTS pg_catalog.get_global_active_transactions(); CREATE OR REPLACE FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8) + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8, OUT transactionClock int8) RETURNS SETOF RECORD LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$get_global_active_transactions$$; COMMENT ON FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, - OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8) + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8, OUT transactionClock int8) IS 'returns transaction information for all Citus initiated transactions from each node of the cluster'; diff --git a/src/backend/distributed/sql/udfs/set_transaction_id_clock_value/11.1-1.sql b/src/backend/distributed/sql/udfs/set_transaction_id_clock_value/11.1-1.sql new file mode 100644 index 000000000..fdcc5bb2e --- /dev/null +++ b/src/backend/distributed/sql/udfs/set_transaction_id_clock_value/11.1-1.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION citus_internal.set_transaction_id_clock_value(bigint) + RETURNS VOID + LANGUAGE C STABLE PARALLEL SAFE STRICT + AS 'MODULE_PATHNAME', $$set_transaction_id_clock_value$$; +COMMENT ON FUNCTION citus_internal.set_transaction_id_clock_value(bigint) + IS 'Internal UDF to set the transaction clock value of the remote node(s) distributed transaction id'; diff --git a/src/backend/distributed/sql/udfs/set_transaction_id_clock_value/latest.sql b/src/backend/distributed/sql/udfs/set_transaction_id_clock_value/latest.sql new file mode 100644 index 000000000..fdcc5bb2e --- /dev/null +++ b/src/backend/distributed/sql/udfs/set_transaction_id_clock_value/latest.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION citus_internal.set_transaction_id_clock_value(bigint) + RETURNS VOID + LANGUAGE C STABLE PARALLEL SAFE STRICT + AS 'MODULE_PATHNAME', $$set_transaction_id_clock_value$$; +COMMENT ON FUNCTION citus_internal.set_transaction_id_clock_value(bigint) + IS 'Internal UDF to set the transaction clock value of the remote node(s) distributed transaction id'; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 925071c35..9011a614f 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -26,6 +26,7 @@ #include "datatype/timestamp.h" #include "distributed/backend_data.h" #include "distributed/connection_management.h" +#include "distributed/cluster_clock.h" #include "distributed/listutils.h" #include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" @@ -47,7 +48,7 @@ #define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();" -#define ACTIVE_TRANSACTION_COLUMN_COUNT 7 +#define ACTIVE_TRANSACTION_COLUMN_COUNT 8 #define GLOBAL_PID_NODE_ID_MULTIPLIER 10000000000 /* @@ -75,6 +76,10 @@ typedef struct BackendManagementShmemData */ pg_atomic_uint32 externalClientBackendCounter; + /* Logical clock value of this cluster */ + slock_t clockMutex; + uint64 clusterClockValue; + BackendData backends[FLEXIBLE_ARRAY_MEMBER]; } BackendManagementShmemData; @@ -317,6 +322,7 @@ get_global_active_transactions(PG_FUNCTION_ARGS) values[4] = ParseIntField(result, rowIndex, 4); values[5] = ParseTimestampTzField(result, rowIndex, 5); values[6] = ParseIntField(result, rowIndex, 6); + values[7] = ParseIntField(result, rowIndex, 7); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } @@ -409,6 +415,8 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto transactionNumber = currentBackend->transactionId.transactionNumber; TimestampTz transactionIdTimestamp = currentBackend->transactionId.timestamp; + uint64 transactionClockValue = + currentBackend->transactionId.transactionClockValue; SpinLockRelease(¤tBackend->mutex); @@ -437,6 +445,7 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto values[4] = UInt64GetDatum(transactionNumber); values[5] = TimestampTzGetDatum(transactionIdTimestamp); values[6] = UInt64GetDatum(currentBackend->globalPID); + values[7] = UInt64GetDatum(transactionClockValue); } else { @@ -447,6 +456,7 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto isNulls[4] = true; isNulls[5] = true; values[6] = UInt64GetDatum(currentBackend->globalPID); + values[7] = UInt64GetDatum(transactionClockValue); } tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); @@ -553,6 +563,10 @@ BackendManagementShmemInit(void) /* there are no active backends yet, so start with zero */ pg_atomic_init_u32(&backendManagementShmemData->externalClientBackendCounter, 0); + /* A zero value indicates that the clock is not adjusted yet */ + backendManagementShmemData->clusterClockValue = 0; + SpinLockInit(&backendManagementShmemData->clockMutex); + /* * We need to init per backend's spinlock before any backend * starts its execution. Note that we initialize TotalProcs (e.g., not @@ -1278,3 +1292,183 @@ DecrementExternalClientBackendCounter(void) { pg_atomic_sub_fetch_u32(&backendManagementShmemData->externalClientBackendCounter, 1); } + + +/* + * GetTransactionIdClockValue returns the clock value of the current + * distributed transaction. The caller must make sure a distributed + * transaction is in progress. + */ +uint64 +GetTransactionIdClockValue(void) +{ + Assert(MyBackendData != NULL); + + return MyBackendData->transactionId.transactionClockValue; +} + + +/* + * SetTransactionIdClockValue is used to set the transaction clock value + * in the current distributed transaction id. + */ +void +SetTransactionIdClockValue(uint64 transactionClockValue) +{ + if (!MyBackendData) + { + return; + } + + SpinLockAcquire(&MyBackendData->mutex); + MyBackendData->transactionId.transactionClockValue = transactionClockValue; + SpinLockRelease(&MyBackendData->mutex); + + ereport(DEBUG1, (errmsg("Set transaction clock %lu on node(%d)", + transactionClockValue, GetLocalNodeId()))); +} + + +/* + * GetCurrentClusterClockValue returns the current logical clock value. + */ +uint64 +GetCurrentClusterClockValue(void) +{ + SpinLockAcquire(&backendManagementShmemData->clockMutex); + uint64 epochValue = backendManagementShmemData->clusterClockValue; + SpinLockRelease(&backendManagementShmemData->clockMutex); + + return epochValue; +} + + +/* + * GetNextClusterClock implements the internal guts of the UDF citus_get_cluster_clock() + */ +uint64 +GetNextClusterClock(void) +{ + uint64 epochValue = GetEpochTimeMs(); + + SpinLockAcquire(&backendManagementShmemData->clockMutex); + + /* Check if the clock is adjusted after the boot */ + if (backendManagementShmemData->clusterClockValue == 0) + { + SpinLockRelease(&backendManagementShmemData->clockMutex); + ereport(ERROR, (errmsg("backend never adjusted the clock, please retry"))); + } + + uint64 nextClusterClockValue = backendManagementShmemData->clusterClockValue + 1; + + if (epochValue > nextClusterClockValue) + { + nextClusterClockValue = epochValue; + } + + backendManagementShmemData->clusterClockValue = nextClusterClockValue; + + SpinLockRelease(&backendManagementShmemData->clockMutex); + + return nextClusterClockValue; +} + + +/* + * AdjustLocalClockToGlobal sets the node's local logical cluster clock value + * to the distributed transaction clock value, which is the maximum clock value + * of all the participating nodes. + */ +void +AdjustLocalClockToGlobal(void) +{ + if (!EnableGlobalClock || !MyBackendData) + { + return; + } + + uint64 transactionClockValue = MyBackendData->transactionId.transactionClockValue; + + /* It's ok to check unprotected as the real assignment is done under protection */ + if (transactionClockValue <= backendManagementShmemData->clusterClockValue) + { + /* + * Clock moved and greater than the rest of the nodes, there is + * no need to adjust the clock. + */ + return; + } + + SpinLockAcquire(&backendManagementShmemData->clockMutex); + + backendManagementShmemData->clusterClockValue = + MyBackendData->transactionId.transactionClockValue; + + SpinLockRelease(&backendManagementShmemData->clockMutex); +} + + +/* + * RegisterAndAdjustClockValue registers a shared-memory exit callback, which + * will persist the current clock value from shared memory to catalog. Calls + * routine that initializes and adjusts the clock if needed. + */ +void +RegisterAndAdjustClockValue(void) +{ + static bool registeredSaveClock = false; + + /* Avoid repeated initialization */ + if (registeredSaveClock == true) + { + return; + } + + before_shmem_exit(PersistLocalClockValue, (Datum) 0); + AdjustAndInitializeClusterClock(); + registeredSaveClock = true; +} + + +/* + * AdjustAndInitializeClusterClock compares the current epoch value with + * the persisted clock value to see if the clock drifted backwards, and + * adjusts the logical clock accordingly. + * Ideally, this routine should be called only _once_ at the time of boot. + */ +void +AdjustAndInitializeClusterClock(void) +{ + uint64 epochPersistedValue; + + if (!GetLocalClockValue(&epochPersistedValue)) + { + /* couldn't get the value from catalog, might happened + * during pg upgrade */ + return; + } + + /* Ensure clock never drifts back */ + SpinLockAcquire(&backendManagementShmemData->clockMutex); + + /* Check if the clock was already set after the boot */ + if (backendManagementShmemData->clusterClockValue > 0) + { + /* Already initialized */ + SpinLockRelease(&backendManagementShmemData->clockMutex); + return; + } + + Assert(backendManagementShmemData->clusterClockValue == 0); + backendManagementShmemData->clusterClockValue = epochPersistedValue; + + SpinLockRelease(&backendManagementShmemData->clockMutex); + + uint64 epochValue = GetEpochTimeMs(); + if (epochPersistedValue > epochValue) + { + ereport(LOG, (errmsg("Drift in the epoch clock, adjusted " + "to persisted old value:(%ld)", epochPersistedValue))); + } +} diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 2859ec4c9..0cc73009d 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -17,6 +17,7 @@ #include "access/xact.h" #include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" +#include "distributed/cluster_clock.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -31,7 +32,7 @@ #include "utils/hsearch.h" -#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u" +#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u_"UINT64_FORMAT static char * AssignDistributedTransactionIdCommand(void); @@ -50,6 +51,7 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection static void Assign2PCIdentifier(MultiConnection *connection); +bool EnableGlobalClock = false; /* * StartRemoteTransactionBegin initiates beginning the remote transaction in @@ -793,14 +795,15 @@ CoordinatedRemoteTransactionsPrepare(void) { dlist_iter iter; List *connectionList = NIL; + MultiConnection *connection = NULL; /* issue PREPARE TRANSACTION; to all relevant remote nodes */ /* asynchronously send PREPARE */ dlist_foreach(iter, &InProgressTransactions) { - MultiConnection *connection = dlist_container(MultiConnection, transactionNode, - iter.cur); + connection = dlist_container(MultiConnection, transactionNode, + iter.cur); RemoteTransaction *transaction = &connection->remoteTransaction; Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED); @@ -818,19 +821,32 @@ CoordinatedRemoteTransactionsPrepare(void) */ if (ConnectionModifiedPlacement(connection)) { - StartRemoteTransactionPrepare(connection); connectionList = lappend(connectionList, connection); } } + /* + * If enabled, get the transaction clock value from all nodes and use the + * highest value. + */ + if (EnableGlobalClock) + { + SetTransactionClusterClock(connectionList); + } + + foreach_ptr(connection, connectionList) + { + StartRemoteTransactionPrepare(connection); + } + bool raiseInterrupts = true; WaitForAllConnections(connectionList, raiseInterrupts); /* Wait for result */ dlist_foreach(iter, &InProgressTransactions) { - MultiConnection *connection = dlist_container(MultiConnection, transactionNode, - iter.cur); + connection = dlist_container(MultiConnection, transactionNode, + iter.cur); RemoteTransaction *transaction = &connection->remoteTransaction; if (transaction->transactionState != REMOTE_TRANS_PREPARING) @@ -1321,7 +1337,7 @@ CheckRemoteTransactionsHealth(void) * * citus____ * - * (at most 5+1+10+1+10+1+20+1+10 = 59 characters, while limit is 64) + * (at most 5+1+10+1+10+1+20+1+10+1+20 = 80 characters, while limit is 128) * * The source group is used to distinguish 2PCs started by different * coordinators. A coordinator will only attempt to recover its own 2PCs. @@ -1335,6 +1351,9 @@ CheckRemoteTransactionsHealth(void) * The connection number is used to distinguish connections made to a node * within the same transaction. * + * The transactionClockValue can be used to track all the changes made + * within the same transaction. + * */ static void Assign2PCIdentifier(MultiConnection *connection) @@ -1345,10 +1364,14 @@ Assign2PCIdentifier(MultiConnection *connection) /* transaction identifier that is unique across processes */ uint64 transactionNumber = CurrentDistributedTransactionNumber(); + /* Cluster clock value */ + uint64 transactionClockValue = GetTransactionIdClockValue(); + /* print all numbers as unsigned to guarantee no minus symbols appear in the name */ - SafeSnprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, + SafeSnprintf(connection->remoteTransaction.preparedName, + PREPARED_TRANSACTION_NAME_LEN, PREPARED_TRANSACTION_NAME_FORMAT, GetLocalGroupId(), MyProcPid, - transactionNumber, connectionNumber++); + transactionNumber, connectionNumber++, transactionClockValue); } @@ -1362,7 +1385,8 @@ bool ParsePreparedTransactionName(char *preparedTransactionName, int32 *groupId, int *procId, uint64 *transactionNumber, - uint32 *connectionNumber) + uint32 *connectionNumber, + uint64 *transactionClockValue) { char *currentCharPointer = preparedTransactionName; @@ -1431,5 +1455,20 @@ ParsePreparedTransactionName(char *preparedTransactionName, return false; } + currentCharPointer = strchr(currentCharPointer, '_'); + if (currentCharPointer == NULL) + { + return false; + } + + /* step ahead of the current '_' character */ + ++currentCharPointer; + + *transactionClockValue = pg_strtouint64(currentCharPointer, NULL, 10); + if ((errno != 0) || (*transactionClockValue == ULLONG_MAX && errno == ERANGE)) + { + return false; + } + return true; } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0337411cb..1e55e1ee0 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,6 +21,7 @@ #include "access/xact.h" #include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" +#include "distributed/cluster_clock.h" #include "distributed/connection_management.h" #include "distributed/distributed_planner.h" #include "distributed/function_call_delegation.h" @@ -275,6 +276,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) { + /* Coordinator: Adjust local clock value to the global cluster value */ + AdjustLocalClockToGlobal(); + /* handles both already prepared and open transactions */ CoordinatedRemoteTransactionsCommit(); } @@ -285,6 +289,11 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetPlacementConnectionManagement(); AfterXactConnectionHandling(true); } + else + { + /* Worker(s): Adjust local clock value to the global cluster value */ + AdjustLocalClockToGlobal(); + } /* * Changes to catalog tables are now visible to the metadata sync diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 87809c7b5..ec440ff34 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -450,11 +450,13 @@ IsTransactionInProgress(HTAB *activeTransactionNumberSet, char *preparedTransact uint32 connectionNumber = 0; uint64 transactionNumber = 0; bool isTransactionInProgress = false; + uint64 transactionClockValue = 0; bool isValidName = ParsePreparedTransactionName(preparedTransactionName, &groupId, &procId, &transactionNumber, - &connectionNumber); + &connectionNumber, + &transactionClockValue); if (isValidName) { hash_search(activeTransactionNumberSet, &transactionNumber, HASH_FIND, diff --git a/src/backend/distributed/utils/cluster_clock.c b/src/backend/distributed/utils/cluster_clock.c new file mode 100644 index 000000000..cbe249986 --- /dev/null +++ b/src/backend/distributed/utils/cluster_clock.c @@ -0,0 +1,333 @@ +/* + * cluster_clock.c + * + * Core funtion defintions to implement cluster clock. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include +#include "access/genam.h" +#include "commands/extension.h" +#include "storage/spin.h" + +#include "distributed/cluster_clock.h" +#include "distributed/listutils.h" +#include "distributed/lock_graph.h" +#include "distributed/maintenanced.h" +#include "distributed/metadata_cache.h" +#include "distributed/pg_dist_local_group.h" +#include "distributed/remote_commands.h" + +PG_FUNCTION_INFO_V1(citus_get_cluster_clock); +PG_FUNCTION_INFO_V1(set_transaction_id_clock_value); + +static void UpdateClockCatalog(void); + + +/* + * GetEpochTimeMs returns the epoch value in milliseconds. + */ +uint64 +GetEpochTimeMs(void) +{ + struct timeval tp; + + gettimeofday(&tp, NULL); + + uint64 result = (uint64) (tp.tv_sec) * 1000; + result = result + (uint64) (tp.tv_usec) / 1000; + return result; +} + + +/* + * GetLocalClockValue returns logical_clock_value value stored in + * the Citus catalog pg_dist_local_group in given "savedValue" + * arg. + * + * Returns true on success, i.e.: finds a record in pg_dist_local_group, + * and false otherwise. + */ +bool +GetLocalClockValue(uint64 *savedValue) +{ + ScanKey scanKey = NULL; + int scanKeyCount = 0; + bool success; + + Relation pgDistLocalGroupId = table_open(DistLocalGroupIdRelationId(), + AccessShareLock); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistLocalGroupId, + InvalidOid, false, + NULL, scanKeyCount, scanKey); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistLocalGroupId); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + + if (HeapTupleIsValid(heapTuple)) + { + Datum datumArray[Natts_pg_dist_local_group]; + bool isNullArray[Natts_pg_dist_local_group]; + + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + + if (isNullArray[Anum_pg_dist_local_logical_clock_value - 1]) + { + ereport(ERROR, (errmsg("unexpected: got null clock value " + "stored in pg_dist_local_group"))); + } + + *savedValue = DatumGetInt64( + datumArray[Anum_pg_dist_local_logical_clock_value - 1]); + success = true; + } + else + { + /* + * Upgrade is happening. When upgrading postgres, pg_dist_local_group is + * temporarily empty before citus_finish_pg_upgrade() finishes execution. + */ + success = false; + } + + systable_endscan(scanDescriptor); + table_close(pgDistLocalGroupId, AccessShareLock); + + return success; +} + + +/* + * PersistLocalClockValue gets invoked in two places, one in Citus maintenance + * daemon, and when a ever a backend exits, but that still leaves a time + * window where the clock value used may not be current in the catalog. + * TBD: May be store it during Checkpoint too? + */ +void +PersistLocalClockValue(int code, Datum argUnused) +{ + /* It's a no-op if catalogs are not fully baked yet */ + if (IsInitProcessingMode() || creating_extension || + !EnableVersionChecks || !IsMaintainanceDaemonProcess()) + { + return; + } + + StartTransactionCommand(); + + if (LockCitusExtension() && CheckCitusVersion(LOG) && CitusHasBeenLoaded()) + { + UpdateClockCatalog(); + } + + CommitTransactionCommand(); +} + + +/* + * UpdateClockCatalog persists the most recent logical clock value + * into the catalog pg_dist_local_group. + */ +static void +UpdateClockCatalog(void) +{ + ScanKey scanKey = NULL; + int scanKeyCount = 0; + Datum values[Natts_pg_dist_local_group] = { 0 }; + bool isNull[Natts_pg_dist_local_group] = { false }; + bool replace[Natts_pg_dist_local_group] = { false }; + static uint64 savedClockValue = 0; + + uint64 clockValue = GetCurrentClusterClockValue(); + + if (clockValue == savedClockValue) + { + /* clock didn't move, nothing to save */ + return; + } + + /* clock should only move forward */ + Assert(clockValue > savedClockValue); + + Relation pgDistLocalGroupId = table_open(DistLocalGroupIdRelationId(), + RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistLocalGroupId); + SysScanDesc scanDescriptor = systable_beginscan(pgDistLocalGroupId, + InvalidOid, + false, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(LOG, (errmsg("could not find valid entry in pg_dist_local"))); + systable_endscan(scanDescriptor); + table_close(pgDistLocalGroupId, NoLock); + return; + } + + values[Anum_pg_dist_local_logical_clock_value - 1] = Int64GetDatum(clockValue); + isNull[Anum_pg_dist_local_logical_clock_value - 1] = false; + replace[Anum_pg_dist_local_logical_clock_value - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isNull, replace); + + CatalogTupleUpdate(pgDistLocalGroupId, &heapTuple->t_self, heapTuple); + + CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId()); + systable_endscan(scanDescriptor); + table_close(pgDistLocalGroupId, RowExclusiveLock); + + /* cache the saved value */ + savedClockValue = clockValue; +} + + +/* + * SetTransactionClusterClock() takes the connection list of participating nodes in + * the current transaction, and polls the logical clock value of all the nodes. It + * sets the maximum logical clock value of all the nodes in the distributed transaction + * id, which may be used as commit order for individual objects. + */ +void +SetTransactionClusterClock(List *connectionList) +{ + /* get clock value of the local node */ + Datum value = GetNextClusterClock(); + uint64 globalClockValue = DatumGetUInt64(value); + + ereport(DEBUG1, (errmsg("Coordinator transaction clock %lu", + globalClockValue))); + + /* get clock value from each node */ + MultiConnection *connection = NULL; + StringInfo queryToSend = makeStringInfo(); + appendStringInfo(queryToSend, "SELECT citus_get_cluster_clock();"); + + foreach_ptr(connection, connectionList) + { + int querySent = SendRemoteCommand(connection, queryToSend->data); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + /* fetch the results and pick the maximum clock value of all the nodes */ + foreach_ptr(connection, connectionList) + { + bool raiseInterrupts = true; + + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + continue; + } + + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ereport(ERROR, + (errmsg("Internal error, connection failure"))); + } + + int64 rowCount = PQntuples(result); + int64 colCount = PQnfields(result); + + /* Although it is not expected */ + if (colCount != 1 || rowCount != 1) + { + ereport(ERROR, + (errmsg("unexpected result from citus_get_cluster_clock()"))); + } + + value = ParseIntField(result, 0, 0); + uint64 nodeClockValue = DatumGetUInt64(value); + ereport(DEBUG1, (errmsg("Node(%lu) transaction clock %lu", + connection->connectionId, nodeClockValue))); + + if (nodeClockValue > globalClockValue) + { + globalClockValue = nodeClockValue; + } + + PQclear(result); + ForgetResults(connection); + } + + ereport(DEBUG1, + (errmsg("Final global transaction clock %lu", globalClockValue))); + + /* Set the adjusted value locally */ + SetTransactionIdClockValue(globalClockValue); + + /* Set the clock value on participating worker nodes */ + resetStringInfo(queryToSend); + appendStringInfo(queryToSend, + "SELECT citus_internal.set_transaction_id_clock_value(%lu);", + globalClockValue); + + foreach_ptr(connection, connectionList) + { + int querySent = SendRemoteCommand(connection, queryToSend->data); + + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + /* Process the result */ + foreach_ptr(connection, connectionList) + { + bool raiseInterrupts = true; + + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + + if (!IsResponseOK(result)) + { + ereport(ERROR, + (errmsg("Internal error, connection failure"))); + } + + PQclear(result); + ForgetResults(connection); + } +} + + +/* + * citus_get_cluster_clock() is an UDF that returns a monotonically increasing + * logical clock. Clock guarantees to never go back in value after restarts, and + * makes best attempt to keep the value close to unix epoch time in milliseconds. + */ +Datum +citus_get_cluster_clock(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + uint64 nextClusterClockValue = GetNextClusterClock(); + + PG_RETURN_UINT64(nextClusterClockValue); +} + + +/* + * set_transaction_id_clock_value() is an internal UDF to set the transaction + * clock value of the remote nodes' distributed transaction id. + */ +Datum +set_transaction_id_clock_value(PG_FUNCTION_ARGS) +{ + uint64 transactionClockValue = PG_GETARG_INT64(0); + + SetTransactionIdClockValue(transactionClockValue); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/utils/cluster_clock.md b/src/backend/distributed/utils/cluster_clock.md new file mode 100644 index 000000000..5fb3b2bb4 --- /dev/null +++ b/src/backend/distributed/utils/cluster_clock.md @@ -0,0 +1,70 @@ +# Requirement +Many distributed applications need to track the changes in the same order as they are applied on the database. The changes can be to databases or objects within them, either in a single node or across the sharded cluster.
+**Note**: Not to be confused with serialization or any other external imposed order, the changes happen the usual way in the system. + +## Definitions +**Total ordering** - Every pair of change events can be placed in some order.
+**Causal ordering** - Only events that are causally related (an event A caused an event B) can be ordered i.e., it's only a partial order - sometimes events happen independently with no possible causal relationship, such events are treated to concurrent.
+**Sequential consistency** - All writes must be seen in the same order by all processes.
+**Causal consistency** - Causally related writes must be seen in the same order.
+ +## Introduction +Transactions in a single node system naturally provide a total and sequential ordering guarantees for client read and write operations as all operations are routed to the same node, but there are challenges for a multi node distributed system, such as, Citus. + +One possible way to totally order all the changes in the system is to timestamp all the events with a global physical clock or a centralized logical clock. Thus, observing the events in the increasing order of the timestamp will give the total ordering of events. For both performance and cost reasons such solutions are impractical. In the absence of total ordering, a little weaker ordering is the **causal order** + +Causal order is defined as a model that preserves a partial order of events in a distributed system. If an event + 1. A causes another event B, every other process in the system observes the event A before observing event B. + 2. Causal order is transitive: if A causes B, and B causes C, then A causes C. + 3. Non causally ordered events are treated as concurrent. + +Causal consistency is a weak form of consistency that preserves the order of causally related operations. The causal consistency model can be refined into four session guarantees + 1. Read Your Writes: If a process performs a write, the same process + later observes the result of its write. + 2. Monotonic Reads: The set of writes observed (read) by a process is + guaranteed to be monotonically increasing. + 3. Writes Follow Reads: If some process performs a read followed by a write, and another process observes the result of the write, then it can also observe the read. + 4. Monotonic Writes: If some process performs a write, followed sometime later by another write, other processes will observe them in the same order. + +## UDF +***get_cluster_clock()*** is a new UDF that **helps** applications/clients in causally ordering events in the distributed system. + + 1. An application can call get_cluster_clock(), which returns monotonically increasing logical clock value, as close to epoch value (in milli seconds) as possible, with the guarantee that it will never go back from its current value even after a restart (not hard crash). The returned value can be used for ordering the events, which are related. + + 2. A new GUC value citus.enable_global_clock, when enabled, stamps the cluster wide logical clock value for each distributed transaction. This clock timestamp can be used by applications for causal ordering of changes to the objects in the transaction(s). + + Sample data of cluster wide clock timestamp for a distributed transaction that accesses shards from 3 nodes. The query output is run right after the commit of the transaction. + + SELECT initiator_node_identifier, transactionclock as commit_clock, global_pid, transaction_stamp +from **get_all_active_transactions()** +where global_pid = citus_backend_gpid() +order by global_pid; + + + initiator_node_identifier | commit_clock | global_pid | transaction_stamp + ---------------------------+---------------+-------------+------------------------ + 1 | 1656390052898 | 10000175398 | 1999-12-31 16:00:00-08 + + As you can see the transaction has the timestamp of **1656390052898** at the originator and share the same timestamp at all the participating nodes (below output). All the nodes adjust the clock value to the new value, so as to maintain the increasing order of the clock across the cluster. + + SELECT initiator_node_identifier, transactionclock as commit_clock, global_pid, transaction_stamp +from **get_global_active_transactions()** +where global_pid = citus_backend_gpid() +order by global_pid; + + initiator_node_identifier | commit_clock | global_pid | transaction_stamp + ---------------------------+---------------+-------------+------------------------ + 1 | 1656390052898 | 10000175398 | 1999-12-31 16:00:00-08 + 1 | 1656390052898 | 10000175398 | 1999-12-31 16:00:00-08 + 1 | 1656390052898 | 10000175398 | 1999-12-31 16:00:00-08 + +### Next steps + 1. The current model doesn't provide protection against clock drifts in the system. On some Posix systems, clock can go backwards after restart of the VM, in which case the increasing order is not guaranteed. This can be alleviated by initializing the clock using new udf ***adjust_cluster_clock***(..) to a user-specified value. Before starting any workload, clock must be set to the most recent clock timestamp seen by the application before restart. This is not an issue for a planned server restart as the shutdown callback persists the most recent value, and on restart, the server adjusts the cluster clock even if there is a drift in the epoch clock value. + 2. Provide a way to get the cluster clock timestamp for any distributed transactions. Currently distributed transactions are uniquely identified by a combination of
+ + databaseId + initiatorNodeIdentifier + transactionNumber + timestamp + + Persist cluster clock timestamp with the corresponding unique distributed transaction id in a catalog, which can be either queried directly or a new UDF ***get_transaction_clock***(....) that returns the cluster clock timestamp for the unique distributed transaction id passed as input. diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 36d4b0b6e..db790bcc5 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -33,6 +33,7 @@ #include "libpq/pqsignal.h" #include "catalog/namespace.h" #include "distributed/citus_safe_lib.h" +#include "distributed/cluster_clock.h" #include "distributed/distributed_deadlock_detection.h" #include "distributed/maintenanced.h" #include "distributed/coordinator_protocol.h" @@ -622,6 +623,13 @@ CitusMaintenanceDaemonMain(Datum main_arg) timeout = Min(timeout, deadlockTimeout); } + /* Periodically persist the logical clock value */ + if (!RecoveryInProgress()) + { + InvalidateMetadataSystemCache(); + PersistLocalClockValue(0, (Datum) 0); + } + if (!RecoveryInProgress() && DeferShardDeleteInterval > 0 && TimestampDifferenceExceeds(lastShardCleanTime, GetCurrentTimestamp(), DeferShardDeleteInterval)) @@ -968,3 +976,14 @@ MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData) return metadataSyncTriggered; } + + +/* + * IsMaintainanceDaemonProcess returns true if the process is + * maintenance daemon, false for all other backends. + */ +bool +IsMaintainanceDaemonProcess(void) +{ + return IsMaintenanceDaemon; +} diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index ccb4da535..bff291b47 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -73,6 +73,13 @@ extern void DecrementExternalClientBackendCounter(void); extern bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, StringInfo queryResultString, bool reportResultError); +extern uint64 GetTransactionIdClockValue(void); +extern void SetTransactionIdClockValue(uint64 transactionClockValue); +extern uint64 GetCurrentClusterClockValue(void); +extern uint64 GetNextClusterClock(void); +extern void AdjustLocalClockToGlobal(void); +extern void RegisterAndAdjustClockValue(void); +extern void AdjustAndInitializeClusterClock(void); #define INVALID_CITUS_INTERNAL_BACKEND_GPID 0 #define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999 diff --git a/src/include/distributed/cluster_clock.h b/src/include/distributed/cluster_clock.h new file mode 100644 index 000000000..a6d150526 --- /dev/null +++ b/src/include/distributed/cluster_clock.h @@ -0,0 +1,20 @@ +/* + * cluster_clock.h + * + * Data structure definitions for managing cluster clock and + * related function declarations. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CLUSTER_CLOCK_H +#define CLUSTER_CLOCK_H + +extern uint64 GetEpochTimeMs(void); +extern bool GetLocalClockValue(uint64 *savedValue); +extern void PersistLocalClockValue(int code, Datum argUnused); +extern void SetTransactionClusterClock(List *connectionList); + +#endif /* CLUSTER_CLOCK_H */ diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index c5002021d..696bc4799 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -28,5 +28,6 @@ extern void InitializeMaintenanceDaemonBackend(void); extern bool LockCitusExtension(void); extern void CitusMaintenanceDaemonMain(Datum main_arg); +extern bool IsMaintainanceDaemonProcess(void); #endif /* MAINTENANCED_H */ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 0d9f125d8..49d481ce5 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -294,4 +294,5 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId); extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); + #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_local_group.h b/src/include/distributed/pg_dist_local_group.h index bed11e214..9bb13b1ad 100644 --- a/src/include/distributed/pg_dist_local_group.h +++ b/src/include/distributed/pg_dist_local_group.h @@ -18,6 +18,7 @@ typedef struct FormData_pg_dist_local_group { int groupid; + uint64 logical_clock_value; } FormData_pg_dist_local_group; /* ---------------- @@ -31,7 +32,8 @@ typedef FormData_pg_dist_local_group *Form_pg_dist_local_group; * compiler constants for pg_dist_local_group * ---------------- */ -#define Natts_pg_dist_local_group 1 +#define Natts_pg_dist_local_group 2 #define Anum_pg_dist_local_groupid 1 +#define Anum_pg_dist_local_logical_clock_value 2 #endif /* PG_DIST_LOCAL_GROUP_H */ diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index f827bd9ec..893855291 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -15,6 +15,11 @@ #include "nodes/pg_list.h" #include "lib/ilist.h" +/* + * Not to exceed the community-code global transaction + * identifier (GID) size (200) + */ +#define PREPARED_TRANSACTION_NAME_LEN 128 /* forward declare, to avoid recursive includes */ struct MultiConnection; @@ -82,17 +87,20 @@ typedef struct RemoteTransaction bool transactionRecovering; /* 2PC transaction name currently associated with connection */ - char preparedName[NAMEDATALEN]; + char preparedName[PREPARED_TRANSACTION_NAME_LEN]; /* set when BEGIN is sent over the connection */ bool beginSent; } RemoteTransaction; +extern bool EnableGlobalClock; + /* utility functions for dealing with remote transactions */ extern bool ParsePreparedTransactionName(char *preparedTransactionName, int32 *groupId, int *procId, uint64 *transactionNumber, - uint32 *connectionNumber); + uint32 *connectionNumber, + uint64 *transactionClockValue); /* change an individual remote transaction's state */ extern void StartRemoteTransactionBegin(struct MultiConnection *connection); diff --git a/src/include/distributed/transaction_identifier.h b/src/include/distributed/transaction_identifier.h index d206d1b85..2322b0bd9 100644 --- a/src/include/distributed/transaction_identifier.h +++ b/src/include/distributed/transaction_identifier.h @@ -28,6 +28,7 @@ * - transactionNumber: A locally unique identifier assigned for the distributed * transaction on the node that initiated the distributed transaction * - timestamp: The current timestamp of distributed transaction initiation + * - transactionClockValue: The cluster clock value of the transaction commit time * */ typedef struct DistributedTransactionId @@ -36,6 +37,7 @@ typedef struct DistributedTransactionId bool transactionOriginator; uint64 transactionNumber; TimestampTz timestamp; + uint64 transactionClockValue; } DistributedTransactionId; diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 329d63722..d3e8017d8 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -122,8 +122,8 @@ s/(ERROR.*)pgsql_job_cache\/([0-9]+_[0-9]+_[0-9]+)\/(.*).data/\1pgsql_job_cache\ # assign_distributed_transaction id params s/(NOTICE.*)assign_distributed_transaction_id\([0-9]+, [0-9]+, '.*'\)/\1assign_distributed_transaction_id\(xx, xx, 'xxxxxxx'\)/g -s/(NOTICE.*)PREPARE TRANSACTION 'citus_[0-9]+_[0-9]+_[0-9]+_[0-9]+'/\1PREPARE TRANSACTION 'citus_xx_xx_xx_xx'/g -s/(NOTICE.*)COMMIT PREPARED 'citus_[0-9]+_[0-9]+_[0-9]+_[0-9]+'/\1COMMIT PREPARED 'citus_xx_xx_xx_xx'/g +s/(NOTICE.*)PREPARE TRANSACTION 'citus_[0-9]+_[0-9]+_[0-9]+_[0-9]+_[0-9]+'/\1PREPARE TRANSACTION 'citus_xx_xx_xx_xx'/g +s/(NOTICE.*)COMMIT PREPARED 'citus_[0-9]+_[0-9]+_[0-9]+_[0-9]+_[0-9]+'/\1COMMIT PREPARED 'citus_xx_xx_xx_xx'/g # toast tables s/pg_toast_[0-9]+/pg_toast_xxxxx/g @@ -283,3 +283,6 @@ s/^(DETAIL: "[a-z\ ]+ )pg_temp_[0-9]+(\..*" will be created only locally)$/\1pg # will be replaced with # WARNING: "function func(bigint)" has dependency on unsupported object "schema pg_temp_xxx" s/^(WARNING|ERROR)(: "[a-z\ ]+ .*" has dependency on unsupported object) "schema pg_temp_[0-9]+"$/\1\2 "schema pg_temp_xxx"/g + +# In clock tests, normalize epoch value printed +s/^(DEBUG: |LOG: )(Coordinator|Node\([0-9]+\)|Final global|Set) transaction clock [0-9]+.*$/\1\2 transaction clock xxxxxx/g diff --git a/src/test/regress/expected/clock.out b/src/test/regress/expected/clock.out new file mode 100644 index 000000000..d9912f619 --- /dev/null +++ b/src/test/regress/expected/clock.out @@ -0,0 +1,118 @@ +CREATE SCHEMA clock; +SET search_path TO clock; +SET citus.next_shard_id TO 920000; +CREATE VIEW localtxns AS +SELECT initiator_node_identifier, transactionclock AS commit_clock, transaction_number, global_pid, transaction_stamp +FROM get_all_active_transactions() +WHERE global_pid = citus_backend_gpid() +ORDER BY global_pid; +-- +-- Note: Test might become flaky as it relies on Citus Deamon actions, where +-- the scheduling of the daemon is not guaranteed. +-- +-- Check the clock is *increasing* +-- +SELECT * FROM citus_get_cluster_clock() \gset t1 +SELECT * FROM citus_get_cluster_clock() \gset t2 +SELECT * FROM citus_get_cluster_clock() \gset t3 +SELECT :t2citus_get_cluster_clock > :t1citus_get_cluster_clock AS t2_greater_t1; + t2_greater_t1 +--------------------------------------------------------------------- + t +(1 row) + +SELECT :t3citus_get_cluster_clock > :t2citus_get_cluster_clock AS t3_greater_t2; + t3_greater_t2 +--------------------------------------------------------------------- + t +(1 row) + +-- +-- Check if the latest clock value is persisted in the catalog +-- +SELECT logical_clock_value FROM pg_dist_local_group \gset t1 +/* get the current clock value */ +SELECT * FROM citus_get_cluster_clock() \gset t2 +/* wait for the daemon to save the clock */ +SELECT pg_sleep(3); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +/* fetch the value from the catalog */ +SELECT logical_clock_value FROM pg_dist_local_group \gset t3 +SELECT :t3logical_clock_value > :t1logical_clock_value AS t3_greater_t1; + t3_greater_t1 +--------------------------------------------------------------------- + t +(1 row) + +/* the value returned by citus_get_cluster_clock() must be persisted in the catalog */ +SELECT :t2citus_get_cluster_clock = :t3logical_clock_value AS t2_saved_in_t3; + t2_saved_in_t3 +--------------------------------------------------------------------- + t +(1 row) + +-- +-- Check the value returned by citus_get_cluster_clock is close to Epoch in ms +-- +SELECT (extract(epoch from now()) * 1000)::bigint AS epoch \gset +SELECT citus_get_cluster_clock() \gset +SELECT (:citus_get_cluster_clock - :epoch) < 100; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE dist_table (id int, nonid int); +SELECT create_distributed_table('dist_table', 'id', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.enable_global_clock; -- Default should be false + citus.enable_global_clock +--------------------------------------------------------------------- + off +(1 row) + +BEGIN; +INSERT INTO dist_table SELECT generate_series(1, 10000, 1), 0; +SET client_min_messages TO DEBUG1; +COMMIT; +-- Turn on the global clock, commit should pick the greatest clock value +SET citus.enable_global_clock to ON; +BEGIN; +INSERT INTO dist_table SELECT generate_series(1, 10000, 1), 0; +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +SET client_min_messages TO DEBUG1; +SELECT * FROM localtxns \gset t1 +COMMIT; +DEBUG: Coordinator transaction clock xxxxxx +DEBUG: Node(1) transaction clock xxxxxx +DEBUG: Node(2) transaction clock xxxxxx +DEBUG: Node(9) transaction clock xxxxxx +DEBUG: Node(10) transaction clock xxxxxx +DEBUG: Node(11) transaction clock xxxxxx +DEBUG: Node(12) transaction clock xxxxxx +DEBUG: Node(13) transaction clock xxxxxx +DEBUG: Node(14) transaction clock xxxxxx +DEBUG: Final global transaction clock xxxxxx +DEBUG: Set transaction clock xxxxxx +SELECT * FROM localtxns \gset t2 +SELECT :t2commit_clock > :t1commit_clock; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +RESET client_min_messages; +RESET citus.enable_global_clock; +DROP SCHEMA clock CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to view localtxns +drop cascades to table dist_table diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out index 9c44269a3..7233cc9da 100644 --- a/src/test/regress/expected/failure_connection_establishment.out +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -226,7 +226,7 @@ SELECT count(*) FROM single_replicatated WHERE key = 100; RESET client_min_messages; -- verify get_global_active_transactions works when a timeout happens on a connection SELECT * FROM get_global_active_transactions() WHERE transaction_number != 0; - datid | process_id | initiator_node_identifier | worker_query | transaction_number | transaction_stamp | global_pid + datid | process_id | initiator_node_identifier | worker_query | transaction_number | transaction_stamp | global_pid | transactionclock --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 3c3c5b2ae..8200a4727 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1044,7 +1044,9 @@ SELECT * FROM multi_extension.print_extension_changes(); function worker_merge_files_into_table(bigint,integer,text[],text[]) void | function worker_range_partition_table(bigint,integer,text,text,oid,anyarray) void | function worker_repartition_cleanup(bigint) void | -(8 rows) + | function citus_get_cluster_clock() bigint + | function citus_internal.set_transaction_id_clock_value(bigint) void +(10 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_metadata_attributes.out b/src/test/regress/expected/multi_metadata_attributes.out index 3ce512c2b..dd7c5ce22 100644 --- a/src/test/regress/expected/multi_metadata_attributes.out +++ b/src/test/regress/expected/multi_metadata_attributes.out @@ -9,7 +9,8 @@ FROM pg_attribute WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass, 'pg_dist_partition'::regclass, - 'pg_dist_object'::regclass) + 'pg_dist_object'::regclass, + 'pg_dist_local_group'::regclass) ORDER BY attrelid, attname; attrelid | attname | atthasmissing | attmissingval --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 749254292..dc6a594d8 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -87,7 +87,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' - UPDATE pg_dist_local_group SET groupid = 1 + UPDATE pg_dist_local_group SET groupid = 1, logical_clock_value = citus_get_cluster_clock() WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; (29 rows) @@ -161,7 +161,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' - UPDATE pg_dist_local_group SET groupid = 1 + UPDATE pg_dist_local_group SET groupid = 1, logical_clock_value = citus_get_cluster_clock() WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; @@ -210,7 +210,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' - UPDATE pg_dist_local_group SET groupid = 1 + UPDATE pg_dist_local_group SET groupid = 1, logical_clock_value = citus_get_cluster_clock() WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; @@ -261,7 +261,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' - UPDATE pg_dist_local_group SET groupid = 1 + UPDATE pg_dist_local_group SET groupid = 1, logical_clock_value = citus_get_cluster_clock() WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; @@ -318,7 +318,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' - UPDATE pg_dist_local_group SET groupid = 1 + UPDATE pg_dist_local_group SET groupid = 1, logical_clock_value = citus_get_cluster_clock() WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; @@ -368,7 +368,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' - UPDATE pg_dist_local_group SET groupid = 1 + UPDATE pg_dist_local_group SET groupid = 1, logical_clock_value = citus_get_cluster_clock() WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; @@ -439,7 +439,7 @@ SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND node -- Check that the metadata has been copied to the worker \c - - - :worker_1_port -SELECT * FROM pg_dist_local_group; +SELECT groupid FROM pg_dist_local_group; groupid --------------------------------------------------------------------- 1 @@ -578,7 +578,7 @@ SELECT 1 FROM citus_activate_node('localhost', :worker_1_port); (1 row) \c - - - :worker_1_port -SELECT * FROM pg_dist_local_group; +SELECT groupid FROM pg_dist_local_group; groupid --------------------------------------------------------------------- 1 @@ -1923,7 +1923,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' - UPDATE pg_dist_local_group SET groupid = 1 + UPDATE pg_dist_local_group SET groupid = 1, logical_clock_value = citus_get_cluster_clock() WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10002, 7, 1, 'integer'::regtype, NULL, NULL), (10003, 1, -1, 0, NULL, NULL), (10004, 3, 1, 'integer'::regtype, NULL, NULL), (10005, 4, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_test_schema_1', 'mx_table_1']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_test_schema_2', 'mx_table_2']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_ref']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'dist_table_1']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_sequence_0']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_sequence_1']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 5, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 5, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 5, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 5, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 20cec7578..b63d836d3 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -33,7 +33,7 @@ SELECT count(*) FROM pg_dist_transaction; -- If the groupid of the worker changes this query will produce a -- different result and the prepared statement names should be adapted -- accordingly. -SELECT * FROM pg_dist_local_group; +SELECT groupid FROM pg_dist_local_group; groupid --------------------------------------------------------------------- 14 diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index ea7fe01a4..51713af7d 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -62,6 +62,7 @@ ORDER BY 1; function citus_finalize_upgrade_to_citus11(boolean) function citus_finish_pg_upgrade() function citus_get_active_worker_nodes() + function citus_get_cluster_clock() function citus_internal.columnar_ensure_am_depends_catalog() function citus_internal.downgrade_columnar_storage(regclass) function citus_internal.find_groupid_for_node(text,integer) @@ -71,6 +72,7 @@ ORDER BY 1; function citus_internal.refresh_isolation_tester_prepared_statement() function citus_internal.replace_isolation_tester_func() function citus_internal.restore_isolation_tester_func() + function citus_internal.set_transaction_id_clock_value(bigint) function citus_internal.upgrade_columnar_storage(regclass) function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) @@ -275,5 +277,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(259 rows) +(261 rows) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 6a2785553..c6980811c 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -79,7 +79,7 @@ test: multi_average_expression multi_working_columns multi_having_pushdown havin test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join anonymous_columns test: ch_bench_subquery_repartition -test: multi_agg_type_conversion multi_count_type_conversion recursive_relation_planning_restriction_pushdown +test: multi_agg_type_conversion multi_count_type_conversion recursive_relation_planning_restriction_pushdown clock test: multi_partition_pruning single_hash_repartition_join unsupported_lateral_subqueries test: multi_join_pruning multi_hash_pruning intermediate_result_pruning test: multi_null_minmax_value_pruning cursors @@ -96,7 +96,6 @@ test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes - # --------- # test that no tests leaked intermediate results. This should always be last # --------- diff --git a/src/test/regress/sql/clock.sql b/src/test/regress/sql/clock.sql new file mode 100644 index 000000000..cd0592162 --- /dev/null +++ b/src/test/regress/sql/clock.sql @@ -0,0 +1,67 @@ +CREATE SCHEMA clock; +SET search_path TO clock; +SET citus.next_shard_id TO 920000; + +CREATE VIEW localtxns AS +SELECT initiator_node_identifier, transactionclock AS commit_clock, transaction_number, global_pid, transaction_stamp +FROM get_all_active_transactions() +WHERE global_pid = citus_backend_gpid() +ORDER BY global_pid; + +-- +-- Note: Test might become flaky as it relies on Citus Deamon actions, where +-- the scheduling of the daemon is not guaranteed. +-- +-- Check the clock is *increasing* +-- +SELECT * FROM citus_get_cluster_clock() \gset t1 +SELECT * FROM citus_get_cluster_clock() \gset t2 +SELECT * FROM citus_get_cluster_clock() \gset t3 +SELECT :t2citus_get_cluster_clock > :t1citus_get_cluster_clock AS t2_greater_t1; +SELECT :t3citus_get_cluster_clock > :t2citus_get_cluster_clock AS t3_greater_t2; + +-- +-- Check if the latest clock value is persisted in the catalog +-- +SELECT logical_clock_value FROM pg_dist_local_group \gset t1 +/* get the current clock value */ +SELECT * FROM citus_get_cluster_clock() \gset t2 +/* wait for the daemon to save the clock */ +SELECT pg_sleep(3); +/* fetch the value from the catalog */ +SELECT logical_clock_value FROM pg_dist_local_group \gset t3 + +SELECT :t3logical_clock_value > :t1logical_clock_value AS t3_greater_t1; +/* the value returned by citus_get_cluster_clock() must be persisted in the catalog */ +SELECT :t2citus_get_cluster_clock = :t3logical_clock_value AS t2_saved_in_t3; + + +-- +-- Check the value returned by citus_get_cluster_clock is close to Epoch in ms +-- +SELECT (extract(epoch from now()) * 1000)::bigint AS epoch \gset +SELECT citus_get_cluster_clock() \gset +SELECT (:citus_get_cluster_clock - :epoch) < 100; + +CREATE TABLE dist_table (id int, nonid int); +SELECT create_distributed_table('dist_table', 'id', colocate_with := 'none'); + +SHOW citus.enable_global_clock; -- Default should be false +BEGIN; +INSERT INTO dist_table SELECT generate_series(1, 10000, 1), 0; +SET client_min_messages TO DEBUG1; +COMMIT; + +-- Turn on the global clock, commit should pick the greatest clock value +SET citus.enable_global_clock to ON; +BEGIN; +INSERT INTO dist_table SELECT generate_series(1, 10000, 1), 0; +SET client_min_messages TO DEBUG1; +SELECT * FROM localtxns \gset t1 +COMMIT; +SELECT * FROM localtxns \gset t2 +SELECT :t2commit_clock > :t1commit_clock; + +RESET client_min_messages; +RESET citus.enable_global_clock; +DROP SCHEMA clock CASCADE; diff --git a/src/test/regress/sql/multi_metadata_attributes.sql b/src/test/regress/sql/multi_metadata_attributes.sql index 58351310c..b93a33ea5 100644 --- a/src/test/regress/sql/multi_metadata_attributes.sql +++ b/src/test/regress/sql/multi_metadata_attributes.sql @@ -10,5 +10,6 @@ FROM pg_attribute WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass, 'pg_dist_partition'::regclass, - 'pg_dist_object'::regclass) + 'pg_dist_object'::regclass, + 'pg_dist_local_group'::regclass) ORDER BY attrelid, attname; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 0d67bb68b..dc807e468 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -118,7 +118,7 @@ SELECT nodeid, hasmetadata FROM pg_dist_node WHERE nodename='localhost' AND node -- Check that the metadata has been copied to the worker \c - - - :worker_1_port -SELECT * FROM pg_dist_local_group; +SELECT groupid FROM pg_dist_local_group; SELECT * FROM pg_dist_node ORDER BY nodeid; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; SELECT * FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY shardid; @@ -168,7 +168,7 @@ RESET citus.shard_replication_factor; SELECT 1 FROM citus_activate_node('localhost', :worker_1_port); SELECT 1 FROM citus_activate_node('localhost', :worker_1_port); \c - - - :worker_1_port -SELECT * FROM pg_dist_local_group; +SELECT groupid FROM pg_dist_local_group; SELECT * FROM pg_dist_node ORDER BY nodeid; SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid; SELECT * FROM pg_dist_shard WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY shardid; diff --git a/src/test/regress/sql/multi_mx_transaction_recovery.sql b/src/test/regress/sql/multi_mx_transaction_recovery.sql index 2a6b4991b..f54dcb0ef 100644 --- a/src/test/regress/sql/multi_mx_transaction_recovery.sql +++ b/src/test/regress/sql/multi_mx_transaction_recovery.sql @@ -19,7 +19,7 @@ SELECT count(*) FROM pg_dist_transaction; -- If the groupid of the worker changes this query will produce a -- different result and the prepared statement names should be adapted -- accordingly. -SELECT * FROM pg_dist_local_group; +SELECT groupid FROM pg_dist_local_group; BEGIN; CREATE TABLE table_should_abort (value int);