From 0241c144fc12a57a51fef991580b4ebabb7170a1 Mon Sep 17 00:00:00 2001 From: Xavier Stevens Date: Thu, 18 Sep 2014 17:33:49 -0700 Subject: [PATCH] Initial commit but not compiling yet --- .gitignore | 11 +- Makefile | 14 ++ proto/pg_logicaldec.proto | 32 +++ src/decoderbufs.c | 368 ++++++++++++++++++++++++++++++++ src/proto/pg_logicaldec.pb-c.c | 370 +++++++++++++++++++++++++++++++++ src/proto/pg_logicaldec.pb-c.h | 135 ++++++++++++ 6 files changed, 924 insertions(+), 6 deletions(-) create mode 100644 Makefile create mode 100644 proto/pg_logicaldec.proto create mode 100644 src/decoderbufs.c create mode 100644 src/proto/pg_logicaldec.pb-c.c create mode 100644 src/proto/pg_logicaldec.pb-c.h diff --git a/.gitignore b/.gitignore index edf6645..496f7d7 100644 --- a/.gitignore +++ b/.gitignore @@ -4,15 +4,9 @@ *.obj *.elf -# Precompiled Headers -*.gch -*.pch - # Libraries *.lib *.a -*.la -*.lo # Shared objects (inc. Windows DLLs) *.dll @@ -27,3 +21,8 @@ *.i*86 *.x86_64 *.hex + +# Eclipse +.cproject +.project +.settings diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..465a200 --- /dev/null +++ b/Makefile @@ -0,0 +1,14 @@ +MODULES = decoderbufs + +PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0') +PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0') + +PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) +SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) + +MODULE_big = $(patsubst src/%.c,%,$(wildcard src/*.c)) +OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o + +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) \ No newline at end of file diff --git a/proto/pg_logicaldec.proto b/proto/pg_logicaldec.proto new file mode 100644 index 0000000..2286fe0 --- /dev/null +++ b/proto/pg_logicaldec.proto @@ -0,0 +1,32 @@ +package decoderbufs; + +option java_package="dedoderbufs.proto"; +option java_outer_classname = "TxnProto"; +option optimize_for = SPEED; + +enum Op { + INSERT = 0; + UPDATE = 1; + DELETE = 2; +} + +message DatumMessage { + optional string column_name = 1; + optional int64 column_type = 2; + optional int32 datum_int32 = 3; + optional int64 datum_int64 = 4; + optional float datum_float = 5; + optional double datum_double = 6; + optional bool datum_bool = 7; + optional string datum_string = 8; + optional bytes datum_bytes = 9; +} + +message TxnMessage { + optional sint64 timestamp = 1; + optional int64 xid = 2; + optional string table = 3; + optional Op op = 4; + optional DatumMessage new_datum = 5; + optional DatumMessage old_datum = 6; +} diff --git a/src/decoderbufs.c b/src/decoderbufs.c new file mode 100644 index 0000000..e94b9b3 --- /dev/null +++ b/src/decoderbufs.c @@ -0,0 +1,368 @@ +/* + * decoderbufs - PostgreSQL output plug-in for logical replication to Protocol + * Buffers + * + * Copyright (c) 2014 Xavier Stevens + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + *all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "postgres.h" +#include "funcapi.h" +#include "catalog/pg_class.h" +#include "catalog/pg_type.h" +#include "replication/output_plugin.h" +#include "replication/logical.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/json.h" +#include "utils/memutils.h" +#include "utils/numeric.h" +#include "utils/rel.h" +#include "utils/relcache.h" +#include "utils/syscache.h" +#include "utils/typcache.h" +#include "proto/pg_logicaldec.pb-c.h" + +PG_MODULE_MAGIC; + +typedef struct { + MemoryContext context; + Decoderbufs__TxnMessage *txn_msg; + bool debug_mode; +} DecoderData; + +/* These must be available to pg_dlsym() */ +extern void _PG_init(void); +extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); + +static void pg_decode_startup(LogicalDecodingContext *ctx, + OutputPluginOptions *opt, bool is_init); +static void pg_decode_shutdown(LogicalDecodingContext *ctx); +static void pg_decode_begin_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pg_decode_commit_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation rel, ReorderBufferChange *change); + +void _PG_init(void) {} + +/* specify output plugin callbacks */ +void _PG_output_plugin_init(OutputPluginCallbacks *cb) { + AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit); + cb->startup_cb = pg_decode_startup; + cb->begin_cb = pg_decode_begin_txn; + cb->change_cb = pg_decode_change; + cb->commit_cb = pg_decode_commit_txn; + cb->shutdown_cb = pg_decode_shutdown; +} + +/* initialize this plugin */ +static void pg_decode_startup(LogicalDecodingContext *ctx, + OutputPluginOptions *opt, bool is_init) { + ListCell *option; + DecoderData *data; + + data = palloc(sizeof(DecoderData)); + data->context = AllocSetContextCreate(ctx->context, "decoderbufs context", + ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + ctx->output_plugin_private = data; + + opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; + + foreach(option, ctx->output_plugin_options) { + DefElem *elem = lfirst(option); + Assert(elem->arg == NULL || IsA(elem->arg, String)); + + if (strcmp(elem->defname, "debug-mode") == 0) { + bool debug_mode; + if (elem->arg == NULL) + debug_mode = false; + else if (!parse_bool(strVal(elem->arg), &debug_mode)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + + if (debug_mode) + opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; + } else { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, + elem->arg ? strVal(elem->arg) : "(null)"))); + } + } +} + +static void free_txn_msg_datums(Decoderbufs__TxnMessage *msg) { + if (msg->new_datum) { + if (msg->new_datum->has_datum_bytes) { + pfree(msg->new_datum->datum_bytes.data); + msg->new_datum->datum_bytes.data = NULL; + msg->new_datum->datum_bytes.len = 0; + } + pfree(msg->new_datum); + } + if (msg->old_datum) { + if (msg->old_datum->has_datum_bytes) { + pfree(msg->old_datum->datum_bytes.data); + msg->old_datum->datum_bytes.data = NULL; + msg->old_datum->datum_bytes.len = 0; + } + pfree(msg->old_datum); + } +} + +/* cleanup this plugin's resources */ +static void pg_decode_shutdown(LogicalDecodingContext *ctx) { + DecoderData *data = ctx->output_plugin_private; + + /* cleanup our own resources via memory context reset */ + MemoryContextDelete(data->context); +} + +/* BEGIN callback */ +static void pg_decode_begin_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) { + DecoderData *data = ctx->output_plugin_private; + Decoderbufs__TxnMessage msg = DECODERBUFS__TXN_MESSAGE__INIT; + data->txn_msg = &msg; + data->txn_msg->xid = txn->xid; +} + +/* COMMIT callback */ +static void pg_decode_commit_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + DecoderData *data = ctx->output_plugin_private; + Decoderbufs__TxnMessage *msg = data->txn_msg; + msg->timestamp = txn->commit_time; + + OutputPluginPrepareWrite(ctx, true); + size_t psize = decoderbufs__txn_message__get_packed_size(msg); + void *packed = palloc(psize); + size_t ssize = decoderbufs__txn_message__pack(msg, packed); + appendBinaryStringInfo(ctx->out, packed, ssize); + OutputPluginWrite(ctx, true); + + pfree(packed); + free_txn_msg_datums(msg); +} + +/* this doesn't seem to be available in the public api (unfortunate) */ +static double numeric_to_double_no_overflow(Numeric num) { + char *tmp; + double val; + char *endptr; + + tmp = DatumGetCString(DirectFunctionCall1(numeric_out, + NumericGetDatum(num))); + + /* unlike float8in, we ignore ERANGE from strtod */ + val = strtod(tmp, &endptr); + if (*endptr != '\0') + { + /* shouldn't happen ... */ + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid input syntax for type double precision: \"%s\"", + tmp))); + } + + pfree(tmp); + + return val; +} + +static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, Oid typoutput, Datum datum) { + Numeric num; + char c; + bytea *valptr; + char *output; + switch (typid) { + case BOOLOID: + datum_msg->datum_bool = DatumGetBool(datum); + break; + case INT2OID: + datum_msg->datum_int32 = DatumGetInt16(datum); + break; + case INT4OID: + datum_msg->datum_int32 = DatumGetInt32(datum); + break; + case INT8OID: + datum_msg->datum_int64 = DatumGetInt64(datum); + break; + case OIDOID: + break; + case FLOAT4OID: + datum_msg->datum_float = DatumGetFloat4(datum); + break; + case FLOAT8OID: + datum_msg->datum_double = DatumGetFloat8(datum); + break; + case NUMERICOID: + num = DatumGetNumeric(datum); + if (!numeric_is_nan(num)) { + datum_msg->datum_double = numeric_to_double_no_overflow(num); + } + break; + case CHAROID: + c = DatumGetChar(datum); + datum_msg->datum_string = &c; + break; + case VARCHAROID: + case TEXTOID: + datum_msg->datum_string = DatumGetCString(datum); + break; + case BYTEAOID: + valptr = DatumGetByteaPCopy(datum); + int size = VARSIZE(valptr); + datum_msg->datum_bytes = *((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData))); + datum_msg->datum_bytes.data = (uint8_t *)VARDATA(valptr); + datum_msg->datum_bytes.len = size - VARHDRSZ; + break; + default: + output = OidOutputFunctionCall(typoutput, datum); + datum_msg->datum_bytes = *((ProtobufCBinaryData *)palloc(sizeof(ProtobufCBinaryData))); + datum_msg->datum_bytes.data = (uint8_t *)output; + datum_msg->datum_bytes.len = sizeof(output); + break; + } +} + +static Decoderbufs__DatumMessage tuple_to_datum_msg(Relation relation, HeapTuple tuple) { + TupleDesc tupdesc = RelationGetDescr(relation); + int natt; + Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; + + /* Build column names and values */ + for (natt = 0; natt < tupdesc->natts; natt++) { + Form_pg_attribute attr; + Datum origval; + bool isnull; + + attr = tupdesc->attrs[natt]; + + /* Skip dropped columns and system columns */ + if (attr->attisdropped || attr->attnum < 0) + continue; + + /* Set the column name */ + datum_msg.column_name = quote_identifier(NameStr(attr->attname)); + + /* Get Datum from tuple */ + origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull); + + /* Get output function */ + datum_msg.column_type = attr->atttypid; + + Oid typoutput; + bool typisvarlena; + /* Query output function */ + getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); + if (!isnull) { + if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) { + // what to do if anything? + } else if (!typisvarlena) { + set_datum_value(&datum_msg, attr->atttypid, typoutput, origval); + } else { + Datum val = PointerGetDatum(PG_DETOAST_DATUM(origval)); + set_datum_value(&datum_msg, attr->atttypid, typoutput, val); + } + } + } + + return datum_msg; +} + +/* + * callback for individual changed tuples + */ +static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) { + DecoderData *data; + MemoryContext old; + + Form_pg_class class_form; + char replident = relation->rd_rel->relreplident; + bool is_rel_non_selective; + class_form = RelationGetForm(relation); + + data = ctx->output_plugin_private; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + RelationGetIndexList(relation); + is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING || + (replident == REPLICA_IDENTITY_DEFAULT && + !OidIsValid(relation->rd_replidindex))); + + /* set common fields */ + data->txn_msg->table = quote_qualified_identifier( + get_namespace_name( + get_rel_namespace(RelationGetRelid(relation))), + NameStr(class_form->relname)); + + /* decode different operation types */ + switch (change->action) { + case REORDER_BUFFER_CHANGE_INSERT: + data->txn_msg->op = DECODERBUFS__OP__INSERT; + if (change->data.tp.newtuple != NULL) { + HeapTupleGetDatum(&change->data.tp.newtuple->tuple); + Decoderbufs__DatumMessage new_datum = tuple_to_datum_msg(relation, + &change->data.tp.newtuple->tuple); + data->txn_msg->new_datum = &new_datum; + } + break; + case REORDER_BUFFER_CHANGE_UPDATE: + data->txn_msg->op = DECODERBUFS__OP__UPDATE; + if (is_rel_non_selective) { + if (change->data.tp.oldtuple != NULL) { + Decoderbufs__DatumMessage old_datum = tuple_to_datum_msg(relation, + &change->data.tp.oldtuple->tuple); + data->txn_msg->old_datum = &old_datum; + } + if (change->data.tp.newtuple != NULL) { + Decoderbufs__DatumMessage new_datum = tuple_to_datum_msg(relation, + &change->data.tp.newtuple->tuple); + data->txn_msg->new_datum = &new_datum; + } + } + break; + case REORDER_BUFFER_CHANGE_DELETE: + data->txn_msg->op = DECODERBUFS__OP__DELETE; + /* if there was no PK, we only know that a delete happened */ + if (is_rel_non_selective && change->data.tp.oldtuple != NULL) { + Decoderbufs__DatumMessage old_datum = tuple_to_datum_msg(relation, + &change->data.tp.oldtuple->tuple); + data->txn_msg->old_datum = &old_datum; + } + break; + default: + Assert(0); + break; + } + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} diff --git a/src/proto/pg_logicaldec.pb-c.c b/src/proto/pg_logicaldec.pb-c.c new file mode 100644 index 0000000..b997a4a --- /dev/null +++ b/src/proto/pg_logicaldec.pb-c.c @@ -0,0 +1,370 @@ +/* Generated by the protocol buffer compiler. DO NOT EDIT! */ +/* Generated from: proto/pg_logicaldec.proto */ + +/* Do not generate deprecated warnings for self */ +#ifndef PROTOBUF_C__NO_DEPRECATED +#define PROTOBUF_C__NO_DEPRECATED +#endif + +#include "proto/pg_logicaldec.pb-c.h" +void decoderbufs__datum_message__init + (Decoderbufs__DatumMessage *message) +{ + static Decoderbufs__DatumMessage init_value = DECODERBUFS__DATUM_MESSAGE__INIT; + *message = init_value; +} +size_t decoderbufs__datum_message__get_packed_size + (const Decoderbufs__DatumMessage *message) +{ + assert(message->base.descriptor == &decoderbufs__datum_message__descriptor); + return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message)); +} +size_t decoderbufs__datum_message__pack + (const Decoderbufs__DatumMessage *message, + uint8_t *out) +{ + assert(message->base.descriptor == &decoderbufs__datum_message__descriptor); + return protobuf_c_message_pack ((const ProtobufCMessage*)message, out); +} +size_t decoderbufs__datum_message__pack_to_buffer + (const Decoderbufs__DatumMessage *message, + ProtobufCBuffer *buffer) +{ + assert(message->base.descriptor == &decoderbufs__datum_message__descriptor); + return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer); +} +Decoderbufs__DatumMessage * + decoderbufs__datum_message__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data) +{ + return (Decoderbufs__DatumMessage *) + protobuf_c_message_unpack (&decoderbufs__datum_message__descriptor, + allocator, len, data); +} +void decoderbufs__datum_message__free_unpacked + (Decoderbufs__DatumMessage *message, + ProtobufCAllocator *allocator) +{ + assert(message->base.descriptor == &decoderbufs__datum_message__descriptor); + protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); +} +void decoderbufs__txn_message__init + (Decoderbufs__TxnMessage *message) +{ + static Decoderbufs__TxnMessage init_value = DECODERBUFS__TXN_MESSAGE__INIT; + *message = init_value; +} +size_t decoderbufs__txn_message__get_packed_size + (const Decoderbufs__TxnMessage *message) +{ + assert(message->base.descriptor == &decoderbufs__txn_message__descriptor); + return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message)); +} +size_t decoderbufs__txn_message__pack + (const Decoderbufs__TxnMessage *message, + uint8_t *out) +{ + assert(message->base.descriptor == &decoderbufs__txn_message__descriptor); + return protobuf_c_message_pack ((const ProtobufCMessage*)message, out); +} +size_t decoderbufs__txn_message__pack_to_buffer + (const Decoderbufs__TxnMessage *message, + ProtobufCBuffer *buffer) +{ + assert(message->base.descriptor == &decoderbufs__txn_message__descriptor); + return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer); +} +Decoderbufs__TxnMessage * + decoderbufs__txn_message__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data) +{ + return (Decoderbufs__TxnMessage *) + protobuf_c_message_unpack (&decoderbufs__txn_message__descriptor, + allocator, len, data); +} +void decoderbufs__txn_message__free_unpacked + (Decoderbufs__TxnMessage *message, + ProtobufCAllocator *allocator) +{ + assert(message->base.descriptor == &decoderbufs__txn_message__descriptor); + protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); +} +static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[9] = +{ + { + "column_name", + 1, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_STRING, + 0, /* quantifier_offset */ + offsetof(Decoderbufs__DatumMessage, column_name), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "column_type", + 2, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_INT64, + offsetof(Decoderbufs__DatumMessage, has_column_type), + offsetof(Decoderbufs__DatumMessage, column_type), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "datum_int32", + 3, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_INT32, + offsetof(Decoderbufs__DatumMessage, has_datum_int32), + offsetof(Decoderbufs__DatumMessage, datum_int32), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "datum_int64", + 4, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_INT64, + offsetof(Decoderbufs__DatumMessage, has_datum_int64), + offsetof(Decoderbufs__DatumMessage, datum_int64), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "datum_float", + 5, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_FLOAT, + offsetof(Decoderbufs__DatumMessage, has_datum_float), + offsetof(Decoderbufs__DatumMessage, datum_float), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "datum_double", + 6, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_DOUBLE, + offsetof(Decoderbufs__DatumMessage, has_datum_double), + offsetof(Decoderbufs__DatumMessage, datum_double), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "datum_bool", + 7, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_BOOL, + offsetof(Decoderbufs__DatumMessage, has_datum_bool), + offsetof(Decoderbufs__DatumMessage, datum_bool), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "datum_string", + 8, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_STRING, + 0, /* quantifier_offset */ + offsetof(Decoderbufs__DatumMessage, datum_string), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "datum_bytes", + 9, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_BYTES, + offsetof(Decoderbufs__DatumMessage, has_datum_bytes), + offsetof(Decoderbufs__DatumMessage, datum_bytes), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, +}; +static const unsigned decoderbufs__datum_message__field_indices_by_name[] = { + 0, /* field[0] = column_name */ + 1, /* field[1] = column_type */ + 6, /* field[6] = datum_bool */ + 8, /* field[8] = datum_bytes */ + 5, /* field[5] = datum_double */ + 4, /* field[4] = datum_float */ + 2, /* field[2] = datum_int32 */ + 3, /* field[3] = datum_int64 */ + 7, /* field[7] = datum_string */ +}; +static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] = +{ + { 1, 0 }, + { 0, 9 } +}; +const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = +{ + PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC, + "decoderbufs.DatumMessage", + "DatumMessage", + "Decoderbufs__DatumMessage", + "decoderbufs", + sizeof(Decoderbufs__DatumMessage), + 9, + decoderbufs__datum_message__field_descriptors, + decoderbufs__datum_message__field_indices_by_name, + 1, decoderbufs__datum_message__number_ranges, + (ProtobufCMessageInit) decoderbufs__datum_message__init, + NULL,NULL,NULL /* reserved[123] */ +}; +static const ProtobufCFieldDescriptor decoderbufs__txn_message__field_descriptors[6] = +{ + { + "timestamp", + 1, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_SINT64, + offsetof(Decoderbufs__TxnMessage, has_timestamp), + offsetof(Decoderbufs__TxnMessage, timestamp), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "xid", + 2, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_INT64, + offsetof(Decoderbufs__TxnMessage, has_xid), + offsetof(Decoderbufs__TxnMessage, xid), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "table", + 3, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_STRING, + 0, /* quantifier_offset */ + offsetof(Decoderbufs__TxnMessage, table), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "op", + 4, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_ENUM, + offsetof(Decoderbufs__TxnMessage, has_op), + offsetof(Decoderbufs__TxnMessage, op), + &decoderbufs__op__descriptor, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "new_datum", + 5, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_MESSAGE, + 0, /* quantifier_offset */ + offsetof(Decoderbufs__TxnMessage, new_datum), + &decoderbufs__datum_message__descriptor, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "old_datum", + 6, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_MESSAGE, + 0, /* quantifier_offset */ + offsetof(Decoderbufs__TxnMessage, old_datum), + &decoderbufs__datum_message__descriptor, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, +}; +static const unsigned decoderbufs__txn_message__field_indices_by_name[] = { + 4, /* field[4] = new_datum */ + 5, /* field[5] = old_datum */ + 3, /* field[3] = op */ + 2, /* field[2] = table */ + 0, /* field[0] = timestamp */ + 1, /* field[1] = xid */ +}; +static const ProtobufCIntRange decoderbufs__txn_message__number_ranges[1 + 1] = +{ + { 1, 0 }, + { 0, 6 } +}; +const ProtobufCMessageDescriptor decoderbufs__txn_message__descriptor = +{ + PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC, + "decoderbufs.TxnMessage", + "TxnMessage", + "Decoderbufs__TxnMessage", + "decoderbufs", + sizeof(Decoderbufs__TxnMessage), + 6, + decoderbufs__txn_message__field_descriptors, + decoderbufs__txn_message__field_indices_by_name, + 1, decoderbufs__txn_message__number_ranges, + (ProtobufCMessageInit) decoderbufs__txn_message__init, + NULL,NULL,NULL /* reserved[123] */ +}; +const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] = +{ + { "INSERT", "DECODERBUFS__OP__INSERT", 0 }, + { "UPDATE", "DECODERBUFS__OP__UPDATE", 1 }, + { "DELETE", "DECODERBUFS__OP__DELETE", 2 }, +}; +static const ProtobufCIntRange decoderbufs__op__value_ranges[] = { +{0, 0},{0, 3} +}; +const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] = +{ + { "DELETE", 2 }, + { "INSERT", 0 }, + { "UPDATE", 1 }, +}; +const ProtobufCEnumDescriptor decoderbufs__op__descriptor = +{ + PROTOBUF_C__ENUM_DESCRIPTOR_MAGIC, + "decoderbufs.Op", + "Op", + "Decoderbufs__Op", + "decoderbufs", + 3, + decoderbufs__op__enum_values_by_number, + 3, + decoderbufs__op__enum_values_by_name, + 1, + decoderbufs__op__value_ranges, + NULL,NULL,NULL,NULL /* reserved[1234] */ +}; diff --git a/src/proto/pg_logicaldec.pb-c.h b/src/proto/pg_logicaldec.pb-c.h new file mode 100644 index 0000000..a898805 --- /dev/null +++ b/src/proto/pg_logicaldec.pb-c.h @@ -0,0 +1,135 @@ +/* Generated by the protocol buffer compiler. DO NOT EDIT! */ +/* Generated from: proto/pg_logicaldec.proto */ + +#ifndef PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED +#define PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED + +#include + +PROTOBUF_C__BEGIN_DECLS + +#if PROTOBUF_C_VERSION_NUMBER < 1000000 +# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers. +#elif 1000001 < PROTOBUF_C_MIN_COMPILER_VERSION +# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c. +#endif + + +typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage; +typedef struct _Decoderbufs__TxnMessage Decoderbufs__TxnMessage; + + +/* --- enums --- */ + +typedef enum _Decoderbufs__Op { + DECODERBUFS__OP__INSERT = 0, + DECODERBUFS__OP__UPDATE = 1, + DECODERBUFS__OP__DELETE = 2 + PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP) +} Decoderbufs__Op; + +/* --- messages --- */ + +struct _Decoderbufs__DatumMessage +{ + ProtobufCMessage base; + char *column_name; + protobuf_c_boolean has_column_type; + int64_t column_type; + protobuf_c_boolean has_datum_int32; + int32_t datum_int32; + protobuf_c_boolean has_datum_int64; + int64_t datum_int64; + protobuf_c_boolean has_datum_float; + float datum_float; + protobuf_c_boolean has_datum_double; + double datum_double; + protobuf_c_boolean has_datum_bool; + protobuf_c_boolean datum_bool; + char *datum_string; + protobuf_c_boolean has_datum_bytes; + ProtobufCBinaryData datum_bytes; +}; +#define DECODERBUFS__DATUM_MESSAGE__INIT \ + { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \ + , NULL, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, NULL, 0,{0,NULL} } + + +struct _Decoderbufs__TxnMessage +{ + ProtobufCMessage base; + protobuf_c_boolean has_timestamp; + int64_t timestamp; + protobuf_c_boolean has_xid; + int64_t xid; + char *table; + protobuf_c_boolean has_op; + Decoderbufs__Op op; + Decoderbufs__DatumMessage *new_datum; + Decoderbufs__DatumMessage *old_datum; +}; +#define DECODERBUFS__TXN_MESSAGE__INIT \ + { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__txn_message__descriptor) \ + , 0,0, 0,0, NULL, 0,0, NULL, NULL } + + +/* Decoderbufs__DatumMessage methods */ +void decoderbufs__datum_message__init + (Decoderbufs__DatumMessage *message); +size_t decoderbufs__datum_message__get_packed_size + (const Decoderbufs__DatumMessage *message); +size_t decoderbufs__datum_message__pack + (const Decoderbufs__DatumMessage *message, + uint8_t *out); +size_t decoderbufs__datum_message__pack_to_buffer + (const Decoderbufs__DatumMessage *message, + ProtobufCBuffer *buffer); +Decoderbufs__DatumMessage * + decoderbufs__datum_message__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data); +void decoderbufs__datum_message__free_unpacked + (Decoderbufs__DatumMessage *message, + ProtobufCAllocator *allocator); +/* Decoderbufs__TxnMessage methods */ +void decoderbufs__txn_message__init + (Decoderbufs__TxnMessage *message); +size_t decoderbufs__txn_message__get_packed_size + (const Decoderbufs__TxnMessage *message); +size_t decoderbufs__txn_message__pack + (const Decoderbufs__TxnMessage *message, + uint8_t *out); +size_t decoderbufs__txn_message__pack_to_buffer + (const Decoderbufs__TxnMessage *message, + ProtobufCBuffer *buffer); +Decoderbufs__TxnMessage * + decoderbufs__txn_message__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data); +void decoderbufs__txn_message__free_unpacked + (Decoderbufs__TxnMessage *message, + ProtobufCAllocator *allocator); +/* --- per-message closures --- */ + +typedef void (*Decoderbufs__DatumMessage_Closure) + (const Decoderbufs__DatumMessage *message, + void *closure_data); +typedef void (*Decoderbufs__TxnMessage_Closure) + (const Decoderbufs__TxnMessage *message, + void *closure_data); + +/* --- services --- */ + + +/* --- descriptors --- */ + +extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor; +extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor; +extern const ProtobufCMessageDescriptor decoderbufs__txn_message__descriptor; + +PROTOBUF_C__END_DECLS + + +#endif /* PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED */