This implements a new UDF citus_get_cluster_clock() that returns a monotonically

increasing logical clock. Clock guarantees to never go back in value after restarts,
and makes best attempt to keep the value close to unix epoch time in milliseconds.

Also, introduces a new GUC "citus.enable_cluster_clock", when true, every
distributed transaction is stamped with logical causal clock and persisted
in a catalog pg_dist_commit_transaction.
causal-order-clock-bakup
Teja Mupparti 2022-08-17 13:53:00 -07:00
parent 580ab012bf
commit 76d3bb2132
30 changed files with 2394 additions and 4 deletions

View File

@ -187,6 +187,7 @@ typedef struct MetadataCacheData
Oid distColocationidIndexId;
Oid distPlacementGroupidIndexId;
Oid distTransactionRelationId;
Oid distCommitTransactionRelationId;
Oid distTransactionGroupIndexId;
Oid citusCatalogNamespaceId;
Oid copyFormatTypeId;
@ -2872,6 +2873,17 @@ DistTransactionRelationId(void)
}
/* return oid of pg_dist_commit_transaction relation */
Oid
DistCommitTransactionRelationId(void)
{
CachedRelationLookup("pg_dist_commit_transaction",
&MetadataCache.distCommitTransactionRelationId);
return MetadataCache.distCommitTransactionRelationId;
}
/* return oid of pg_dist_transaction_group_index */
Oid
DistTransactionGroupIndexId(void)

View File

