This implements a new UDF citus_get_cluster_clock() that returns a monotonically

increasing logical clock. Clock guarantees to never go back in value after restarts,
and makes best attempt to keep the value close to unix epoch time in milliseconds.

Also, introduces a new GUC "citus.enable_cluster_clock", when true, every
distributed transaction is stamped with logical causal clock and persisted
in a catalog pg_dist_commit_transaction.
pull/6463/head
Teja Mupparti 2022-08-17 13:53:00 -07:00 committed by Teja Mupparti
parent 9249fd5c5d
commit 01103ce05d
35 changed files with 2345 additions and 8 deletions

View File

@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$(
DATA_built = $(generated_sql_files)
# directories with source files
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
# enterprise modules
SUBDIRS += replication

View File

@ -0,0 +1,244 @@
# Cluster Clock
### Requirement:
Many distributed applications need to track the changes in the same order as they are applied on the database. The changes can be to databases or objects within them, either within a single node or across the sharded cluster.
### Definitions
**Total ordering** - Every pair of change events can be placed in some order.
**Causal ordering** - Only events that are causally related (an event A caused an event B) can be ordered i.e., it's only a partial order - sometimes events happen independently with no possible causal relationship, such events are treated to concurrent.
**Sequential consistency** - All writes must be seen in the same order by all processes.
**Causal consistency** - Causally related writes must be seen in the same order.
Transactions on a single node system naturally provide a total and sequential ordering guarantees for client read and write operations as all operations are routed to the same node, but there are challenges for a multi node distributed system, such as, Citus.
One possible way to totally order all the changes in the system is to timestamp all the events with a global physical clock or a centralized logical clock. Thus, observing the events in the increasing order of the timestamp will give the total ordering of events. For both the performance and cost reasons such solutions are impractical. In the absence of total ordering, a little weaker ordering is the **causal order**.
Causal order is defined as a model that preserves a partial order of events in a distributed system. If an event
1. A causes another event B, every other process in the system observes the event A before observing event B.
2. Causal order is transitive: if A causes B, and B causes C, then A causes C.
3. Non causally ordered events are treated as concurrent.
Causal consistency is a weak form of consistency that preserves the order of causally related operations. The causal consistency model can be refined into four session guarantees.
1. Read Your Writes: If a process performs a write, the same process later observes the result of its write.
6. Monotonic Reads: The set of writes observed (read) by a process is guaranteed to be monotonically increasing.
7. Writes Follow Reads: If some process performs a read followed by a write, and another process observes the result of the write, then it can also observe the read.
8. Monotonic Writes: If some process performs a write, followed sometime later by another write, other processes will observe them in the same order.
### Hybrid Logical Clock (HLC)
HLC provides a way to get the causality relationship like logical clocks. It can also be used for backup/recovery too as the logical clock value is maintained close to the wall clock time. HLC consists of
LC - Logical clock
C - Counter
Clock components - Unsigned 64 bit <LC, C>
Epoch Milliseconds ( LC ) | Logical counter ( C )|
|--|--|
| 42 bits | 22 bits |
2^42 milliseconds - 4398046511104 milliseconds, which is ~139 years.
2^22 ticks - maximum of four million operations per millisecond.
### UDFs
A new UDF `citus_get_cluster_clock`() that returns a monotonically increasing logical clock. Clock guarantees to never go back in value after restarts and makes best attempt to keep the value close to UNIX epoch time in milliseconds.
A new UDF `citus_get_transaction_clock`(), when called by the user, returns the logical causal clock timestamp current transaction,
Internally, this is the maximum clock among all transaction nodes, and
all nodes move to the new clock.
### GUC
A new GUC parameter, "**citus.enable_cluster_clock**", If clocks go bad for any reason, this serves as a safety valve to avoid the need to change the application and (re)deploy it.
### Sequence
In Unix, though rare, there is a possibility of clock drifting backwards (or
forward) after a restart. In such rare scenarios, we might end up with a logical clock value less than the previously used value, this violates the fundamental requirement of monotonically increasing clock. To avoid such disasters, every logical clock tick is persisted using sequences (non-transactional). After a restart, the persisted sequence value is read and clock starts from that value, which will ensure that system starts the clock from where we left off.
### Psuedo code
WC - Current Wall Clock in milliseconds
HLC - Current Hybrid Logical Clock in shared
memory
MAX_COUNTER - Four million
/* Tick the clock by 1 */
IncrementClusterClock()
{
/* It's the counter that always ticks, once it reaches
the maximum, reset the counter to 1 and increment
the logical clock. */
if (HLC.C == MAX_COUNTER)
{
HLC.LC++;
HLC.C = 0;
return;
}
HLC.C++;
}
/* Tick for each event, must increase monotonically */
GetNextNodeClockValue()
{
IncrementClusterClock(HLC);
/* From the incremented clock and current wall clock,
pick which ever is highest */
NextClock = MAX(HLC, WC);
/* Save the NextClock value in both the shared memory
and sequence */
HLC = NextClock;
SETVAL(pg_dist_clock_logical_seq, HLC);
}
/* Returns true if the clock1 is after clock2 */
IsEventAfter(HLC1, HLC2)
{
IF (HLC1.LC != HLC2.LC)
return (HLC1.LC > HLC2.LC);
ELSE
return (HLC1.C > HLC2.C);
}
/* Simply returns the highest node clock value among all
nodes */
GetHighestClockInTransaction()
{
For each node
{
NodeClock[N] = GetNextNodeClockValue();
}
/* Return the highest clock value of all the nodes */
return MAX(NodeClock[N]);
}
/* Adjust the local shared memory clock to the received
value (RHLC) from the remote node */
AdjustClock(RHLC)
{
/* local clock is ahead or equal, do nothing */
IF (HLC >= RHLC)
{
return;
}
/* Save the remote clockvalue in both the shared
memory and sequence */
HLC = RHLC;
SETVAL(pg_dist_clock_logical_seq, HLC);
}
/* All the nodes will adjust their clocks to the highest
of the newly negotiated clock */
AdjustClocksToTransactionHighest(HLC)
{
For each node
{
SendCommand ("AdjustClock(HLC)");
}
}
/* When citus_get_transaction_clock() UDF is invoked */
PrepareAndSetTransactionClock()
{
/* Pick the highest logical clock value among all
transaction-nodes */
txnCLock = GetHighestClockInTransaction()
/* Adjust all the nodes with the new clock value */
AdjustClocksToTransactionHighest(txnCLock )
return txnClock;
}
/* Initialize the clock value to the highest clock
persisted in sequence */
InitClockAtBoot()
{
/* Start with the current wall clock */
HLC = WC;
IF (SEQUENCE == 1)
/* clock never ticked on this node, start with the
wall clock. */
return;
/* get the most recent clock ever used from disk */
persistedClock =
NEXT_VAL(pg_dist_clock_logical_seq...)
/* Start the clock with persisted value */
AdjustLocalClock(persistedClock);
}
}
#### Usage
**Step 1**
In the application, track every change of a transaction along with the unique transaction ID by calling UDF
`get_current_transaction_id`()
INSERT INTO track_table
SET TransactionId =
get_current_transaction_id(),
operation = <insert/update/delete>,
row_key = <>,
....;
**Step 2**
As the transaction is about to end, and before the COMMIT, capture the causal clock timestamp along with the transaction ID in a table
INSERT INTO transaction_commit_clock
(TransactionId, CommitClock, timestamp)
SELECT
citus_get_transaction_clock(),
get_current_transaction_id(),
now()
**Step 3**
How to get all the events in the causal order?
SELECT tt.row_key, tt.operation
FROM track_table tt,
transaction_commit_clock cc
WHERE tt.TransactionId = cc.TransactionId
ORDER BY cc.CommitClock
Events for an object
SELECT tt.row_key, tt.operation
FROM track_table tt,
transaction_commit_clock cc
WHERE tt.TransactionId = cc.TransactionId
and row_key = $1 ORDER BY cc.CommitClock
Events in the last one hour
SELECT tt.row_key, tt.operation
FROM track_table tt,
transaction_commit_clockcc
WHERE cc.timestamp >= now() - interval '1 hour'
and tt.TransactionId = cc.TransactionId
**Note**: In Citus we use 2PC, if any node goes down after the PREPARE and before the COMMIT, we might have changes partially committed. Citus tracks such transactions in **pg_dist_transaction** and eventually will be committed when the node becomes healthy, but when we track change-data from committed transactions of **transaction_commit_clock** we will miss the changes from a bad node.
To address this issue, proposal is to have a new UDF #TBD, that freezes
the clock and ensures that all the 2PCs are fully complete
(i.e., **pg_dist_transaction** should be empty) and return the highest
clock used. All transactions in `transaction_commit_clock` with
timestamp below this returned clock are visible to the application. The
exact nuances, such as frequency of calling such UDF, are still TBD.
Caveat is, if the node and the 2PC takes long to fully recover, the
visibility of the committed transactions might stall.
### Catalog pruning
The data in **transaction_commit_clock** should be ephemeral data i.e., eventually rows have to automatically be deleted. Users can install a pg_cron job to prune the catalog regularly.
delete from transaction_commit_clock
where timestamp < now() - interval '7 days'
### Limitations of Citus
Using this transaction commit clock ordering to build a secondary, that's a mirror copy of the original, may not be feasible at this time for the following reasons.
Given that there is no well-defined order between concurrent distributed transactions in Citus, we cannot retroactively apply a transaction-order that leads to an exact replica of the primary unless we preserve the original object-level ordering as it happened on individual nodes.
For instance, if a multi-shard insert (transaction A) happens concurrently with a multi-shard update (transaction B) and the WHERE clause of the update matches inserted rows in multiple shards, we could have a scenario in which only a subset of the inserted rows gets updated. Effectively, transaction A might happen before transaction B on node 1, while transaction B happens before transaction A on node 2. While unfortunate, we cannot simply claim changes made by transaction A happened first based on commit timestamps, because that would lead us reorder changes to the same object ID on node 2, which might lead to a different outcome when replayed.
In such scenario, even if we use causal commit clock to order changes. It is essential that the order of modifications to an object matches the original order. Otherwise, you could have above scenarios where an insert happens before an update in the primary cluster, but the update happens before the insert. Replaying the changes would then lead to a different database.
In absence of a coherent transaction-ordering semantics in distributed cluster, best we can do is ensure that changes to the same object are in the correct order and ensure exactly once delivery (correct pagination).

