Merge pull request #3 from hchiorean/DBZ-3

DBZ-3 Adds support for the CASH, BIT, JSONB, VARBIT, DATE,TIME,TIMETZ and INTERVAL oids
pull/4/head v0.2.0
Randall Hauch 2016-11-04 09:28:52 -05:00 committed by GitHub
commit 1baad6ecc6
1 changed files with 64 additions and 14 deletions

View File

@ -51,8 +51,15 @@
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/typcache.h" #include "utils/typcache.h"
#include "utils/uuid.h" #include "utils/uuid.h"
#include "utils/timestamp.h"
#include "utils/date.h"
#include "utils/cash.h"
#include "proto/pg_logicaldec.pb-c.h" #include "proto/pg_logicaldec.pb-c.h"
#ifndef HAVE_INT64_TIMESTAMP
#error Expecting timestamps to be represented as integers, not as floating-point.
#endif
/* POSTGIS version define so it doesn't redef macros */ /* POSTGIS version define so it doesn't redef macros */
#define POSTGIS_PGSQL_VERSION 94 #define POSTGIS_PGSQL_VERSION 94
#include "liblwgeom.h" #include "liblwgeom.h"
@ -60,15 +67,11 @@
PG_MODULE_MAGIC; PG_MODULE_MAGIC;
/* define a time macro to convert TimestampTz into something more sane, /* define a time macro to convert TimestampTz into something more sane,
* which in this case is microseconds since epoch * which in this case is microseconds since epoch. This is because PG stores internally the timestamps relative to
* 2000-01-01T00:00:00Z and not the Unix epoch.
*/ */
#ifdef HAVE_INT64_TIMESTAMP #define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) t + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY;
#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \ #define DATE_TO_DAYS_SINCE_EPOCH(t) t + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE);
t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY *USECS_PER_SEC);
#else
#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \
(t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * 1000.0;
#endif
typedef struct { typedef struct {
MemoryContext context; MemoryContext context;
@ -395,6 +398,11 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
bytea *valptr = NULL; bytea *valptr = NULL;
const char *output = NULL; const char *output = NULL;
Point *p = NULL; Point *p = NULL;
Timestamp ts = NULL;
double duration;
TimeTzADT *timetz = NULL;
Interval *interval = NULL;
int size = 0; int size = 0;
switch (typid) { switch (typid) {
case BOOLOID: case BOOLOID:
@ -429,25 +437,67 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
} }
break; break;
case CASHOID:
datum_msg->datum_int64 = DatumGetCash(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case CHAROID: case CHAROID:
case VARCHAROID: case VARCHAROID:
case BPCHAROID: case BPCHAROID:
case TEXTOID: case TEXTOID:
case JSONOID: case JSONOID:
case JSONBOID:
case XMLOID: case XMLOID:
case BITOID:
case VARBITOID:
case UUIDOID: case UUIDOID:
output = OidOutputFunctionCall(typoutput, datum); output = OidOutputFunctionCall(typoutput, datum);
datum_msg->datum_string = pnstrdup(output, strlen(output)); datum_msg->datum_string = pnstrdup(output, strlen(output));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING;
break; break;
case TIMESTAMPOID: case TIMESTAMPOID:
/* ts = DatumGetTimestamp(datum);
* THIS FALLTHROUGH IS MAKING THE ASSUMPTION WE ARE ON UTC if (TIMESTAMP_NOT_FINITE(ts)) {
*/ ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
}
case TIMESTAMPTZOID: case TIMESTAMPTZOID:
output = timestamptz_to_str(DatumGetTimestampTz(datum)); ts = DatumGetTimestampTz(datum);
datum_msg->datum_string = pnstrdup(output, strlen(output)); if (TIMESTAMP_NOT_FINITE(ts)) {
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING; ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
}
case DATEOID:
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
break;
case TIMEOID:
datum_msg->datum_int64 = DatumGetTimeADT(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case TIMETZOID:
timetz = DatumGetTimeTzADTP(datum);
/* use GMT-equivalent time */
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case INTERVALOID:
interval = DatumGetIntervalP(datum);
/*
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
*/
duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY);
datum_msg->datum_double = duration;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break; break;
case BYTEAOID: case BYTEAOID:
valptr = DatumGetByteaPCopy(datum); valptr = DatumGetByteaPCopy(datum);