Adding frame encoding and fixing handling of some types

pull/1/head
Xavier Stevens 2014-09-30 11:35:07 -07:00
parent be9ef38f1d
commit 4f29f5c47f
1 changed files with 38 additions and 19 deletions

View File

@ -12,8 +12,7 @@
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
*all
* copies or substantial portions of the Software.
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
@ -24,6 +23,13 @@
* SOFTWARE.
*/
#if defined(__linux__)
#include <endian.h>
#elif defined(__APPLE__)
#include <machine/endian.h>
#include <libkern/OSByteOrder.h>
#define htobe64(x) OSSwapHostToBigInt64(x)
#endif
#include <inttypes.h>
#include "postgres.h"
@ -75,7 +81,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation rel, ReorderBufferChange *change);
void _PG_init(void) {}
void _PG_init(void) {
}
/* specify output plugin callbacks */
void _PG_output_plugin_init(OutputPluginCallbacks *cb) {
@ -97,7 +104,7 @@ static void pg_decode_startup(LogicalDecodingContext *ctx,
data->context = AllocSetContextCreate(
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
data->debug_mode = false;
ctx->output_plugin_private = data;
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
@ -106,16 +113,21 @@ static void pg_decode_startup(LogicalDecodingContext *ctx,
Assert(elem->arg == NULL || IsA(elem->arg, String));
if (strcmp(elem->defname, "debug-mode") == 0) {
if (elem->arg == NULL)
if (elem->arg == NULL) {
data->debug_mode = false;
else if (!parse_bool(strVal(elem->arg), &data->debug_mode))
} else if (!parse_bool(strVal(elem->arg), &data->debug_mode)) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
if (data->debug_mode)
if (data->debug_mode) {
fprintf(stderr, "Decoderbufs DEBUG MODE is ON.");
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
} else {
fprintf(stderr, "Decoderbufs DEBUG MODE is OFF.");
}
} else {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("option \"%s\" = \"%s\" is unknown", elem->defname,
@ -248,7 +260,8 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
Oid typoutput, Datum datum) {
Numeric num;
bytea *valptr;
char *output;
const char *output;
int size = 0;
switch (typid) {
case BOOLOID:
datum_msg->datum_bool = DatumGetBool(datum);
@ -292,24 +305,27 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_string = pnstrdup(output, strlen(output));
break;
case TIMESTAMPOID:
/*
* THIS FALLTHROUGH IS MAKING THE ASSUMPTION WE ARE ON UTC
*/
case TIMESTAMPTZOID:
datum_msg->datum_string = pstrdup(timestamptz_to_str(DatumGetTimestampTz(datum)));
output = timestamptz_to_str(DatumGetTimestampTz(datum));
datum_msg->datum_string = pnstrdup(output, strlen(output));
break;
case BYTEAOID:
valptr = DatumGetByteaPCopy(datum);
int size = VARSIZE(valptr);
datum_msg->datum_bytes =
*((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData)));
datum_msg->datum_bytes.data = (uint8_t *)VARDATA(valptr);
datum_msg->datum_bytes.len = size - VARHDRSZ;
size = VARSIZE(valptr) - VARHDRSZ;
datum_msg->datum_bytes.data = palloc(size);
memcpy(datum_msg->datum_bytes.data, (uint8_t *)VARDATA(valptr), size);
datum_msg->datum_bytes.len = size;
datum_msg->has_datum_bytes = true;
break;
default:
output = OidOutputFunctionCall(typoutput, datum);
datum_msg->datum_bytes =
*((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData)));
datum_msg->datum_bytes.data = (uint8_t *)output;
datum_msg->datum_bytes.len = sizeof(output);
size = sizeof(output);
datum_msg->datum_bytes.data = palloc(size);
memcpy(datum_msg->datum_bytes.data, (uint8_t *)output, size);
datum_msg->datum_bytes.len = size;
datum_msg->has_datum_bytes = true;
break;
}
@ -475,7 +491,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
// appendBinaryStringInfo(ctx->out, (void *)psize, sizeof(psize));
uint64_t flen = htobe64(ssize);
/* frame encoding size */
appendBinaryStringInfo(ctx->out, (char *) &flen, sizeof(flen));
/* frame encoding payload */
appendBinaryStringInfo(ctx->out, packed, ssize);
OutputPluginWrite(ctx, true);