@ -33,6 +33,7 @@
#include "executor/executor.h"
#include "distributed/backend_data.h"
#include "distributed/background_jobs.h"
#include "distributed/causal_clock.h"
#include "distributed/citus_depended_object.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_safe_lib.h"
@ -457,6 +458,7 @@ _PG_init(void)
InitializeCitusQueryStats();
InitializeSharedConnectionStats();
InitializeLocallyReservedSharedConnections();
InitializeClusterClockMem();
/* initialize shard split shared memory handle management */
InitializeShardSplitSMHandleManagement();
@ -542,6 +544,7 @@ citus_shmem_request(void)
RequestAddinShmemSpace(SharedConnectionStatsShmemSize());
RequestAddinShmemSpace(MaintenanceDaemonShmemSize());
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
RequestAddinShmemSpace(LogicalClockShmemSize());
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
}
@ -1128,6 +1131,16 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_cluster_clock",
gettext_noop("When true, every distributed transaction is stamped with "
"logical causal clock and persisted in the catalog"),
NULL,
&EnableClusterClock,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_cost_based_connection_establishment",
gettext_noop("When enabled the connection establishment times "

View File

@ -1,3 +1,144 @@
-- citus--11.1-1--11.2-1
-- bump version to 11.2-1
--
--
-- cluster_clock base type is a combination of
-- uint64 cluster clock logical timestamp at the commit
-- uint32 cluster clock counter(ticks with in the logical clock)
--
CREATE TYPE citus.cluster_clock;
CREATE FUNCTION pg_catalog.cluster_clock_in(cstring)
RETURNS citus.cluster_clock
AS 'MODULE_PATHNAME',$$cluster_clock_in$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_out(citus.cluster_clock)
RETURNS cstring
AS 'MODULE_PATHNAME',$$cluster_clock_out$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_recv(internal)
RETURNS citus.cluster_clock
AS 'MODULE_PATHNAME',$$cluster_clock_recv$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_send(citus.cluster_clock)
RETURNS bytea
AS 'MODULE_PATHNAME',$$cluster_clock_send$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION pg_catalog.cluster_clock_diff_in_ms(citus.cluster_clock)
RETURNS bigint
AS 'MODULE_PATHNAME',$$cluster_clock_diff_in_ms$$
LANGUAGE C IMMUTABLE STRICT;
CREATE TYPE citus.cluster_clock (
internallength = 12, -- specifies the size of the memory block required to hold the type uint64 + uint32
input = cluster_clock_in,
output = cluster_clock_out,
receive = cluster_clock_recv,
send = cluster_clock_send
);
ALTER TYPE citus.cluster_clock SET SCHEMA pg_catalog;
COMMENT ON TYPE cluster_clock IS 'combination of (logical, counter): 42 bits + 22 bits';
--
-- Define the required operators
--
CREATE FUNCTION cluster_clock_lt(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_lt$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION cluster_clock_le(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_le$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION cluster_clock_eq(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_eq$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION cluster_clock_ne(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_ne$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION cluster_clock_ge(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_ge$$
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION cluster_clock_gt(cluster_clock, cluster_clock) RETURNS bool
AS 'MODULE_PATHNAME',$$cluster_clock_gt$$
LANGUAGE C IMMUTABLE STRICT;
CREATE OPERATOR < (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_lt,
commutator = > , negator = >= ,
restrict = scalarltsel, join = scalarltjoinsel
);
CREATE OPERATOR <= (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_le,
commutator = >= , negator = > ,
restrict = scalarlesel, join = scalarlejoinsel
);
CREATE OPERATOR = (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_eq,
commutator = = ,
negator = <> ,
restrict = eqsel, join = eqjoinsel
);
CREATE OPERATOR <> (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_ne,
commutator = <> ,
negator = = ,
restrict = neqsel, join = neqjoinsel
);
CREATE OPERATOR >= (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_ge,
commutator = <= , negator = < ,
restrict = scalargesel, join = scalargejoinsel
);
CREATE OPERATOR > (
leftarg = cluster_clock, rightarg = cluster_clock, procedure = cluster_clock_gt,
commutator = < , negator = <= ,
restrict = scalargtsel, join = scalargtjoinsel
);
-- Create the support function too
CREATE FUNCTION cluster_clock_cmp(cluster_clock, cluster_clock) RETURNS int4
AS 'MODULE_PATHNAME',$$cluster_clock_cmp$$
LANGUAGE C IMMUTABLE STRICT;
-- Define operator class to be be used by an index.
CREATE OPERATOR CLASS cluster_clock_ops
DEFAULT FOR TYPE cluster_clock USING btree AS
OPERATOR 1 < ,
OPERATOR 2 <= ,
OPERATOR 3 = ,
OPERATOR 4 >= ,
OPERATOR 5 > ,
FUNCTION 1 cluster_clock_cmp(cluster_clock, cluster_clock);
CREATE TABLE citus.pg_dist_commit_transaction(
transaction_id TEXT NOT NULL CONSTRAINT pg_dist_commit_transactionId_unique_constraint UNIQUE,
cluster_clock_value pg_catalog.cluster_clock NOT NULL,
timestamp BIGINT NOT NULL -- Epoch in milliseconds
);
CREATE INDEX pg_dist_commit_transaction_clock_index
ON citus.pg_dist_commit_transaction(cluster_clock_value DESC);
ALTER TABLE citus.pg_dist_commit_transaction SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.pg_dist_commit_transaction TO public;
#include "udfs/citus_get_cluster_clock/11.2-1.sql"
#include "udfs/citus_is_clock_after/11.2-1.sql"
#include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql"

View File

@ -1 +1,6 @@
-- citus--11.2-1--11.1-1
DROP FUNCTION pg_catalog.citus_get_cluster_clock();
DROP FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(cluster_clock);
DROP FUNCTION pg_catalog.citus_is_clock_after(cluster_clock, cluster_clock);
DROP TYPE pg_catalog.cluster_clock CASCADE;
DROP TABLE pg_catalog.pg_dist_commit_transaction;

View File

@ -0,0 +1,156 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
DECLARE
table_name regclass;
command text;
trigger_name text;
BEGIN
IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN
EXECUTE $cmd$
-- disable propagation to prevent EnsureCoordinator errors
-- the aggregate created here does not depend on Citus extension (yet)
-- since we add the dependency with the next command
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray);
COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
ELSE
EXECUTE $cmd$
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
COMMENT ON AGGREGATE array_cat_agg(anyarray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
END IF;
--
-- Citus creates the array_cat_agg but because of a compatibility
-- issue between pg13-pg14, we drop and create it during upgrade.
-- And as Citus creates it, there needs to be a dependency to the
-- Citus extension, so we create that dependency here.
-- We are not using:
-- ALTER EXENSION citus DROP/CREATE AGGREGATE array_cat_agg
-- because we don't have an easy way to check if the aggregate
-- exists with anyarray type or anycompatiblearray type.
INSERT INTO pg_depend
SELECT
'pg_proc'::regclass::oid as classid,
(SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'e' as deptype;
--
-- restore citus catalog tables
--
INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition;
INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard;
INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement;
INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata;
INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node;
INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group;
INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction;
INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation;
INSERT INTO pg_catalog.pg_dist_cleanup SELECT * FROM public.pg_dist_cleanup;
INSERT INTO pg_catalog.pg_dist_commit_transaction SELECT * FROM public.pg_dist_commit_transaction;
-- enterprise catalog tables
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT
name,
default_strategy,
shard_cost_function::regprocedure::regproc,
node_capacity_function::regprocedure::regproc,
shard_allowed_on_node_function::regprocedure::regproc,
default_threshold,
minimum_threshold,
improvement_threshold
FROM public.pg_dist_rebalance_strategy;
--
-- drop backup tables
--
DROP TABLE public.pg_dist_authinfo;
DROP TABLE public.pg_dist_colocation;
DROP TABLE public.pg_dist_local_group;
DROP TABLE public.pg_dist_node;
DROP TABLE public.pg_dist_node_metadata;
DROP TABLE public.pg_dist_partition;
DROP TABLE public.pg_dist_placement;
DROP TABLE public.pg_dist_poolinfo;
DROP TABLE public.pg_dist_shard;
DROP TABLE public.pg_dist_transaction;
DROP TABLE public.pg_dist_rebalance_strategy;
DROP TABLE public.pg_dist_cleanup;
DROP TABLE public.pg_dist_commit_transaction;
--
-- reset sequences
--
PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false);
PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false);
PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false);
PERFORM setval('pg_catalog.pg_dist_operationid_seq', (SELECT MAX(operation_id)+1 AS max_operation_id FROM pg_dist_cleanup), false);
PERFORM setval('pg_catalog.pg_dist_cleanup_recordid_seq', (SELECT MAX(record_id)+1 AS max_record_id FROM pg_dist_cleanup), false);
--
-- register triggers
--
FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition JOIN pg_class ON (logicalrelid = oid) WHERE relkind <> 'f'
LOOP
trigger_name := 'truncate_trigger_' || table_name::oid;
command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()';
EXECUTE command;
command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name);
EXECUTE command;
END LOOP;
--
-- set dependencies
--
INSERT INTO pg_depend
SELECT
'pg_class'::regclass::oid as classid,
p.logicalrelid::regclass::oid as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'n' as deptype
FROM pg_catalog.pg_dist_partition p;
-- set dependencies for columnar table access method
PERFORM columnar_internal.columnar_ensure_am_depends_catalog();
-- restore pg_dist_object from the stable identifiers
TRUNCATE pg_catalog.pg_dist_object;
INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid)
SELECT
address.classid,
address.objid,
address.objsubid,
naming.distribution_argument_index,
naming.colocationid
FROM
public.pg_dist_object naming,
pg_catalog.pg_get_object_address(naming.type, naming.object_names, naming.object_args) address;
DROP TABLE public.pg_dist_object;
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade()
IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade';

View File

@ -63,6 +63,7 @@ BEGIN
INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction;
INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation;
INSERT INTO pg_catalog.pg_dist_cleanup SELECT * FROM public.pg_dist_cleanup;
INSERT INTO pg_catalog.pg_dist_commit_transaction SELECT * FROM public.pg_dist_commit_transaction;
-- enterprise catalog tables
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
@ -93,6 +94,7 @@ BEGIN
DROP TABLE public.pg_dist_transaction;
DROP TABLE public.pg_dist_rebalance_strategy;
DROP TABLE public.pg_dist_cleanup;
DROP TABLE public.pg_dist_commit_transaction;
--
-- reset sequences
--

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_get_cluster_clock()
RETURNS pg_catalog.cluster_clock
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_get_cluster_clock$$;
COMMENT ON FUNCTION pg_catalog.citus_get_cluster_clock()
IS 'Returns monotonically increasing timestamp with logical clock value as close to epoch value (in milli seconds) as possible, and a counter for ticks(maximum of 4 million) within the logical clock';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_get_cluster_clock()
RETURNS pg_catalog.cluster_clock
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_get_cluster_clock$$;
COMMENT ON FUNCTION pg_catalog.citus_get_cluster_clock()
IS 'Returns monotonically increasing timestamp with logical clock value as close to epoch value (in milli seconds) as possible, and a counter for ticks(maximum of 4 million) within the logical clock';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
RETURNS void
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME', $$citus_internal_adjust_local_clock_to_remote$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
IS 'Internal UDF used to adjust the local clock to the maximum of nodes in the cluster';
REVOKE ALL ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock) FROM PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
RETURNS void
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME', $$citus_internal_adjust_local_clock_to_remote$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock)
IS 'Internal UDF used to adjust the local clock to the maximum of nodes in the cluster';
REVOKE ALL ON FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(pg_catalog.cluster_clock) FROM PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_is_clock_after(clock_one pg_catalog.cluster_clock, clock_two pg_catalog.cluster_clock)
RETURNS BOOL
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_is_clock_after$$;
COMMENT ON FUNCTION pg_catalog.citus_is_clock_after(pg_catalog.cluster_clock, pg_catalog.cluster_clock)
IS 'Accepts logical clock timestamps of two causally related events and returns true if the argument1 happened before argument2';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_is_clock_after(clock_one pg_catalog.cluster_clock, clock_two pg_catalog.cluster_clock)
RETURNS BOOL
LANGUAGE C STABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME',$$citus_is_clock_after$$;
COMMENT ON FUNCTION pg_catalog.citus_is_clock_after(pg_catalog.cluster_clock, pg_catalog.cluster_clock)
IS 'Accepts logical clock timestamps of two causally related events and returns true if the argument1 happened before argument2';

View File

@ -0,0 +1,78 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
BEGIN
DELETE FROM pg_depend WHERE
objid IN (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') AND
refobjid IN (select oid from pg_extension where extname = 'citus');
--
-- We are dropping the aggregates because postgres 14 changed
-- array_cat type from anyarray to anycompatiblearray. When
-- upgrading to pg14, specifically when running pg_restore on
-- array_cat_agg we would get an error. So we drop the aggregate
-- and create the right one on citus_finish_pg_upgrade.
DROP AGGREGATE IF EXISTS array_cat_agg(anyarray);
DROP AGGREGATE IF EXISTS array_cat_agg(anycompatiblearray);
--
-- Drop existing backup tables
--
DROP TABLE IF EXISTS public.pg_dist_partition;
DROP TABLE IF EXISTS public.pg_dist_shard;
DROP TABLE IF EXISTS public.pg_dist_placement;
DROP TABLE IF EXISTS public.pg_dist_node_metadata;
DROP TABLE IF EXISTS public.pg_dist_node;
DROP TABLE IF EXISTS public.pg_dist_local_group;
DROP TABLE IF EXISTS public.pg_dist_transaction;
DROP TABLE IF EXISTS public.pg_dist_colocation;
DROP TABLE IF EXISTS public.pg_dist_authinfo;
DROP TABLE IF EXISTS public.pg_dist_poolinfo;
DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy;
DROP TABLE IF EXISTS public.pg_dist_object;
DROP TABLE IF EXISTS public.pg_dist_cleanup;
DROP TABLE IF EXISTS public.pg_dist_commit_transaction;
--
-- backup citus catalog tables
--
CREATE TABLE public.pg_dist_partition AS SELECT * FROM pg_catalog.pg_dist_partition;
CREATE TABLE public.pg_dist_shard AS SELECT * FROM pg_catalog.pg_dist_shard;
CREATE TABLE public.pg_dist_placement AS SELECT * FROM pg_catalog.pg_dist_placement;
CREATE TABLE public.pg_dist_node_metadata AS SELECT * FROM pg_catalog.pg_dist_node_metadata;
CREATE TABLE public.pg_dist_node AS SELECT * FROM pg_catalog.pg_dist_node;
CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group;
CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction;
CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation;
CREATE TABLE public.pg_dist_cleanup AS SELECT * FROM pg_catalog.pg_dist_cleanup;
CREATE TABLE public.pg_dist_commit_transaction AS SELECT * FROM pg_catalog.pg_dist_commit_transaction;
-- enterprise catalog tables
CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo;
CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo;
CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT
name,
default_strategy,
shard_cost_function::regprocedure::text,
node_capacity_function::regprocedure::text,
shard_allowed_on_node_function::regprocedure::text,
default_threshold,
minimum_threshold,
improvement_threshold
FROM pg_catalog.pg_dist_rebalance_strategy;
-- store upgrade stable identifiers on pg_dist_object catalog
CREATE TABLE public.pg_dist_object AS SELECT
address.type,
address.object_names,
address.object_args,
objects.distribution_argument_index,
objects.colocationid
FROM pg_catalog.pg_dist_object objects,
pg_catalog.pg_identify_object_as_address(objects.classid, objects.objid, objects.objsubid) address;
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_prepare_pg_upgrade()
IS 'perform tasks to copy citus settings to a location that could later be restored after pg_upgrade is done';

View File

@ -33,6 +33,7 @@ BEGIN
DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy;
DROP TABLE IF EXISTS public.pg_dist_object;
DROP TABLE IF EXISTS public.pg_dist_cleanup;
DROP TABLE IF EXISTS public.pg_dist_commit_transaction;
--
-- backup citus catalog tables
@ -46,6 +47,7 @@ BEGIN
CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction;
CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation;
CREATE TABLE public.pg_dist_cleanup AS SELECT * FROM pg_catalog.pg_dist_cleanup;
CREATE TABLE public.pg_dist_commit_transaction AS SELECT * FROM pg_catalog.pg_dist_commit_transaction;
-- enterprise catalog tables
CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo;
CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo;

View File

@ -26,6 +26,7 @@
#include "datatype/timestamp.h"
#include "distributed/backend_data.h"
#include "distributed/connection_management.h"
#include "distributed/function_utils.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
@ -168,6 +169,33 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
}
/*
* GetCurrentTransactionString returns the string representation of the
* UDF get_current_transaction_id output.
*
* Note: This routine calls the UDF get_current_transaction_id directly to
* keep the output/format coherent, else, any changes in the UDF parameters
* or output may diverge from this routine.
*/
char *
GetCurrentTransactionIdString(void)
{
/*
* Call get_current_transaction_id UDF to get the current
* distributed transaction id.
*/
Oid transactionFuncOid = FunctionOid("pg_catalog", "get_current_transaction_id", 0);
Datum transactionIdHeapDatum = OidFunctionCall0(transactionFuncOid);
/* Now, call the datatype output function on the tuple */
FmgrInfo *outputFunction = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
Oid outputFunctionId = FunctionOid("pg_catalog", "record_out", 1);
fmgr_info(outputFunctionId, outputFunction);
return (OutputFunctionCall(outputFunction, transactionIdHeapDatum));
}
/*
* get_current_transaction_id returns a tuple with (databaseId, processId,
* initiatorNodeIdentifier, transactionNumber, timestamp) that exists in the

View File

@ -16,6 +16,7 @@
#include "access/xact.h"
#include "distributed/backend_data.h"
#include "distributed/causal_clock.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
@ -841,6 +842,12 @@ CoordinatedRemoteTransactionsPrepare(void)
}
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
/*
* If "citus.enable_cluster_clock" is enabled, timestamp the transaction
* with the cluster clock and persist its id along with the timestamp.
*/
PrepareAndSetTransactionClock(connectionList);
}

View File

