This implements a new UDF citus_get_cluster_clock() that returns a monotonically

increasing logical clock. Clock guarantees to never go back in value after restarts,
and makes best attempt to keep the value close to unix epoch time in milliseconds.

Also, introduces a new GUC "citus.enable_cluster_clock", when true, every
distributed transaction is stamped with logical causal clock and persisted
in a catalog pg_dist_commit_transaction.
causal-order-clock-native
Teja Mupparti 2022-08-17 13:53:00 -07:00
parent 2e943a64a0
commit 40f5c2bab3
27 changed files with 1497 additions and 19 deletions

View File

@ -187,6 +187,7 @@ typedef struct MetadataCacheData
Oid distColocationidIndexId; Oid distColocationidIndexId;
Oid distPlacementGroupidIndexId; Oid distPlacementGroupidIndexId;
Oid distTransactionRelationId; Oid distTransactionRelationId;
Oid distCommitTransactionRelationId;
Oid distTransactionGroupIndexId; Oid distTransactionGroupIndexId;
Oid citusCatalogNamespaceId; Oid citusCatalogNamespaceId;
Oid copyFormatTypeId; Oid copyFormatTypeId;
@ -2872,6 +2873,17 @@ DistTransactionRelationId(void)
} }
/* return oid of pg_dist_commit_transaction relation */
Oid
DistCommitTransactionRelationId(void)
{
CachedRelationLookup("pg_dist_commit_transaction",
&MetadataCache.distCommitTransactionRelationId);
return MetadataCache.distCommitTransactionRelationId;
}
/* return oid of pg_dist_transaction_group_index */ /* return oid of pg_dist_transaction_group_index */
Oid Oid
DistTransactionGroupIndexId(void) DistTransactionGroupIndexId(void)

View File

@ -42,6 +42,7 @@
#include "distributed/query_pushdown_planning.h" #include "distributed/query_pushdown_planning.h"
#include "distributed/string_utils.h" #include "distributed/string_utils.h"
#include "distributed/tdigest_extension.h" #include "distributed/tdigest_extension.h"
#include "distributed/type_utils.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
@ -279,7 +280,6 @@ static Oid CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *a
static Oid WorkerPartialAggOid(void); static Oid WorkerPartialAggOid(void);
static Oid CoordCombineAggOid(void); static Oid CoordCombineAggOid(void);
static Oid AggregateFunctionOid(const char *functionName, Oid inputType); static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
static Oid TypeOid(Oid schemaId, const char *typeName);
static SortGroupClause * CreateSortGroupClause(Var *column); static SortGroupClause * CreateSortGroupClause(Var *column);
/* Local functions forward declarations for count(distinct) approximations */ /* Local functions forward declarations for count(distinct) approximations */
@ -3667,21 +3667,6 @@ CoordCombineAggOid()
} }
/*
* TypeOid looks for a type that has the given name and schema, and returns the
* corresponding type's oid.
*/
static Oid
TypeOid(Oid schemaId, const char *typeName)
{
Oid typeOid = GetSysCacheOid2(TYPENAMENSP, Anum_pg_type_oid,
PointerGetDatum(typeName),
ObjectIdGetDatum(schemaId));
return typeOid;
}
/* /*
* CreateSortGroupClause creates SortGroupClause for a given column Var. * CreateSortGroupClause creates SortGroupClause for a given column Var.
* The caller should set tleSortGroupRef field and respective * The caller should set tleSortGroupRef field and respective

View File

@ -33,6 +33,7 @@
#include "executor/executor.h" #include "executor/executor.h"
#include "distributed/backend_data.h" #include "distributed/backend_data.h"
#include "distributed/background_jobs.h" #include "distributed/background_jobs.h"
#include "distributed/causal_clock.h"
#include "distributed/citus_depended_object.h" #include "distributed/citus_depended_object.h"
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/citus_safe_lib.h" #include "distributed/citus_safe_lib.h"
@ -457,6 +458,7 @@ _PG_init(void)
InitializeCitusQueryStats(); InitializeCitusQueryStats();
InitializeSharedConnectionStats(); InitializeSharedConnectionStats();
InitializeLocallyReservedSharedConnections(); InitializeLocallyReservedSharedConnections();
InitializeClusterClockMem();
/* initialize shard split shared memory handle management */ /* initialize shard split shared memory handle management */
InitializeShardSplitSMHandleManagement(); InitializeShardSplitSMHandleManagement();
@ -542,6 +544,7 @@ citus_shmem_request(void)
RequestAddinShmemSpace(SharedConnectionStatsShmemSize()); RequestAddinShmemSpace(SharedConnectionStatsShmemSize());
RequestAddinShmemSpace(MaintenanceDaemonShmemSize()); RequestAddinShmemSpace(MaintenanceDaemonShmemSize());
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize()); RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
RequestAddinShmemSpace(LogicalClockShmemSize());
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1); RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
} }
@ -666,6 +669,7 @@ StartupCitusBackend(void)
InitializeBackendData(INVALID_CITUS_INTERNAL_BACKEND_GPID); InitializeBackendData(INVALID_CITUS_INTERNAL_BACKEND_GPID);
AssignGlobalPID(); AssignGlobalPID();
RegisterConnectionCleanup(); RegisterConnectionCleanup();
InitClockAtBoot();
} }
@ -1128,6 +1132,16 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_cluster_clock",
gettext_noop("When true, every distributed transaction is stamped with "
"logical causal clock and persisted in the catalog"),
NULL,
&EnableClusterClock,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.enable_cost_based_connection_establishment", "citus.enable_cost_based_connection_establishment",
gettext_noop("When enabled the connection establishment times " gettext_noop("When enabled the connection establishment times "

View File

@ -168,3 +168,23 @@ GRANT SELECT ON pg_catalog.pg_dist_background_task_depend TO PUBLIC;
#include "udfs/citus_job_wait/11.1-1.sql" #include "udfs/citus_job_wait/11.1-1.sql"
#include "udfs/citus_job_cancel/11.1-1.sql" #include "udfs/citus_job_cancel/11.1-1.sql"
--
-- Logical clock
-- Ticks counter with in the logical clock
--
CREATE TYPE citus.cluster_clock AS (logical bigint, counter int);
ALTER TYPE citus.cluster_clock SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_commit_transaction(
transaction_id TEXT NOT NULL CONSTRAINT pg_dist_commit_transactionId_unique_constraint UNIQUE,
cluster_clock_value cluster_clock NOT NULL,
timestamp BIGINT NOT NULL -- Epoch in milliseconds
);
ALTER TABLE citus.pg_dist_commit_transaction SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.pg_dist_commit_transaction TO public;
#include "udfs/citus_get_cluster_clock/11.1-1.sql"
#include "udfs/citus_is_clock_after/11.1-1.sql"
#include "udfs/citus_internal_adjust_local_clock_to_remote/11.1-1.sql"

View File

@ -114,3 +114,9 @@ DROP TABLE pg_catalog.pg_dist_background_job;
DROP TYPE pg_catalog.citus_job_status; DROP TYPE pg_catalog.citus_job_status;
DROP FUNCTION pg_catalog.citus_copy_shard_placement; DROP FUNCTION pg_catalog.citus_copy_shard_placement;
#include "../udfs/citus_copy_shard_placement/10.0-1.sql" #include "../udfs/citus_copy_shard_placement/10.0-1.sql"
DROP FUNCTION pg_catalog.citus_get_cluster_clock();
DROP FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(cluster_clock);
DROP FUNCTION pg_catalog.citus_is_clock_after(cluster_clock, cluster_clock);
DROP TYPE pg_catalog.cluster_clock CASCADE;
DROP TABLE pg_catalog.pg_dist_commit_transaction;

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_get_cluster_clock()
RETURNS pg_catalog.cluster_clock
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_get_cluster_clock$$;
COMMENT ON FUNCTION pg_catalog.citus_get_cluster_clock()
IS 'Returns monotonically increasing timestamp with logical clock value as close to epoch value (in milli seconds) as possible, and a counter for ticks(maximum of 4 million) within the logical clock';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_get_cluster_clock()
RETURNS pg_catalog.cluster_clock
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_get_cluster_clock$$;
COMMENT ON FUNCTION pg_catalog.citus_get_cluster_clock()
IS 'Returns monotonically increasing timestamp with logical clock value as close to epoch value (in milli seconds) as possible, and a counter for ticks(maximum of 4 million) within the logical clock';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
RETURNS void
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME', $$citus_internal_adjust_local_clock_to_remote$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
IS 'Internal UDF used to adjust the local clock to the maximum of nodes in the cluster';
REVOKE ALL ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock) FROM PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
RETURNS void
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME', $$citus_internal_adjust_local_clock_to_remote$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
IS 'Internal UDF used to adjust the local clock to the maximum of nodes in the cluster';
REVOKE ALL ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock) FROM PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_is_clock_after(clock_one pg_catalog.cluster_clock, clock_two pg_catalog.cluster_clock)
RETURNS BOOL
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_is_clock_after$$;
COMMENT ON FUNCTION pg_catalog.citus_is_clock_after(pg_catalog.cluster_clock, pg_catalog.cluster_clock)
IS 'Accepts logical clock timestamps of two causally related events and returns true if the argument1 happened before argument2';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_is_clock_after(clock_one pg_catalog.cluster_clock, clock_two pg_catalog.cluster_clock)
RETURNS BOOL
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_is_clock_after$$;
COMMENT ON FUNCTION pg_catalog.citus_is_clock_after(pg_catalog.cluster_clock, pg_catalog.cluster_clock)
IS 'Accepts logical clock timestamps of two causally related events and returns true if the argument1 happened before argument2';

