From d203e0da9fa62115c8c1494495c50a704bf8afb2 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Fri, 30 Sep 2022 12:53:18 +0200 Subject: [PATCH] implement native cluster clock type --- .../distributed/sql/citus--11.0-4--11.1-1.sql | 20 +++++ src/backend/distributed/utils/causal_clock.c | 78 +++++++++++++++++++ src/include/distributed/causal_clock.h | 4 +- 3 files changed, 101 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 503abcc4c..a936f7c6c 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -176,6 +176,26 @@ GRANT SELECT ON pg_catalog.pg_dist_background_task_depend TO PUBLIC; CREATE TYPE citus.cluster_clock AS (logical bigint, counter int); ALTER TYPE citus.cluster_clock SET SCHEMA pg_catalog; +CREATE TYPE citus.cluster_clock_native; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_cluster_clock_native_in(pg_catalog.cstring) + RETURNS citus.cluster_clock_native + LANGUAGE C IMMUTABLE + AS 'MODULE_PATHNAME',$$citus_cluster_clock_native_in$$; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_cluster_clock_native_out(citus.cluster_clock_native) + RETURNS pg_catalog.cstring + LANGUAGE C IMMUTABLE + AS 'MODULE_PATHNAME',$$citus_cluster_clock_native_out$$; + +CREATE TYPE citus.cluster_clock_native ( + INPUT = pg_catalog.citus_cluster_clock_native_in, + OUTPUT = pg_catalog.citus_cluster_clock_native_out, + INTERNALLENGTH = 8 +); + +ALTER TYPE citus.cluster_clock_native SET SCHEMA pg_catalog; + CREATE TABLE citus.pg_dist_commit_transaction( transaction_id TEXT NOT NULL CONSTRAINT pg_dist_commit_transactionId_unique_constraint UNIQUE, cluster_clock_value cluster_clock NOT NULL, diff --git a/src/backend/distributed/utils/causal_clock.c b/src/backend/distributed/utils/causal_clock.c index 0978d186e..aeadfaeef 100644 --- a/src/backend/distributed/utils/causal_clock.c +++ b/src/backend/distributed/utils/causal_clock.c @@ -38,6 +38,8 @@ #include "distributed/remote_commands.h" #include "distributed/type_utils.h" +PG_FUNCTION_INFO_V1(citus_cluster_clock_native_in); +PG_FUNCTION_INFO_V1(citus_cluster_clock_native_out); PG_FUNCTION_INFO_V1(citus_get_cluster_clock); PG_FUNCTION_INFO_V1(citus_internal_adjust_local_clock_to_remote); PG_FUNCTION_INFO_V1(citus_is_clock_after); @@ -83,6 +85,82 @@ static bool IsClockAfter(uint64 logicalClock1, uint32 counterClock1, uint64 logicalClock2, uint32 counterClock2); bool EnableClusterClock = false; +#define LDELIM '(' +#define RDELIM ')' +#define DELIM ',' +#define NTIDARGS 2 + +Datum +citus_cluster_clock_native_in(PG_FUNCTION_ARGS) +{ + char *str = PG_GETARG_CSTRING(0); + char *p; + char *coord[NTIDARGS]; + int i; + ClusterClockNative *result; + char *badp; + + for (i = 0, p = str; *p && i < NTIDARGS && *p != RDELIM; p++) + { + if (*p == DELIM || (*p == LDELIM && !i)) + { + coord[i++] = p + 1; + } + } + + if (i < NTIDARGS) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid input syntax for type %s: \"%s\"", + "cluster_clock_native", str))); + } + + errno = 0; + uint64 logical = strtou64(coord[0], &badp, 10); + if (errno || *badp != DELIM) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid input syntax for type %s: \"%s\"", + "cluster_clock_native", str))); + } + + uint32 counter = strtoul(coord[1], &badp, 10); + if (errno || *badp != RDELIM || + counter > MAX_COUNTER || counter < 0) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid input syntax for type %s: \"%s\"", + "tid", str))); + } + + + result = (ClusterClockNative *) palloc(sizeof(ClusterClockNative)); + + SET_CLOCK(*result, logical, counter); + + return PointerGetDatum(result); +} + + +Datum +citus_cluster_clock_native_out(PG_FUNCTION_ARGS) +{ + ClusterClockNative *clockPtr = + (ClusterClockNative *) DatumGetPointer(PG_GETARG_DATUM(0)); + + uint64 logical = GET_LOGICAL(*clockPtr); + uint32 counter = GET_COUNTER(*clockPtr); + + /* Perhaps someday we should output this as a record. */ + char buf[32] = { 0 }; + snprintf(buf, sizeof(buf), "(%lu,%u)", logical, counter); + + PG_RETURN_CSTRING(pstrdup(buf)); +} + /* * GetEpochTimeMs returns the epoch value in milliseconds. diff --git a/src/include/distributed/causal_clock.h b/src/include/distributed/causal_clock.h index 8a652b67b..bf510660d 100644 --- a/src/include/distributed/causal_clock.h +++ b/src/include/distributed/causal_clock.h @@ -14,6 +14,8 @@ extern size_t LogicalClockShmemSize(void); +typedef uint64 ClusterClockNative; + /* * Clock components - Unsigned 64 bit * Logical clock (LC): 42 bits @@ -32,7 +34,7 @@ extern size_t LogicalClockShmemSize(void); #define GET_COUNTER(x) ((x) & LOGICAL_MASK) /* concatenate logical and counter to form a 64 bit clock value */ -#define SET_CLOCK(var, lc, c) var = (((lc) << COUNTER_BITS) | (c)) +#define SET_CLOCK(var, lc, c) (var) = (((lc) << COUNTER_BITS) | ((c) & LOGICAL_MASK)) extern bool EnableClusterClock; extern void LogicalClockShmemInit(void);