@ -0,0 +1,208 @@
# Cluster Clock
### Requirement:
Many distributed applications need to track the changes in the same order as they are applied on the database. The changes can be to databases or objects within them, either within a single node or across the sharded cluster.
### Definitions
**Total ordering** - Every pair of change events can be placed in some order.
**Causal ordering** - Only events that are causally related (an event A caused an event B) can be ordered i.e., it's only a partial order - sometimes events happen independently --with no possible causal relationship--, such events are treated as concurrent.
**Sequential consistency** - All writes must be seen in the same order by all processes.
**Causal consistency** - Causally related writes must be seen in the same order.
Transactions on a single node system naturally provide a total and sequential ordering guarantees for client read and write operations as all operations are routed to the same node, but there are challenges for a multi node distributed system, such as, Citus.
One possible way to totally order all the changes in the system is to timestamp all the events with a global physical clock or a centralized logical clock. Thus, observing the events in the increasing order of the timestamp will give the total ordering of events. For both the performance and cost reasons such solutions are impractical. In the absence of total ordering, a little weaker ordering is the **causal order**
Causal order is defined as a model that preserves a partial order of events in a distributed system. If an event
1. A causes another event B, every other process in the system observes the event A before observing event B.
2. Causal order is transitive: if A causes B, and B causes C, then A causes C.
3. Non causally ordered events are treated as concurrent.
Causal consistency is a weak form of consistency that preserves the order of causally related operations. The causal consistency model can be refined into four session guarantees.
1. Read Your Writes: If a process performs a write, the same process later observes the result of its write.
6. Monotonic Reads: The set of writes observed (read) by a process is guaranteed to be monotonically increasing.
7. Writes Follow Reads: If some process performs a read followed by a write, and another process observes the result of the write, then it can also observe the read.
8. Monotonic Writes: If some process performs a write, followed sometime later by another write, other processes will observe them in the same order.
### Hybrid Logical Clock (HLC)
HLC provides a way to get the causality relationship like logical clocks. It can also be used for backup/recovery too as the logical clock value is maintained close to the wall clock time. HLC consists of
LC - Logical clock
C - Counter
Clock components - Unsigned 64 bit <LC, C>
Epoch Milliseconds ( LC ) | Logical counter ( C )|
|--|--|
| 42 bits | 22 bits |
2^42 milliseconds - 4398046511104 milliseconds, which is ~139 years.
2^22 ticks - maximum of four million operations per millisecond.
### New catalog
All the committed transactions are persisted with the transaction id and the commit clock time in a new catalog
**pg_dist_commit_transaction**
Transaction Identifier
(database_id, process_id, initiator_node_identifier, transaction_number, transaction_stamp)
Assuming timestamp never jumps back, this id is globally unique across the cluster and restarts.
|TransactionId| CommitClock (LC, C)| timestamp (epoch) |
|--|--|--|
| (13090,1077913,2,5,"2022-07-26 19:05:09.290597-07") |(1658887880235, 9) | 2022-07-26 19:05:09.290597-07
### GUC
A new GUC parameter, "**citus.enable_global_clock**", enables cluster-wide timestamp for all the transactions and persists them in the table.
### Psuedo code
WC - Current Wall Clock in milliseconds
HLC - Current Hybrid Logical Clock in shared memory
MAXTICKS - Four million
/* Tick for each event */
GetClock()
{
IF (HLC.LC < WC)
HLC.LC = WC;
HLC.C = 0;
ELSE IF (HLC.C == MAXTICKS)
HLC.LC = HLC.LC + 1;
HLC.C = 0;
ELSE
HLC.C = HLC.C + 1;
return HLC;
}
/* Returns true if the clock1 is after clock2*/
IsEventAfter(HLC1, HLC2)
{
IF (HLC1.LC != HLC2.LC)
return (HLC1.LC > HLC2.LC);
ELSE
return (HLC1.C > HLC2.C);
}
/* Simply returns the highest cluster clock value */
CurrentClusterClock()
{
For each node
{
NodeClock[N] = GetClock();
}
/* Return the highest clock value of all the nodes */
return max(NodeClock[N]);
}
/* Adjust the local shared memory clock to the
received value from the remote node */
ReceiveClock(RHLC)
{
IF (RHLC.LC < HLC.LC)
return; /* do nothing */
IF (RHLC.LC > HLC.LC)
HLC.LC = RHLC.LC;
HLC.C = RHLC.C;
return;
IF (RHLC.LC == HLC.LC)
HLC.C = (RHLC.C > HLC.C) ? RHLC.C : HLC.C;
}
/* All the nodes will adjust their clocks to the
highest of the newly committed 2PC */
AdjustClocks(HLC)
{
For each node
{
SendCommand("select ReceiveClock(%s)", HLC);
}
}
/* During prepare, once all the nodes acknowledge
commit, persist the current transaction id along with
the clock value in the catalog */
PrepareTransaction()
{
HLC = CurrentClusterClock();
PersistCatalog(get_current_transaction_id(), HLC);
AdjustClocks(HLC);
}
/* Initialize the shared memory clock value to the
highest clock persisted */
InitClockAtBoot()
{
HLC.LC = WC;
HLC.C = 0;
MAX_LC = "SELECT max(CommitClock.LC) FROM catalog";
IF (MAX_LC != NULL)
{
/*There are prior commits, adjust to that value*/
ReceiveClock(MAX_LC);
}
}
#### Usage
**Step 1**
In the application, track individual changes with the current transaction id
UPDATE track_table
SET TransactionId = get_current_transaction_id(), operation = <>, row_key = <>,....;
**Step 2**
As the transaction commits, if the GUC is enabled, engine internally calls `citus_get_cluster_clock()` and persists the current transaction Id along with the commit cluster clock.
INSERT INTO pg_dist_commit_transaction(TransactionId, CommitClock, timestamp) VALUES (current_transactionId, commit_clock, now())
**Step 3**
How to get all the events in the causal order?
SELECT tt.row_key, tt.operation
FROM track_table tt, pg_dist_commit_transaction cc
WHERE tt.TransactionId = cc.TransactionId
ORDER BY cc.CommitClock
Events for an object
SELECT tt.row_key, tt.operation
FROM track_table tt, pg_dist_commit_transaction cc
WHERE tt.TransactionId = cc.TransactionId
and row_key = $1 ORDER BY cc.CommitClock
Events in the last one hour
SELECT tt.row_key, tt.operation
FROM track_table tt, pg_dist_commit_transaction cc
WHERE cc.timestamp >= now() - interval '1 hour'
and tt.TransactionId = cc.TransactionId
Note: In Citus we use 2PC, if any node goes down after the PREPARE and before COMMIT, we might have changes partially committed. Citus tracks such transactions in **pg_dist_transaction** and eventually be committed when the node becomes healthy, when we track data from committed transaction of **pg_dist_commit_transaction** we will miss the changes from the bad-node.
One way to avoid such anomaly, take only the transactions from **pg_dist_commit_transaction** with clock value less than the minimum clock of the transactions in **pg_dist_transaction**. Caveat is, if the node takes long to recover and the 2PC to fully recover, the visibility of the committed transactions might stall.
### Catalog pruning
The data in **pg_dist_commit_transaction** should be ephemeral data i.e., eventually rows have to automatically be deleted. Users can install a pg_cron job to prune the catalog regularly.
delete from pg_dist_commit_transaction
where timestamp < now() - interval '7 days'
### Limitations of Citus
Using this transaction commit clock ordering to build a secondary, that's a mirror copy of the original, may not be feasible at this time for the following reasons.
Given that there is no well-defined order between concurrent distributed transactions in Citus, we cannot retroactively apply a transaction-order that leads to an exact replica of the primary unless we preserve the original object-level ordering as it happened on individual nodes.
For instance, if a multi-shard insert (transaction A) happens concurrently with a multi-shard update (transaction B) and the WHERE clause of the update matches inserted rows in multiple shards, we could have a scenario in which only a subset of the inserted rows gets updated. Effectively, transaction A might happen before transaction B on node 1, while transaction B happens before transaction A on node 2. While unfortunate, we cannot simply claim changes made by transaction A happened first based on commit timestamps, because that would lead us reorder changes to the same object ID on node 2, which might lead to a different outcome when replayed.
In such scenario, even if we use causal commit clock to order changes. It is essential that the order of modifications to an object matches the original order. Otherwise, you could have above scenarios where an insert happens before an update in the primary cluster, but the update happens before the insert. Replaying the changes would then lead to a different database.
In absence of a coherent transaction-ordering semantics in distributed cluster, best we can do is ensure that changes to the same object are in the correct order and ensure exactly once delivery (correct pagination).

View File