View File

@ -26,6 +26,7 @@
#include "datatype/timestamp.h" #include "datatype/timestamp.h"
#include "distributed/backend_data.h" #include "distributed/backend_data.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/function_utils.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/lock_graph.h" #include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -168,6 +169,33 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
} }
/*
* GetCurrentTransactionString returns the string representation of the
* UDF get_current_transaction_id output.
*
* Note: This routine calls the UDF get_current_transaction_id directly to
* keep the output/format coherent, else, any changes in the UDF parameters
* or output may diverge from this routine.
*/
char *
GetCurrentTransactionIdString(void)
{
/*
* Call get_current_transaction_id UDF to get the current
* distributed transaction id.
*/
Oid transactionFuncOid = FunctionOid("pg_catalog", "get_current_transaction_id", 0);
Datum transactionIdHeapDatum = OidFunctionCall0(transactionFuncOid);
/* Now, call the datatype output function on the tuple */
FmgrInfo *outputFunction = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
Oid outputFunctionId = FunctionOid("pg_catalog", "record_out", 1);
fmgr_info(outputFunctionId, outputFunction);
return (OutputFunctionCall(outputFunction, transactionIdHeapDatum));
}
/* /*
* get_current_transaction_id returns a tuple with (databaseId, processId, * get_current_transaction_id returns a tuple with (databaseId, processId,
* initiatorNodeIdentifier, transactionNumber, timestamp) that exists in the * initiatorNodeIdentifier, transactionNumber, timestamp) that exists in the

View File

@ -16,6 +16,7 @@
#include "access/xact.h" #include "access/xact.h"
#include "distributed/backend_data.h" #include "distributed/backend_data.h"
#include "distributed/causal_clock.h"
#include "distributed/citus_safe_lib.h" #include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
@ -848,6 +849,12 @@ CoordinatedRemoteTransactionsPrepare(void)
} }
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
/*
* If "citus.enable_cluster_clock" is enabled, timestamp the transaction
* with the cluster clock and persist its id along with the timestamp.
*/
PrepareAndSetTransactionClock(connectionList);
} }

View File

