implement native cluster clock type

causal-order-clock-native
Nils Dijk 2022-09-30 12:53:18 +02:00
parent 40f5c2bab3
commit d203e0da9f
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
3 changed files with 101 additions and 1 deletions

View File

@ -176,6 +176,26 @@ GRANT SELECT ON pg_catalog.pg_dist_background_task_depend TO PUBLIC;
CREATE TYPE citus.cluster_clock AS (logical bigint, counter int);
ALTER TYPE citus.cluster_clock SET SCHEMA pg_catalog;
CREATE TYPE citus.cluster_clock_native;
CREATE OR REPLACE FUNCTION pg_catalog.citus_cluster_clock_native_in(pg_catalog.cstring)
RETURNS citus.cluster_clock_native
LANGUAGE C IMMUTABLE
AS 'MODULE_PATHNAME',$$citus_cluster_clock_native_in$$;
CREATE OR REPLACE FUNCTION pg_catalog.citus_cluster_clock_native_out(citus.cluster_clock_native)
RETURNS pg_catalog.cstring
LANGUAGE C IMMUTABLE
AS 'MODULE_PATHNAME',$$citus_cluster_clock_native_out$$;
CREATE TYPE citus.cluster_clock_native (
INPUT = pg_catalog.citus_cluster_clock_native_in,
OUTPUT = pg_catalog.citus_cluster_clock_native_out,
INTERNALLENGTH = 8
);
ALTER TYPE citus.cluster_clock_native SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_commit_transaction(
transaction_id TEXT NOT NULL CONSTRAINT pg_dist_commit_transactionId_unique_constraint UNIQUE,
cluster_clock_value cluster_clock NOT NULL,

View File

@ -38,6 +38,8 @@
#include "distributed/remote_commands.h"
#include "distributed/type_utils.h"
PG_FUNCTION_INFO_V1(citus_cluster_clock_native_in);
PG_FUNCTION_INFO_V1(citus_cluster_clock_native_out);
PG_FUNCTION_INFO_V1(citus_get_cluster_clock);
PG_FUNCTION_INFO_V1(citus_internal_adjust_local_clock_to_remote);
PG_FUNCTION_INFO_V1(citus_is_clock_after);
@ -83,6 +85,82 @@ static bool IsClockAfter(uint64 logicalClock1, uint32 counterClock1,
uint64 logicalClock2, uint32 counterClock2);
bool EnableClusterClock = false;
#define LDELIM '('
#define RDELIM ')'
#define DELIM ','
#define NTIDARGS 2
Datum
citus_cluster_clock_native_in(PG_FUNCTION_ARGS)
{
char *str = PG_GETARG_CSTRING(0);
char *p;
char *coord[NTIDARGS];
int i;
ClusterClockNative *result;
char *badp;
for (i = 0, p = str; *p && i < NTIDARGS && *p != RDELIM; p++)
{
if (*p == DELIM || (*p == LDELIM && !i))
{
coord[i++] = p + 1;
}
}
if (i < NTIDARGS)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"cluster_clock_native", str)));
}
errno = 0;
uint64 logical = strtou64(coord[0], &badp, 10);
if (errno || *badp != DELIM)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"cluster_clock_native", str)));
}
uint32 counter = strtoul(coord[1], &badp, 10);
if (errno || *badp != RDELIM ||
counter > MAX_COUNTER || counter < 0)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"",
"tid", str)));
}
result = (ClusterClockNative *) palloc(sizeof(ClusterClockNative));
SET_CLOCK(*result, logical, counter);
return PointerGetDatum(result);
}
Datum
citus_cluster_clock_native_out(PG_FUNCTION_ARGS)
{
ClusterClockNative *clockPtr =
(ClusterClockNative *) DatumGetPointer(PG_GETARG_DATUM(0));
uint64 logical = GET_LOGICAL(*clockPtr);
uint32 counter = GET_COUNTER(*clockPtr);
/* Perhaps someday we should output this as a record. */
char buf[32] = { 0 };
snprintf(buf, sizeof(buf), "(%lu,%u)", logical, counter);
PG_RETURN_CSTRING(pstrdup(buf));
}
/*
* GetEpochTimeMs returns the epoch value in milliseconds.

View File

@ -14,6 +14,8 @@
extern size_t LogicalClockShmemSize(void);
typedef uint64 ClusterClockNative;
/*
* Clock components - Unsigned 64 bit <LC, C>
* Logical clock (LC): 42 bits
@ -32,7 +34,7 @@ extern size_t LogicalClockShmemSize(void);
#define GET_COUNTER(x) ((x) & LOGICAL_MASK)
/* concatenate logical and counter to form a 64 bit clock value */
#define SET_CLOCK(var, lc, c) var = (((lc) << COUNTER_BITS) | (c))
#define SET_CLOCK(var, lc, c) (var) = (((lc) << COUNTER_BITS) | ((c) & LOGICAL_MASK))
extern bool EnableClusterClock;
extern void LogicalClockShmemInit(void);