@ -0,0 +1,734 @@
/*-------------------------------------------------------------------------
* causal_clock.c
*
* Core function defintions to implement hybrid logical clock.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include <sys/time.h>
#include "postgres.h"
#include "miscadmin.h"
#include "fmgr.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/numeric.h"
#include "utils/typcache.h"
#include "nodes/pg_list.h"
#include "catalog/namespace.h"
#include "commands/extension.h"
#include "executor/spi.h"
#include "postmaster/postmaster.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
#include "storage/s_lock.h"
#include "distributed/causal_clock.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/pg_dist_commit_transaction.h"
#include "distributed/remote_commands.h"
#include "distributed/citus_safe_lib.h"
PG_FUNCTION_INFO_V1(citus_get_cluster_clock);
PG_FUNCTION_INFO_V1(citus_internal_adjust_local_clock_to_remote);
PG_FUNCTION_INFO_V1(citus_is_clock_after);
/*
* Current state of the logical clock
*/
typedef enum ClockState
{
CLOCKSTATE_INITIALIZED,
CLOCKSTATE_UNINITIALIZED,
CLOCKSTATE_INIT_INPROGRESS,
} ClockState;
/*
* Holds the cluster clock variables in shared memory.
*/
typedef struct LogicalClockShmemData
{
slock_t clockMutex;
/* Current logical clock value of this node */
ClusterClock clusterClockValue;
/* Tracks initialization at boot */
ClockState clockInitialized;
} LogicalClockShmemData;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static LogicalClockShmemData *logicalClockShmem = NULL;
static uint64 GetEpochTimeMs(void);
static void AdjustLocalClock(ClusterClock *remoteClock);
static void GetNextClusterClockValue(ClusterClock *nextClusterClockValue);
static ClusterClock * GetHighestClockInTransaction(List *nodeConnectionList);
static void AdjustClocksToTransactionHighest(List *nodeConnectionList,
ClusterClock *transactionClockValue);
static void LogTransactionCommitClock(char *transactionId, ClusterClock
*transactionClockValue);
static char * ExecuteQueryAndReturnStringResult(char *query, int spiok_type);
static void InitClockAtFirstUse(void);
static void IncrementClusterClock(ClusterClock *clusterClock);
static ClusterClock * LargerClock(ClusterClock *clusterClock1,
ClusterClock *clusterClock2);
bool EnableClusterClock = false;
/*
* GetEpochTimeAsClock returns the epoch value milliseconds used as logical value in ClusterClock.
*/
ClusterClock *
GetEpochTimeAsClock(void)
{
struct timeval tp;
gettimeofday(&tp, NULL);
uint64 result = (uint64) (tp.tv_sec) * 1000;
result = result + (uint64) (tp.tv_usec) / 1000;
ClusterClock *epochClock = (ClusterClock *) palloc(sizeof(ClusterClock));
epochClock->logical = result;
epochClock->counter = 0;
return epochClock;
}
/*
* GetEpochTimeMs returns the epoch value in milliseconds, used as wall-clock timestamp.
*/
static uint64
GetEpochTimeMs(void)
{
struct timeval tp;
gettimeofday(&tp, NULL);
uint64 result = (uint64) (tp.tv_sec) * 1000;
result = result + (uint64) (tp.tv_usec) / 1000;
return result;
}
/*
* LogicalClockShmemSize returns the size that should be allocated
* in the shared memory for logical clock management.
*/
size_t
LogicalClockShmemSize(void)
{
Size size = 0;
size = add_size(size, sizeof(LogicalClockShmemData));
return size;
}
/*
* InitializeClusterClockMem reserves shared-memory space needed to
* store LogicalClockShmemData, and sets the hook for initialization
* of the same.
*/
void
InitializeClusterClockMem(void)
{
/* On PG 15 and above, we use shmem_request_hook_type */
#if PG_VERSION_NUM < PG_VERSION_15
/* allocate shared memory for pre PG-15 versions */
if (!IsUnderPostmaster)
{
RequestAddinShmemSpace(LogicalClockShmemSize());
}
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = LogicalClockShmemInit;
}
/*
* LogicalClockShmemInit Allocates and initializes shared memory for
* cluster clock related variables.
*/
void
LogicalClockShmemInit(void)
{
bool alreadyInitialized = false;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
logicalClockShmem = (LogicalClockShmemData *)
ShmemInitStruct("Logical Clock Shmem",
LogicalClockShmemSize(),
&alreadyInitialized);
if (!alreadyInitialized)
{
/* A zero value indicates that the clock is not adjusted yet */
memset(&logicalClockShmem->clusterClockValue, 0, sizeof(ClusterClock));
SpinLockInit(&logicalClockShmem->clockMutex);
logicalClockShmem->clockInitialized = CLOCKSTATE_UNINITIALIZED;
}
LWLockRelease(AddinShmemInitLock);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
/*
* IncrementClusterClock increments the ClusterClock by 1.
*/
static void
IncrementClusterClock(ClusterClock *clusterClock)
{
/*
* It's the counter that always ticks, once it reaches the maximum, reset the
* counter and increment the logical clock.
*/
if (clusterClock->counter == MAX_COUNTER)
{
clusterClock->logical++;
clusterClock->counter = 0;
return;
}
clusterClock->counter++;
}
/*
* LargerClock compares two ClusterClock(s) and returns a pointer to the copy of the larger one.
*/
static ClusterClock *
LargerClock(ClusterClock *clusterClock1, ClusterClock *clusterClock2)
{
ClusterClock *maxClock = (ClusterClock *) palloc(sizeof(ClusterClock));
ClusterClock *sourceClock = NULL;
if (clusterClock1->logical != clusterClock2->logical)
{
sourceClock = (clusterClock1->logical > clusterClock2->logical) ? clusterClock1 :
clusterClock2;
}
else
{
sourceClock = (clusterClock1->counter > clusterClock2->counter) ? clusterClock1 :
clusterClock2;
}
memcpy(maxClock, sourceClock, sizeof(ClusterClock));
return maxClock;
}
/*
* GetNextClusterClock implements the internal guts of the UDF citus_get_cluster_clock()
*/
static void
GetNextClusterClockValue(ClusterClock *nextClusterClockValue)
{
ClusterClock *epochValue = GetEpochTimeAsClock();
SpinLockAcquire(&logicalClockShmem->clockMutex);
/* Check if the clock is adjusted after the boot */
if (logicalClockShmem->clockInitialized == CLOCKSTATE_UNINITIALIZED)
{
SpinLockRelease(&logicalClockShmem->clockMutex);
Assert(logicalClockShmem->clusterClockValue.logical == 0);
InitClockAtFirstUse();
SpinLockAcquire(&logicalClockShmem->clockMutex);
if (logicalClockShmem->clockInitialized == CLOCKSTATE_INIT_INPROGRESS ||
logicalClockShmem->clockInitialized == CLOCKSTATE_UNINITIALIZED)
{
/* Either we lost the initialization-race or there was an exception */
SpinLockRelease(&logicalClockShmem->clockMutex);
ereport(ERROR, (errmsg("Clock is in the process of getting initialized, "
"please retry")));
}
SpinLockRelease(&logicalClockShmem->clockMutex);
}
Assert(logicalClockShmem->clockInitialized == CLOCKSTATE_INITIALIZED);
IncrementClusterClock(&logicalClockShmem->clusterClockValue);
ClusterClock *clockValue = LargerClock(&logicalClockShmem->clusterClockValue,
epochValue);
logicalClockShmem->clusterClockValue = *clockValue;
SpinLockRelease(&logicalClockShmem->clockMutex);
nextClusterClockValue->logical = clockValue->logical;
nextClusterClockValue->counter = clockValue->counter;
}
/*
* AdjustLocalClock Adjusts the local shared memory clock to the
* received value from the remote node.
*/
void
AdjustLocalClock(ClusterClock *remoteClock)
{
SpinLockAcquire(&logicalClockShmem->clockMutex);
if (remoteClock->logical < logicalClockShmem->clusterClockValue.logical)
{
/* local clock is ahead, do nothing */
SpinLockRelease(&logicalClockShmem->clockMutex);
return;
}
if (remoteClock->logical > logicalClockShmem->clusterClockValue.logical)
{
ereport(DEBUG1, (errmsg("adjusting to remote clock "
"logical(%lu) counter(%u)",
remoteClock->logical,
remoteClock->counter)));
/* Pick the remote value */
logicalClockShmem->clusterClockValue.logical = remoteClock->logical;
logicalClockShmem->clusterClockValue.counter = remoteClock->counter;
SpinLockRelease(&logicalClockShmem->clockMutex);
return;
}
/*
* Both the logical clock values are equal, pick the larger counter.
*/
if (remoteClock->counter > logicalClockShmem->clusterClockValue.counter)
{
ereport(DEBUG1, (errmsg("both logical clock values are "
"equal(%lu), pick remote's counter (%u) "
"since it's greater",
remoteClock->logical, remoteClock->counter)));
logicalClockShmem->clusterClockValue.logical = remoteClock->logical;
logicalClockShmem->clusterClockValue.counter = remoteClock->counter;
}
SpinLockRelease(&logicalClockShmem->clockMutex);
}
/*
* GetHighestClockInTransaction takes the connection list of participating nodes in the
* current transaction and polls the logical clock value of all the nodes. Returns the
* highest logical clock value of all the nodes in the current distributed transaction,
* which may be used as commit order for individual objects in the transaction.
*/
static ClusterClock *
GetHighestClockInTransaction(List *nodeConnectionList)
{
/* get clock value from each node including the transaction coordinator */
int connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags, LocalHostName,
PostPortNumber);
nodeConnectionList = lappend(nodeConnectionList, connection);
foreach_ptr(connection, nodeConnectionList)
{
int querySent = SendRemoteCommand(connection,
"SELECT citus_get_cluster_clock();");
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
ClusterClock *globalClockValue = NULL;
/* fetch the results and pick the highest clock value of all the nodes */
foreach_ptr(connection, nodeConnectionList)
{
bool raiseInterrupts = true;
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
ereport(ERROR, (errmsg("connection to %s:%d failed when "
"fetching logical clock value",
connection->hostname, connection->port)));
}
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
int32 rowCount = PQntuples(result);
int32 colCount = PQnfields(result);
/* Although it is not expected */
if (colCount != 1 || rowCount != 1)
{
ereport(ERROR,
(errmsg("unexpected result from citus_get_cluster_clock()")));
}
ClusterClock *nodeClockValue = ParseClusterClockPGresult(result, 0, 0);
ereport(DEBUG1, (errmsg(
"node(%lu:%u) transaction clock %lu:%u",
connection->connectionId, connection->port,
nodeClockValue->logical, nodeClockValue->counter)));
if (globalClockValue)
{
globalClockValue = LargerClock(globalClockValue, nodeClockValue);
}
else
{
globalClockValue = nodeClockValue;
}
PQclear(result);
ForgetResults(connection);
}
ereport(DEBUG1,
(errmsg("final global transaction clock %lu:%u",
globalClockValue->logical,
globalClockValue->counter)));
return globalClockValue;
}
/*
* AdjustClocksToTransactionHighest Sets the clock value of all the nodes, participated
* in the PREPARE of the transaction, to the highest clock value of all the nodes.
*/
static void
AdjustClocksToTransactionHighest(List *nodeConnectionList, ClusterClock
*transactionClockValue)
{
StringInfo queryToSend = makeStringInfo();
/* Set the adjusted value locally */
AdjustLocalClock(transactionClockValue);
/* Set the clock value on participating worker nodes */
MultiConnection *connection = NULL;
appendStringInfo(queryToSend,
"SELECT pg_catalog.citus_internal_adjust_local_clock_to_remote"
"('(%lu, %u)'::pg_catalog.cluster_clock);",
transactionClockValue->logical, transactionClockValue->counter);
foreach_ptr(connection, nodeConnectionList)
{
int querySent = SendRemoteCommand(connection, queryToSend->data);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
/* Process the result */
foreach_ptr(connection, nodeConnectionList)
{
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
ForgetResults(connection);
}
}
/*
* During prepare, once all the nodes acknowledge commit, persist the current
* transaction id along with the clock value in the catalog.
*/
void
PrepareAndSetTransactionClock(List *transactionNodeList)
{
if (!EnableClusterClock)
{
/* citus.enable_cluster_clock is false */
return;
}
char *transactionId = GetCurrentTransactionIdString();
/* Pick the highest logical clock value among all transaction-nodes */
ClusterClock *transactionClockValue =
GetHighestClockInTransaction(transactionNodeList);
/* Persist the transactionId along with the logical commit-clock timestamp */
LogTransactionCommitClock(transactionId, transactionClockValue);
/* Adjust all the nodes with the new clock value */
AdjustClocksToTransactionHighest(transactionNodeList, transactionClockValue);
}
/*
* LogTransactionCommitClock registers the committed transaction along
* with the commit clock.
*/
static void
LogTransactionCommitClock(char *transactionId, ClusterClock *transactionClockValue)
{
ereport(DEBUG1, (errmsg("persisting transaction %s with "
"clock logical (%lu) and counter(%u)",
transactionId, transactionClockValue->logical,
transactionClockValue->counter)));
uint64 transactionTimestamp = GetEpochTimeMs();
char *clockString = psprintf("(%lu,%u)",
transactionClockValue->logical,
transactionClockValue->counter);
Datum values[Natts_pg_dist_commit_transaction] = { 0 };
bool isNulls[Natts_pg_dist_commit_transaction] = { 0 };
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_commit_transaction_transaction_id - 1] =
CStringGetTextDatum(transactionId);
values[Anum_pg_dist_commit_transaction_cluster_clock - 1] =
CStringGetDatum(clockString);
values[Anum_pg_dist_commit_transaction_timestamp - 1] =
Int64GetDatum(transactionTimestamp);
/* open pg_dist_commit_transaction and insert new tuple */
Relation pgDistCommitTransaction =
table_open(DistCommitTransactionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistCommitTransaction);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistCommitTransaction, heapTuple);
CommandCounterIncrement();
/* close relation and invalidate previous cache entry */
table_close(pgDistCommitTransaction, NoLock);
}
/*
* Initialize the shared memory clock value to the highest clock
* persisted. This will protect from any clock drifts.
*/
static void
InitClockAtFirstUse(void)
{
if (creating_extension)
{
/* No catalog present yet */
return;
}
SpinLockAcquire(&logicalClockShmem->clockMutex);
/* Avoid repeated and parallel initialization */
if (logicalClockShmem->clockInitialized == CLOCKSTATE_INITIALIZED ||
logicalClockShmem->clockInitialized == CLOCKSTATE_INIT_INPROGRESS)
{
SpinLockRelease(&logicalClockShmem->clockMutex);
return;
}
Assert(logicalClockShmem->clockInitialized == CLOCKSTATE_UNINITIALIZED);
/*
* Set the flag before executing a distributed query
* (else it might trigger this routine recursively.)
*/
logicalClockShmem->clockInitialized = CLOCKSTATE_INIT_INPROGRESS;
SpinLockRelease(&logicalClockShmem->clockMutex);
PG_TRY();
{
/* Start with the wall clock value */
ClusterClock *epochValue = GetEpochTimeAsClock();
logicalClockShmem->clusterClockValue = *epochValue;
/*
* Select the highest clock value persisted in the catalog.
*/
char *results = ExecuteQueryAndReturnStringResult(
"SELECT cluster_clock_value FROM "
"pg_catalog.pg_dist_commit_transaction ORDER BY 1 DESC LIMIT 1;",
SPI_OK_SELECT);
if (results != NULL)
{
ClusterClock *persistedMaxClock = ParseClusterClockFields(results);
ereport(DEBUG1, (errmsg("adjusted the clock with value persisted"
"logical(%lu) and counter(%u)",
persistedMaxClock->logical,
persistedMaxClock->counter)));
/*
* Adjust the local clock according to the most recent
* clock stamp value persisted in the catalog.
*/
AdjustLocalClock(persistedMaxClock);
}
/*
* NULL results indicate no prior commit timestamps on this node, retain
* the wall clock.
*/
}
PG_CATCH();
{
SpinLockAcquire(&logicalClockShmem->clockMutex);
logicalClockShmem->clockInitialized = CLOCKSTATE_UNINITIALIZED;
memset(&logicalClockShmem->clusterClockValue, 0, sizeof(ClusterClock));
SpinLockRelease(&logicalClockShmem->clockMutex);
PG_RE_THROW();
}
PG_END_TRY();
SpinLockAcquire(&logicalClockShmem->clockMutex);
logicalClockShmem->clockInitialized = CLOCKSTATE_INITIALIZED;
SpinLockRelease(&logicalClockShmem->clockMutex);
}
/*
* ExecuteQueryAndReturnResults connects to SPI, executes the query and checks
* if the SPI returned the correct type. Returns an array of int64 results
* in the caller's memory context.
*/
static char *
ExecuteQueryAndReturnStringResult(char *query, int spiok_type)
{
/*
* Allocate in caller's context, should hold max value
* of 44 bits(13 chars) and 22 bits(7 chars) + 3 delimiters.
*/
char *clockString = (char *) palloc(24 * sizeof(char));
int spiResult = SPI_connect();
if (spiResult != SPI_OK_CONNECT)
{
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
spiResult = SPI_execute(query, false, 0);
if (spiResult != spiok_type)
{
ereport(ERROR, (errmsg("could not run SPI query")));
}
if (SPI_processed > 1)
{
ereport(ERROR, (errmsg("query(%s) unexpectedly returned "
"more than one row", query)));
}
if (SPI_processed != 1)
{
/* No rows found, it's up to the caller to handle it */
SPI_finish();
return NULL;
}
char *results = DatumGetCString(
SPI_getvalue(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc, 1));
strcpy_s(clockString, 24, results);
spiResult = SPI_finish();
if (spiResult != SPI_OK_FINISH)
{
ereport(ERROR, (errmsg("could not finish SPI connection")));
}
return clockString;
}
/*
* citus_get_cluster_clock() is an UDF that returns a monotonically increasing
* logical clock. Clock guarantees to never go back in value after restarts, and
* makes best attempt to keep the value close to unix epoch time in milliseconds.
*/
Datum
citus_get_cluster_clock(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ClusterClock *clusterClockValue = (ClusterClock *) palloc(sizeof(ClusterClock));
GetNextClusterClockValue(clusterClockValue);
PG_RETURN_POINTER(clusterClockValue);
}
/*
* citus_internal_adjust_local_clock_to_remote is an internal UDF to adjust
* the local clock to the highest in the cluster.
*/
Datum
citus_internal_adjust_local_clock_to_remote(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ClusterClock *remoteClock = (ClusterClock *) PG_GETARG_POINTER(0);
AdjustLocalClock(remoteClock);
PG_RETURN_VOID();
}
/*
* citus_is_clock_after is an UDF that accepts logical clock timestamps of
* two causally related events and returns true if the argument1 happened
* before argument2.
*/
Datum
citus_is_clock_after(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/* Fetch both the arguments */
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
ereport(DEBUG1, (errmsg(
"clock1 @ LC:%lu, C:%u, "
"clock2 @ LC:%lu, C:%u",
clock1->logical, clock1->counter,
clock2->logical, clock2->counter)));
bool result = (cluster_clock_cmp_internal(clock1, clock2) > 0);
PG_RETURN_BOOL(result);
}

View File

@ -0,0 +1,337 @@
/*-------------------------------------------------------------------------
*
* type_utils.c
*
* Utility functions related to types.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "libpq-fe.h"
#include "catalog/pg_type.h"
#include "nodes/pg_list.h"
#include "utils/syscache.h"
#include "libpq/pqformat.h"
#include "distributed/causal_clock.h"
#define NUM_CLUSTER_CLOCK_ARGS 2
#define LDELIM '('
#define RDELIM ')'
#define DELIM ','
PG_FUNCTION_INFO_V1(cluster_clock_in);
PG_FUNCTION_INFO_V1(cluster_clock_out);
PG_FUNCTION_INFO_V1(cluster_clock_recv);
PG_FUNCTION_INFO_V1(cluster_clock_send);
/*
* cluster_clock_in converts the cstring input format to the ClusterClock type.
*/
Datum
cluster_clock_in(PG_FUNCTION_ARGS)
{
char *clockString = PG_GETARG_CSTRING(0);
char *clockFields[NUM_CLUSTER_CLOCK_ARGS];
int numClockFields = 0;
char *currentChar = clockString;
for (; *currentChar && numClockFields < NUM_CLUSTER_CLOCK_ARGS && *currentChar !=
RDELIM; currentChar++)
{
if (*currentChar == DELIM || (*currentChar == LDELIM && !numClockFields))
{
clockFields[numClockFields++] = currentChar + 1;
}
}
if (numClockFields < NUM_CLUSTER_CLOCK_ARGS)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"cluster_clock",
clockString)));
}
char *endingChar = NULL;
errno = 0;
int64 logical = strtoul(clockFields[0], &endingChar, 10);
if (errno || *endingChar != DELIM || logical > MAX_LOGICAL || logical < 0)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"cluster_clock",
clockString)));
}
int64 counter = strtol(clockFields[1], &endingChar, 10);
if (errno || *endingChar != RDELIM || counter > MAX_COUNTER || counter < 0)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"cluster_clock",
clockString)));
}
ClusterClock *clusterClock = (ClusterClock *) palloc(sizeof(ClusterClock));
clusterClock->logical = logical;
clusterClock->counter = counter;
PG_RETURN_POINTER(clusterClock);
}
/*
* cluster_clock_out converts the internal ClusterClock format to cstring output.
*/
Datum
cluster_clock_out(PG_FUNCTION_ARGS)
{
ClusterClock *clusterClock = (ClusterClock *) PG_GETARG_POINTER(0);
char *clockString = psprintf("(%lu,%u)", clusterClock->logical,
clusterClock->counter);
PG_RETURN_CSTRING(clockString);
}
/*
* cluster_clock_recv converts external binary format to ClusterClock.
*/
Datum
cluster_clock_recv(PG_FUNCTION_ARGS)
{
StringInfo clockBuffer = (StringInfo) PG_GETARG_POINTER(0);
ClusterClock *clusterClock = (ClusterClock *) palloc(sizeof(ClusterClock));
clusterClock->logical = pq_getmsgint64(clockBuffer);
clusterClock->counter = pq_getmsgint(clockBuffer, sizeof(int));
PG_RETURN_POINTER(clusterClock);
}
/*
* cluster_clock_send converts ClusterClock to binary format.
*/
Datum
cluster_clock_send(PG_FUNCTION_ARGS)
{
ClusterClock *clusterClock = (ClusterClock *) PG_GETARG_POINTER(0);
StringInfoData clockBuffer;
pq_begintypsend(&clockBuffer);
pq_sendint64(&clockBuffer, clusterClock->logical);
pq_sendint32(&clockBuffer, clusterClock->counter);
PG_RETURN_BYTEA_P(pq_endtypsend(&clockBuffer));
}
/*****************************************************************************
* PUBLIC ROUTINES *
*****************************************************************************/
PG_FUNCTION_INFO_V1(cluster_clock_lt);
PG_FUNCTION_INFO_V1(cluster_clock_le);
PG_FUNCTION_INFO_V1(cluster_clock_eq);
PG_FUNCTION_INFO_V1(cluster_clock_ne);
PG_FUNCTION_INFO_V1(cluster_clock_gt);
PG_FUNCTION_INFO_V1(cluster_clock_ge);
PG_FUNCTION_INFO_V1(cluster_clock_cmp);
PG_FUNCTION_INFO_V1(cluster_clock_diff_in_ms);
/*
* cluster_clock_cmp_internal generic compare routine, and must be used for all
* operators, including Btree Indexes when comparing cluster_clock data type.
* Return values are
* 1 -- clock1 is > clock2
* 0 -- clock1 is = clock2
* -1 -- clock1 is < clock2
*/
int
cluster_clock_cmp_internal(ClusterClock *clusterClock1, ClusterClock *clusterClock2)
{
int retcode = 0;
/* Logical value takes precedence when comparing two clocks */
if (clusterClock1->logical != clusterClock2->logical)
{
retcode = (clusterClock1->logical > clusterClock2->logical) ? 1 : -1;
return retcode;
}
/* Logical values are equal, let's compare ticks */
if (clusterClock1->counter != clusterClock2->counter)
{
retcode = (clusterClock1->counter > clusterClock2->counter) ? 1 : -1;
return retcode;
}
/* Ticks are equal too, return zero */
return retcode;
}
/*
* cluster_clock_lt returns true if clock1 is less than clock2.
*/
Datum
cluster_clock_lt(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) < 0);
}
/*
* cluster_clock_le returns true if clock1 is less than or equal to clock2.
*/
Datum
cluster_clock_le(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) <= 0);
}
/*
* cluster_clock_eq returns true if clock1 is equal to clock2.
*/
Datum
cluster_clock_eq(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) == 0);
}
/*
* cluster_clock_ne returns true if clock1 is not equal to clock2.
*/
Datum
cluster_clock_ne(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) != 0);
}
/*
* cluster_clock_gt returns true if clock1 is greater than clock2.
*/
Datum
cluster_clock_gt(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) > 0);
}
/*
* cluster_clock_ge returns true if clock1 is greater than or equal to clock2
*/
Datum
cluster_clock_ge(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_BOOL(cluster_clock_cmp_internal(clock1, clock2) >= 0);
}
/*
* cluster_clock_cmp returns 1 if clock1 is greater than clock2, returns -1 if
* clock1 is less than clock2, and zero if they are equal.
*/
Datum
cluster_clock_cmp(PG_FUNCTION_ARGS)
{
ClusterClock *clock1 = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *clock2 = (ClusterClock *) PG_GETARG_POINTER(1);
PG_RETURN_INT32(cluster_clock_cmp_internal(clock1, clock2));
}
/*
* cluster_clock_diff_in_ms takes clock value as input and returns the
* difference, in milliseconds, from the current wallclock epoch time.
*/
Datum
cluster_clock_diff_in_ms(PG_FUNCTION_ARGS)
{
ClusterClock *clusterClock = (ClusterClock *) PG_GETARG_POINTER(0);
ClusterClock *epochClock = GetEpochTimeAsClock();
PG_RETURN_INT64(epochClock->logical - clusterClock->logical);
}
/*
* ParseClusterClockFields takes the input string format (logical, counter) and returns
* the in memory representation of ClusterClock.
*/
ClusterClock *
ParseClusterClockFields(char *clockString)
{
uint64 logical = 0;
uint32 counter = 0;
if (sscanf(clockString, " ( %lu , %u )", &logical, &counter) !=
NUM_CLUSTER_CLOCK_ARGS)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid output for type %s: \"%s\"", "cluster_clock",
clockString)));
}
ClusterClock *resultClock = (ClusterClock *) palloc(sizeof(ClusterClock));
resultClock->logical = logical;
resultClock->counter = counter;
return resultClock;
}
/*
* ParseClusterClockPGresult parses a ClusterClock remote result and returns the value or
* returns 0 if the result is NULL.
*/
ClusterClock *
ParseClusterClockPGresult(PGresult *result, int rowIndex, int colIndex)
{
if (PQgetisnull(result, rowIndex, colIndex))
{
return 0;
}
char *resultString = PQgetvalue(result, rowIndex, colIndex);
return ParseClusterClockFields(resultString);
}