@ -0,0 +1,208 @@
# Cluster Clock
### Requirement:
Many distributed applications need to track the changes in the same order as they are applied on the database. The changes can be to databases or objects within them, either within a single node or across the sharded cluster.
### Definitions
**Total ordering** - Every pair of change events can be placed in some order.
**Causal ordering** - Only events that are causally related (an event A caused an event B) can be ordered i.e., it's only a partial order - sometimes events happen independently --with no possible causal relationship--, such events are treated as concurrent.
**Sequential consistency** - All writes must be seen in the same order by all processes.
**Causal consistency** - Causally related writes must be seen in the same order.
Transactions on a single node system naturally provide a total and sequential ordering guarantees for client read and write operations as all operations are routed to the same node, but there are challenges for a multi node distributed system, such as, Citus.
One possible way to totally order all the changes in the system is to timestamp all the events with a global physical clock or a centralized logical clock. Thus, observing the events in the increasing order of the timestamp will give the total ordering of events. For both the performance and cost reasons such solutions are impractical. In the absence of total ordering, a little weaker ordering is the **causal order**
Causal order is defined as a model that preserves a partial order of events in a distributed system. If an event
1. A causes another event B, every other process in the system observes the event A before observing event B.
2. Causal order is transitive: if A causes B, and B causes C, then A causes C.
3. Non causally ordered events are treated as concurrent.
Causal consistency is a weak form of consistency that preserves the order of causally related operations. The causal consistency model can be refined into four session guarantees.
1. Read Your Writes: If a process performs a write, the same process later observes the result of its write.
6. Monotonic Reads: The set of writes observed (read) by a process is guaranteed to be monotonically increasing.
7. Writes Follow Reads: If some process performs a read followed by a write, and another process observes the result of the write, then it can also observe the read.
8. Monotonic Writes: If some process performs a write, followed sometime later by another write, other processes will observe them in the same order.
### Hybrid Logical Clock (HLC)
HLC provides a way to get the causality relationship like logical clocks. It can also be used for backup/recovery too as the logical clock value is maintained close to the wall clock time. HLC consists of
LC - Logical clock
C - Counter
Clock components - Unsigned 64 bit <LC, C>
Epoch Milliseconds ( LC ) | Logical counter ( C )|
|--|--|
| 42 bits | 22 bits |
2^42 milliseconds - 4398046511104 milliseconds, which is ~139 years.
2^22 ticks - maximum of four million operations per millisecond.
### New catalog
All the committed transactions are persisted with the transaction id and the commit clock time in a new catalog
**pg_dist_commit_transaction**
Transaction Identifier
(database_id, process_id, initiator_node_identifier, transaction_number, transaction_stamp)
Assuming timestamp never jumps back, this id is globally unique across the cluster and restarts.
|TransactionId| CommitClock (LC, C)| timestamp (epoch) |
|--|--|--|
| (13090,1077913,2,5,"2022-07-26 19:05:09.290597-07") |(1658887880235, 9) | 2022-07-26 19:05:09.290597-07
### GUC
A new GUC parameter, "**citus.enable_global_clock**", enables cluster-wide timestamp for all the transactions and persists them in the table.
### Psuedo code
WC - Current Wall Clock in milliseconds
HLC - Current Hybrid Logical Clock in shared memory
MAXTICKS - Four million
/* Tick for each event */
GetClock()
{
IF (HLC.LC < WC)
HLC.LC = WC;
HLC.C = 0;
ELSE IF (HLC.C == MAXTICKS)
HLC.LC = HLC.LC + 1;
HLC.C = 0;
ELSE
HLC.C = HLC.C + 1;
return HLC;
}
/* Returns true if the clock1 is after clock2*/
IsEventAfter(HLC1, HLC2)
{
IF (HLC1.LC != HLC2.LC)
return (HLC1.LC > HLC2.LC);
ELSE
return (HLC1.C > HLC2.C);
}
/* Simply returns the highest cluster clock value */
CurrentClusterClock()
{
For each node
{
NodeClock[N] = GetClock();
}
/* Return the highest clock value of all the nodes */
return max(NodeClock[N]);
}
/* Adjust the local shared memory clock to the
received value from the remote node */
ReceiveClock(RHLC)
{
IF (RHLC.LC < HLC.LC)
return; /* do nothing */
IF (RHLC.LC > HLC.LC)
HLC.LC = RHLC.LC;
HLC.C = RHLC.C;
return;
IF (RHLC.LC == HLC.LC)
HLC.C = (RHLC.C > HLC.C) ? RHLC.C : HLC.C;
}
/* All the nodes will adjust their clocks to the
highest of the newly committed 2PC */
AdjustClocks(HLC)
{
For each node
{
SendCommand("select ReceiveClock(%s)", HLC);
}
}
/* During prepare, once all the nodes acknowledge
commit, persist the current transaction id along with
the clock value in the catalog */
PrepareTransaction()
{
HLC = CurrentClusterClock();
PersistCatalog(get_current_transaction_id(), HLC);
AdjustClocks(HLC);
}
/* Initialize the shared memory clock value to the
highest clock persisted */
InitClockAtBoot()
{
HLC.LC = WC;
HLC.C = 0;
MAX_LC = "SELECT max(CommitClock.LC) FROM catalog";
IF (MAX_LC != NULL)
{
/*There are prior commits, adjust to that value*/
ReceiveClock(MAX_LC);
}
}
#### Usage
**Step 1**
In the application, track individual changes with the current transaction id
UPDATE track_table
SET TransactionId = get_current_transaction_id(), operation = <>, row_key = <>,....;
**Step 2**
As the transaction commits, if the GUC is enabled, engine internally calls `citus_get_cluster_clock()` and persists the current transaction Id along with the commit cluster clock.
INSERT INTO pg_dist_commit_transaction(TransactionId, CommitClock, timestamp) VALUES (current_transactionId, commit_clock, now())
**Step 3**
How to get all the events in the causal order?
SELECT tt.row_key, tt.operation
FROM track_table tt, pg_dist_commit_transaction cc
WHERE tt.TransactionId = cc.TransactionId
ORDER BY cc.CommitClock
Events for an object
SELECT tt.row_key, tt.operation
FROM track_table tt, pg_dist_commit_transaction cc
WHERE tt.TransactionId = cc.TransactionId
and row_key = $1 ORDER BY cc.CommitClock
Events in the last one hour
SELECT tt.row_key, tt.operation
FROM track_table tt, pg_dist_commit_transaction cc
WHERE cc.timestamp >= now() - interval '1 hour'
and tt.TransactionId = cc.TransactionId
Note: In Citus we use 2PC, if any node goes down after the PREPARE and before COMMIT, we might have changes partially committed. Citus tracks such transactions in **pg_dist_transaction** and eventually be committed when the node becomes healthy, when we track data from committed transaction of **pg_dist_commit_transaction** we will miss the changes from the bad-node.
One way to avoid such anomaly, take only the transactions from **pg_dist_commit_transaction** with clock value less than the minimum clock of the transactions in **pg_dist_transaction**. Caveat is, if the node takes long to recover and the 2PC to fully recover, the visibility of the committed transactions might stall.
### Catalog pruning
The data in **pg_dist_commit_transaction** should be ephemeral data i.e., eventually rows have to automatically be deleted. Users can install a pg_cron job to prune the catalog regularly.
delete from pg_dist_commit_transaction
where timestamp < now() - interval '7 days'
### Limitations of Citus
Using this transaction commit clock ordering to build a secondary, that's a mirror copy of the original, may not be feasible at this time for the following reasons.
Given that there is no well-defined order between concurrent distributed transactions in Citus, we cannot retroactively apply a transaction-order that leads to an exact replica of the primary unless we preserve the original object-level ordering as it happened on individual nodes.
For instance, if a multi-shard insert (transaction A) happens concurrently with a multi-shard update (transaction B) and the WHERE clause of the update matches inserted rows in multiple shards, we could have a scenario in which only a subset of the inserted rows gets updated. Effectively, transaction A might happen before transaction B on node 1, while transaction B happens before transaction A on node 2. While unfortunate, we cannot simply claim changes made by transaction A happened first based on commit timestamps, because that would lead us reorder changes to the same object ID on node 2, which might lead to a different outcome when replayed.
In such scenario, even if we use causal commit clock to order changes. It is essential that the order of modifications to an object matches the original order. Otherwise, you could have above scenarios where an insert happens before an update in the primary cluster, but the update happens before the insert. Replaying the changes would then lead to a different database.
In absence of a coherent transaction-ordering semantics in distributed cluster, best we can do is ensure that changes to the same object are in the correct order and ensure exactly once delivery (correct pagination).

View File

