diff --git a/src/decoderbufs.c b/src/decoderbufs.c index 4c9b226..3482d66 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -51,8 +51,15 @@ #include "utils/syscache.h" #include "utils/typcache.h" #include "utils/uuid.h" +#include "utils/timestamp.h" +#include "utils/date.h" +#include "utils/cash.h" #include "proto/pg_logicaldec.pb-c.h" +#ifndef HAVE_INT64_TIMESTAMP +#error Expecting timestamps to be represented as integers, not as floating-point. +#endif + /* POSTGIS version define so it doesn't redef macros */ #define POSTGIS_PGSQL_VERSION 94 #include "liblwgeom.h" @@ -60,15 +67,11 @@ PG_MODULE_MAGIC; /* define a time macro to convert TimestampTz into something more sane, - * which in this case is microseconds since epoch + * which in this case is microseconds since epoch. This is because PG stores internally the timestamps relative to + * 2000-01-01T00:00:00Z and not the Unix epoch. */ -#ifdef HAVE_INT64_TIMESTAMP -#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \ - t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY *USECS_PER_SEC); -#else -#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \ - (t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * 1000.0; -#endif +#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) t + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; +#define DATE_TO_DAYS_SINCE_EPOCH(t) t + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE); typedef struct { MemoryContext context; @@ -395,6 +398,11 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, bytea *valptr = NULL; const char *output = NULL; Point *p = NULL; + Timestamp ts = NULL; + double duration; + TimeTzADT *timetz = NULL; + Interval *interval = NULL; + int size = 0; switch (typid) { case BOOLOID: @@ -429,25 +437,67 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; } break; + case CASHOID: + datum_msg->datum_int64 = DatumGetCash(datum); + datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; + break; case CHAROID: case VARCHAROID: case BPCHAROID: case TEXTOID: case JSONOID: + case JSONBOID: case XMLOID: + case BITOID: + case VARBITOID: case UUIDOID: output = OidOutputFunctionCall(typoutput, datum); datum_msg->datum_string = pnstrdup(output, strlen(output)); datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING; break; case TIMESTAMPOID: - /* - * THIS FALLTHROUGH IS MAKING THE ASSUMPTION WE ARE ON UTC - */ + ts = DatumGetTimestamp(datum); + if (TIMESTAMP_NOT_FINITE(ts)) { + ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE), + errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)"))); + } else { + datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); + datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; + break; + } case TIMESTAMPTZOID: - output = timestamptz_to_str(DatumGetTimestampTz(datum)); - datum_msg->datum_string = pnstrdup(output, strlen(output)); - datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING; + ts = DatumGetTimestampTz(datum); + if (TIMESTAMP_NOT_FINITE(ts)) { + ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE), + errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)"))); + } else { + datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); + datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; + break; + } + case DATEOID: + /* simply get the number of days as the stored 32 bit value and convert to EPOCH */ + datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum)); + datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32; + break; + case TIMEOID: + datum_msg->datum_int64 = DatumGetTimeADT(datum); + datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; + break; + case TIMETZOID: + timetz = DatumGetTimeTzADTP(datum); + /* use GMT-equivalent time */ + datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0)); + datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; + break; + case INTERVALOID: + interval = DatumGetIntervalP(datum); + /* + Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days. + */ + duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY); + datum_msg->datum_double = duration; + datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; break; case BYTEAOID: valptr = DatumGetByteaPCopy(datum);