View File

@ -0,0 +1,616 @@
/*-------------------------------------------------------------------------
* causal_clock.c
*
* Core function defintions to implement hybrid logical clock.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include <sys/time.h>
#include "postgres.h"
#include "miscadmin.h"
#include "fmgr.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/numeric.h"
#include "utils/typcache.h"
#include "nodes/pg_list.h"
#include "catalog/namespace.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "executor/spi.h"
#include "postmaster/postmaster.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
#include "storage/s_lock.h"
#include "distributed/causal_clock.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/local_executor.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/placement_connection.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/citus_safe_lib.h"
#define SAVE_AND_PERSIST(c) \
do { \
LogicalClockShmem->clusterClockValue = *(c); \
DirectFunctionCall2(setval_oid, \
ObjectIdGetDatum(DistClockLogicalSequenceId()), \
Int64GetDatum((c)->logical)); \
} while (0)
PG_FUNCTION_INFO_V1(citus_get_node_clock);
PG_FUNCTION_INFO_V1(citus_internal_adjust_local_clock_to_remote);
PG_FUNCTION_INFO_V1(citus_is_clock_after);
PG_FUNCTION_INFO_V1(citus_get_transaction_clock);
/*
* Current state of the logical clock
*/
typedef enum ClockState
{
CLOCKSTATE_INITIALIZED,
CLOCKSTATE_UNINITIALIZED,
} ClockState;
/*
* Holds the cluster clock variables in shared memory.
*/
typedef struct LogicalClockShmemData
{
NamedLWLockTranche namedLockTranche;
LWLock clockLock;
/* Current logical clock value of this node */
ClusterClock clusterClockValue;
/* Tracks initialization at boot */
ClockState clockInitialized;
} LogicalClockShmemData;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static LogicalClockShmemData *LogicalClockShmem = NULL;
static void AdjustLocalClock(ClusterClock *remoteClock);
static void GetNextNodeClockValue(ClusterClock *nextClusterClockValue);
static ClusterClock * GetHighestClockInTransaction(List *nodeConnectionList);
static void AdjustClocksToTransactionHighest(List *nodeConnectionList,
ClusterClock *transactionClockValue);
static void InitClockAtFirstUse(void);
static void IncrementClusterClock(ClusterClock *clusterClock);
static ClusterClock * LargerClock(ClusterClock *clock1, ClusterClock *clock2);
static ClusterClock * PrepareAndSetTransactionClock(void);
bool EnableClusterClock = true;
/*
* GetEpochTimeAsClock returns the epoch value milliseconds used as logical
* value in ClusterClock.
*/
ClusterClock *
GetEpochTimeAsClock(void)
{
struct timeval tp = { 0 };
gettimeofday(&tp, NULL);
uint64 result = (uint64) (tp.tv_sec) * 1000;
result = result + (uint64) (tp.tv_usec) / 1000;
ClusterClock *epochClock = (ClusterClock *) palloc(sizeof(ClusterClock));
epochClock->logical = result;
epochClock->counter = 0;
return epochClock;
}
/*
* LogicalClockShmemSize returns the size that should be allocated
* in the shared memory for logical clock management.
*/
size_t
LogicalClockShmemSize(void)
{
Size size = 0;
size = add_size(size, sizeof(LogicalClockShmemData));
return size;
}
/*
* InitializeClusterClockMem reserves shared-memory space needed to
* store LogicalClockShmemData, and sets the hook for initialization
* of the same.
*/
void
InitializeClusterClockMem(void)
{
/* On PG 15 and above, we use shmem_request_hook_type */
#if PG_VERSION_NUM < PG_VERSION_15
/* allocate shared memory for pre PG-15 versions */
if (!IsUnderPostmaster)
{
RequestAddinShmemSpace(LogicalClockShmemSize());
}
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = LogicalClockShmemInit;
}
/*
* LogicalClockShmemInit Allocates and initializes shared memory for
* cluster clock related variables.
*/
void
LogicalClockShmemInit(void)
{
bool alreadyInitialized = false;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
LogicalClockShmem = (LogicalClockShmemData *)
ShmemInitStruct("Logical Clock Shmem",
LogicalClockShmemSize(),
&alreadyInitialized);
if (!alreadyInitialized)
{
/* A zero value indicates that the clock is not adjusted yet */
memset(&LogicalClockShmem->clusterClockValue, 0, sizeof(ClusterClock));
LogicalClockShmem->namedLockTranche.trancheName = "Cluster Clock Setup Tranche";
LogicalClockShmem->namedLockTranche.trancheId = LWLockNewTrancheId();
LWLockRegisterTranche(LogicalClockShmem->namedLockTranche.trancheId,
LogicalClockShmem->namedLockTranche.trancheName);
LWLockInitialize(&LogicalClockShmem->clockLock,
LogicalClockShmem->namedLockTranche.trancheId);
LogicalClockShmem->clockInitialized = CLOCKSTATE_UNINITIALIZED;
}
LWLockRelease(AddinShmemInitLock);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
/*
* IncrementClusterClock increments the ClusterClock by 1.
*/
static void
IncrementClusterClock(ClusterClock *clusterClock)
{
/*
* It's the counter that always ticks, once it reaches the maximum, reset
* the counter to 1 and increment the logical clock.
*/
if (clusterClock->counter == MAX_COUNTER)
{
clusterClock->logical++;
clusterClock->counter = 0;
return;
}
clusterClock->counter++;
}
/*
* LargerClock compares two ClusterClock(s) and returns pointer to the larger one.
* Note: If equal or one of the clock is NULL, non NULL clock is copied.
*/
static ClusterClock *
LargerClock(ClusterClock *clock1, ClusterClock *clock2)
{
/* Check if one of the paramater is NULL */
if (!clock1 || !clock2)
{
Assert(clock1 || clock2);
return (!clock1 ? clock2 : clock1);
}
if (cluster_clock_cmp_internal(clock1, clock2) > 0)
{
return clock1;
}
else
{
return clock2;
}
}
/*
* GetNextNodeClock implements the internal guts of the UDF citus_get_node_clock()
*/
static void
GetNextNodeClockValue(ClusterClock *nextClusterClockValue)
{
static bool isClockInitChecked = false; /* serves as a local cache */
ClusterClock *epochValue = GetEpochTimeAsClock();
/* If this backend already checked for initialization, skip it */
if (!isClockInitChecked)
{
InitClockAtFirstUse();
/* We reach here only if CLOCKSTATE_INITIALIZED, all other cases error out. */
isClockInitChecked = true;
}
LWLockAcquire(&LogicalClockShmem->clockLock, LW_EXCLUSIVE);
Assert(LogicalClockShmem->clockInitialized == CLOCKSTATE_INITIALIZED);
/* Tick the clock */
IncrementClusterClock(&LogicalClockShmem->clusterClockValue);
/* Pick the larger of the two, wallclock and logical clock. */
ClusterClock *clockValue = LargerClock(&LogicalClockShmem->clusterClockValue,
epochValue);
/*
* Save the returned value in both the shared memory and sequences.
*/
SAVE_AND_PERSIST(clockValue);
LWLockRelease(&LogicalClockShmem->clockLock);
/* Return the clock */
*nextClusterClockValue = *clockValue;
}
/*
* AdjustLocalClock Adjusts the local shared memory clock to the
* received value from the remote node.
*/
void
AdjustLocalClock(ClusterClock *remoteClock)
{
LWLockAcquire(&LogicalClockShmem->clockLock, LW_EXCLUSIVE);
ClusterClock *localClock = &LogicalClockShmem->clusterClockValue;
/* local clock is ahead or equal, do nothing */
if (cluster_clock_cmp_internal(localClock, remoteClock) >= 0)
{
LWLockRelease(&LogicalClockShmem->clockLock);
return;
}
SAVE_AND_PERSIST(remoteClock);
LWLockRelease(&LogicalClockShmem->clockLock);
ereport(DEBUG1, (errmsg("adjusted to remote clock: "
"<logical(%lu) counter(%u)>",
remoteClock->logical,
remoteClock->counter)));
}
/*
* GetHighestClockInTransaction takes the connection list of participating nodes in the
* current transaction and polls the logical clock value of all the nodes. Returns the
* highest logical clock value of all the nodes in the current distributed transaction,
* which may be used as commit order for individual objects in the transaction.
*/
static ClusterClock *
GetHighestClockInTransaction(List *nodeConnectionList)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, nodeConnectionList)
{
int querySent =
SendRemoteCommand(connection, "SELECT citus_get_node_clock();");
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
/* Check for the current node */
ClusterClock *globalClockValue = (ClusterClock *) palloc(sizeof(ClusterClock));
GetNextNodeClockValue(globalClockValue);
ereport(DEBUG1, (errmsg("node(%u) transaction clock %lu:%u",
PostPortNumber, globalClockValue->logical,
globalClockValue->counter)));
/* fetch the results and pick the highest clock value of all the nodes */
foreach_ptr(connection, nodeConnectionList)
{
bool raiseInterrupts = true;
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
ereport(ERROR, (errmsg("connection to %s:%d failed when "
"fetching logical clock value",
connection->hostname, connection->port)));
}
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
ClusterClock *nodeClockValue = ParseClusterClockPGresult(result, 0, 0);
ereport(DEBUG1, (errmsg("node(%u) transaction clock %lu:%u",
connection->port, nodeClockValue->logical,
nodeClockValue->counter)));
globalClockValue = LargerClock(globalClockValue, nodeClockValue);
PQclear(result);
ForgetResults(connection);
}
ereport(DEBUG1, (errmsg("final global transaction clock %lu:%u",
globalClockValue->logical,
globalClockValue->counter)));
return globalClockValue;
}
/*
* AdjustClocksToTransactionHighest Sets the clock value of all the nodes, participated
* in the PREPARE of the transaction, to the highest clock value of all the nodes.
*/
static void
AdjustClocksToTransactionHighest(List *nodeConnectionList,
ClusterClock *transactionClockValue)
{
StringInfo queryToSend = makeStringInfo();
/* Set the clock value on participating worker nodes */
appendStringInfo(queryToSend,
"SELECT pg_catalog.citus_internal_adjust_local_clock_to_remote"
"('(%lu, %u)'::pg_catalog.cluster_clock);",
transactionClockValue->logical, transactionClockValue->counter);
ExecuteRemoteCommandInConnectionList(nodeConnectionList, queryToSend->data);
AdjustLocalClock(transactionClockValue);
}
/*
* PrepareAndSetTransactionClock polls all the transaction-nodes for their respective clocks,
* picks the highest clock and returns it via UDF citus_get_transaction_clock. All the nodes
* will now move to this newly negotiated clock.
*/
static ClusterClock *
PrepareAndSetTransactionClock(void)
{
if (!EnableClusterClock)
{
/* citus.enable_cluster_clock is false */
ereport(WARNING, (errmsg("GUC enable_cluster_clock is off")));
return NULL;
}
RequireTransactionBlock(true, "citus_get_transaction_clock");
dlist_iter iter;
List *transactionNodeList = NIL;
List *nodeList = NIL;
/* Prepare the connection list */
dlist_foreach(iter, &InProgressTransactions)
{
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port);
/* Skip the node if we already in the list */
if (list_member_int(nodeList, workerNode->groupId))
{
continue;
}
RemoteTransaction *transaction = &connection->remoteTransaction;
Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED);
/* Skip a transaction that failed */
if (transaction->transactionFailed)
{
continue;
}
nodeList = lappend_int(nodeList, workerNode->groupId);
transactionNodeList = lappend(transactionNodeList, connection);
}
/* Pick the highest logical clock value among all transaction-nodes */
ClusterClock *transactionClockValue =
GetHighestClockInTransaction(transactionNodeList);
/* Adjust all the nodes with the new clock value */
AdjustClocksToTransactionHighest(transactionNodeList, transactionClockValue);
return transactionClockValue;
}
/*
* InitClockAtFirstUse Initializes the shared memory clock value to the highest clock
* persisted. This will protect from any clock drifts.
*/
static void
InitClockAtFirstUse(void)
{
LWLockAcquire(&LogicalClockShmem->clockLock, LW_EXCLUSIVE);
/* Avoid repeated and parallel initialization */
if (LogicalClockShmem->clockInitialized == CLOCKSTATE_INITIALIZED)
{
LWLockRelease(&LogicalClockShmem->clockLock);
return;
}
if (DistClockLogicalSequenceId() == InvalidOid)
{
ereport(ERROR, (errmsg("Clock related sequence is missing")));
}
/* Start with the wall clock value */
ClusterClock *epochValue = GetEpochTimeAsClock();
LogicalClockShmem->clusterClockValue = *epochValue;
/* Retrieve the highest clock value persisted in the sequence */
ClusterClock persistedMaxClock = { 0 };
/*
* We will get one more than the persisted value, but that's harmless and
* also very _crucial_ in below scenarios
*
* 1) As sequences are not transactional, this will protect us from crashes
* after the logical increment and before the counter increment.
*
* 2) If a clock drifts backwards, we should always start one clock above
* the previous value, though we are not persisting the counter as the
* logical value supersedes the counter, a simple increment of it will
* protect us.
*
* Note: The first (and every 32nd) call to nextval() consumes 32 values in the
* WAL. This is an optimization that postgres does to only have to write a WAL
* entry every 32 invocations. Normally this is harmless, however, if the database
* gets in a crashloop it could outrun the wall clock, if the database crashes at
* a higher rate than once every 32 seconds.
*
*/
persistedMaxClock.logical =
DirectFunctionCall1(nextval_oid, ObjectIdGetDatum(DistClockLogicalSequenceId()));
/*
* Sequence 1 indicates no prior clock timestamps on this server, retain
* the wall clock i.e. no adjustment needed.
*/
if (persistedMaxClock.logical != 1)
{
ereport(DEBUG1, (errmsg("adjusting the clock with persisted value: "
"<logical(%lu) and counter(%u)>",
persistedMaxClock.logical,
persistedMaxClock.counter)));
/*
* Adjust the local clock according to the most recent
* clock stamp value persisted in the catalog.
*/
if (cluster_clock_cmp_internal(&persistedMaxClock, epochValue) > 0)
{
SAVE_AND_PERSIST(&persistedMaxClock);
ereport(NOTICE, (errmsg("clock drifted backwards, adjusted to: "
"<logical(%lu) counter(%u)>",
persistedMaxClock.logical,
persistedMaxClock.counter)));
}
}
LogicalClockShmem->clockInitialized = CLOCKSTATE_INITIALIZED;
LWLockRelease(&LogicalClockShmem->clockLock);
}
/*
* citus_get_node_clock() is an UDF that returns a monotonically increasing
* logical clock. Clock guarantees to never go back in value after restarts, and
* makes best attempt to keep the value close to unix epoch time in milliseconds.
*/
Datum
citus_get_node_clock(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ClusterClock *nodeClockValue = (ClusterClock *) palloc(sizeof(ClusterClock));
GetNextNodeClockValue(nodeClockValue);
PG_RETURN_POINTER(nodeClockValue);
}
/*
* citus_internal_adjust_local_clock_to_remote is an internal UDF to adjust
* the local clock to the highest in the cluster.
*/
Datum
citus_internal_adjust_local_clock_to_remote(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ClusterClock *remoteClock = (ClusterClock *) PG_GETARG_POINTER(0);
AdjustLocalClock(remoteClock);
PG_RETURN_VOID();
}
/*
* citus_is_clock_after is an UDF that accepts logical clock timestamps of
* two causally related events and returns true if the argument1 happened
* before argument2.
*/
Datum
citus_is_clock_after(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/* Fetch both the arguments */
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
ereport(DEBUG1, (errmsg(
"clock1 @ LC:%lu, C:%u, "
"clock2 @ LC:%lu, C:%u",
clock1->logical, clock1->counter,
clock2->logical, clock2->counter)));
bool result = (cluster_clock_cmp_internal(clock1, clock2) > 0);
PG_RETURN_BOOL(result);
}
/*
* citus_get_transaction_clock() is an UDF that returns a transaction timestamp
* logical clock. Clock returned is the maximum of all transaction-nodes and the
* all the nodes adjust to the this new clock value.
*/
Datum
citus_get_transaction_clock(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ClusterClock *clusterClockValue = PrepareAndSetTransactionClock();
PG_RETURN_POINTER(clusterClockValue);
}