View File

@ -80,6 +80,8 @@ extern bool IsRebalancerInternalBackend(void);
extern bool IsCitusRunCommandBackend(void);
extern bool IsExternalClientBackend(void);
extern void ResetCitusBackendType(void);
extern char * GetCurrentTransactionIdString(void);
extern Datum get_current_transaction_id(PG_FUNCTION_ARGS);
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999

View File

@ -0,0 +1,45 @@
/*
* causal_clock.h
*
* Data structure definitions for managing hybrid logical clock and
* related function declarations.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CAUSAL_CLOCK_H
#define CAUSAL_CLOCK_H
#include "distributed/type_utils.h"
/*
* Clock components - Unsigned 64 bit <LC, C>
* Logical clock (LC): 42 bits
* Counter (C): 22 bits
*
* 2^42 milliseconds - 4398046511104 milliseconds, which is ~139 years.
* 2^22 ticks - maximum of four million operations per millisecond.
*
*/
#define LOGICAL_BITS 42
#define COUNTER_BITS 22
#define LOGICAL_MASK ((1U << COUNTER_BITS) - 1)
#define MAX_LOGICAL ((1LU << LOGICAL_BITS) - 1)
#define MAX_COUNTER LOGICAL_MASK
#define GET_LOGICAL(x) ((x) >> COUNTER_BITS)
#define GET_COUNTER(x) ((x) & LOGICAL_MASK)
extern bool EnableClusterClock;
extern void LogicalClockShmemInit(void);
extern size_t LogicalClockShmemSize(void);
extern void InitializeClusterClockMem(void);
extern void PrepareAndSetTransactionClock(List *transactionNodeList);
extern ClusterClock * GetEpochTimeAsClock(void);
#endif /* CAUSAL_CLOCK_H */