@ -0,0 +1,770 @@
/*-------------------------------------------------------------------------
* causal_clock.c
*
* Core function defintions to implement hybrid logical clock.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include <sys/time.h>
#include "postgres.h"
#include "miscadmin.h"
#include "fmgr.h"
#include "funcapi.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/numeric.h"
#include "utils/typcache.h"
#include "nodes/pg_list.h"
#include "catalog/namespace.h"
#include "commands/extension.h"
#include "executor/spi.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
#include "storage/s_lock.h"
#include "distributed/causal_clock.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/pg_dist_commit_transaction.h"
#include "distributed/remote_commands.h"
#include "distributed/type_utils.h"
PG_FUNCTION_INFO_V1(citus_get_cluster_clock);
PG_FUNCTION_INFO_V1(citus_internal_adjust_local_clock_to_remote);
PG_FUNCTION_INFO_V1(citus_is_clock_after);
/*
* Current state of the logical clock
*/
typedef enum ClockState
{
CLOCKSTATE_INITIALIZED,
CLOCKSTATE_UNINITIALIZED,
CLOCKSTATE_INIT_INPROGRESS,
} ClockState;
/*
* Holds the cluster clock variables in shared memory.
*/
typedef struct LogicalClockShmemData
{
slock_t clockMutex;
/* Current logical clock value of this node */
uint64 clusterClockValue;
/* Tracks initialization at boot */
ClockState clockInitialized;
} LogicalClockShmemData;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static LogicalClockShmemData *logicalClockShmem = NULL;
static uint64 GetEpochTimeMs(void);
static void AdjustLocalClock(uint64 remoteLogicalClock,
uint32 remoteCounterClock);
static uint64 GetNextClusterClockValue(void);
static uint64 GetHighestClockInTransaction(List *nodeConnectionList);
static void AdjustClocksToTransactionHighest(List *nodeConnectionList,
uint64 transactionClockValue);
static void LogTransactionCommitClock(char *transactionId, uint64 transactionClockValue);
static uint64 * ExecuteQueryAndReturnBigIntCols(char *query, int resultSize, int
spiok_type);
static bool IsClockAfter(uint64 logicalClock1, uint32 counterClock1,
uint64 logicalClock2, uint32 counterClock2);
bool EnableClusterClock = false;
/*
* GetEpochTimeMs returns the epoch value in milliseconds.
*/
static uint64
GetEpochTimeMs(void)
{
struct timeval tp;
gettimeofday(&tp, NULL);
uint64 result = (uint64) (tp.tv_sec) * 1000;
result = result + (uint64) (tp.tv_usec) / 1000;
return result;
}
/*
* LogicalClockShmemSize returns the size that should be allocated
* in the shared memory for logical clock management.
*/
size_t
LogicalClockShmemSize(void)
{
Size size = 0;
size = add_size(size, sizeof(LogicalClockShmemData));
return size;
}
/*
* InitializeClusterClockMem reserves shared-memory space needed to
* store LogicalClockShmemData, and sets the hook for initialization
* of the same.
*/
void
InitializeClusterClockMem(void)
{
/* On PG 15 and above, we use shmem_request_hook_type */
#if PG_VERSION_NUM < PG_VERSION_15
/* allocate shared memory for pre PG-15 versions */
if (!IsUnderPostmaster)
{
RequestAddinShmemSpace(LogicalClockShmemSize());
}
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = LogicalClockShmemInit;
}
/*
* LogicalClockShmemInit Allocates and initializes shared memory for
* cluster clock related variables.
*/
void
LogicalClockShmemInit(void)
{
bool alreadyInitialized = false;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
logicalClockShmem = (LogicalClockShmemData *)
ShmemInitStruct("Logical Clock Shmem",
LogicalClockShmemSize(),
&alreadyInitialized);
if (!alreadyInitialized)
{
/* A zero value indicates that the clock is not adjusted yet */
logicalClockShmem->clusterClockValue = 0;
SpinLockInit(&logicalClockShmem->clockMutex);
logicalClockShmem->clockInitialized = CLOCKSTATE_UNINITIALIZED;
}
LWLockRelease(AddinShmemInitLock);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
/*
* GetNextClusterClock implements the internal guts of the UDF citus_get_cluster_clock()
*/
uint64
GetNextClusterClockValue(void)
{
uint64 wallClockValue;
uint64 epochValue = GetEpochTimeMs();
SpinLockAcquire(&logicalClockShmem->clockMutex);
/* Check if the clock is adjusted after the boot */
if (logicalClockShmem->clockInitialized == CLOCKSTATE_UNINITIALIZED)
{
SpinLockRelease(&logicalClockShmem->clockMutex);
Assert(logicalClockShmem->clusterClockValue == 0);
ereport(ERROR, (errmsg("backend never adjusted the clock please retry")));
}
/* Set the epoch in (lc, c) format */
SET_CLOCK(wallClockValue, epochValue, 0);
uint64 nextClusterClockValue = Max(logicalClockShmem->clusterClockValue + 1,
wallClockValue);
logicalClockShmem->clusterClockValue = nextClusterClockValue;
SpinLockRelease(&logicalClockShmem->clockMutex);
return nextClusterClockValue;
}
/*
* IsClockAfter implements the internal guts of the UDF citus_is_clock_after()
*/
static bool
IsClockAfter(uint64 logicalClock1, uint32 counterClock1,
uint64 logicalClock2, uint32 counterClock2)
{
ereport(DEBUG1, (errmsg(
"clock1 @ LC:%lu, C:%u, "
"clock2 @ LC:%lu, C:%u",
logicalClock1, counterClock1,
logicalClock2, counterClock2)));
if (logicalClock1 != logicalClock2)
{
return (logicalClock1 > logicalClock2);
}
else
{
return (counterClock1 > counterClock2);
}
}
/*
* AdjustLocalClock Adjusts the local shared memory clock to the
* received value from the remote node.
*/
void
AdjustLocalClock(uint64 remoteLogicalClock, uint32 remoteCounterClock)
{
uint64 remoteClusterClock;
SET_CLOCK(remoteClusterClock, remoteLogicalClock, remoteCounterClock);
SpinLockAcquire(&logicalClockShmem->clockMutex);
uint64 currentLogicalClock = GET_LOGICAL(logicalClockShmem->clusterClockValue);
uint64 currentCounterClock = GET_COUNTER(logicalClockShmem->clusterClockValue);
if (remoteLogicalClock < currentLogicalClock)
{
/* local clock is ahead, do nothing */
SpinLockRelease(&logicalClockShmem->clockMutex);
return;
}
if (remoteLogicalClock > currentLogicalClock)
{
ereport(DEBUG1, (errmsg("adjusting to remote clock "
"logical(%lu) counter(%u)",
remoteLogicalClock,
remoteCounterClock)));
/* Pick the remote value */
logicalClockShmem->clusterClockValue = remoteClusterClock;
SpinLockRelease(&logicalClockShmem->clockMutex);
return;
}
/*
* Both the logical clock values are equal, pick the larger counter.
*/
if (remoteCounterClock > currentCounterClock)
{
ereport(DEBUG1, (errmsg("both logical clock values are "
"equal(%lu), pick remote's counter (%u) "
"since it's greater",
remoteLogicalClock, remoteCounterClock)));
logicalClockShmem->clusterClockValue = remoteClusterClock;
}
SpinLockRelease(&logicalClockShmem->clockMutex);
}
/*
* GetHighestClockInTransaction takes the connection list of participating nodes in the
* current transaction and polls the logical clock value of all the nodes. Returns the
* highest logical clock value of all the nodes in the current distributed transaction,
* which may be used as commit order for individual objects in the transaction.
*/
static uint64
GetHighestClockInTransaction(List *nodeConnectionList)
{
/* get clock value of the local node */
uint64 globalClockValue = GetNextClusterClockValue();
ereport(DEBUG1, (errmsg("coordinator transaction clock %lu:%u",
GET_LOGICAL(globalClockValue),
(uint32) GET_COUNTER(globalClockValue))));
/* get clock value from each node */
MultiConnection *connection = NULL;
foreach_ptr(connection, nodeConnectionList)
{
int querySent = SendRemoteCommand(connection,
"SELECT logical, counter FROM citus_get_cluster_clock();");
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
/* fetch the results and pick the highest clock value of all the nodes */
foreach_ptr(connection, nodeConnectionList)
{
bool raiseInterrupts = true;
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
ereport(ERROR, (errmsg("connection to %s:%d failed when "
"fetching logical clock value",
connection->hostname, connection->port)));
}
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
int32 rowCount = PQntuples(result);
int32 colCount = PQnfields(result);
/* Although it is not expected */
if (colCount != 2 || rowCount != 1)
{
ereport(ERROR,
(errmsg("unexpected result from citus_get_cluster_clock()")));
}
uint64 logical = ParseIntField(result, 0, 0);
uint32 counter = ParseIntField(result, 0, 1);
uint64 nodeClockValue;
SET_CLOCK(nodeClockValue, logical, counter);
ereport(DEBUG1, (errmsg(
"node(%lu:%u) transaction clock %lu:%u",
connection->connectionId, connection->port,
logical, counter)));
if (nodeClockValue > globalClockValue)
{
globalClockValue = nodeClockValue;
}
PQclear(result);
ForgetResults(connection);
}
ereport(DEBUG1,
(errmsg("final global transaction clock %lu:%u",
GET_LOGICAL(globalClockValue),
(uint32) GET_COUNTER(globalClockValue))));
return globalClockValue;
}
/*
* AdjustClocksToTransactionHighest Sets the clock value of all the nodes, participated
* in the PREPARE of the transaction, to the highest clock value of all the nodes.
*/
static void
AdjustClocksToTransactionHighest(List *nodeConnectionList, uint64 transactionClockValue)
{
StringInfo queryToSend = makeStringInfo();
uint64 transactionLogicalClock = GET_LOGICAL(transactionClockValue);
uint32 transactionCounterClock = GET_COUNTER(transactionClockValue);
/* Set the adjusted value locally */
AdjustLocalClock(transactionLogicalClock, transactionCounterClock);
/* Set the clock value on participating worker nodes */
MultiConnection *connection = NULL;
appendStringInfo(queryToSend,
"SELECT pg_catalog.citus_internal_adjust_local_clock_to_remote"
"((%lu, %u)::pg_catalog.cluster_clock);",
transactionLogicalClock, transactionCounterClock);
foreach_ptr(connection, nodeConnectionList)
{
int querySent = SendRemoteCommand(connection, queryToSend->data);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
/* Process the result */
foreach_ptr(connection, nodeConnectionList)
{
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
ForgetResults(connection);
}
}
/*
* During prepare, once all the nodes acknowledge commit, persist the current
* transaction id along with the clock value in the catalog.
*/
void
PrepareAndSetTransactionClock(List *transactionNodeList)
{
if (!EnableClusterClock)
{
/* citus.enable_cluster_clock is false */
return;
}
char *transactionId = GetCurrentTransactionIdString();
/* Pick the highest logical clock value among all transaction-nodes */
uint64 transactionClockValue = GetHighestClockInTransaction(transactionNodeList);
/* Persist the transactionId along with the logical commit-clock timestamp */
LogTransactionCommitClock(transactionId, transactionClockValue);
/* Adjust all the nodes with the new clock value */
AdjustClocksToTransactionHighest(transactionNodeList, transactionClockValue);
}
/*
* LogTransactionCommitClock registers the committed transaction along
* with the commit clock.
*/
static void
LogTransactionCommitClock(char *transactionId, uint64 transactionClockValue)
{
Datum values[Natts_pg_dist_commit_transaction];
bool isNulls[Natts_pg_dist_commit_transaction];
uint64 clockLogical = GET_LOGICAL(transactionClockValue);
uint32 clockCounter = GET_COUNTER(transactionClockValue);
ereport(DEBUG1, (errmsg("persisting transaction %s with "
"clock logical (%lu) and counter(%u)",
transactionId, clockLogical, clockCounter)));
/* form new transaction tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
uint64 transactionTimestamp = GetEpochTimeMs();
/* Fetch the tuple description of "cluster_clock" type */
Oid schemaId = get_namespace_oid("pg_catalog", false);
Oid clusterClockTypeOid = TypeOid(schemaId, "cluster_clock");
TupleDesc clockTupDesc = lookup_rowtype_tupdesc_copy(clusterClockTypeOid, -1);
values[0] = Int64GetDatum(clockLogical);
values[1] = Int32GetDatum(clockCounter);
HeapTuple clockTuple = heap_form_tuple(clockTupDesc, values, isNulls);
values[Anum_pg_dist_commit_transaction_transaction_id - 1] =
CStringGetTextDatum(transactionId);
values[Anum_pg_dist_commit_transaction_cluster_clock - 1] =
HeapTupleGetDatum(clockTuple);
values[Anum_pg_dist_commit_transaction_timestamp - 1] =
Int64GetDatum(transactionTimestamp);
/* open pg_dist_commit_transaction and insert new tuple */
Relation pgDistCommitTransaction =
table_open(DistCommitTransactionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistCommitTransaction);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistCommitTransaction, heapTuple);
CommandCounterIncrement();
/* close relation and invalidate previous cache entry */
table_close(pgDistCommitTransaction, NoLock);
}
/*
* Initialize the shared memory clock value to the highest clock
* persisted. This will protect from any clock drifts.
*/
void
InitClockAtBoot(void)
{
if (creating_extension)
{
/* No catalog present yet */
return;
}
SpinLockAcquire(&logicalClockShmem->clockMutex);
/* Avoid repeated and parallel initialization */
if (logicalClockShmem->clockInitialized == CLOCKSTATE_INITIALIZED ||
logicalClockShmem->clockInitialized == CLOCKSTATE_INIT_INPROGRESS)
{
SpinLockRelease(&logicalClockShmem->clockMutex);
return;
}
/*Assert(logicalClockShmem->clockInitialized == CLOCKSTATE_UNINITIALIZED);*/
/*
* Set the flag before executing a distributed query
* (else it might trigger this routine recursively.)
*/
logicalClockShmem->clockInitialized = CLOCKSTATE_INIT_INPROGRESS;
SpinLockRelease(&logicalClockShmem->clockMutex);
PG_TRY();
{
/* Start with the wall clock value */
uint64 epochValue = GetEpochTimeMs();
SpinLockAcquire(&logicalClockShmem->clockMutex);
SET_CLOCK(logicalClockShmem->clusterClockValue, epochValue, 0);
SpinLockRelease(&logicalClockShmem->clockMutex);
/*
* Version checking is a must for the below code as it accesses the
* new catalog created in the new version.
* Note: If we move this code to the top of the routine it will cause an
* infinite loop with CitusHasBeenLoaded() calling this routine again.
*/
bool savedEnableVersionChecks = EnableVersionChecks;
EnableVersionChecks = true;
if (!CheckCitusVersion(NOTICE))
{
/* Reset the CLOCKSTATE_INIT_INPROGRESS */
SpinLockAcquire(&logicalClockShmem->clockMutex);
logicalClockShmem->clockInitialized = CLOCKSTATE_UNINITIALIZED;
SET_CLOCK(logicalClockShmem->clusterClockValue, 0, 0);
SpinLockRelease(&logicalClockShmem->clockMutex);
EnableVersionChecks = savedEnableVersionChecks;
return;
}
EnableVersionChecks = savedEnableVersionChecks;
/*
* Select the highest clock value persisted in the catalog.
*
* SELECT i, MAX(j) FROM <tab>
* WHERE i = (SELECT MAX(i) FROM <tab>)
* GROUP BY i
*/
uint32 numCols = 2;
uint64 *results = ExecuteQueryAndReturnBigIntCols(
"SELECT (cluster_clock_value).logical, (cluster_clock_value).counter FROM "
"pg_catalog.pg_dist_commit_transaction ORDER BY 1 DESC, "
"2 DESC LIMIT 1;", numCols, SPI_OK_SELECT);
if (results != NULL)
{
uint64 logicalMaxClock = results[0];
uint32 counterMaxClock = results[1];
ereport(DEBUG1, (errmsg("adjusted the clock with value persisted"
"logical(%lu) and counter(%u)",
logicalMaxClock, counterMaxClock)));
/*
* Adjust the local clock according to the most recent
* clock stamp value persisted in the catalog.
*/
AdjustLocalClock(logicalMaxClock, counterMaxClock);
}
/*
* NULL results indicate no prior commit timestamps on this node, start
* from the wall clock.
*/
}
PG_CATCH();
{
SpinLockAcquire(&logicalClockShmem->clockMutex);
logicalClockShmem->clockInitialized = CLOCKSTATE_UNINITIALIZED;
SET_CLOCK(logicalClockShmem->clusterClockValue, 0, 0);
SpinLockRelease(&logicalClockShmem->clockMutex);
PG_RE_THROW();
}
PG_END_TRY();
SpinLockAcquire(&logicalClockShmem->clockMutex);
logicalClockShmem->clockInitialized = CLOCKSTATE_INITIALIZED;
SpinLockRelease(&logicalClockShmem->clockMutex);
}
/*
* ExecuteQueryAndReturnResults connects to SPI, executes the query and checks
* if the SPI returned the correct type. Returns an array of int64 results
* in the caller's memory context.
*/
static uint64 *
ExecuteQueryAndReturnBigIntCols(char *query, int resultSize, int spiok_type)
{
/* Allocate in caller's context */
uint64 *results = (uint64 *) palloc(resultSize * sizeof(uint64));
int spiResult = SPI_connect();
if (spiResult != SPI_OK_CONNECT)
{
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
spiResult = SPI_execute(query, false, 0);
if (spiResult != spiok_type)
{
ereport(ERROR, (errmsg("could not run SPI query")));
}
if (SPI_processed > 1)
{
ereport(ERROR, (errmsg("query(%s) unexpectedly returned "
"more than one row", query)));
}
if (SPI_processed != 1)
{
/* No rows found, it's up to the caller to handle it */
SPI_finish();
return NULL;
}
for (int i = 0; i < resultSize; i++)
{
bool isNull = false;
results[i] = DatumGetInt64(
SPI_getbinval(SPI_tuptable->vals[0], /* First row */
SPI_tuptable->tupdesc,
i + 1, /* 'i+1' column */
&isNull));
}
spiResult = SPI_finish();
if (spiResult != SPI_OK_FINISH)
{
ereport(ERROR, (errmsg("could not finish SPI connection")));
}
return results;
}
/*
* citus_get_cluster_clock() is an UDF that returns a monotonically increasing
* logical clock. Clock guarantees to never go back in value after restarts, and
* makes best attempt to keep the value close to unix epoch time in milliseconds.
*/
Datum
citus_get_cluster_clock(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL;
Datum values[2];
bool isNull[2];
/* build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
uint64 clusterClockValue = GetNextClusterClockValue();
memset(values, 0, sizeof(values));
memset(isNull, false, sizeof(isNull));
values[0] = Int64GetDatum(GET_LOGICAL(clusterClockValue)); /* LC */
values[1] = Int32GetDatum(GET_COUNTER(clusterClockValue)); /* C */
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNull);
PG_RETURN_DATUM(HeapTupleGetDatum(heapTuple));
}
/*
* citus_internal_adjust_local_clock_to_remote is an internal UDF to adjust
* the local clock to the highest in the cluster.
*/
Datum
citus_internal_adjust_local_clock_to_remote(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
HeapTupleHeader record = PG_GETARG_HEAPTUPLEHEADER(0);
bool isNull[2];
memset(isNull, false, sizeof(isNull));
/* Argument is of complex type (logical, counter) */
uint64 logicalClock =
DatumGetInt64(GetAttributeByName(record, "logical", &isNull[0]));
uint32 counterClock =
DatumGetUInt32(GetAttributeByName(record, "counter", &isNull[1]));
if (isNull[0] || isNull[1])
{
ereport(ERROR, (errmsg("parameters can't be NULL")));
}
AdjustLocalClock(logicalClock, counterClock);
PG_RETURN_VOID();
}
/*
* citus_is_clock_after is an UDF that accepts logical clock timestamps of
* two causally related events and returns true if the argument1 happened
* before argument2.
*/
Datum
citus_is_clock_after(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/* Fetch both the arguments */
HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
bool isNull[2];
memset(isNull, false, sizeof(isNull));
/* Argument is of complex type (logical, counter) */
uint64 logical1 = DatumGetInt64(GetAttributeByName(record1, "logical", &isNull[0]));
uint32 counter1 = DatumGetUInt32(GetAttributeByName(record1, "counter", &isNull[1]));
if (isNull[0] || isNull[1])
{
ereport(ERROR, (errmsg("parameters can't be NULL")));
}
/* Argument is of complex type (logical, counter) */
uint64 logical2 = DatumGetInt64(GetAttributeByName(record2, "logical", &isNull[0]));
uint32 counter2 = DatumGetUInt32(GetAttributeByName(record2, "counter", &isNull[1]));
if (isNull[0] || isNull[1])
{
ereport(ERROR, (errmsg("parameters can't be NULL")));
}
bool result = IsClockAfter(logical1, counter1, logical2, counter2);
PG_RETURN_BOOL(result);
}

View File

@ -0,0 +1,37 @@
/*-------------------------------------------------------------------------
*
* type_utils.c
*
* Utility functions related to types.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/type_utils.h"
#include "catalog/pg_type.h"
#include "utils/syscache.h"
/*
* TypeOid looks for a type that has the given name and schema, and returns the
* corresponding type's oid.
*/
Oid
TypeOid(Oid schemaId, const char *typeName)
{
Oid typeOid = GetSysCacheOid2(TYPENAMENSP, Anum_pg_type_oid,
PointerGetDatum(typeName),
ObjectIdGetDatum(schemaId));
if (!OidIsValid(typeOid))
{
ereport(ERROR, (errmsg("Type (%s) not found", typeName)));
}
return typeOid;
}

View File

@ -80,6 +80,8 @@ extern bool IsRebalancerInternalBackend(void);
extern bool IsCitusRunCommandBackend(void); extern bool IsCitusRunCommandBackend(void);
extern bool IsExternalClientBackend(void); extern bool IsExternalClientBackend(void);
extern void ResetCitusBackendType(void); extern void ResetCitusBackendType(void);
extern char * GetCurrentTransactionIdString(void);
extern Datum get_current_transaction_id(PG_FUNCTION_ARGS);
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0 #define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999 #define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999

View File

@ -0,0 +1,43 @@
/*
* causal_clock.h
*
* Data structure definitions for managing hybrid logical clock and
* related function declarations.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CAUSAL_CLOCK_H
#define CAUSAL_CLOCK_H
extern size_t LogicalClockShmemSize(void);
/*
* Clock components - Unsigned 64 bit <LC, C>
* Logical clock (LC): 42 bits
* Counter (C): 22 bits
*
* 2^42 milliseconds - 4398046511104 milliseconds, which is ~139 years.
* 2^22 ticks - maximum of four million operations per millisecond.
*
*/
#define COUNTER_BITS 22
#define LOGICAL_MASK ((1U << COUNTER_BITS) - 1)
#define MAX_COUNTER LOGICAL_MASK
#define GET_LOGICAL(x) ((x) >> COUNTER_BITS)
#define GET_COUNTER(x) ((x) & LOGICAL_MASK)
/* concatenate logical and counter to form a 64 bit clock value */
#define SET_CLOCK(var, lc, c) var = (((lc) << COUNTER_BITS) | (c))
extern bool EnableClusterClock;
extern void LogicalClockShmemInit(void);
extern void InitializeClusterClockMem(void);
extern void PrepareAndSetTransactionClock(List *transactionNodeList);
extern void InitClockAtBoot(void);
#endif /* CAUSAL_CLOCK_H */

View File

@ -254,6 +254,7 @@ extern Oid DistPlacementShardidIndexId(void);
extern Oid DistPlacementPlacementidIndexId(void); extern Oid DistPlacementPlacementidIndexId(void);
extern Oid DistColocationIndexId(void); extern Oid DistColocationIndexId(void);
extern Oid DistTransactionRelationId(void); extern Oid DistTransactionRelationId(void);
extern Oid DistCommitTransactionRelationId(void);
extern Oid DistTransactionGroupIndexId(void); extern Oid DistTransactionGroupIndexId(void);
extern Oid DistPlacementGroupidIndexId(void); extern Oid DistPlacementGroupidIndexId(void);
extern Oid DistObjectPrimaryKeyIndexId(void); extern Oid DistObjectPrimaryKeyIndexId(void);

View File

@ -0,0 +1,50 @@
/*-------------------------------------------------------------------------
*
* pg_dist_commit_transaction.h
* definition of the "commit-transaction" relation (pg_dist_commit_transaction).
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_COMMIT_TRANSACTION_H
#define PG_DIST_COMMIT_TRANSACTION_H
typedef struct cluster_clock
{
uint64 clockLogical; /* cluster clock logical timestamp at the commit */
uint32 clockCounter; /* cluster clock counter value at the commit */
} cluster_clock;
/* ----------------
* pg_dist_commit_transaction definition.
* ----------------
*/
typedef struct FormData_pg_dist_commit_transaction
{
text transaction_id; /* id of the current transaction committed */
cluster_clock cluster_clock_value; /* logical clock timestamp */
uint64 timestamp; /* epoch timestamp in milliseconds */
} FormData_pg_dist_commit_transaction;
/* ----------------
* Form_pg_dist_commit_transactions corresponds to a pointer to a tuple with
* the format of pg_dist_commit_transactions relation.
* ----------------
*/
typedef FormData_pg_dist_commit_transaction *Form_pg_dist_commit_transaction;
/* ----------------
* compiler constants for pg_dist_commit_transaction
* ----------------
*/
#define Natts_pg_dist_commit_transaction 3
#define Anum_pg_dist_commit_transaction_transaction_id 1
#define Anum_pg_dist_commit_transaction_cluster_clock 2
#define Anum_pg_dist_commit_transaction_timestamp 3
#endif /* PG_DIST_COMMIT_TRANSACTION_H */

