From 4f29f5c47f3cbf68efa57281eeda071f6fc1cbe1 Mon Sep 17 00:00:00 2001 From: Xavier Stevens Date: Tue, 30 Sep 2014 11:35:07 -0700 Subject: [PATCH] Adding frame encoding and fixing handling of some types --- src/decoderbufs.c | 57 +++++++++++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/src/decoderbufs.c b/src/decoderbufs.c index fb0bacf..cacea4c 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -12,8 +12,7 @@ * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in - *all - * copies or substantial portions of the Software. + * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, @@ -24,6 +23,13 @@ * SOFTWARE. */ +#if defined(__linux__) + #include +#elif defined(__APPLE__) + #include + #include + #define htobe64(x) OSSwapHostToBigInt64(x) +#endif #include #include "postgres.h" @@ -75,7 +81,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); -void _PG_init(void) {} +void _PG_init(void) { +} /* specify output plugin callbacks */ void _PG_output_plugin_init(OutputPluginCallbacks *cb) { @@ -97,7 +104,7 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, data->context = AllocSetContextCreate( ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - + data->debug_mode = false; ctx->output_plugin_private = data; opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; @@ -106,16 +113,21 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, Assert(elem->arg == NULL || IsA(elem->arg, String)); if (strcmp(elem->defname, "debug-mode") == 0) { - if (elem->arg == NULL) + if (elem->arg == NULL) { data->debug_mode = false; - else if (!parse_bool(strVal(elem->arg), &data->debug_mode)) + } else if (!parse_bool(strVal(elem->arg), &data->debug_mode)) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); + } - if (data->debug_mode) + if (data->debug_mode) { + fprintf(stderr, "Decoderbufs DEBUG MODE is ON."); opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; + } else { + fprintf(stderr, "Decoderbufs DEBUG MODE is OFF."); + } } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, @@ -248,7 +260,8 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, Oid typoutput, Datum datum) { Numeric num; bytea *valptr; - char *output; + const char *output; + int size = 0; switch (typid) { case BOOLOID: datum_msg->datum_bool = DatumGetBool(datum); @@ -292,24 +305,27 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, datum_msg->datum_string = pnstrdup(output, strlen(output)); break; case TIMESTAMPOID: + /* + * THIS FALLTHROUGH IS MAKING THE ASSUMPTION WE ARE ON UTC + */ case TIMESTAMPTZOID: - datum_msg->datum_string = pstrdup(timestamptz_to_str(DatumGetTimestampTz(datum))); + output = timestamptz_to_str(DatumGetTimestampTz(datum)); + datum_msg->datum_string = pnstrdup(output, strlen(output)); break; case BYTEAOID: valptr = DatumGetByteaPCopy(datum); - int size = VARSIZE(valptr); - datum_msg->datum_bytes = - *((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData))); - datum_msg->datum_bytes.data = (uint8_t *)VARDATA(valptr); - datum_msg->datum_bytes.len = size - VARHDRSZ; + size = VARSIZE(valptr) - VARHDRSZ; + datum_msg->datum_bytes.data = palloc(size); + memcpy(datum_msg->datum_bytes.data, (uint8_t *)VARDATA(valptr), size); + datum_msg->datum_bytes.len = size; datum_msg->has_datum_bytes = true; break; default: output = OidOutputFunctionCall(typoutput, datum); - datum_msg->datum_bytes = - *((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData))); - datum_msg->datum_bytes.data = (uint8_t *)output; - datum_msg->datum_bytes.len = sizeof(output); + size = sizeof(output); + datum_msg->datum_bytes.data = palloc(size); + memcpy(datum_msg->datum_bytes.data, (uint8_t *)output, size); + datum_msg->datum_bytes.len = size; datum_msg->has_datum_bytes = true; break; } @@ -475,7 +491,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, size_t psize = decoderbufs__row_message__get_packed_size(&rmsg); void *packed = palloc(psize); size_t ssize = decoderbufs__row_message__pack(&rmsg, packed); - // appendBinaryStringInfo(ctx->out, (void *)psize, sizeof(psize)); + uint64_t flen = htobe64(ssize); + /* frame encoding size */ + appendBinaryStringInfo(ctx->out, (char *) &flen, sizeof(flen)); + /* frame encoding payload */ appendBinaryStringInfo(ctx->out, packed, ssize); OutputPluginWrite(ctx, true);