View File

@ -254,6 +254,7 @@ extern Oid DistPlacementShardidIndexId(void);
extern Oid DistPlacementPlacementidIndexId(void);
extern Oid DistColocationIndexId(void);
extern Oid DistTransactionRelationId(void);
extern Oid DistCommitTransactionRelationId(void);
extern Oid DistTransactionGroupIndexId(void);
extern Oid DistPlacementGroupidIndexId(void);
extern Oid DistObjectPrimaryKeyIndexId(void);

View File

@ -0,0 +1,43 @@
/*-------------------------------------------------------------------------
*
* pg_dist_commit_transaction.h
* definition of the "commit-transaction" relation (pg_dist_commit_transaction).
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_COMMIT_TRANSACTION_H
#define PG_DIST_COMMIT_TRANSACTION_H
/* ----------------
* pg_dist_commit_transaction definition.
* ----------------
*/
typedef struct FormData_pg_dist_commit_transaction
{
text transaction_id; /* id of the current transaction committed */
ClusterClock cluster_clock_value; /* logical clock timestamp */
uint64 timestamp; /* epoch timestamp in milliseconds */
} FormData_pg_dist_commit_transaction;
/* ----------------
* Form_pg_dist_commit_transactions corresponds to a pointer to a tuple with
* the format of pg_dist_commit_transactions relation.
* ----------------
*/
typedef FormData_pg_dist_commit_transaction *Form_pg_dist_commit_transaction;
/* ----------------
* compiler constants for pg_dist_commit_transaction
* ----------------
*/
#define Natts_pg_dist_commit_transaction 3
#define Anum_pg_dist_commit_transaction_transaction_id 1
#define Anum_pg_dist_commit_transaction_cluster_clock 2
#define Anum_pg_dist_commit_transaction_timestamp 3
#endif /* PG_DIST_COMMIT_TRANSACTION_H */

View File

@ -0,0 +1,26 @@
/*-------------------------------------------------------------------------
*
* type_utils.h
* Utility functions related to types.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef TYPE_UTILS_H
#define TYPE_UTILS_H
typedef struct ClusterClock
{
uint64 logical; /* cluster clock logical timestamp at the commit */
uint32 counter; /* cluster clock counter value at the commit */
} ClusterClock;
extern ClusterClock * ParseClusterClockPGresult(PGresult *result, int rowIndex, int
colIndex);
extern ClusterClock * ParseClusterClockFields(char *clockString);
extern int cluster_clock_cmp_internal(ClusterClock *clusterClock1,
ClusterClock *clusterClock2);
#endif /* TYPE_UTILS_H */

View File