View File

@ -424,6 +424,43 @@ ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command)
}
/*
* ExecuteRemoteCommandInConnectionList executes a remote command, on all connections
* given in the list, that is critical to the transaction. If the command fails then
* the transaction aborts.
*/
void
ExecuteRemoteCommandInConnectionList(List *nodeConnectionList, const char *command)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, nodeConnectionList)
{
int querySent = SendRemoteCommand(connection, command);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
/* Process the result */
foreach_ptr(connection, nodeConnectionList)
{
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
ForgetResults(connection);
}
}
/*
* ExecuteOptionalRemoteCommand executes a remote command. If the command fails a WARNING
* is emitted but execution continues.

View File

@ -212,6 +212,7 @@ typedef struct MetadataCacheData
Oid jsonbExtractPathFuncId;
Oid jsonbExtractPathTextFuncId;
Oid CitusDependentObjectFuncId;
Oid distClockLogicalSequenceId;
bool databaseNameValid;
char databaseName[NAMEDATALEN];
} MetadataCacheData;
@ -2598,6 +2599,16 @@ DistBackgroundTaskTaskIdSequenceId(void)
}
Oid
DistClockLogicalSequenceId(void)
{
CachedRelationLookup("pg_dist_clock_logical_seq",
&MetadataCache.distClockLogicalSequenceId);
return MetadataCache.distClockLogicalSequenceId;
}
Oid
DistBackgroundTaskDependRelationId(void)
{

View File

@ -33,6 +33,7 @@
#include "executor/executor.h"
#include "distributed/backend_data.h"
#include "distributed/background_jobs.h"
#include "distributed/causal_clock.h"
#include "distributed/citus_depended_object.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_safe_lib.h"
@ -459,6 +460,7 @@ _PG_init(void)
InitializeCitusQueryStats();
InitializeSharedConnectionStats();
InitializeLocallyReservedSharedConnections();
InitializeClusterClockMem();
/* initialize shard split shared memory handle management */
InitializeShardSplitSMHandleManagement();
@ -544,6 +546,7 @@ citus_shmem_request(void)
RequestAddinShmemSpace(SharedConnectionStatsShmemSize());
RequestAddinShmemSpace(MaintenanceDaemonShmemSize());
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
RequestAddinShmemSpace(LogicalClockShmemSize());
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
}
@ -1119,6 +1122,19 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_cluster_clock",
gettext_noop("When users explicitly call UDF citus_get_transaction_clock() "
"and the flag is true, it returns the maximum "
"clock among all nodes. All nodes move to the "
"new clock. If clocks go bad for any reason, "
"this serves as a safety valve."),
NULL,
&EnableClusterClock,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_cost_based_connection_establishment",
gettext_noop("When enabled the connection establishment times "

View File

@ -1,2 +1,11 @@
-- citus--11.1-1--11.2-1
-- bump version to 11.2-1
#include "udfs/get_rebalance_progress/11.2-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/11.2-1.sql"
#include "datatypes/citus_cluster_clock/11.2-1.sql"
#include "udfs/citus_get_node_clock/11.2-1.sql"
#include "udfs/citus_get_transaction_clock/11.2-1.sql"
#include "udfs/citus_is_clock_after/11.2-1.sql"
#include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql"

View File

@ -0,0 +1,133 @@
--
-- cluster_clock base type is a combination of
-- uint64 cluster clock logical timestamp at the commit
-- uint32 cluster clock counter(ticks with in the logical clock)
--
CREATE TYPE citus.cluster_clock;
CREATE FUNCTION pg_catalog.cluster_clock_in(cstring)
RETURNS citus.cluster_clock
AS 'MODULE_PATHNAME',$$cluster_clock_in$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_out(citus.cluster_clock)
RETURNS cstring
AS 'MODULE_PATHNAME',$$cluster_clock_out$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_recv(internal)
RETURNS citus.cluster_clock
AS 'MODULE_PATHNAME',$$cluster_clock_recv$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_send(citus.cluster_clock)
RETURNS bytea
AS 'MODULE_PATHNAME',$$cluster_clock_send$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_logical(citus.cluster_clock)
RETURNS bigint
AS 'MODULE_PATHNAME',$$cluster_clock_logical$$
LANGUAGE C IMMUTABLE STRICT;
CREATE TYPE citus.cluster_clock (
internallength = 12, -- specifies the size of the memory block required to hold the type uint64 + uint32
input = cluster_clock_in,
output = cluster_clock_out,
receive = cluster_clock_recv,
send = cluster_clock_send
);
ALTER TYPE citus.cluster_clock SET SCHEMA pg_catalog;
COMMENT ON TYPE cluster_clock IS 'combination of (logical, counter): 42 bits + 22 bits';
--
-- Define the required operators
--
CREATE FUNCTION pg_catalog.cluster_clock_lt(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_lt$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_le(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_le$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_eq(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_eq$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_ne(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_ne$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_ge(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_ge$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_gt(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_gt$$
LANGUAGE C IMMUTABLE STRICT;
CREATE OPERATOR < (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_lt,
commutator = > , negator = >= ,
restrict = scalarltsel, join = scalarltjoinsel
);
CREATE OPERATOR <= (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_le,
commutator = >= , negator = > ,
restrict = scalarlesel, join = scalarlejoinsel
);
CREATE OPERATOR = (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_eq,
commutator = = ,
negator = <> ,
restrict = eqsel, join = eqjoinsel
);
CREATE OPERATOR <> (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_ne,
commutator = <> ,
negator = = ,
restrict = neqsel, join = neqjoinsel
);
CREATE OPERATOR >= (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_ge,
commutator = <= , negator = < ,
restrict = scalargesel, join = scalargejoinsel
);
CREATE OPERATOR > (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_gt,
commutator = < , negator = <= ,
restrict = scalargtsel, join = scalargtjoinsel
);
-- Create the support function too
CREATE FUNCTION pg_catalog.cluster_clock_cmp(cluster_clock, cluster_clock) RETURNS int4
AS 'MODULE_PATHNAME',$$cluster_clock_cmp$$
LANGUAGE C IMMUTABLE STRICT;
-- Define operator class to be be used by an index for type cluster_clock.
CREATE OPERATOR CLASS pg_catalog.cluster_clock_ops
DEFAULT FOR TYPE cluster_clock USING btree AS
OPERATOR 1 < ,
OPERATOR 2 <= ,
OPERATOR 3 = ,
OPERATOR 4 >= ,
OPERATOR 5 > ,
FUNCTION 1 cluster_clock_cmp(cluster_clock, cluster_clock);
--
-- Create sequences for logical and counter fields of the type cluster_clock, to
-- be used as a storage.
--
CREATE SEQUENCE citus.pg_dist_clock_logical_seq START 1;
ALTER SEQUENCE citus.pg_dist_clock_logical_seq SET SCHEMA pg_catalog;
REVOKE UPDATE ON SEQUENCE pg_catalog.pg_dist_clock_logical_seq FROM public;

View File

@ -1,2 +1,12 @@
-- citus--11.2-1--11.1-1
#include "../udfs/get_rebalance_progress/11.1-1.sql"
#include "../udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
DROP FUNCTION pg_catalog.citus_get_node_clock();
DROP FUNCTION pg_catalog.citus_get_transaction_clock();
DROP FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(cluster_clock);
DROP FUNCTION pg_catalog.citus_is_clock_after(cluster_clock, cluster_clock);
DROP FUNCTION pg_catalog.cluster_clock_logical(cluster_clock);
DROP SEQUENCE pg_catalog.pg_dist_clock_logical_seq;
DROP OPERATOR CLASS pg_catalog.cluster_clock_ops USING btree CASCADE;
DROP OPERATOR FAMILY pg_catalog.cluster_clock_ops USING btree CASCADE;
DROP TYPE pg_catalog.cluster_clock CASCADE;

View File

@ -0,0 +1,158 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
DECLARE
table_name regclass;
command text;
trigger_name text;
BEGIN
IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN
EXECUTE $cmd$
-- disable propagation to prevent EnsureCoordinator errors
-- the aggregate created here does not depend on Citus extension (yet)
-- since we add the dependency with the next command
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray);
COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
ELSE
EXECUTE $cmd$
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
COMMENT ON AGGREGATE array_cat_agg(anyarray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
END IF;
--
-- Citus creates the array_cat_agg but because of a compatibility
-- issue between pg13-pg14, we drop and create it during upgrade.
-- And as Citus creates it, there needs to be a dependency to the
-- Citus extension, so we create that dependency here.
-- We are not using:
-- ALTER EXENSION citus DROP/CREATE AGGREGATE array_cat_agg
-- because we don't have an easy way to check if the aggregate
-- exists with anyarray type or anycompatiblearray type.
INSERT INTO pg_depend
SELECT
'pg_proc'::regclass::oid as classid,
(SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'e' as deptype;
--
-- restore citus catalog tables
--
INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition;
INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard;
INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement;
INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata;
INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node;
INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group;
INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction;
INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation;
INSERT INTO pg_catalog.pg_dist_cleanup SELECT * FROM public.pg_dist_cleanup;
-- enterprise catalog tables
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT
name,
default_strategy,
shard_cost_function::regprocedure::regproc,
node_capacity_function::regprocedure::regproc,
shard_allowed_on_node_function::regprocedure::regproc,
default_threshold,
minimum_threshold,
improvement_threshold
FROM public.pg_dist_rebalance_strategy;
--
-- drop backup tables
--
DROP TABLE public.pg_dist_authinfo;
DROP TABLE public.pg_dist_colocation;
DROP TABLE public.pg_dist_local_group;
DROP TABLE public.pg_dist_node;
DROP TABLE public.pg_dist_node_metadata;
DROP TABLE public.pg_dist_partition;
DROP TABLE public.pg_dist_placement;
DROP TABLE public.pg_dist_poolinfo;
DROP TABLE public.pg_dist_shard;
DROP TABLE public.pg_dist_transaction;
DROP TABLE public.pg_dist_rebalance_strategy;
DROP TABLE public.pg_dist_cleanup;
--
-- reset sequences
--
PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false);
PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false);
PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false);
PERFORM setval('pg_catalog.pg_dist_operationid_seq', (SELECT MAX(operation_id)+1 AS max_operation_id FROM pg_dist_cleanup), false);
PERFORM setval('pg_catalog.pg_dist_cleanup_recordid_seq', (SELECT MAX(record_id)+1 AS max_record_id FROM pg_dist_cleanup), false);
PERFORM setval('pg_catalog.pg_dist_clock_logical_seq', (SELECT last_val FROM public.pg_dist_clock_logical_seq), false);
DROP TABLE public.pg_dist_clock_logical_seq;
--
-- register triggers
--
FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition JOIN pg_class ON (logicalrelid = oid) WHERE relkind <> 'f'
LOOP
trigger_name := 'truncate_trigger_' || table_name::oid;
command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()';
EXECUTE command;
command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name);
EXECUTE command;
END LOOP;
--
-- set dependencies
--
INSERT INTO pg_depend
SELECT
'pg_class'::regclass::oid as classid,
p.logicalrelid::regclass::oid as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'n' as deptype
FROM pg_catalog.pg_dist_partition p;
-- set dependencies for columnar table access method
PERFORM columnar_internal.columnar_ensure_am_depends_catalog();
-- restore pg_dist_object from the stable identifiers
TRUNCATE pg_catalog.pg_dist_object;
INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid)
SELECT
address.classid,
address.objid,
address.objsubid,
naming.distribution_argument_index,
naming.colocationid
FROM
public.pg_dist_object naming,
pg_catalog.pg_get_object_address(naming.type, naming.object_names, naming.object_args) address;
DROP TABLE public.pg_dist_object;
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade()
IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade';

View File

@ -103,6 +103,10 @@ BEGIN
PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false);
PERFORM setval('pg_catalog.pg_dist_operationid_seq', (SELECT MAX(operation_id)+1 AS max_operation_id FROM pg_dist_cleanup), false);
PERFORM setval('pg_catalog.pg_dist_cleanup_recordid_seq', (SELECT MAX(record_id)+1 AS max_record_id FROM pg_dist_cleanup), false);
PERFORM setval('pg_catalog.pg_dist_clock_logical_seq', (SELECT last_val FROM public.pg_dist_clock_logical_seq), false);
DROP TABLE public.pg_dist_clock_logical_seq;
--
-- register triggers

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_get_node_clock()
RETURNS pg_catalog.cluster_clock
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_get_node_clock$$;
COMMENT ON FUNCTION pg_catalog.citus_get_node_clock()
IS 'Returns monotonically increasing timestamp with logical clock value as close to epoch value (in milli seconds) as possible, and a counter for ticks(maximum of 4 million) within the logical clock';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_get_node_clock()
RETURNS pg_catalog.cluster_clock
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_get_node_clock$$;
COMMENT ON FUNCTION pg_catalog.citus_get_node_clock()
IS 'Returns monotonically increasing timestamp with logical clock value as close to epoch value (in milli seconds) as possible, and a counter for ticks(maximum of 4 million) within the logical clock';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_get_transaction_clock()
RETURNS pg_catalog.cluster_clock
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_get_transaction_clock$$;
COMMENT ON FUNCTION pg_catalog.citus_get_transaction_clock()
IS 'Returns a transaction timestamp logical clock';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_get_transaction_clock()
RETURNS pg_catalog.cluster_clock
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_get_transaction_clock$$;
COMMENT ON FUNCTION pg_catalog.citus_get_transaction_clock()
IS 'Returns a transaction timestamp logical clock';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
RETURNS void
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME', $$citus_internal_adjust_local_clock_to_remote$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
IS 'Internal UDF used to adjust the local clock to the maximum of nodes in the cluster';
REVOKE ALL ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock) FROM PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
RETURNS void
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME', $$citus_internal_adjust_local_clock_to_remote$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
IS 'Internal UDF used to adjust the local clock to the maximum of nodes in the cluster';
REVOKE ALL ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock) FROM PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_is_clock_after(clock_one pg_catalog.cluster_clock, clock_two pg_catalog.cluster_clock)
RETURNS BOOL
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_is_clock_after$$;
COMMENT ON FUNCTION pg_catalog.citus_is_clock_after(pg_catalog.cluster_clock, pg_catalog.cluster_clock)
IS 'Accepts logical clock timestamps of two causally related events and returns true if the argument1 happened before argument2';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_is_clock_after(clock_one pg_catalog.cluster_clock, clock_two pg_catalog.cluster_clock)
RETURNS BOOL
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_is_clock_after$$;
COMMENT ON FUNCTION pg_catalog.citus_is_clock_after(pg_catalog.cluster_clock, pg_catalog.cluster_clock)
IS 'Accepts logical clock timestamps of two causally related events and returns true if the argument1 happened before argument2';