View File

@ -0,0 +1,15 @@
/*-------------------------------------------------------------------------
*
* type_utils.h
* Utility functions related to types.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef TYPE_UTILS_H
#define TYPE_UTILS_H
extern Oid TypeOid(Oid schemaId, const char *typeName);
#endif /* TYPE_UTILS_H */

View File

@ -283,3 +283,10 @@ s/^(DETAIL: "[a-z\ ]+ )pg_temp_[0-9]+(\..*" will be created only locally)$/\1pg
# will be replaced with # will be replaced with
# WARNING: "function func(bigint)" has dependency on unsupported object "schema pg_temp_xxx" # WARNING: "function func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
s/^(WARNING|ERROR)(: "[a-z\ ]+ .*" has dependency on unsupported object) "schema pg_temp_[0-9]+"$/\1\2 "schema pg_temp_xxx"/g s/^(WARNING|ERROR)(: "[a-z\ ]+ .*" has dependency on unsupported object) "schema pg_temp_[0-9]+"$/\1\2 "schema pg_temp_xxx"/g
# In clock tests, normalize epoch value(s) and the DEBUG messages printed
s/^(DEBUG: |LOG: )(coordinator|node\([0-9]+:[0-9]+\)|final global|Set) transaction clock [0-9]+.*$/\1\2 transaction clock xxxxxx/g
s/^(NOTICE: )(clock).*LC:[0-9]+,.*C:[0-9]+,.*$/\1\2 xxxxxx/g
/^(DEBUG: )(adjusting to remote clock logical)\([0-9]+\) counter\([0-9]+\)$/d
/^DEBUG: persisting transaction.*counter.*$/d
/^DEBUG: both logical clock values are equal\([0-9]+\), pick remote.*$/d