@ -288,3 +288,9 @@ s/^(WARNING|ERROR)(: "[a-z\ ]+ .*" has dependency on unsupported object) "schem
s/^ERROR: A rebalance is already running as job [0-9]+$/ERROR: A rebalance is already running as job xxx/g
s/^NOTICE: Scheduled ([0-9]+) moves as job [0-9]+$/NOTICE: Scheduled \1 moves as job xxx/g
s/^HINT: (.*) job_id = [0-9]+ (.*)$/HINT: \1 job_id = xxx \2/g
# In clock tests, normalize epoch value(s) and the DEBUG messages printed
s/^(DEBUG: |LOG: )(coordinator|node\([0-9]+:[0-9]+\)|final global|Set) transaction clock [0-9]+.*$/\1\2 transaction clock xxxxxx/g
s/^(NOTICE: )(clock).*LC:[0-9]+,.*C:[0-9]+,.*$/\1\2 xxxxxx/g
/^(DEBUG: )(adjusting to remote clock logical)\([0-9]+\) counter\([0-9]+\)$/d
/^DEBUG: persisting transaction.*counter.*$/d
/^DEBUG: both logical clock values are equal\([0-9]+\), pick remote.*$/d

View File

@ -0,0 +1,325 @@
CREATE SCHEMA clock;
SET search_path TO clock;
SHOW citus.enable_cluster_clock;
citus.enable_cluster_clock
---------------------------------------------------------------------
off
(1 row)
SET citus.enable_cluster_clock to ON;
SHOW citus.enable_cluster_clock;
citus.enable_cluster_clock
---------------------------------------------------------------------
on
(1 row)
CREATE TABLE clock_test (id int, nonid int);
SELECT create_distributed_table('clock_test', 'id', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
--
-- Compare <logical, counter> pairs
--
-- Returns true
SELECT citus_is_clock_after('(5,1)', '(3,6)');
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
-- Returns false
SELECT citus_is_clock_after('(2,9)', '(3,0)');
citus_is_clock_after
---------------------------------------------------------------------
f
(1 row)
-- Returns true
SELECT citus_is_clock_after('(5,6)', '(5,1)');
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
-- Returns false
SELECT citus_is_clock_after('(5,6)', '(5,6)');
citus_is_clock_after
---------------------------------------------------------------------
f
(1 row)
--
-- Check the clock is *monotonically increasing*
--
SELECT citus_get_cluster_clock() \gset t1
SELECT citus_get_cluster_clock() \gset t2
SELECT citus_get_cluster_clock() \gset t3
-- Both should return true
SELECT citus_is_clock_after(:'t2citus_get_cluster_clock', :'t1citus_get_cluster_clock');
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
SELECT citus_is_clock_after(:'t3citus_get_cluster_clock', :'t2citus_get_cluster_clock');
citus_is_clock_after
---------------------------------------------------------------------
t
(1 row)
-- Returns false
SELECT citus_is_clock_after(:'t1citus_get_cluster_clock', :'t3citus_get_cluster_clock');
citus_is_clock_after
---------------------------------------------------------------------
f
(1 row)
CREATE TABLE cluster_clock_type(cc cluster_clock);
INSERT INTO cluster_clock_type values('(0, 100)');
INSERT INTO cluster_clock_type values('(100, 0)');
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 2)');
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 100)');
INSERT INTO cluster_clock_type values('(200, 20)');
INSERT INTO cluster_clock_type values('(200, 3)');
INSERT INTO cluster_clock_type values('(200, 400)');
INSERT INTO cluster_clock_type values('(500, 600)');
INSERT INTO cluster_clock_type values('(500, 0)');
SELECT cc FROM cluster_clock_type ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,100)
(100,200)
(200,3)
(200,20)
(200,400)
(500,0)
(500,600)
(11 rows)
SELECT cc FROM cluster_clock_type where cc = '(200, 400)';
cc
---------------------------------------------------------------------
(200,400)
(1 row)
SELECT cc FROM cluster_clock_type where cc <> '(500, 600)';
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,200)
(100,100)
(200,20)
(200,3)
(200,400)
(500,0)
(10 rows)
SELECT cc FROM cluster_clock_type where cc != '(500, 600)';
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,200)
(100,100)
(200,20)
(200,3)
(200,400)
(500,0)
(10 rows)
SELECT cc FROM cluster_clock_type where cc < '(200, 20)' ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,100)
(100,200)
(200,3)
(7 rows)
SELECT cc FROM cluster_clock_type where cc <= '(200, 20)' ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(0,100)
(100,0)
(100,1)
(100,2)
(100,100)
(100,200)
(200,3)
(200,20)
(8 rows)
SELECT cc FROM cluster_clock_type where cc > '(200, 20)' ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(200,400)
(500,0)
(500,600)
(3 rows)
SELECT cc FROM cluster_clock_type where cc >= '(200, 20)' ORDER BY 1 ASC;
cc
---------------------------------------------------------------------
(200,20)
(200,400)
(500,0)
(500,600)
(4 rows)
CREATE INDEX cc_idx on cluster_clock_type(cc);
-- Multiply rows to check index usage
INSERT INTO cluster_clock_type SELECT a.cc FROM cluster_clock_type a, cluster_clock_type b;
INSERT INTO cluster_clock_type SELECT a.cc FROM cluster_clock_type a, cluster_clock_type b;
EXPLAIN SELECT cc FROM cluster_clock_type ORDER BY 1 ASC LIMIT 1;
QUERY PLAN
---------------------------------------------------------------------
Limit (cost=0.28..0.92 rows=1 width=12)
-> Index Only Scan using cc_idx on cluster_clock_type (cost=0.28..667.95 rows=1045 width=12)
(2 rows)
SELECT cc FROM cluster_clock_type ORDER BY 1 ASC LIMIT 1;
cc
---------------------------------------------------------------------
(0,100)
(1 row)
EXPLAIN SELECT cc FROM cluster_clock_type where cc = '(200, 20)' LIMIT 5;
QUERY PLAN
---------------------------------------------------------------------
Limit (cost=4.32..20.94 rows=5 width=12)
-> Bitmap Heap Scan on cluster_clock_type (cost=4.32..20.94 rows=5 width=12)
Recheck Cond: (cc = '(200,20)'::cluster_clock)
-> Bitmap Index Scan on cc_idx (cost=0.00..4.31 rows=5 width=0)
Index Cond: (cc = '(200,20)'::cluster_clock)
(5 rows)
SELECT cc FROM cluster_clock_type where cc = '(200, 20)' LIMIT 5;
cc
---------------------------------------------------------------------
(200,20)
(200,20)
(200,20)
(200,20)
(200,20)
(5 rows)
-- Max limits
INSERT INTO cluster_clock_type values('(4398046511103, 0)');
INSERT INTO cluster_clock_type values('(0, 4194303)');
INSERT INTO cluster_clock_type values('(4398046511103, 4194303)');
-- Bad input
INSERT INTO cluster_clock_type values('(-1, 100)');
ERROR: invalid input syntax for type cluster_clock: "(-1, 100)"
INSERT INTO cluster_clock_type values('(100, -1)');
ERROR: invalid input syntax for type cluster_clock: "(100, -1)"
INSERT INTO cluster_clock_type values('(4398046511104, 100)'); -- too big to into 42 bits
ERROR: invalid input syntax for type cluster_clock: "(4398046511104, 100)"
INSERT INTO cluster_clock_type values('(0, 4194304)'); -- too big to into 22 bits
ERROR: invalid input syntax for type cluster_clock: "(0, 4194304)"
DROP TABLE cluster_clock_type;
CREATE TABLE cluster_clock_type(cc cluster_clock UNIQUE);
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 1)');
ERROR: duplicate key value violates unique constraint "cluster_clock_type_cc_key"
DETAIL: Key (cc)=((100,1)) already exists.
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 200)');
ERROR: duplicate key value violates unique constraint "cluster_clock_type_cc_key"
DETAIL: Key (cc)=((100,200)) already exists.
INSERT INTO cluster_clock_type values('(100, 100)');
INSERT INTO cluster_clock_type values('(100, 100)');
ERROR: duplicate key value violates unique constraint "cluster_clock_type_cc_key"
DETAIL: Key (cc)=((100,100)) already exists.
-- Get wall clock epoch in milliseconds
SELECT (extract(epoch from now()) * 1000)::bigint AS epoch \gset
-- This should return the epoch value as there is no difference from (0,0)
SELECT cluster_clock_diff_in_ms('(0,0)') epoch_diff \gset
-- Returns false
SELECT (:epoch - :epoch_diff) > 10 as epoch_diff;
epoch_diff
---------------------------------------------------------------------
f
(1 row)
--
-- Check the value returned by citus_get_cluster_clock is close to epoch in ms
--
SELECT citus_get_cluster_clock() AS latest_clock \gset
-- Returns the clock difference from epoch in ms
SELECT cluster_clock_diff_in_ms(:'latest_clock') as epoch_diff_in_ms \gset
-- Returns false
SELECT :epoch_diff_in_ms > 10 as epoch_diff_in_ms;
epoch_diff_in_ms
---------------------------------------------------------------------
f
(1 row)
-- Transaction that accesses multiple nodes
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
COMMIT;
DEBUG: node(1:57637) transaction clock xxxxxx
DEBUG: node(2:57638) transaction clock xxxxxx
DEBUG: node(4:57637) transaction clock xxxxxx
DEBUG: node(5:57638) transaction clock xxxxxx
DEBUG: node(6:57637) transaction clock xxxxxx
DEBUG: node(7:57638) transaction clock xxxxxx
DEBUG: node(8:57637) transaction clock xxxxxx
DEBUG: node(9:57638) transaction clock xxxxxx
DEBUG: node(3:57636) transaction clock xxxxxx
DEBUG: final global transaction clock xxxxxx
--
-- Check to see if the transaction is indeed persisted in the catalog
--
SELECT count(*)
FROM pg_dist_commit_transaction commit_clock
WHERE transaction_id = :'tidget_current_transaction_id';
count
---------------------------------------------------------------------
1
(1 row)
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
ROLLBACK;
RESET client_min_messages;
--
-- Check that the transaction is not persisted
--
SELECT count(*)
FROM pg_dist_commit_transaction commit_clock
WHERE transaction_id = :'tidget_current_transaction_id';
count
---------------------------------------------------------------------
0
(1 row)
RESET client_min_messages;
RESET citus.enable_cluster_clock;
DROP SCHEMA clock CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table clock_test
drop cascades to table cluster_clock_type