View File

@ -0,0 +1,79 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
BEGIN
DELETE FROM pg_depend WHERE
objid IN (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') AND
refobjid IN (select oid from pg_extension where extname = 'citus');
--
-- We are dropping the aggregates because postgres 14 changed
-- array_cat type from anyarray to anycompatiblearray. When
-- upgrading to pg14, specifically when running pg_restore on
-- array_cat_agg we would get an error. So we drop the aggregate
-- and create the right one on citus_finish_pg_upgrade.
DROP AGGREGATE IF EXISTS array_cat_agg(anyarray);
DROP AGGREGATE IF EXISTS array_cat_agg(anycompatiblearray);
--
-- Drop existing backup tables
--
DROP TABLE IF EXISTS public.pg_dist_partition;
DROP TABLE IF EXISTS public.pg_dist_shard;
DROP TABLE IF EXISTS public.pg_dist_placement;
DROP TABLE IF EXISTS public.pg_dist_node_metadata;
DROP TABLE IF EXISTS public.pg_dist_node;
DROP TABLE IF EXISTS public.pg_dist_local_group;
DROP TABLE IF EXISTS public.pg_dist_transaction;
DROP TABLE IF EXISTS public.pg_dist_colocation;
DROP TABLE IF EXISTS public.pg_dist_authinfo;
DROP TABLE IF EXISTS public.pg_dist_poolinfo;
DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy;
DROP TABLE IF EXISTS public.pg_dist_object;
DROP TABLE IF EXISTS public.pg_dist_cleanup;
DROP TABLE IF EXISTS public.pg_dist_clock_logical_seq;
--
-- backup citus catalog tables
--
CREATE TABLE public.pg_dist_partition AS SELECT * FROM pg_catalog.pg_dist_partition;
CREATE TABLE public.pg_dist_shard AS SELECT * FROM pg_catalog.pg_dist_shard;
CREATE TABLE public.pg_dist_placement AS SELECT * FROM pg_catalog.pg_dist_placement;
CREATE TABLE public.pg_dist_node_metadata AS SELECT * FROM pg_catalog.pg_dist_node_metadata;
CREATE TABLE public.pg_dist_node AS SELECT * FROM pg_catalog.pg_dist_node;
CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group;
CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction;
CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation;
CREATE TABLE public.pg_dist_cleanup AS SELECT * FROM pg_catalog.pg_dist_cleanup;
-- enterprise catalog tables
CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo;
CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo;
-- sequences
CREATE TABLE public.pg_dist_clock_logical_seq AS SELECT last_val FROM pg_catalog.pg_dist_clock_logical_seq;
CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT
name,
default_strategy,
shard_cost_function::regprocedure::text,
node_capacity_function::regprocedure::text,
shard_allowed_on_node_function::regprocedure::text,
default_threshold,
minimum_threshold,
improvement_threshold
FROM pg_catalog.pg_dist_rebalance_strategy;
-- store upgrade stable identifiers on pg_dist_object catalog
CREATE TABLE public.pg_dist_object AS SELECT
address.type,
address.object_names,
address.object_args,
objects.distribution_argument_index,
objects.colocationid
FROM pg_catalog.pg_dist_object objects,
pg_catalog.pg_identify_object_as_address(objects.classid, objects.objid, objects.objsubid) address;
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_prepare_pg_upgrade()
IS 'perform tasks to copy citus settings to a location that could later be restored after pg_upgrade is done';

View File

@ -33,6 +33,7 @@ BEGIN
DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy;
DROP TABLE IF EXISTS public.pg_dist_object;
DROP TABLE IF EXISTS public.pg_dist_cleanup;
DROP TABLE IF EXISTS public.pg_dist_clock_logical_seq;
--
-- backup citus catalog tables
@ -49,6 +50,8 @@ BEGIN
-- enterprise catalog tables
CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo;
CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo;
-- sequences
CREATE TABLE public.pg_dist_clock_logical_seq AS SELECT last_val FROM pg_catalog.pg_dist_clock_logical_seq;
CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT
name,
default_strategy,

View File

@ -0,0 +1,329 @@
/*-------------------------------------------------------------------------
*
* type_utils.c
*
* Utility functions related to types.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "libpq-fe.h"
#include "catalog/pg_type.h"
#include "nodes/pg_list.h"
#include "utils/syscache.h"
#include "libpq/pqformat.h"
#include "distributed/causal_clock.h"
#define NUM_CLUSTER_CLOCK_ARGS 2
#define LDELIM '('
#define RDELIM ')'
#define DELIM ','
static ClusterClock * cluster_clock_in_internal(char *clockString);
PG_FUNCTION_INFO_V1(cluster_clock_in);
PG_FUNCTION_INFO_V1(cluster_clock_out);
PG_FUNCTION_INFO_V1(cluster_clock_recv);
PG_FUNCTION_INFO_V1(cluster_clock_send);
/*
* cluster_clock_in_internal generic routine to parse the cluster_clock format of (logical, counter),
* (%lu, %u), in string format to ClusterClock struct internal format.
*/
static ClusterClock *
cluster_clock_in_internal(char *clockString)
{
char *clockFields[NUM_CLUSTER_CLOCK_ARGS];
int numClockField = 0;
for (char *currentChar = clockString;
(*currentChar) && (numClockField < NUM_CLUSTER_CLOCK_ARGS) && (*currentChar !=
RDELIM);
currentChar++)
{
if (*currentChar == DELIM || (*currentChar == LDELIM && !numClockField))
{
clockFields[numClockField++] = currentChar + 1;
}
}
if (numClockField < NUM_CLUSTER_CLOCK_ARGS)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"cluster_clock",
clockString)));
}
char *endingChar = NULL;
errno = 0;
int64 logical = strtoul(clockFields[0], &endingChar, 10);
if (errno || (*endingChar != DELIM) || (logical > MAX_LOGICAL) || logical < 0)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"cluster_clock",
clockString)));
}
int64 counter = strtol(clockFields[1], &endingChar, 10);
if (errno || (*endingChar != RDELIM) || (counter > MAX_COUNTER) || counter < 0)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"cluster_clock",
clockString)));
}
ClusterClock *clusterClock = (ClusterClock *) palloc(sizeof(ClusterClock));
clusterClock->logical = logical;
clusterClock->counter = counter;
return clusterClock;
}
/*
* cluster_clock_in converts the cstring input format to the ClusterClock type.
*/
Datum
cluster_clock_in(PG_FUNCTION_ARGS)
{
char *clockString = PG_GETARG_CSTRING(0);
PG_RETURN_POINTER(cluster_clock_in_internal(clockString));
}
/*
* cluster_clock_out converts the internal ClusterClock format to cstring output.
*/
Datum
cluster_clock_out(PG_FUNCTION_ARGS)
{
ClusterClock *clusterClock = (ClusterClock *) PG_GETARG_POINTER(0);
if (clusterClock == NULL)
{
PG_RETURN_CSTRING("");
}
char *clockString = psprintf("(%lu,%u)", clusterClock->logical,
clusterClock->counter);
PG_RETURN_CSTRING(clockString);
}
/*
* cluster_clock_recv converts external binary format to ClusterClock.
*/
Datum
cluster_clock_recv(PG_FUNCTION_ARGS)
{
StringInfo clockBuffer = (StringInfo) PG_GETARG_POINTER(0);
ClusterClock *clusterClock = (ClusterClock *) palloc(sizeof(ClusterClock));
clusterClock->logical = pq_getmsgint64(clockBuffer);
clusterClock->counter = pq_getmsgint(clockBuffer, sizeof(int));
PG_RETURN_POINTER(clusterClock);
}
/*
* cluster_clock_send converts ClusterClock to binary format.
*/
Datum
cluster_clock_send(PG_FUNCTION_ARGS)
{
ClusterClock *clusterClock = (ClusterClock *) PG_GETARG_POINTER(0);
StringInfoData clockBuffer;
pq_begintypsend(&clockBuffer);
pq_sendint64(&clockBuffer, clusterClock->logical);
pq_sendint32(&clockBuffer, clusterClock->counter);
PG_RETURN_BYTEA_P(pq_endtypsend(&clockBuffer));
}
/*****************************************************************************
* PUBLIC ROUTINES *
*****************************************************************************/
PG_FUNCTION_INFO_V1(cluster_clock_lt);
PG_FUNCTION_INFO_V1(cluster_clock_le);
PG_FUNCTION_INFO_V1(cluster_clock_eq);
PG_FUNCTION_INFO_V1(cluster_clock_ne);
PG_FUNCTION_INFO_V1(cluster_clock_gt);
PG_FUNCTION_INFO_V1(cluster_clock_ge);
PG_FUNCTION_INFO_V1(cluster_clock_cmp);
PG_FUNCTION_INFO_V1(cluster_clock_logical);
/*
* cluster_clock_cmp_internal generic compare routine, and must be used for all
* operators, including Btree Indexes when comparing cluster_clock data type.
* Return values are
* 1 -- clock1 is > clock2
* 0 -- clock1 is = clock2
* -1 -- clock1 is < clock2
*/
int
cluster_clock_cmp_internal(ClusterClock *clusterClock1, ClusterClock *clusterClock2)
{
Assert(clusterClock1 && clusterClock2);
int retcode = 0;
/* Logical value takes precedence when comparing two clocks */
if (clusterClock1->logical != clusterClock2->logical)
{
retcode = (clusterClock1->logical > clusterClock2->logical) ? 1 : -1;
return retcode;
}
/* Logical values are equal, let's compare ticks */
if (clusterClock1->counter != clusterClock2->counter)
{
retcode = (clusterClock1->counter > clusterClock2->counter) ? 1 : -1;
return retcode;
}
/* Ticks are equal too, return zero */
return retcode;
}
/*
* cluster_clock_lt returns true if clock1 is less than clock2.
*/
Datum
cluster_clock_lt(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) < 0);
}
/*
* cluster_clock_le returns true if clock1 is less than or equal to clock2.
*/
Datum
cluster_clock_le(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) <= 0);
}
/*
* cluster_clock_eq returns true if clock1 is equal to clock2.
*/
Datum
cluster_clock_eq(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) == 0);
}
/*
* cluster_clock_ne returns true if clock1 is not equal to clock2.
*/
Datum
cluster_clock_ne(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) != 0);
}
/*
* cluster_clock_gt returns true if clock1 is greater than clock2.
*/
Datum
cluster_clock_gt(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) > 0);
}
/*
* cluster_clock_ge returns true if clock1 is greater than or equal to clock2
*/
Datum
cluster_clock_ge(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) >= 0);
}
/*
* cluster_clock_cmp returns 1 if clock1 is greater than clock2, returns -1 if
* clock1 is less than clock2, and zero if they are equal.
*/
Datum
cluster_clock_cmp(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_INT32(cluster_clock_cmp_internal(clock1, clock2));
}
/*
* cluster_clock_logical return the logical part from <logical, counter> type
* clock, which is basically the epoch value in milliseconds.
*/
Datum
cluster_clock_logical(PG_FUNCTION_ARGS)
{
ClusterClock *clusterClock = (ClusterClock *) PG_GETARG_POINTER(0);
PG_RETURN_INT64(clusterClock->logical);
}
/*
* ParseClusterClockPGresult parses a ClusterClock remote result and returns the value or
* returns 0 if the result is NULL.
*/
ClusterClock *
ParseClusterClockPGresult(PGresult *result, int rowIndex, int colIndex)
{
if (PQgetisnull(result, rowIndex, colIndex))
{
return 0;
}
char *resultString = PQgetvalue(result, rowIndex, colIndex);
return cluster_clock_in_internal(resultString);
}