View File

@ -0,0 +1,141 @@
CREATE SCHEMA clock;
SET search_path TO clock;
SHOW citus.enable_cluster_clock;
citus.enable_cluster_clock
---------------------------------------------------------------------
off
(1 row)
SET citus.enable_cluster_clock to ON;
SHOW citus.enable_cluster_clock;
citus.enable_cluster_clock
---------------------------------------------------------------------
on
(1 row)
CREATE TABLE clock_test (id int, nonid int);
SELECT create_distributed_table('clock_test', 'id', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
--
-- Compare <logical, counter> pairs
--
-- Returns true
SELECT citus_is_clock_after(ROW(5,1), ROW(3,6));
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
-- Returns false
SELECT citus_is_clock_after(ROW(2,9), ROW(3,0));
citus_is_clock_after
---------------------------------------------------------------------
f
(1 row)
-- Returns true
SELECT citus_is_clock_after(ROW(5,6), ROW(5,1));
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
-- Returns false
SELECT citus_is_clock_after(ROW(5,6), ROW(5,6));
citus_is_clock_after
---------------------------------------------------------------------
f
(1 row)
--
-- Check the clock is *monotonically increasing*
--
SELECT citus_get_cluster_clock() \gset t1
SELECT citus_get_cluster_clock() \gset t2
SELECT citus_get_cluster_clock() \gset t3
-- Both should return true
SELECT citus_is_clock_after(:t2citus_get_cluster_clock, :t1citus_get_cluster_clock);
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
SELECT citus_is_clock_after(:'t3citus_get_cluster_clock', :'t2citus_get_cluster_clock');
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
-- Returns false
SELECT citus_is_clock_after(:'t1citus_get_cluster_clock', :'t3citus_get_cluster_clock');
citus_is_clock_after
---------------------------------------------------------------------
f
(1 row)
--
-- Check the value returned by citus_get_cluster_clock is close to Epoch in ms
--
SELECT (extract(epoch from now()) * 1000)::bigint AS epoch \gset
SELECT logical FROM citus_get_cluster_clock() \gset
-- Returns false
SELECT (:logical - :epoch) > 100 as epoch_drift_in_ms;
epoch_drift_in_ms
---------------------------------------------------------------------
f
(1 row)
-- Transaction that accesses multiple nodes
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
COMMIT;
DEBUG: coordinator transaction clock xxxxxx
DEBUG: node(1:57637) transaction clock xxxxxx
DEBUG: node(2:57638) transaction clock xxxxxx
DEBUG: node(3:57637) transaction clock xxxxxx
DEBUG: node(4:57638) transaction clock xxxxxx
DEBUG: node(5:57637) transaction clock xxxxxx
DEBUG: node(6:57638) transaction clock xxxxxx
DEBUG: node(7:57637) transaction clock xxxxxx
DEBUG: node(8:57638) transaction clock xxxxxx
DEBUG: final global transaction clock xxxxxx
--
-- Check to see if the transaction is indeed persisted in the catalog
--
SELECT count(*)
FROM pg_dist_commit_transaction commit_clock
WHERE transaction_id = :'tidget_current_transaction_id';
count
---------------------------------------------------------------------
1
(1 row)
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
ROLLBACK;
RESET client_min_messages;
--
-- Check that the transaction is not persisted
--
SELECT count(*)
FROM pg_dist_commit_transaction commit_clock
WHERE transaction_id = :'tidget_current_transaction_id';
count
---------------------------------------------------------------------
0
(1 row)
RESET client_min_messages;
RESET citus.enable_cluster_clock;
DROP SCHEMA clock CASCADE;
NOTICE: drop cascades to table clock_test

View File

@ -1139,7 +1139,10 @@ SELECT * FROM multi_extension.print_extension_changes();
table columnar.stripe | table columnar.stripe |
| function citus_cleanup_orphaned_resources() | function citus_cleanup_orphaned_resources()
| function citus_copy_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) void | function citus_copy_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) void
| function citus_get_cluster_clock() cluster_clock
| function citus_internal_adjust_local_clock_to_remote(cluster_clock) void
| function citus_internal_delete_partition_metadata(regclass) void | function citus_internal_delete_partition_metadata(regclass) void
| function citus_is_clock_after(cluster_clock,cluster_clock) boolean
| function citus_job_cancel(bigint) void | function citus_job_cancel(bigint) void
| function citus_job_wait(bigint,citus_job_status) void | function citus_job_wait(bigint,citus_job_status) void
| function citus_locks() SETOF record | function citus_locks() SETOF record
@ -1159,13 +1162,15 @@ SELECT * FROM multi_extension.print_extension_changes();
| table pg_dist_background_task | table pg_dist_background_task
| table pg_dist_background_task_depend | table pg_dist_background_task_depend
| table pg_dist_cleanup | table pg_dist_cleanup
| table pg_dist_commit_transaction
| type citus_job_status | type citus_job_status
| type citus_task_status | type citus_task_status
| type cluster_clock
| type replication_slot_info | type replication_slot_info
| type split_copy_info | type split_copy_info
| type split_shard_info | type split_shard_info
| view citus_locks | view citus_locks
(52 rows) (57 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -53,6 +53,7 @@ ORDER BY 1;
function citus_finish_citus_upgrade() function citus_finish_citus_upgrade()
function citus_finish_pg_upgrade() function citus_finish_pg_upgrade()
function citus_get_active_worker_nodes() function citus_get_active_worker_nodes()
function citus_get_cluster_clock()
function citus_internal.find_groupid_for_node(text,integer) function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_trigger_func() function citus_internal.pg_dist_rebalance_strategy_trigger_func()
@ -65,6 +66,7 @@ ORDER BY 1;
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint)
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
function citus_internal_adjust_local_clock_to_remote(cluster_clock)
function citus_internal_delete_colocation_metadata(integer) function citus_internal_delete_colocation_metadata(integer)
function citus_internal_delete_partition_metadata(regclass) function citus_internal_delete_partition_metadata(regclass)
function citus_internal_delete_shard_metadata(bigint) function citus_internal_delete_shard_metadata(bigint)
@ -72,6 +74,7 @@ ORDER BY 1;
function citus_internal_local_blocked_processes() function citus_internal_local_blocked_processes()
function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_internal_update_placement_metadata(bigint,integer,integer)
function citus_internal_update_relation_colocation(oid,integer) function citus_internal_update_relation_colocation(oid,integer)
function citus_is_clock_after(cluster_clock,cluster_clock)
function citus_is_coordinator() function citus_is_coordinator()
function citus_isolation_test_session_is_blocked(integer,integer[]) function citus_isolation_test_session_is_blocked(integer,integer[])
function citus_job_cancel(bigint) function citus_job_cancel(bigint)
@ -253,6 +256,7 @@ ORDER BY 1;
table pg_dist_background_task_depend table pg_dist_background_task_depend
table pg_dist_cleanup table pg_dist_cleanup
table pg_dist_colocation table pg_dist_colocation
table pg_dist_commit_transaction
table pg_dist_local_group table pg_dist_local_group
table pg_dist_node table pg_dist_node
table pg_dist_node_metadata table pg_dist_node_metadata
@ -268,6 +272,7 @@ ORDER BY 1;
type citus_copy_format type citus_copy_format
type citus_job_status type citus_job_status
type citus_task_status type citus_task_status
type cluster_clock
type noderole type noderole
type replication_slot_info type replication_slot_info
type split_copy_info type split_copy_info
@ -283,5 +288,5 @@ ORDER BY 1;
view citus_stat_statements view citus_stat_statements
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(275 rows) (280 rows)

