From be9ef38f1d5081b4313c35c35a1b9ed9ad1ca1e2 Mon Sep 17 00:00:00 2001 From: Xavier Stevens Date: Tue, 23 Sep 2014 12:19:29 -0700 Subject: [PATCH] Adding timestamp oid support --- proto/pg_logicaldec.proto | 4 ++-- src/decoderbufs.c | 27 +++++++++++++++++---------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/proto/pg_logicaldec.proto b/proto/pg_logicaldec.proto index 070c34c..cafdd71 100644 --- a/proto/pg_logicaldec.proto +++ b/proto/pg_logicaldec.proto @@ -1,7 +1,7 @@ package decoderbufs; option java_package="dedoderbufs.proto"; -option java_outer_classname = "TxnProto"; +option java_outer_classname = "PgldProto"; option optimize_for = SPEED; enum Op { @@ -12,7 +12,7 @@ enum Op { message DatumMessage { optional string column_name = 1; - optional int64 column_type = 2; + optional int64 column_type = 2; optional int32 datum_int32 = 3; optional int64 datum_int64 = 4; optional float datum_float = 5; diff --git a/src/decoderbufs.c b/src/decoderbufs.c index e17473f..fb0bacf 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -61,7 +61,7 @@ typedef struct { bool debug_mode; } DecoderData; -/* These must be available to pg_dlsym() */ +/* these must be available to pg_dlsym() */ extern void _PG_init(void); extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); @@ -134,7 +134,8 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) { /* BEGIN callback */ static void pg_decode_begin_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn) {} + ReorderBufferTXN *txn) { +} /* COMMIT callback */ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, @@ -180,6 +181,7 @@ static void free_row_msg_subs(Decoderbufs__RowMessage *msg) { } } +/* only used for debug-mode (currently not all OIDs are currently supported) */ static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup, size_t n) { if (tup) { @@ -205,6 +207,8 @@ static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup, appendStringInfo(out, ", datum[%f]", dmsg->datum_double); break; case TEXTOID: + case TIMESTAMPOID: + case TIMESTAMPTZOID: appendStringInfo(out, ", datum[%s]", dmsg->datum_string); break; default: @@ -239,10 +243,10 @@ static double numeric_to_double_no_overflow(Numeric num) { return val; } +/* set a datum value based on its OID specified by typid */ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, Oid typoutput, Datum datum) { Numeric num; - char c; bytea *valptr; char *output; switch (typid) { @@ -279,14 +283,18 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, } break; case CHAROID: - c = DatumGetChar(datum); - datum_msg->datum_string = &c; - break; case VARCHAROID: + case BPCHAROID: case TEXTOID: + case JSONOID: + case XMLOID: output = OidOutputFunctionCall(typoutput, datum); datum_msg->datum_string = pnstrdup(output, strlen(output)); break; + case TIMESTAMPOID: + case TIMESTAMPTZOID: + datum_msg->datum_string = pstrdup(timestamptz_to_str(DatumGetTimestampTz(datum))); + break; case BYTEAOID: valptr = DatumGetByteaPCopy(datum); int size = VARSIZE(valptr); @@ -307,6 +315,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, } } +/* convert a PG tuple to an array of DatumMessage(s) */ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Relation relation, HeapTuple tuple, TupleDesc tupdesc) { @@ -340,7 +349,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Oid typoutput; bool typisvarlena; - /* Query output function */ + /* query output function */ getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); if (!isnull) { if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) { @@ -359,9 +368,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, } } -/* - * callback for individual changed tuples - */ +/* callback for individual changed tuples */ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { DecoderData *data;