View File

@ -0,0 +1,44 @@
/*
* causal_clock.h
*
* Data structure definitions for managing hybrid logical clock and
* related function declarations.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CAUSAL_CLOCK_H
#define CAUSAL_CLOCK_H
#include "distributed/type_utils.h"
/*
* Clock components - Unsigned 64 bit <LC, C>
* Logical clock (LC): 42 bits
* Counter (C): 22 bits
*
* 2^42 milliseconds - 4398046511104 milliseconds, which is ~139 years.
* 2^22 ticks - maximum of four million operations per millisecond.
*
*/
#define LOGICAL_BITS 42
#define COUNTER_BITS 22
#define LOGICAL_MASK ((1U << COUNTER_BITS) - 1)
#define MAX_LOGICAL ((1LU << LOGICAL_BITS) - 1)
#define MAX_COUNTER LOGICAL_MASK
#define GET_LOGICAL(x) ((x) >> COUNTER_BITS)
#define GET_COUNTER(x) ((x) & LOGICAL_MASK)
extern bool EnableClusterClock;
extern void LogicalClockShmemInit(void);
extern size_t LogicalClockShmemSize(void);
extern void InitializeClusterClockMem(void);
extern ClusterClock * GetEpochTimeAsClock(void);
#endif /* CAUSAL_CLOCK_H */