View File

@ -80,7 +80,7 @@ test: multi_reference_table multi_select_for_update relation_access_tracking pg1
test: custom_aggregate_support aggregate_support tdigest_aggregate_support test: custom_aggregate_support aggregate_support tdigest_aggregate_support
test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join anonymous_columns test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join anonymous_columns clock
test: ch_bench_subquery_repartition test: ch_bench_subquery_repartition
test: multi_agg_type_conversion multi_count_type_conversion recursive_relation_planning_restriction_pushdown test: multi_agg_type_conversion multi_count_type_conversion recursive_relation_planning_restriction_pushdown
test: multi_partition_pruning single_hash_repartition_join unsupported_lateral_subqueries test: multi_partition_pruning single_hash_repartition_join unsupported_lateral_subqueries

View File

@ -0,0 +1,78 @@
CREATE SCHEMA clock;
SET search_path TO clock;
SHOW citus.enable_cluster_clock;
SET citus.enable_cluster_clock to ON;
SHOW citus.enable_cluster_clock;
CREATE TABLE clock_test (id int, nonid int);
SELECT create_distributed_table('clock_test', 'id', colocate_with := 'none');
--
-- Compare <logical, counter> pairs
--
-- Returns true
SELECT citus_is_clock_after(ROW(5,1), ROW(3,6));
-- Returns false
SELECT citus_is_clock_after(ROW(2,9), ROW(3,0));
-- Returns true
SELECT citus_is_clock_after(ROW(5,6), ROW(5,1));
-- Returns false
SELECT citus_is_clock_after(ROW(5,6), ROW(5,6));
--
-- Check the clock is *monotonically increasing*
--
SELECT citus_get_cluster_clock() \gset t1
SELECT citus_get_cluster_clock() \gset t2
SELECT citus_get_cluster_clock() \gset t3
-- Both should return true
SELECT citus_is_clock_after(:t2citus_get_cluster_clock, :t1citus_get_cluster_clock);
SELECT citus_is_clock_after(:'t3citus_get_cluster_clock', :'t2citus_get_cluster_clock');
-- Returns false
SELECT citus_is_clock_after(:'t1citus_get_cluster_clock', :'t3citus_get_cluster_clock');
--
-- Check the value returned by citus_get_cluster_clock is close to Epoch in ms
--
SELECT (extract(epoch from now()) * 1000)::bigint AS epoch \gset
SELECT logical FROM citus_get_cluster_clock() \gset
-- Returns false
SELECT (:logical - :epoch) > 100 as epoch_drift_in_ms;
-- Transaction that accesses multiple nodes
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
COMMIT;
--
-- Check to see if the transaction is indeed persisted in the catalog
--
SELECT count(*)
FROM pg_dist_commit_transaction commit_clock
WHERE transaction_id = :'tidget_current_transaction_id';
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
ROLLBACK;
RESET client_min_messages;
--
-- Check that the transaction is not persisted
--
SELECT count(*)
FROM pg_dist_commit_transaction commit_clock
WHERE transaction_id = :'tidget_current_transaction_id';
RESET client_min_messages;
RESET citus.enable_cluster_clock;
DROP SCHEMA clock CASCADE;