View File

@ -1184,9 +1184,14 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.2-1
ALTER EXTENSION citus UPDATE TO '11.2-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
| function citus_get_cluster_clock() cluster_clock
| function citus_internal_adjust_local_clock_to_remote(cluster_clock) void
| function citus_is_clock_after(cluster_clock,cluster_clock) boolean
| table pg_dist_commit_transaction
| type cluster_clock
(5 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -53,6 +53,7 @@ ORDER BY 1;
function citus_finish_citus_upgrade()
function citus_finish_pg_upgrade()
function citus_get_active_worker_nodes()
function citus_get_cluster_clock()
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_trigger_func()
@ -65,6 +66,7 @@ ORDER BY 1;
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint)
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
function citus_internal_adjust_local_clock_to_remote(cluster_clock)
function citus_internal_delete_colocation_metadata(integer)
function citus_internal_delete_partition_metadata(regclass)
function citus_internal_delete_shard_metadata(bigint)
@ -72,6 +74,7 @@ ORDER BY 1;
function citus_internal_local_blocked_processes()
function citus_internal_update_placement_metadata(bigint,integer,integer)
function citus_internal_update_relation_colocation(oid,integer)
function citus_is_clock_after(cluster_clock,cluster_clock)
function citus_is_coordinator()
function citus_isolation_test_session_is_blocked(integer,integer[])
function citus_job_cancel(bigint)
@ -122,6 +125,18 @@ ORDER BY 1;
function citus_update_table_statistics(regclass)
function citus_validate_rebalance_strategy_functions(regproc,regproc,regproc)
function citus_version()
function cluster_clock_cmp(cluster_clock,cluster_clock)
function cluster_clock_diff_in_ms(cluster_clock)
function cluster_clock_eq(cluster_clock,cluster_clock)
function cluster_clock_ge(cluster_clock,cluster_clock)
function cluster_clock_gt(cluster_clock,cluster_clock)
function cluster_clock_in(cstring)
function cluster_clock_le(cluster_clock,cluster_clock)
function cluster_clock_lt(cluster_clock,cluster_clock)
function cluster_clock_ne(cluster_clock,cluster_clock)
function cluster_clock_out(cluster_clock)
function cluster_clock_recv(internal)
function cluster_clock_send(cluster_clock)
function column_name_to_column(regclass,text)
function column_to_column_name(regclass,text)
function coord_combine_agg(oid,cstring,anyelement)
@ -239,6 +254,14 @@ ORDER BY 1;
function worker_split_copy(bigint,text,split_copy_info[])
function worker_split_shard_release_dsm()
function worker_split_shard_replication_setup(split_shard_info[])
operator <(cluster_clock,cluster_clock)
operator <=(cluster_clock,cluster_clock)
operator <>(cluster_clock,cluster_clock)
operator =(cluster_clock,cluster_clock)
operator >(cluster_clock,cluster_clock)
operator >=(cluster_clock,cluster_clock)
operator class cluster_clock_ops for access method btree
operator family cluster_clock_ops for access method btree
schema citus
schema citus_internal
sequence pg_dist_background_job_job_id_seq
@ -256,6 +279,7 @@ ORDER BY 1;
table pg_dist_background_task_depend
table pg_dist_cleanup
table pg_dist_colocation
table pg_dist_commit_transaction
table pg_dist_local_group
table pg_dist_node
table pg_dist_node_metadata
@ -271,6 +295,7 @@ ORDER BY 1;
type citus_copy_format
type citus_job_status
type citus_task_status
type cluster_clock
type noderole
type replication_slot_info
type split_copy_info
@ -286,5 +311,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(278 rows)
(283 rows)

View File

@ -80,7 +80,7 @@ test: multi_reference_table multi_select_for_update relation_access_tracking pg1
test: custom_aggregate_support aggregate_support tdigest_aggregate_support
test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join anonymous_columns
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join anonymous_columns clock
test: ch_bench_subquery_repartition
test: multi_agg_type_conversion multi_count_type_conversion recursive_relation_planning_restriction_pushdown
test: multi_partition_pruning single_hash_repartition_join unsupported_lateral_subqueries

View File

@ -0,0 +1,145 @@
CREATE SCHEMA clock;
SET search_path TO clock;
SHOW citus.enable_cluster_clock;
SET citus.enable_cluster_clock to ON;
SHOW citus.enable_cluster_clock;
CREATE TABLE clock_test (id int, nonid int);
SELECT create_distributed_table('clock_test', 'id', colocate_with := 'none');
--
-- Compare <logical, counter> pairs
--
-- Returns true
SELECT citus_is_clock_after('(5,1)', '(3,6)');
-- Returns false
SELECT citus_is_clock_after('(2,9)', '(3,0)');
-- Returns true
SELECT citus_is_clock_after('(5,6)', '(5,1)');
-- Returns false
SELECT citus_is_clock_after('(5,6)', '(5,6)');
--
-- Check the clock is *monotonically increasing*
--
SELECT citus_get_cluster_clock() \gset t1
SELECT citus_get_cluster_clock() \gset t2
SELECT citus_get_cluster_clock() \gset t3
-- Both should return true
SELECT citus_is_clock_after(:'t2citus_get_cluster_clock', :'t1citus_get_cluster_clock');
SELECT citus_is_clock_after(:'t3citus_get_cluster_clock', :'t2citus_get_cluster_clock');
-- Returns false
SELECT citus_is_clock_after(:'t1citus_get_cluster_clock', :'t3citus_get_cluster_clock');
CREATE TABLE cluster_clock_type(cc cluster_clock);
INSERT INTO cluster_clock_type values('(0, 100)');
INSERT INTO cluster_clock_type values('(100, 0)');
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 2)');
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 100)');
INSERT INTO cluster_clock_type values('(200, 20)');
INSERT INTO cluster_clock_type values('(200, 3)');
INSERT INTO cluster_clock_type values('(200, 400)');
INSERT INTO cluster_clock_type values('(500, 600)');
INSERT INTO cluster_clock_type values('(500, 0)');
SELECT cc FROM cluster_clock_type ORDER BY 1 ASC;
SELECT cc FROM cluster_clock_type where cc = '(200, 400)';
SELECT cc FROM cluster_clock_type where cc <> '(500, 600)';
SELECT cc FROM cluster_clock_type where cc != '(500, 600)';
SELECT cc FROM cluster_clock_type where cc < '(200, 20)' ORDER BY 1 ASC;
SELECT cc FROM cluster_clock_type where cc <= '(200, 20)' ORDER BY 1 ASC;
SELECT cc FROM cluster_clock_type where cc > '(200, 20)' ORDER BY 1 ASC;
SELECT cc FROM cluster_clock_type where cc >= '(200, 20)' ORDER BY 1 ASC;
CREATE INDEX cc_idx on cluster_clock_type(cc);
-- Multiply rows to check index usage
INSERT INTO cluster_clock_type SELECT a.cc FROM cluster_clock_type a, cluster_clock_type b;
INSERT INTO cluster_clock_type SELECT a.cc FROM cluster_clock_type a, cluster_clock_type b;
EXPLAIN SELECT cc FROM cluster_clock_type ORDER BY 1 ASC LIMIT 1;
SELECT cc FROM cluster_clock_type ORDER BY 1 ASC LIMIT 1;
EXPLAIN SELECT cc FROM cluster_clock_type where cc = '(200, 20)' LIMIT 5;
SELECT cc FROM cluster_clock_type where cc = '(200, 20)' LIMIT 5;
-- Max limits
INSERT INTO cluster_clock_type values('(4398046511103, 0)');
INSERT INTO cluster_clock_type values('(0, 4194303)');
INSERT INTO cluster_clock_type values('(4398046511103, 4194303)');
-- Bad input
INSERT INTO cluster_clock_type values('(-1, 100)');
INSERT INTO cluster_clock_type values('(100, -1)');
INSERT INTO cluster_clock_type values('(4398046511104, 100)'); -- too big to into 42 bits
INSERT INTO cluster_clock_type values('(0, 4194304)'); -- too big to into 22 bits
DROP TABLE cluster_clock_type;
CREATE TABLE cluster_clock_type(cc cluster_clock UNIQUE);
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 1)');
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 200)');
INSERT INTO cluster_clock_type values('(100, 100)');
INSERT INTO cluster_clock_type values('(100, 100)');
-- Get wall clock epoch in milliseconds
SELECT (extract(epoch from now()) * 1000)::bigint AS epoch \gset
-- This should return the epoch value as there is no difference from (0,0)
SELECT cluster_clock_diff_in_ms('(0,0)') epoch_diff \gset
-- Returns false
SELECT (:epoch - :epoch_diff) > 10 as epoch_diff;
--
-- Check the value returned by citus_get_cluster_clock is close to epoch in ms
--
SELECT citus_get_cluster_clock() AS latest_clock \gset
-- Returns the clock difference from epoch in ms
SELECT cluster_clock_diff_in_ms(:'latest_clock') as epoch_diff_in_ms \gset
-- Returns false
SELECT :epoch_diff_in_ms > 10 as epoch_diff_in_ms;
-- Transaction that accesses multiple nodes
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
COMMIT;
--
-- Check to see if the transaction is indeed persisted in the catalog
--
SELECT count(*)
FROM pg_dist_commit_transaction commit_clock
WHERE transaction_id = :'tidget_current_transaction_id';
BEGIN;
INSERT INTO clock_test SELECT generate_series(1, 10000, 1), 0;
SELECT get_current_transaction_id() \gset tid
SET client_min_messages TO DEBUG1;
ROLLBACK;
RESET client_min_messages;
--
-- Check that the transaction is not persisted
--
SELECT count(*)
FROM pg_dist_commit_transaction commit_clock
WHERE transaction_id = :'tidget_current_transaction_id';
RESET client_min_messages;
RESET citus.enable_cluster_clock;
DROP SCHEMA clock CASCADE;