View File

@ -262,6 +262,7 @@ extern Oid DistCleanupPrimaryKeyIndexId(void);
/* sequence oids */
extern Oid DistBackgroundJobJobIdSequenceId(void);
extern Oid DistBackgroundTaskTaskIdSequenceId(void);
extern Oid DistClockLogicalSequenceId(void);
/* type oids */
extern Oid LookupTypeOid(char *schemaNameSting, char *typeNameString);

View File

@ -46,6 +46,8 @@ extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection,
List *commandList);
extern void ExecuteCriticalRemoteCommand(MultiConnection *connection,
const char *command);
extern void ExecuteRemoteCommandInConnectionList(List *nodeConnectionList,
const char *command);
extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,
const char *command,
PGresult **result);

View File

@ -0,0 +1,25 @@
/*-------------------------------------------------------------------------
*
* type_utils.h
* Utility functions related to types.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef TYPE_UTILS_H
#define TYPE_UTILS_H
typedef struct ClusterClock
{
uint64 logical; /* cluster clock logical timestamp at the commit */
uint32 counter; /* cluster clock counter value at the commit */
} ClusterClock;
extern ClusterClock * ParseClusterClockPGresult(PGresult *result,
int rowIndex, int colIndex);
extern int cluster_clock_cmp_internal(ClusterClock *clusterClock1,
ClusterClock *clusterClock2);
#endif /* TYPE_UTILS_H */

View File

