From 6a212e9e4cc8f6b140a1396cb295dc162e898cc6 Mon Sep 17 00:00:00 2001 From: Xavier Stevens Date: Mon, 22 Sep 2014 16:21:47 -0700 Subject: [PATCH] First working version for debug text output at least --- .clang-format | 47 ++++ proto/pg_logicaldec.proto | 13 +- src/decoderbufs.c | 467 ++++++++++++++++++++------------- src/proto/pg_logicaldec.pb-c.c | 127 ++++----- src/proto/pg_logicaldec.pb-c.h | 54 ++-- 5 files changed, 425 insertions(+), 283 deletions(-) create mode 100644 .clang-format diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..f3bf35a --- /dev/null +++ b/.clang-format @@ -0,0 +1,47 @@ +--- +# BasedOnStyle: Mozilla +AccessModifierOffset: -2 +ConstructorInitializerIndentWidth: 4 +AlignEscapedNewlinesLeft: false +AlignTrailingComments: true +AllowAllParametersOfDeclarationOnNextLine: false +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AlwaysBreakTemplateDeclarations: false +AlwaysBreakBeforeMultilineStrings: false +BreakBeforeBinaryOperators: false +BreakBeforeTernaryOperators: true +BreakConstructorInitializersBeforeComma: false +BinPackParameters: true +ColumnLimit: 80 +ConstructorInitializerAllOnOneLineOrOnePerLine: true +DerivePointerBinding: true +ExperimentalAutoDetectBinPacking: false +IndentCaseLabels: true +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCSpaceBeforeProtocolList: false +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 60 +PenaltyBreakString: 1000 +PenaltyBreakFirstLessLess: 120 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 200 +PointerBindsToType: true +SpacesBeforeTrailingComments: 1 +Cpp11BracedListStyle: false +Standard: Cpp03 +IndentWidth: 2 +TabWidth: 8 +UseTab: Never +BreakBeforeBraces: Attach +IndentFunctionDeclarationAfterType: false +SpacesInParentheses: false +SpacesInAngles: false +SpaceInEmptyParentheses: false +SpacesInCStyleCastParentheses: false +SpaceAfterControlStatementKeyword: true +SpaceBeforeAssignmentOperators: true +ContinuationIndentWidth: 4 +... + diff --git a/proto/pg_logicaldec.proto b/proto/pg_logicaldec.proto index 2286fe0..070c34c 100644 --- a/proto/pg_logicaldec.proto +++ b/proto/pg_logicaldec.proto @@ -22,11 +22,10 @@ message DatumMessage { optional bytes datum_bytes = 9; } -message TxnMessage { - optional sint64 timestamp = 1; - optional int64 xid = 2; - optional string table = 3; - optional Op op = 4; - optional DatumMessage new_datum = 5; - optional DatumMessage old_datum = 6; +message RowMessage { + optional sint64 commit_time = 1; + optional string table = 2; + optional Op op = 3; + repeated DatumMessage new_tuple = 4; + repeated DatumMessage old_tuple = 5; } diff --git a/src/decoderbufs.c b/src/decoderbufs.c index e94b9b3..36a7450 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -1,5 +1,5 @@ /* - * decoderbufs - PostgreSQL output plug-in for logical replication to Protocol + * decoderbufs - PostgreSQL output plug-in for logical replication to Protocol * Buffers * * Copyright (c) 2014 Xavier Stevens @@ -24,6 +24,9 @@ * SOFTWARE. */ +#include +#include + #include "postgres.h" #include "funcapi.h" #include "catalog/pg_class.h" @@ -43,9 +46,19 @@ PG_MODULE_MAGIC; +/* define a time macro to convert TimestampTz into something more sane, + * which in this case is microseconds since epoch + */ +#ifdef HAVE_INT64_TIMESTAMP +#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \ + t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY * USECS_PER_SEC); +#else +#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \ + (t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * 1000.0; +#endif + typedef struct { MemoryContext context; - Decoderbufs__TxnMessage *txn_msg; bool debug_mode; } DecoderData; @@ -82,12 +95,11 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, DecoderData *data; data = palloc(sizeof(DecoderData)); - data->context = AllocSetContextCreate(ctx->context, "decoderbufs context", - ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + data->context = AllocSetContextCreate( + ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); ctx->output_plugin_private = data; - opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; foreach(option, ctx->output_plugin_options) { @@ -95,17 +107,16 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, Assert(elem->arg == NULL || IsA(elem->arg, String)); if (strcmp(elem->defname, "debug-mode") == 0) { - bool debug_mode; if (elem->arg == NULL) - debug_mode = false; - else if (!parse_bool(strVal(elem->arg), &debug_mode)) + data->debug_mode = false; + else if (!parse_bool(strVal(elem->arg), &data->debug_mode)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); - if (debug_mode) - opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; + if (data->debug_mode) + opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, @@ -114,25 +125,6 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, } } -static void free_txn_msg_datums(Decoderbufs__TxnMessage *msg) { - if (msg->new_datum) { - if (msg->new_datum->has_datum_bytes) { - pfree(msg->new_datum->datum_bytes.data); - msg->new_datum->datum_bytes.data = NULL; - msg->new_datum->datum_bytes.len = 0; - } - pfree(msg->new_datum); - } - if (msg->old_datum) { - if (msg->old_datum->has_datum_bytes) { - pfree(msg->old_datum->datum_bytes.data); - msg->old_datum->datum_bytes.data = NULL; - msg->old_datum->datum_bytes.len = 0; - } - pfree(msg->old_datum); - } -} - /* cleanup this plugin's resources */ static void pg_decode_shutdown(LogicalDecodingContext *ctx) { DecoderData *data = ctx->output_plugin_private; @@ -144,154 +136,228 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) { /* BEGIN callback */ static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { - DecoderData *data = ctx->output_plugin_private; - Decoderbufs__TxnMessage msg = DECODERBUFS__TXN_MESSAGE__INIT; - data->txn_msg = &msg; - data->txn_msg->xid = txn->xid; } /* COMMIT callback */ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - DecoderData *data = ctx->output_plugin_private; - Decoderbufs__TxnMessage *msg = data->txn_msg; - msg->timestamp = txn->commit_time; +} - OutputPluginPrepareWrite(ctx, true); - size_t psize = decoderbufs__txn_message__get_packed_size(msg); - void *packed = palloc(psize); - size_t ssize = decoderbufs__txn_message__pack(msg, packed); - appendBinaryStringInfo(ctx->out, packed, ssize); - OutputPluginWrite(ctx, true); +/* convenience method to free up sub-messages */ +static void free_row_msg_subs(Decoderbufs__RowMessage *msg) { + if (!msg) { + return; + } - pfree(packed); - free_txn_msg_datums(msg); + pfree(msg->table); + if (msg->n_new_tuple > 0) { + for (int i=0; i < msg->n_new_tuple; i++) { + if (msg->new_tuple[i]) { + if (msg->new_tuple[i]->datum_string) { + pfree(msg->new_tuple[i]->datum_string); + } else if (msg->new_tuple[i]->has_datum_bytes) { + pfree(msg->new_tuple[i]->datum_bytes.data); + msg->new_tuple[i]->datum_bytes.data = NULL; + msg->new_tuple[i]->datum_bytes.len = 0; + } + pfree(msg->new_tuple[i]); + } + } + pfree(msg->new_tuple); + } + if (msg->n_old_tuple > 0) { + for (int i=0; i < msg->n_old_tuple; i++) { + if (msg->old_tuple[i]) { + if (msg->old_tuple[i]->datum_string) { + pfree(msg->old_tuple[i]->datum_string); + } else if (msg->old_tuple[i]->has_datum_bytes) { + pfree(msg->old_tuple[i]->datum_bytes.data); + msg->old_tuple[i]->datum_bytes.data = NULL; + msg->old_tuple[i]->datum_bytes.len = 0; + } + pfree(msg->old_tuple[i]); + } + } + pfree(msg->old_tuple); + } +} + +static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup, size_t n) { + if (tup) { + for (int i=0; i < n; i++) { + Decoderbufs__DatumMessage *dmsg = tup[i]; + if (dmsg->column_name) + appendStringInfo(out, "column_name[%s]", dmsg->column_name); + if (dmsg->has_column_type) { + appendStringInfo(out, ", column_type[%" PRId64 "]", dmsg->column_type); + switch (dmsg->column_type) { + case INT2OID: + case INT4OID: + appendStringInfo(out, ", datum[%d]", dmsg->datum_int32); + break; + case INT8OID: + appendStringInfo(out, ", datum[%" PRId64 "]", dmsg->datum_int64); + break; + case FLOAT4OID: + appendStringInfo(out, ", datum[%f]", dmsg->datum_float); + break; + case FLOAT8OID: + case NUMERICOID: + appendStringInfo(out, ", datum[%f]", dmsg->datum_double); + break; + case TEXTOID: + appendStringInfo(out, ", datum[%s]", dmsg->datum_string); + break; + default: + break; + } + appendStringInfo(out, "\n"); + } + } + } } /* this doesn't seem to be available in the public api (unfortunate) */ static double numeric_to_double_no_overflow(Numeric num) { - char *tmp; - double val; - char *endptr; + char *tmp; + double val; + char *endptr; - tmp = DatumGetCString(DirectFunctionCall1(numeric_out, - NumericGetDatum(num))); + tmp = DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(num))); - /* unlike float8in, we ignore ERANGE from strtod */ - val = strtod(tmp, &endptr); - if (*endptr != '\0') - { - /* shouldn't happen ... */ - ereport(ERROR, - (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), - errmsg("invalid input syntax for type double precision: \"%s\"", - tmp))); - } + /* unlike float8in, we ignore ERANGE from strtod */ + val = strtod(tmp, &endptr); + if (*endptr != '\0') { + /* shouldn't happen ... */ + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid input syntax for type double precision: \"%s\"", + tmp))); + } - pfree(tmp); + pfree(tmp); - return val; + return val; } -static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, Oid typoutput, Datum datum) { - Numeric num; - char c; - bytea *valptr; - char *output; - switch (typid) { - case BOOLOID: - datum_msg->datum_bool = DatumGetBool(datum); - break; - case INT2OID: - datum_msg->datum_int32 = DatumGetInt16(datum); - break; - case INT4OID: - datum_msg->datum_int32 = DatumGetInt32(datum); - break; - case INT8OID: - datum_msg->datum_int64 = DatumGetInt64(datum); - break; - case OIDOID: - break; - case FLOAT4OID: - datum_msg->datum_float = DatumGetFloat4(datum); - break; - case FLOAT8OID: - datum_msg->datum_double = DatumGetFloat8(datum); - break; - case NUMERICOID: - num = DatumGetNumeric(datum); - if (!numeric_is_nan(num)) { - datum_msg->datum_double = numeric_to_double_no_overflow(num); - } - break; - case CHAROID: - c = DatumGetChar(datum); - datum_msg->datum_string = &c; - break; - case VARCHAROID: - case TEXTOID: - datum_msg->datum_string = DatumGetCString(datum); - break; - case BYTEAOID: - valptr = DatumGetByteaPCopy(datum); - int size = VARSIZE(valptr); - datum_msg->datum_bytes = *((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData))); - datum_msg->datum_bytes.data = (uint8_t *)VARDATA(valptr); - datum_msg->datum_bytes.len = size - VARHDRSZ; - break; - default: - output = OidOutputFunctionCall(typoutput, datum); - datum_msg->datum_bytes = *((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData))); - datum_msg->datum_bytes.data = (uint8_t *)output; - datum_msg->datum_bytes.len = sizeof(output); - break; - } +static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, + Oid typoutput, Datum datum) { + Numeric num; + char c; + bytea *valptr; + char *output; + switch (typid) { + case BOOLOID: + datum_msg->datum_bool = DatumGetBool(datum); + datum_msg->has_datum_bool = true; + break; + case INT2OID: + datum_msg->datum_int32 = DatumGetInt16(datum); + datum_msg->has_datum_int32 = true; + break; + case INT4OID: + datum_msg->datum_int32 = DatumGetInt32(datum); + datum_msg->has_datum_int32 = true; + break; + case INT8OID: + case OIDOID: + datum_msg->datum_int64 = DatumGetInt64(datum); + datum_msg->has_datum_int64 = true; + break; + case FLOAT4OID: + datum_msg->datum_float = DatumGetFloat4(datum); + datum_msg->has_datum_float = true; + break; + case FLOAT8OID: + datum_msg->datum_double = DatumGetFloat8(datum); + datum_msg->has_datum_double = true; + break; + case NUMERICOID: + num = DatumGetNumeric(datum); + if (!numeric_is_nan(num)) { + datum_msg->datum_double = numeric_to_double_no_overflow(num); + datum_msg->has_datum_double = true; + } + break; + case CHAROID: + c = DatumGetChar(datum); + datum_msg->datum_string = &c; + break; + case VARCHAROID: + case TEXTOID: + output = OidOutputFunctionCall(typoutput, datum); + datum_msg->datum_string = pnstrdup(output, strlen(output)); + break; + case BYTEAOID: + valptr = DatumGetByteaPCopy(datum); + int size = VARSIZE(valptr); + datum_msg->datum_bytes = + *((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData))); + datum_msg->datum_bytes.data = (uint8_t *)VARDATA(valptr); + datum_msg->datum_bytes.len = size - VARHDRSZ; + datum_msg->has_datum_bytes = true; + break; + default: + output = OidOutputFunctionCall(typoutput, datum); + datum_msg->datum_bytes = + *((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData))); + datum_msg->datum_bytes.data = (uint8_t *)output; + datum_msg->datum_bytes.len = sizeof(output); + datum_msg->has_datum_bytes = true; + break; + } } -static Decoderbufs__DatumMessage tuple_to_datum_msg(Relation relation, HeapTuple tuple) { - TupleDesc tupdesc = RelationGetDescr(relation); - int natt; - Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; +static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Relation relation, + HeapTuple tuple, TupleDesc tupdesc) { + int natt; - /* Build column names and values */ - for (natt = 0; natt < tupdesc->natts; natt++) { - Form_pg_attribute attr; - Datum origval; - bool isnull; + /* build column names and values */ + for (natt = 0; natt < tupdesc->natts; natt++) { + Form_pg_attribute attr; + Datum origval; + bool isnull; - attr = tupdesc->attrs[natt]; + attr = tupdesc->attrs[natt]; - /* Skip dropped columns and system columns */ - if (attr->attisdropped || attr->attnum < 0) - continue; + /* skip dropped columns and system columns */ + if (attr->attisdropped || attr->attnum < 0) { + continue; + } - /* Set the column name */ - datum_msg.column_name = quote_identifier(NameStr(attr->attname)); + Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; - /* Get Datum from tuple */ - origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull); + /* set the column name */ + const char *col_name = quote_identifier(NameStr(attr->attname)); + datum_msg.column_name = col_name; - /* Get output function */ - datum_msg.column_type = attr->atttypid; + /* set datum from tuple */ + origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull); - Oid typoutput; - bool typisvarlena; - /* Query output function */ - getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); - if (!isnull) { - if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) { - // what to do if anything? - } else if (!typisvarlena) { - set_datum_value(&datum_msg, attr->atttypid, typoutput, origval); - } else { - Datum val = PointerGetDatum(PG_DETOAST_DATUM(origval)); - set_datum_value(&datum_msg, attr->atttypid, typoutput, val); - } - } - } + /* get output function */ + datum_msg.column_type = attr->atttypid; + datum_msg.has_column_type = true; + + Oid typoutput; + bool typisvarlena; + /* Query output function */ + getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); + if (!isnull) { + if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) { + // TODO: Is there a way we can handle this? + elog(WARNING, "Not handling external on disk varlena at the moment."); + } else if (!typisvarlena) { + set_datum_value(&datum_msg, attr->atttypid, typoutput, origval); + } else { + Datum val = PointerGetDatum(PG_DETOAST_DATUM(origval)); + set_datum_value(&datum_msg, attr->atttypid, typoutput, val); + } + } + + tmsg[natt] = palloc(sizeof(datum_msg)); + memcpy(tmsg[natt], &datum_msg, sizeof(datum_msg)); + } - return datum_msg; } /* @@ -305,6 +371,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Form_pg_class class_form; char replident = relation->rd_rel->relreplident; bool is_rel_non_selective; + Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT; class_form = RelationGetForm(relation); data = ctx->output_plugin_private; @@ -314,48 +381,55 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, RelationGetIndexList(relation); is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING || - (replident == REPLICA_IDENTITY_DEFAULT && - !OidIsValid(relation->rd_replidindex))); + (replident == REPLICA_IDENTITY_DEFAULT && + !OidIsValid(relation->rd_replidindex))); /* set common fields */ - data->txn_msg->table = quote_qualified_identifier( - get_namespace_name( - get_rel_namespace(RelationGetRelid(relation))), - NameStr(class_form->relname)); + rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); + rmsg.has_commit_time = true; + rmsg.table = quote_qualified_identifier( + get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + NameStr(class_form->relname)); /* decode different operation types */ switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: - data->txn_msg->op = DECODERBUFS__OP__INSERT; + rmsg.op = DECODERBUFS__OP__INSERT; + rmsg.has_op = true; if (change->data.tp.newtuple != NULL) { - HeapTupleGetDatum(&change->data.tp.newtuple->tuple); - Decoderbufs__DatumMessage new_datum = tuple_to_datum_msg(relation, - &change->data.tp.newtuple->tuple); - data->txn_msg->new_datum = &new_datum; + TupleDesc tupdesc = RelationGetDescr(relation); + rmsg.n_new_tuple = tupdesc->natts; + rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); } break; case REORDER_BUFFER_CHANGE_UPDATE: - data->txn_msg->op = DECODERBUFS__OP__UPDATE; - if (is_rel_non_selective) { - if (change->data.tp.oldtuple != NULL) { - Decoderbufs__DatumMessage old_datum = tuple_to_datum_msg(relation, - &change->data.tp.oldtuple->tuple); - data->txn_msg->old_datum = &old_datum; - } - if (change->data.tp.newtuple != NULL) { - Decoderbufs__DatumMessage new_datum = tuple_to_datum_msg(relation, - &change->data.tp.newtuple->tuple); - data->txn_msg->new_datum = &new_datum; - } + rmsg.op = DECODERBUFS__OP__UPDATE; + rmsg.has_op = true; + if (!is_rel_non_selective) { + if (change->data.tp.oldtuple != NULL) { + TupleDesc tupdesc = RelationGetDescr(relation); + rmsg.n_old_tuple = tupdesc->natts; + rmsg.old_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc); + } + if (change->data.tp.newtuple != NULL) { + TupleDesc tupdesc = RelationGetDescr(relation); + rmsg.n_new_tuple = tupdesc->natts; + rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); + } } break; case REORDER_BUFFER_CHANGE_DELETE: - data->txn_msg->op = DECODERBUFS__OP__DELETE; + rmsg.op = DECODERBUFS__OP__DELETE; + rmsg.has_op = true; /* if there was no PK, we only know that a delete happened */ - if (is_rel_non_selective && change->data.tp.oldtuple != NULL) { - Decoderbufs__DatumMessage old_datum = tuple_to_datum_msg(relation, - &change->data.tp.oldtuple->tuple); - data->txn_msg->old_datum = &old_datum; + if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) { + TupleDesc tupdesc = RelationGetDescr(relation); + rmsg.n_old_tuple = tupdesc->natts; + rmsg.old_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc); } break; default: @@ -363,6 +437,41 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; } + if (data->debug_mode) { + OutputPluginPrepareWrite(ctx, true); + if (rmsg.has_commit_time) + appendStringInfo(ctx->out, "commit_time[%" PRId64 "]", rmsg.commit_time); + if (rmsg.table) + appendStringInfo(ctx->out, ", table[%s]", rmsg.table); + if (rmsg.has_op) + appendStringInfo(ctx->out, ", op[%d]", rmsg.op); + if (rmsg.old_tuple) { + appendStringInfo(ctx->out, "\nOLD TUPLE: \n"); + print_tuple_msg(ctx->out, rmsg.old_tuple, rmsg.n_old_tuple); + appendStringInfo(ctx->out, "\n"); + } + if (rmsg.new_tuple) { + appendStringInfo(ctx->out, "\nNEW TUPLE: \n"); + print_tuple_msg(ctx->out, rmsg.new_tuple, rmsg.n_new_tuple); + appendStringInfo(ctx->out, "\n"); + } + OutputPluginWrite(ctx, true); + } else { + OutputPluginPrepareWrite(ctx, true); + size_t psize = decoderbufs__row_message__get_packed_size(&rmsg); + void *packed = palloc(psize); + size_t ssize = decoderbufs__row_message__pack(&rmsg, packed); + // appendBinaryStringInfo(ctx->out, (void *)psize, sizeof(psize)); + appendBinaryStringInfo(ctx->out, packed, ssize); + OutputPluginWrite(ctx, true); + + /* free packed buffer */ + pfree(packed); + } + + /* cleanup msg */ + free_row_msg_subs(&rmsg); + MemoryContextSwitchTo(old); MemoryContextReset(data->context); } diff --git a/src/proto/pg_logicaldec.pb-c.c b/src/proto/pg_logicaldec.pb-c.c index b997a4a..4dd9d5c 100644 --- a/src/proto/pg_logicaldec.pb-c.c +++ b/src/proto/pg_logicaldec.pb-c.c @@ -6,7 +6,7 @@ #define PROTOBUF_C__NO_DEPRECATED #endif -#include "proto/pg_logicaldec.pb-c.h" +#include "pg_logicaldec.pb-c.h" void decoderbufs__datum_message__init (Decoderbufs__DatumMessage *message) { @@ -50,47 +50,47 @@ void decoderbufs__datum_message__free_unpacked assert(message->base.descriptor == &decoderbufs__datum_message__descriptor); protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); } -void decoderbufs__txn_message__init - (Decoderbufs__TxnMessage *message) +void decoderbufs__row_message__init + (Decoderbufs__RowMessage *message) { - static Decoderbufs__TxnMessage init_value = DECODERBUFS__TXN_MESSAGE__INIT; + static Decoderbufs__RowMessage init_value = DECODERBUFS__ROW_MESSAGE__INIT; *message = init_value; } -size_t decoderbufs__txn_message__get_packed_size - (const Decoderbufs__TxnMessage *message) +size_t decoderbufs__row_message__get_packed_size + (const Decoderbufs__RowMessage *message) { - assert(message->base.descriptor == &decoderbufs__txn_message__descriptor); + assert(message->base.descriptor == &decoderbufs__row_message__descriptor); return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message)); } -size_t decoderbufs__txn_message__pack - (const Decoderbufs__TxnMessage *message, +size_t decoderbufs__row_message__pack + (const Decoderbufs__RowMessage *message, uint8_t *out) { - assert(message->base.descriptor == &decoderbufs__txn_message__descriptor); + assert(message->base.descriptor == &decoderbufs__row_message__descriptor); return protobuf_c_message_pack ((const ProtobufCMessage*)message, out); } -size_t decoderbufs__txn_message__pack_to_buffer - (const Decoderbufs__TxnMessage *message, +size_t decoderbufs__row_message__pack_to_buffer + (const Decoderbufs__RowMessage *message, ProtobufCBuffer *buffer) { - assert(message->base.descriptor == &decoderbufs__txn_message__descriptor); + assert(message->base.descriptor == &decoderbufs__row_message__descriptor); return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer); } -Decoderbufs__TxnMessage * - decoderbufs__txn_message__unpack +Decoderbufs__RowMessage * + decoderbufs__row_message__unpack (ProtobufCAllocator *allocator, size_t len, const uint8_t *data) { - return (Decoderbufs__TxnMessage *) - protobuf_c_message_unpack (&decoderbufs__txn_message__descriptor, + return (Decoderbufs__RowMessage *) + protobuf_c_message_unpack (&decoderbufs__row_message__descriptor, allocator, len, data); } -void decoderbufs__txn_message__free_unpacked - (Decoderbufs__TxnMessage *message, +void decoderbufs__row_message__free_unpacked + (Decoderbufs__RowMessage *message, ProtobufCAllocator *allocator) { - assert(message->base.descriptor == &decoderbufs__txn_message__descriptor); + assert(message->base.descriptor == &decoderbufs__row_message__descriptor); protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); } static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[9] = @@ -235,27 +235,15 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = (ProtobufCMessageInit) decoderbufs__datum_message__init, NULL,NULL,NULL /* reserved[123] */ }; -static const ProtobufCFieldDescriptor decoderbufs__txn_message__field_descriptors[6] = +static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[5] = { { - "timestamp", + "commit_time", 1, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_SINT64, - offsetof(Decoderbufs__TxnMessage, has_timestamp), - offsetof(Decoderbufs__TxnMessage, timestamp), - NULL, - NULL, - 0, /* flags */ - 0,NULL,NULL /* reserved1,reserved2, etc */ - }, - { - "xid", - 2, - PROTOBUF_C_LABEL_OPTIONAL, - PROTOBUF_C_TYPE_INT64, - offsetof(Decoderbufs__TxnMessage, has_xid), - offsetof(Decoderbufs__TxnMessage, xid), + offsetof(Decoderbufs__RowMessage, has_commit_time), + offsetof(Decoderbufs__RowMessage, commit_time), NULL, NULL, 0, /* flags */ @@ -263,11 +251,11 @@ static const ProtobufCFieldDescriptor decoderbufs__txn_message__field_descriptor }, { "table", - 3, + 2, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_STRING, 0, /* quantifier_offset */ - offsetof(Decoderbufs__TxnMessage, table), + offsetof(Decoderbufs__RowMessage, table), NULL, NULL, 0, /* flags */ @@ -275,67 +263,66 @@ static const ProtobufCFieldDescriptor decoderbufs__txn_message__field_descriptor }, { "op", - 4, + 3, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_ENUM, - offsetof(Decoderbufs__TxnMessage, has_op), - offsetof(Decoderbufs__TxnMessage, op), + offsetof(Decoderbufs__RowMessage, has_op), + offsetof(Decoderbufs__RowMessage, op), &decoderbufs__op__descriptor, NULL, 0, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, { - "new_datum", - 5, - PROTOBUF_C_LABEL_OPTIONAL, + "new_tuple", + 4, + PROTOBUF_C_LABEL_REPEATED, PROTOBUF_C_TYPE_MESSAGE, - 0, /* quantifier_offset */ - offsetof(Decoderbufs__TxnMessage, new_datum), + offsetof(Decoderbufs__RowMessage, n_new_tuple), + offsetof(Decoderbufs__RowMessage, new_tuple), &decoderbufs__datum_message__descriptor, NULL, 0, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, { - "old_datum", - 6, - PROTOBUF_C_LABEL_OPTIONAL, + "old_tuple", + 5, + PROTOBUF_C_LABEL_REPEATED, PROTOBUF_C_TYPE_MESSAGE, - 0, /* quantifier_offset */ - offsetof(Decoderbufs__TxnMessage, old_datum), + offsetof(Decoderbufs__RowMessage, n_old_tuple), + offsetof(Decoderbufs__RowMessage, old_tuple), &decoderbufs__datum_message__descriptor, NULL, 0, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, }; -static const unsigned decoderbufs__txn_message__field_indices_by_name[] = { - 4, /* field[4] = new_datum */ - 5, /* field[5] = old_datum */ - 3, /* field[3] = op */ - 2, /* field[2] = table */ - 0, /* field[0] = timestamp */ - 1, /* field[1] = xid */ +static const unsigned decoderbufs__row_message__field_indices_by_name[] = { + 0, /* field[0] = commit_time */ + 3, /* field[3] = new_tuple */ + 4, /* field[4] = old_tuple */ + 2, /* field[2] = op */ + 1, /* field[1] = table */ }; -static const ProtobufCIntRange decoderbufs__txn_message__number_ranges[1 + 1] = +static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] = { { 1, 0 }, - { 0, 6 } + { 0, 5 } }; -const ProtobufCMessageDescriptor decoderbufs__txn_message__descriptor = +const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor = { PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC, - "decoderbufs.TxnMessage", - "TxnMessage", - "Decoderbufs__TxnMessage", + "decoderbufs.RowMessage", + "RowMessage", + "Decoderbufs__RowMessage", "decoderbufs", - sizeof(Decoderbufs__TxnMessage), - 6, - decoderbufs__txn_message__field_descriptors, - decoderbufs__txn_message__field_indices_by_name, - 1, decoderbufs__txn_message__number_ranges, - (ProtobufCMessageInit) decoderbufs__txn_message__init, + sizeof(Decoderbufs__RowMessage), + 5, + decoderbufs__row_message__field_descriptors, + decoderbufs__row_message__field_indices_by_name, + 1, decoderbufs__row_message__number_ranges, + (ProtobufCMessageInit) decoderbufs__row_message__init, NULL,NULL,NULL /* reserved[123] */ }; const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] = diff --git a/src/proto/pg_logicaldec.pb-c.h b/src/proto/pg_logicaldec.pb-c.h index a898805..dfbbaf4 100644 --- a/src/proto/pg_logicaldec.pb-c.h +++ b/src/proto/pg_logicaldec.pb-c.h @@ -16,7 +16,7 @@ PROTOBUF_C__BEGIN_DECLS typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage; -typedef struct _Decoderbufs__TxnMessage Decoderbufs__TxnMessage; +typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage; /* --- enums --- */ @@ -55,22 +55,22 @@ struct _Decoderbufs__DatumMessage , NULL, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, NULL, 0,{0,NULL} } -struct _Decoderbufs__TxnMessage +struct _Decoderbufs__RowMessage { ProtobufCMessage base; - protobuf_c_boolean has_timestamp; - int64_t timestamp; - protobuf_c_boolean has_xid; - int64_t xid; + protobuf_c_boolean has_commit_time; + int64_t commit_time; char *table; protobuf_c_boolean has_op; Decoderbufs__Op op; - Decoderbufs__DatumMessage *new_datum; - Decoderbufs__DatumMessage *old_datum; + size_t n_new_tuple; + Decoderbufs__DatumMessage **new_tuple; + size_t n_old_tuple; + Decoderbufs__DatumMessage **old_tuple; }; -#define DECODERBUFS__TXN_MESSAGE__INIT \ - { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__txn_message__descriptor) \ - , 0,0, 0,0, NULL, 0,0, NULL, NULL } +#define DECODERBUFS__ROW_MESSAGE__INIT \ + { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \ + , 0,0, NULL, 0,0, 0,NULL, 0,NULL } /* Decoderbufs__DatumMessage methods */ @@ -92,32 +92,32 @@ Decoderbufs__DatumMessage * void decoderbufs__datum_message__free_unpacked (Decoderbufs__DatumMessage *message, ProtobufCAllocator *allocator); -/* Decoderbufs__TxnMessage methods */ -void decoderbufs__txn_message__init - (Decoderbufs__TxnMessage *message); -size_t decoderbufs__txn_message__get_packed_size - (const Decoderbufs__TxnMessage *message); -size_t decoderbufs__txn_message__pack - (const Decoderbufs__TxnMessage *message, +/* Decoderbufs__RowMessage methods */ +void decoderbufs__row_message__init + (Decoderbufs__RowMessage *message); +size_t decoderbufs__row_message__get_packed_size + (const Decoderbufs__RowMessage *message); +size_t decoderbufs__row_message__pack + (const Decoderbufs__RowMessage *message, uint8_t *out); -size_t decoderbufs__txn_message__pack_to_buffer - (const Decoderbufs__TxnMessage *message, +size_t decoderbufs__row_message__pack_to_buffer + (const Decoderbufs__RowMessage *message, ProtobufCBuffer *buffer); -Decoderbufs__TxnMessage * - decoderbufs__txn_message__unpack +Decoderbufs__RowMessage * + decoderbufs__row_message__unpack (ProtobufCAllocator *allocator, size_t len, const uint8_t *data); -void decoderbufs__txn_message__free_unpacked - (Decoderbufs__TxnMessage *message, +void decoderbufs__row_message__free_unpacked + (Decoderbufs__RowMessage *message, ProtobufCAllocator *allocator); /* --- per-message closures --- */ typedef void (*Decoderbufs__DatumMessage_Closure) (const Decoderbufs__DatumMessage *message, void *closure_data); -typedef void (*Decoderbufs__TxnMessage_Closure) - (const Decoderbufs__TxnMessage *message, +typedef void (*Decoderbufs__RowMessage_Closure) + (const Decoderbufs__RowMessage *message, void *closure_data); /* --- services --- */ @@ -127,7 +127,7 @@ typedef void (*Decoderbufs__TxnMessage_Closure) extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor; -extern const ProtobufCMessageDescriptor decoderbufs__txn_message__descriptor; +extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor; PROTOBUF_C__END_DECLS