@ -288,3 +288,13 @@ s/^(WARNING|ERROR)(: "[a-z\ ]+ .*" has dependency on unsupported object) "schem
s/^ERROR: A rebalance is already running as job [0-9]+$/ERROR: A rebalance is already running as job xxx/g
s/^NOTICE: Scheduled ([0-9]+) moves as job [0-9]+$/NOTICE: Scheduled \1 moves as job xxx/g
s/^HINT: (.*) job_id = [0-9]+ (.*)$/HINT: \1 job_id = xxx \2/g
# In clock tests, normalize epoch value(s) and the DEBUG messages printed
s/^(DEBUG: |LOG: )(coordinator|final global|Set) transaction clock [0-9]+.*$/\1\2 transaction clock xxxxxx/g
# Look for >= 13 digit logical value
s/^ (\(localhost,)([0-9]+)(,t,"\([1-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]+,[0-9]+\)")/\1 xxx,t,"(xxxxxxxxxxxxx,x)"/g
s/^ \([1-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]+,[0-9]+\)/ (xxxxxxxxxxxxx,x)/g
s/^(DEBUG: |LOG: )(node\([0-9]+\)) transaction clock [0-9]+.*$/\1node xxxx transaction clock xxxxxx/g
s/^(NOTICE: )(clock).*LC:[0-9]+,.*C:[0-9]+,.*$/\1\2 xxxxxx/g
/^(DEBUG: )(adjusted to remote clock: <logical)\([0-9]+\) counter\([0-9]+\)>$/d
/^DEBUG: persisting transaction.*counter.*$/d
/^DEBUG: both logical clock values are equal\([0-9]+\), pick remote.*$/d

View File

@ -0,0 +1,339 @@
CREATE SCHEMA clock;
SET search_path TO clock;
SHOW citus.enable_cluster_clock;
citus.enable_cluster_clock
---------------------------------------------------------------------
on
(1 row)
SET citus.enable_cluster_clock to ON;
SHOW citus.enable_cluster_clock;
citus.enable_cluster_clock
---------------------------------------------------------------------
on
(1 row)
CREATE TABLE clock_test (id int, nonid int);
SELECT create_distributed_table('clock_test', 'id', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
--
-- Compare <logical, counter> pairs
--
-- Returns true
SELECT citus_is_clock_after('(5,1)', '(3,6)');
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
-- Returns false
SELECT citus_is_clock_after('(2,9)', '(3,0)');
citus_is_clock_after
---------------------------------------------------------------------
f
(1 row)
-- Returns true
SELECT citus_is_clock_after('(5,6)', '(5,1)');
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
-- Returns false
SELECT citus_is_clock_after('(5,6)', '(5,6)');
citus_is_clock_after
---------------------------------------------------------------------
f
(1 row)
--
-- Check the clock is *monotonically increasing*
--
SELECT citus_get_node_clock() \gset t1
SELECT citus_get_node_clock() \gset t2
SELECT citus_get_node_clock() \gset t3
-- Both should return true
SELECT citus_is_clock_after(:'t2citus_get_node_clock', :'t1citus_get_node_clock');
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
SELECT citus_is_clock_after(:'t3citus_get_node_clock', :'t2citus_get_node_clock');
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
-- Returns false
SELECT citus_is_clock_after(:'t1citus_get_node_clock', :'t3citus_get_node_clock');
citus_is_clock_after
---------------------------------------------------------------------
f
(1 row)
CREATE TABLE cluster_clock_type(cc cluster_clock);
INSERT INTO cluster_clock_type values('(0, 100)');
INSERT INTO cluster_clock_type values('(100, 0)');
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 2)');
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 100)');
INSERT INTO cluster_clock_type values('(200, 20)');
INSERT INTO cluster_clock_type values('(200, 3)');
INSERT INTO cluster_clock_type values('(200, 400)');
INSERT INTO cluster_clock_type values('(500, 600)');
INSERT INTO cluster_clock_type values('(500, 0)');
SELECT cc FROM cluster_clock_type ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,100)
(100,200)
(200,3)
(200,20)
(200,400)
(500,0)
(500,600)
(11 rows)
SELECT cc FROM cluster_clock_type where cc = '(200, 400)';
cc
---------------------------------------------------------------------
(200,400)
(1 row)
SELECT cc FROM cluster_clock_type where cc <> '(500, 600)';
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,200)
(100,100)
(200,20)
(200,3)
(200,400)
(500,0)
(10 rows)
SELECT cc FROM cluster_clock_type where cc != '(500, 600)';
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,200)
(100,100)
(200,20)
(200,3)
(200,400)
(500,0)
(10 rows)
SELECT cc FROM cluster_clock_type where cc < '(200, 20)' ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,100)
(100,200)
(200,3)
(7 rows)
SELECT cc FROM cluster_clock_type where cc <= '(200, 20)' ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,100)
(100,200)
(200,3)
(200,20)
(8 rows)
SELECT cc FROM cluster_clock_type where cc > '(200, 20)' ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(200,400)
(500,0)
(500,600)
(3 rows)
SELECT cc FROM cluster_clock_type where cc >= '(200, 20)' ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(200,20)
(200,400)
(500,0)
(500,600)
(4 rows)
CREATE INDEX cc_idx on cluster_clock_type(cc);
-- Multiply rows to check index usage
INSERT INTO cluster_clock_type SELECT a.cc FROM cluster_clock_type a, cluster_clock_type b;
INSERT INTO cluster_clock_type SELECT a.cc FROM cluster_clock_type a, cluster_clock_type b;
EXPLAIN SELECT cc FROM cluster_clock_type ORDER BY 1 ASC LIMIT 1;
QUERY PLAN
---------------------------------------------------------------------
Limit (cost=0.28..0.92 rows=1 width=12)
-> Index Only Scan using cc_idx on cluster_clock_type (cost=0.28..667.95 rows=1045 width=12)
(2 rows)
SELECT cc FROM cluster_clock_type ORDER BY 1 ASC LIMIT 1;
cc
---------------------------------------------------------------------
(0,100)
(1 row)
EXPLAIN SELECT cc FROM cluster_clock_type where cc = '(200, 20)' LIMIT 5;
QUERY PLAN
---------------------------------------------------------------------
Limit (cost=4.32..20.94 rows=5 width=12)
-> Bitmap Heap Scan on cluster_clock_type (cost=4.32..20.94 rows=5 width=12)
Recheck Cond: (cc = '(200,20)'::cluster_clock)
-> Bitmap Index Scan on cc_idx (cost=0.00..4.31 rows=5 width=0)
Index Cond: (cc = '(200,20)'::cluster_clock)
(5 rows)
SELECT cc FROM cluster_clock_type where cc = '(200, 20)' LIMIT 5;
cc
---------------------------------------------------------------------
(200,20)
(200,20)
(200,20)
(200,20)
(200,20)
(5 rows)
-- Max limits
INSERT INTO cluster_clock_type values('(4398046511103, 0)');
INSERT INTO cluster_clock_type values('(0, 4194303)');
INSERT INTO cluster_clock_type values('(4398046511103, 4194303)');
-- Bad input
INSERT INTO cluster_clock_type values('(-1, 100)');
ERROR: invalid input syntax for type cluster_clock: "(-1, 100)"
INSERT INTO cluster_clock_type values('(100, -1)');
ERROR: invalid input syntax for type cluster_clock: "(100, -1)"
INSERT INTO cluster_clock_type values('(4398046511104, 100)'); -- too big to fit into 42 bits
ERROR: invalid input syntax for type cluster_clock: "(4398046511104, 100)"
INSERT INTO cluster_clock_type values('(0, 4194304)'); -- too big to fit into 22 bits
ERROR: invalid input syntax for type cluster_clock: "(0, 4194304)"
DROP TABLE cluster_clock_type;
CREATE TABLE cluster_clock_type(cc cluster_clock UNIQUE);
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 1)');
ERROR: duplicate key value violates unique constraint "cluster_clock_type_cc_key"
DETAIL: Key (cc)=((100,1)) already exists.
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 200)');
ERROR: duplicate key value violates unique constraint "cluster_clock_type_cc_key"
DETAIL: Key (cc)=((100,200)) already exists.
INSERT INTO cluster_clock_type values('(100, 100)');
INSERT INTO cluster_clock_type values('(100, 100)');
ERROR: duplicate key value violates unique constraint "cluster_clock_type_cc_key"
DETAIL: Key (cc)=((100,100)) already exists.
--
-- Check the value returned by citus_get_node_clock is close to epoch in ms
--
SELECT (extract(epoch from now()) * 1000)::bigint AS epoch,
citus_get_node_clock() AS latest_clock \gset
-- Returns true
SELECT ABS(:epoch - cluster_clock_logical(:'latest_clock')) < 25;
?column?
---------------------------------------------------------------------
t
(1 row)
-- This should fail as there is no explicit transaction
SELECT citus_get_transaction_clock();
ERROR: citus_get_transaction_clock can only be used in transaction blocks
BEGIN;
SELECT citus_get_transaction_clock();
citus_get_transaction_clock
---------------------------------------------------------------------
(xxxxxxxxxxxxx,x)
(1 row)
END;
-- Transaction that accesses multiple nodes
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
-- Capture the transaction timestamp
SELECT citus_get_transaction_clock() as txnclock \gset
DEBUG: node xxxx transaction clock xxxxxx
DEBUG: node xxxx transaction clock xxxxxx
DEBUG: node xxxx transaction clock xxxxxx
DEBUG: final global transaction clock xxxxxx
COMMIT;
--
-- Check to see if the clock is persisted in the sequence.
--
SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) limit 1 \gset
SELECT cluster_clock_logical(:'txnclock') as txnlog \gset
SELECT :logseq = :txnlog;
?column?
---------------------------------------------------------------------
t
(1 row)
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
-- Capture the transaction timestamp
SELECT citus_get_transaction_clock() as txnclock \gset
DEBUG: node xxxx transaction clock xxxxxx
DEBUG: node xxxx transaction clock xxxxxx
DEBUG: node xxxx transaction clock xxxxxx
DEBUG: final global transaction clock xxxxxx
ROLLBACK;
SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) limit 1 \gset
SELECT cluster_clock_logical(:'txnclock') as txnlog \gset
SELECT :logseq = :txnlog;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT run_command_on_workers($$SELECT citus_get_node_clock()$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost, xxx,t,"(xxxxxxxxxxxxx,x)")
(localhost, xxx,t,"(xxxxxxxxxxxxx,x)")
(2 rows)
SET citus.enable_cluster_clock to OFF;
BEGIN;
SELECT citus_get_transaction_clock();
WARNING: GUC enable_cluster_clock is off
citus_get_transaction_clock
---------------------------------------------------------------------
(1 row)
END;
RESET client_min_messages;
RESET citus.enable_cluster_clock;
DROP SCHEMA clock CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table clock_test
drop cascades to table cluster_clock_type

View File

@ -1199,11 +1199,37 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.2-1
ALTER EXTENSION citus UPDATE TO '11.2-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) |
| function citus_get_node_clock() cluster_clock
| function citus_get_transaction_clock() cluster_clock
| function citus_internal_adjust_local_clock_to_remote(cluster_clock) void
| function citus_is_clock_after(cluster_clock,cluster_clock) boolean
| function cluster_clock_cmp(cluster_clock,cluster_clock) integer
| function cluster_clock_eq(cluster_clock,cluster_clock) boolean
| function cluster_clock_ge(cluster_clock,cluster_clock) boolean
| function cluster_clock_gt(cluster_clock,cluster_clock) boolean
| function cluster_clock_in(cstring) cluster_clock
| function cluster_clock_le(cluster_clock,cluster_clock) boolean
| function cluster_clock_logical(cluster_clock) bigint
| function cluster_clock_lt(cluster_clock,cluster_clock) boolean
| function cluster_clock_ne(cluster_clock,cluster_clock) boolean
| function cluster_clock_out(cluster_clock) cstring
| function cluster_clock_recv(internal) cluster_clock
| function cluster_clock_send(cluster_clock) bytea
| function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text, source_lsn pg_lsn, target_lsn pg_lsn, status text)
(2 rows)
| operator <(cluster_clock,cluster_clock)
| operator <=(cluster_clock,cluster_clock)
| operator <>(cluster_clock,cluster_clock)
| operator =(cluster_clock,cluster_clock)
| operator >(cluster_clock,cluster_clock)
| operator >=(cluster_clock,cluster_clock)
| operator class cluster_clock_ops for access method btree
| operator family cluster_clock_ops for access method btree
| sequence pg_dist_clock_logical_seq
| type cluster_clock
(28 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -19,7 +19,8 @@ WHERE
oid
---------------------------------------------------------------------
pg_dist_authinfo
(1 row)
pg_dist_clock_logical_seq
(2 rows)
RESET role;
DROP USER no_access;

View File

@ -63,6 +63,12 @@ SELECT nextval('pg_dist_background_task_task_id_seq') > COALESCE(MAX(task_id), 0
t
(1 row)
SELECT last_value > 0 FROM pg_dist_clock_logical_seq;
?column?
---------------------------------------------------------------------
t
(1 row)
-- If this query gives output it means we've added a new sequence that should
-- possibly be restored after upgrades.
SELECT sequence_name FROM information_schema.sequences
@ -77,7 +83,8 @@ SELECT sequence_name FROM information_schema.sequences
'pg_dist_operationid_seq',
'pg_dist_cleanup_recordid_seq',
'pg_dist_background_job_job_id_seq',
'pg_dist_background_task_task_id_seq'
'pg_dist_background_task_task_id_seq',
'pg_dist_clock_logical_seq'
);
sequence_name
---------------------------------------------------------------------

View File

@ -53,6 +53,8 @@ ORDER BY 1;
function citus_finish_citus_upgrade()
function citus_finish_pg_upgrade()
function citus_get_active_worker_nodes()
function citus_get_node_clock()
function citus_get_transaction_clock()
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_trigger_func()
@ -65,6 +67,7 @@ ORDER BY 1;
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint)
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
function citus_internal_adjust_local_clock_to_remote(cluster_clock)
function citus_internal_delete_colocation_metadata(integer)
function citus_internal_delete_partition_metadata(regclass)
function citus_internal_delete_shard_metadata(bigint)
@ -72,6 +75,7 @@ ORDER BY 1;
function citus_internal_local_blocked_processes()
function citus_internal_update_placement_metadata(bigint,integer,integer)
function citus_internal_update_relation_colocation(oid,integer)
function citus_is_clock_after(cluster_clock,cluster_clock)
function citus_is_coordinator()
function citus_isolation_test_session_is_blocked(integer,integer[])
function citus_job_cancel(bigint)
@ -122,6 +126,18 @@ ORDER BY 1;
function citus_update_table_statistics(regclass)
function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc)
function citus_version()
function cluster_clock_cmp(cluster_clock,cluster_clock)
function cluster_clock_eq(cluster_clock,cluster_clock)
function cluster_clock_ge(cluster_clock,cluster_clock)
function cluster_clock_gt(cluster_clock,cluster_clock)
function cluster_clock_in(cstring)
function cluster_clock_le(cluster_clock,cluster_clock)
function cluster_clock_logical(cluster_clock)
function cluster_clock_lt(cluster_clock,cluster_clock)
function cluster_clock_ne(cluster_clock,cluster_clock)
function cluster_clock_out(cluster_clock)
function cluster_clock_recv(internal)
function cluster_clock_send(cluster_clock)
function column_name_to_column(regclass,text)
function column_to_column_name(regclass,text)
function coord_combine_agg(oid,cstring,anyelement)
@ -239,11 +255,20 @@ ORDER BY 1;
function worker_split_copy(bigint,text,split_copy_info[])
function worker_split_shard_release_dsm()
function worker_split_shard_replication_setup(split_shard_info[])
operator <(cluster_clock,cluster_clock)
operator <=(cluster_clock,cluster_clock)
operator <>(cluster_clock,cluster_clock)
operator =(cluster_clock,cluster_clock)
operator >(cluster_clock,cluster_clock)
operator >=(cluster_clock,cluster_clock)
operator class cluster_clock_ops for access method btree
operator family cluster_clock_ops for access method btree
schema citus
schema citus_internal
sequence pg_dist_background_job_job_id_seq
sequence pg_dist_background_task_task_id_seq
sequence pg_dist_cleanup_recordid_seq
sequence pg_dist_clock_logical_seq
sequence pg_dist_colocationid_seq
sequence pg_dist_groupid_seq
sequence pg_dist_node_nodeid_seq
@ -271,6 +296,7 @@ ORDER BY 1;
type citus_copy_format
type citus_job_status
type citus_task_status
type cluster_clock
type noderole
type replication_slot_info
type split_copy_info
@ -286,5 +312,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(278 rows)
(304 rows)

View File

@ -80,7 +80,7 @@ test: multi_reference_table multi_select_for_update relation_access_tracking pg1
test: custom_aggregate_support aggregate_support tdigest_aggregate_support
test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join anonymous_columns
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join anonymous_columns clock
test: ch_bench_subquery_repartition
test: multi_agg_type_conversion multi_count_type_conversion recursive_relation_planning_restriction_pushdown
test: multi_partition_pruning single_hash_repartition_join unsupported_lateral_subqueries

View File

@ -0,0 +1,149 @@
CREATE SCHEMA clock;
SET search_path TO clock;
SHOW citus.enable_cluster_clock;
SET citus.enable_cluster_clock to ON;
SHOW citus.enable_cluster_clock;
CREATE TABLE clock_test (id int, nonid int);
SELECT create_distributed_table('clock_test', 'id', colocate_with := 'none');
--
-- Compare <logical, counter> pairs
--
-- Returns true
SELECT citus_is_clock_after('(5,1)', '(3,6)');
-- Returns false
SELECT citus_is_clock_after('(2,9)', '(3,0)');
-- Returns true
SELECT citus_is_clock_after('(5,6)', '(5,1)');
-- Returns false
SELECT citus_is_clock_after('(5,6)', '(5,6)');
--
-- Check the clock is *monotonically increasing*
--
SELECT citus_get_node_clock() \gset t1
SELECT citus_get_node_clock() \gset t2
SELECT citus_get_node_clock() \gset t3
-- Both should return true
SELECT citus_is_clock_after(:'t2citus_get_node_clock', :'t1citus_get_node_clock');
SELECT citus_is_clock_after(:'t3citus_get_node_clock', :'t2citus_get_node_clock');
-- Returns false
SELECT citus_is_clock_after(:'t1citus_get_node_clock', :'t3citus_get_node_clock');
CREATE TABLE cluster_clock_type(cc cluster_clock);
INSERT INTO cluster_clock_type values('(0, 100)');
INSERT INTO cluster_clock_type values('(100, 0)');
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 2)');
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 100)');
INSERT INTO cluster_clock_type values('(200, 20)');
INSERT INTO cluster_clock_type values('(200, 3)');
INSERT INTO cluster_clock_type values('(200, 400)');
INSERT INTO cluster_clock_type values('(500, 600)');
INSERT INTO cluster_clock_type values('(500, 0)');
SELECT cc FROM cluster_clock_type ORDER BY 1 ASC;
SELECT cc FROM cluster_clock_type where cc = '(200, 400)';
SELECT cc FROM cluster_clock_type where cc <> '(500, 600)';
SELECT cc FROM cluster_clock_type where cc != '(500, 600)';
SELECT cc FROM cluster_clock_type where cc < '(200, 20)' ORDER BY 1 ASC;
SELECT cc FROM cluster_clock_type where cc <= '(200, 20)' ORDER BY 1 ASC;
SELECT cc FROM cluster_clock_type where cc > '(200, 20)' ORDER BY 1 ASC;
SELECT cc FROM cluster_clock_type where cc >= '(200, 20)' ORDER BY 1 ASC;
CREATE INDEX cc_idx on cluster_clock_type(cc);
-- Multiply rows to check index usage
INSERT INTO cluster_clock_type SELECT a.cc FROM cluster_clock_type a, cluster_clock_type b;
INSERT INTO cluster_clock_type SELECT a.cc FROM cluster_clock_type a, cluster_clock_type b;
EXPLAIN SELECT cc FROM cluster_clock_type ORDER BY 1 ASC LIMIT 1;
SELECT cc FROM cluster_clock_type ORDER BY 1 ASC LIMIT 1;
EXPLAIN SELECT cc FROM cluster_clock_type where cc = '(200, 20)' LIMIT 5;
SELECT cc FROM cluster_clock_type where cc = '(200, 20)' LIMIT 5;
-- Max limits
INSERT INTO cluster_clock_type values('(4398046511103, 0)');
INSERT INTO cluster_clock_type values('(0, 4194303)');
INSERT INTO cluster_clock_type values('(4398046511103, 4194303)');
-- Bad input
INSERT INTO cluster_clock_type values('(-1, 100)');
INSERT INTO cluster_clock_type values('(100, -1)');
INSERT INTO cluster_clock_type values('(4398046511104, 100)'); -- too big to fit into 42 bits
INSERT INTO cluster_clock_type values('(0, 4194304)'); -- too big to fit into 22 bits
DROP TABLE cluster_clock_type;
CREATE TABLE cluster_clock_type(cc cluster_clock UNIQUE);
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 100)');
INSERT INTO cluster_clock_type values('(100, 100)');
--
-- Check the value returned by citus_get_node_clock is close to epoch in ms
--
SELECT (extract(epoch from now()) * 1000)::bigint AS epoch,
citus_get_node_clock() AS latest_clock \gset
-- Returns true
SELECT ABS(:epoch - cluster_clock_logical(:'latest_clock')) < 25;
-- This should fail as there is no explicit transaction
SELECT citus_get_transaction_clock();
BEGIN;
SELECT citus_get_transaction_clock();
END;
-- Transaction that accesses multiple nodes
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
-- Capture the transaction timestamp
SELECT citus_get_transaction_clock() as txnclock \gset
COMMIT;
--
-- Check to see if the clock is persisted in the sequence.
--
SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) limit 1 \gset
SELECT cluster_clock_logical(:'txnclock') as txnlog \gset
SELECT :logseq = :txnlog;
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
-- Capture the transaction timestamp
SELECT citus_get_transaction_clock() as txnclock \gset
ROLLBACK;
SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) limit 1 \gset
SELECT cluster_clock_logical(:'txnclock') as txnlog \gset
SELECT :logseq = :txnlog;
SELECT run_command_on_workers($$SELECT citus_get_node_clock()$$);
SET citus.enable_cluster_clock to OFF;
BEGIN;
SELECT citus_get_transaction_clock();
END;
RESET client_min_messages;
RESET citus.enable_cluster_clock;
DROP SCHEMA clock CASCADE;

View File

@ -12,6 +12,7 @@ SELECT nextval('pg_dist_operationid_seq') = MAX(operation_id)+1 FROM pg_dist_cle
SELECT nextval('pg_dist_cleanup_recordid_seq') = MAX(record_id)+1 FROM pg_dist_cleanup;
SELECT nextval('pg_dist_background_job_job_id_seq') > COALESCE(MAX(job_id), 0) FROM pg_dist_background_job;
SELECT nextval('pg_dist_background_task_task_id_seq') > COALESCE(MAX(task_id), 0) FROM pg_dist_background_task;
SELECT last_value > 0 FROM pg_dist_clock_logical_seq;
-- If this query gives output it means we've added a new sequence that should
-- possibly be restored after upgrades.
@ -27,7 +28,8 @@ SELECT sequence_name FROM information_schema.sequences
'pg_dist_operationid_seq',
'pg_dist_cleanup_recordid_seq',
'pg_dist_background_job_job_id_seq',
'pg_dist_background_task_task_id_seq'
'pg_dist_background_task_task_id_seq',
'pg_dist_clock_logical_seq'
);
SELECT logicalrelid FROM pg_dist_partition