diff --git a/Makefile b/Makefile index 04ff70c..a8342e5 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ -MODULES = decoderbufs +MODULE_big = decoderbufs +EXTENSION = decoderbufs PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0') PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0') @@ -6,8 +7,7 @@ PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0') PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) -L/usr/local/lib -llwgeom -MODULE_big = $(patsubst src/%.c,%,$(wildcard src/*.c)) -OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o src/protobuf-c-text.o +OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/README.md b/README.md index 565ffb6..a98d618 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ A PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers # decoderbufs -Version: 0.0.1 +Version: 0.1.0 **decoderbufs** is a PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers. diff --git a/decoderbufs.control b/decoderbufs.control new file mode 100644 index 0000000..3132c55 --- /dev/null +++ b/decoderbufs.control @@ -0,0 +1,3 @@ +comment = 'Logical decoding plugin that delivers WAL stream changes using a Protocol Buffer format' +default_version = '0.1.0' +relocatable = true diff --git a/src/decoderbufs.c b/src/decoderbufs.c index b2251b2..4c9b226 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -52,12 +52,10 @@ #include "utils/typcache.h" #include "utils/uuid.h" #include "proto/pg_logicaldec.pb-c.h" -#include "protobuf-c/protobuf-c.h" -#include "protobuf-c-text.h" /* POSTGIS version define so it doesn't redef macros */ #define POSTGIS_PGSQL_VERSION 94 -#include "libpgcommon/lwgeom_pg.h" +#include "liblwgeom.h" PG_MODULE_MAGIC; @@ -256,65 +254,86 @@ static void row_message_destroy(Decoderbufs__RowMessage *msg) { } } - -/* only used for debug-mode (currently not all OIDs are currently supported) */ -static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup, - size_t n) { +/* print tuple datums (only used for debug-mode) */ +static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup, + size_t n) { if (tup) { for (int i = 0; i < n; i++) { Decoderbufs__DatumMessage *dmsg = tup[i]; + if (dmsg->column_name) appendStringInfo(out, "column_name[%s]", dmsg->column_name); - if (dmsg->has_column_type) { + + if (dmsg->has_column_type) appendStringInfo(out, ", column_type[%" PRId64 "]", dmsg->column_type); - switch (dmsg->column_type) { - case INT2OID: - case INT4OID: - appendStringInfo(out, ", datum[%d]", dmsg->datum_int32); - break; - case INT8OID: - appendStringInfo(out, ", datum[%" PRId64 "]", dmsg->datum_int64); - break; - case FLOAT4OID: - appendStringInfo(out, ", datum[%f]", dmsg->datum_float); - break; - case FLOAT8OID: - case NUMERICOID: - appendStringInfo(out, ", datum[%f]", dmsg->datum_double); - break; - case CHAROID: - case VARCHAROID: - case BPCHAROID: - case TEXTOID: - case JSONOID: - case XMLOID: - case UUIDOID: - case TIMESTAMPOID: - case TIMESTAMPTZOID: - appendStringInfo(out, ", datum[%s]", dmsg->datum_string); - break; - case POINTOID: - appendStringInfo(out, ", datum[POINT(%f, %f)]", - dmsg->datum_point->x, dmsg->datum_point->y); - break; - default: - if (dmsg->column_type == geometry_oid && - dmsg->datum_point != NULL) { - appendStringInfo(out, ", datum[GEOMETRY(POINT(%f,%f))]", - dmsg->datum_point->x, dmsg->datum_point->y); - } else if (dmsg->column_type == geography_oid && - dmsg->datum_point != NULL) { - appendStringInfo(out, ", datum[GEOGRAPHY(POINT(%f,%f))]", - dmsg->datum_point->x, dmsg->datum_point->y); - } - break; - } - appendStringInfo(out, "\n"); + + switch (dmsg->datum_case) { + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32: + appendStringInfo(out, ", datum[%d]", dmsg->datum_int32); + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64: + appendStringInfo(out, ", datum[%" PRId64 "]", dmsg->datum_int64); + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT: + appendStringInfo(out, ", datum[%f]", dmsg->datum_float); + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE: + appendStringInfo(out, ", datum[%f]", dmsg->datum_double); + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BOOL: + appendStringInfo(out, ", datum[%d]", dmsg->datum_bool); + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING: + appendStringInfo(out, ", datum[%s]", dmsg->datum_string); + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES: + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT: + appendStringInfo(out, ", datum[POINT(%f, %f)]", + dmsg->datum_point->x, dmsg->datum_point->y); + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET: + // intentional fall-through + default: + appendStringInfo(out, ", datum[!NOT SET!]"); + break; } + appendStringInfo(out, "\n"); } } } +/* print a row message (only used for debug-mode) */ +static void print_row_msg(StringInfo out, Decoderbufs__RowMessage *rmsg) { + if (!rmsg) + return; + + if (rmsg->has_transaction_id) + appendStringInfo(out, "txid[%d]", rmsg->transaction_id); + + if (rmsg->has_commit_time) + appendStringInfo(out, ", commit_time[%" PRId64 "]", rmsg->commit_time); + + if (rmsg->table) + appendStringInfo(out, ", table[%s]", rmsg->table); + + if (rmsg->has_op) + appendStringInfo(out, ", op[%d]", rmsg->op); + + if (rmsg->old_tuple) { + appendStringInfo(out, "\nOLD TUPLE: \n"); + print_tuple_datums(out, rmsg->old_tuple, rmsg->n_old_tuple); + appendStringInfo(out, "\n"); + } + + if (rmsg->new_tuple) { + appendStringInfo(out, "\nNEW TUPLE: \n"); + print_tuple_datums(out, rmsg->new_tuple, rmsg->n_new_tuple); + appendStringInfo(out, "\n"); + } + +} + /* this doesn't seem to be available in the public api (unfortunate) */ static double numeric_to_double_no_overflow(Numeric num) { char *tmp; @@ -603,25 +622,20 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; } + /* write msg */ + OutputPluginPrepareWrite(ctx, true); if (data->debug_mode) { - OutputPluginPrepareWrite(ctx, true); - protobuf_c_text_to_string_internal(ctx->out, 0, (ProtobufCMessage*)&rmsg, &decoderbufs__row_message__descriptor); - OutputPluginWrite(ctx, true); + //protobuf_c_text_to_string(ctx->out, (ProtobufCMessage*)&rmsg); + print_row_msg(ctx->out, &rmsg); } else { - OutputPluginPrepareWrite(ctx, true); size_t psize = decoderbufs__row_message__get_packed_size(&rmsg); void *packed = palloc(psize); size_t ssize = decoderbufs__row_message__pack(&rmsg, packed); - uint64_t flen = htobe64(ssize); - /* frame encoding size */ - appendBinaryStringInfo(ctx->out, (char *)&flen, sizeof(flen)); - /* frame encoding payload */ appendBinaryStringInfo(ctx->out, packed, ssize); - OutputPluginWrite(ctx, true); - /* free packed buffer */ pfree(packed); } + OutputPluginWrite(ctx, true); /* cleanup msg */ row_message_destroy(&rmsg); diff --git a/src/protobuf-c-text.c b/src/protobuf-c-text.c deleted file mode 100644 index 008a8f2..0000000 --- a/src/protobuf-c-text.c +++ /dev/null @@ -1,348 +0,0 @@ -#include "protobuf-c-text.h" - -/** Escape string. - * - * Add escape characters to strings for problematic characters. - * - * \param[in] src The unescaped string to process. - * \param[in] len Length of \c src. Note that \c src might have ASCII - * \c NULs so strlen() isn't good enough here. - * \return The fully escaped string, or \c NULL if there has been an - * allocation error. - */ -static char* esc_str(char *src, int len) { - int i, escapes = 0, dst_len = 0; - unsigned char *dst; - - for (i = 0; i < len; i++) { - if (!isprint(src[i])) { - escapes++; - } - } - dst = palloc((escapes * 4) + ((len - escapes) * 2) + 1); - if (!dst) { - return NULL; - } - - for (i = 0; i < len; i++) { - switch (src[i]) { - /* Special cases. */ - case '\'': - dst[dst_len++] = '\\'; - dst[dst_len++] = '\''; - break; - case '\"': - dst[dst_len++] = '\\'; - dst[dst_len++] = '\"'; - break; - case '\\': - dst[dst_len++] = '\\'; - dst[dst_len++] = '\\'; - break; - case '\n': - dst[dst_len++] = '\\'; - dst[dst_len++] = 'n'; - break; - case '\r': - dst[dst_len++] = '\\'; - dst[dst_len++] = 'r'; - break; - case '\t': - dst[dst_len++] = '\\'; - dst[dst_len++] = 't'; - break; - - /* Escape with octal if !isprint. */ - default: - if (!isprint(src[i])) { - dst_len += snprintf("\\%03o", dst + dst_len, src[i]); - } else { - dst[dst_len++] = src[i]; - } - break; - } - } - dst[dst_len] = '\0'; - - return dst; -} - - -/** Internal function to back API function. - * - * Has a few extra params to better enable recursion. This function gets - * called for each nested message as the \c ProtobufCMessage struct is - * traversed. - * - * \param[in,out] out The string being built up for the text format protobuf. - * \param[in] level Indent level - increments in 2's. - * \param[in] m The \c ProtobufCMessage being serialised. - * \param[in] d The descriptor for the \c ProtobufCMessage. - */ -static void protobuf_c_text_to_string_internal(StringInfo out, - int level, - ProtobufCMessage *m, - const ProtobufCMessageDescriptor *d) { - int i; - size_t j, quantifier_offset; - double float_var; - const ProtobufCFieldDescriptor *f; - ProtobufCEnumDescriptor *enumd; - const ProtobufCEnumValue *enumv; - - f = d->fields; - for (i = 0; i < d->n_fields; i++) { - /* Decide if something needs to be done for this field. */ - switch (f[i].label) { - case PROTOBUF_C_LABEL_OPTIONAL: - if (f[i].type == PROTOBUF_C_TYPE_STRING) { - if (!STRUCT_MEMBER(char *, m, f[i].offset) || (STRUCT_MEMBER(char *, m, f[i].offset) == (char *)f[i].default_value)) { - continue; - } - } else if (f[i].type == PROTOBUF_C_TYPE_MESSAGE) { - if (!STRUCT_MEMBER(char *, m, f[i].offset)) { - continue; - } - } else { - if (!STRUCT_MEMBER(protobuf_c_boolean, m, f[i].quantifier_offset)) { - continue; - } - } - break; - case PROTOBUF_C_LABEL_REPEATED: - if (!STRUCT_MEMBER(size_t, m, f[i].quantifier_offset)) { - continue; - } - break; - } - - quantifier_offset = STRUCT_MEMBER(size_t, m, f[i].quantifier_offset); - /* Field exists and has data, dump it. */ - switch (f[i].type) { - case PROTOBUF_C_TYPE_INT32: - case PROTOBUF_C_TYPE_UINT32: - case PROTOBUF_C_TYPE_FIXED32: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - appendStringInfo(out, - "%*s%s: %u\n", - level, "", f[i].name, - STRUCT_MEMBER(uint32_t *, m, f[i].offset)[j]); - } - } else { - appendStringInfo(out, - "%*s%s: %u\n", - level, "", f[i].name, - STRUCT_MEMBER(uint32_t, m, f[i].offset)); - } - break; - case PROTOBUF_C_TYPE_SINT32: - case PROTOBUF_C_TYPE_SFIXED32: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - appendStringInfo(out, - "%*s%s: %d\n", - level, "", f[i].name, - STRUCT_MEMBER(int32_t *, m, f[i].offset)[j]); - } - } else { - appendStringInfo(out, - "%*s%s: %d\n", - level, "", f[i].name, - STRUCT_MEMBER(int32_t, m, f[i].offset)); - } - break; - case PROTOBUF_C_TYPE_INT64: - case PROTOBUF_C_TYPE_UINT64: - case PROTOBUF_C_TYPE_FIXED64: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - appendStringInfo(out, - "%*s%s: %lu\n", - level, "", f[i].name, - STRUCT_MEMBER(uint64_t *, m, f[i].offset)[j]); - } - } else { - appendStringInfo(out, - "%*s%s: %lu\n", - level, "", f[i].name, - STRUCT_MEMBER(uint64_t, m, f[i].offset)); - } - break; - case PROTOBUF_C_TYPE_SINT64: - case PROTOBUF_C_TYPE_SFIXED64: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - appendStringInfo(out, - "%*s%s: %ld\n", - level, "", f[i].name, - STRUCT_MEMBER(int64_t *, m, f[i].offset)[j]); - } - } else { - appendStringInfo(out, - "%*s%s: %ld\n", - level, "", f[i].name, - STRUCT_MEMBER(int64_t, m, f[i].offset)); - } - break; - case PROTOBUF_C_TYPE_FLOAT: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - float_var = STRUCT_MEMBER(float *, m, f[i].offset)[j]; - appendStringInfo(out, - "%*s%s: %g\n", - level, "", f[i].name, - float_var); - } - } else { - float_var = STRUCT_MEMBER(float, m, f[i].offset); - appendStringInfo(out, - "%*s%s: %g\n", - level, "", f[i].name, - float_var); - } - break; - case PROTOBUF_C_TYPE_DOUBLE: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - appendStringInfo(out, - "%*s%s: %g\n", - level, "", f[i].name, - STRUCT_MEMBER(double *, m, f[i].offset)[j]); - } - } else { - appendStringInfo(out, - "%*s%s: %g\n", - level, "", f[i].name, - STRUCT_MEMBER(double, m, f[i].offset)); - } - break; - case PROTOBUF_C_TYPE_BOOL: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - appendStringInfo(out, - "%*s%s: %s\n", - level, "", f[i].name, - STRUCT_MEMBER(protobuf_c_boolean *, m, f[i].offset)[j]? - "true": "false"); - } - } else { - appendStringInfo(out, - "%*s%s: %s\n", - level, "", f[i].name, - STRUCT_MEMBER(protobuf_c_boolean, m, f[i].offset)? - "true": "false"); - } - break; - case PROTOBUF_C_TYPE_ENUM: - enumd = (ProtobufCEnumDescriptor *)f[i].descriptor; - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - enumv = protobuf_c_enum_descriptor_get_value( - enumd, STRUCT_MEMBER(int *, m, f[i].offset)[j]); - appendStringInfo(out, - "%*s%s: %s\n", - level, "", f[i].name, - enumv? enumv->name: "unknown"); - } - } else { - enumv = protobuf_c_enum_descriptor_get_value( - enumd, STRUCT_MEMBER(int, m, f[i].offset)); - appendStringInfo(out, - "%*s%s: %s\n", - level, "", f[i].name, - enumv? enumv->name: "unknown"); - } - break; - case PROTOBUF_C_TYPE_STRING: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - unsigned char *escaped; - - escaped = esc_str( - STRUCT_MEMBER(unsigned char **, m, f[i].offset)[j], - strlen(STRUCT_MEMBER(unsigned char **, m, f[i].offset)[j])); - if (!escaped) { - return; - } - appendStringInfo(out, - "%*s%s: \"%s\"\n", level, "", f[i].name, escaped); - pfree(escaped); - } - } else { - unsigned char *escaped; - - escaped = esc_str(STRUCT_MEMBER(unsigned char *, m, f[i].offset), - strlen(STRUCT_MEMBER(unsigned char *, m, f[i].offset))); - if (!escaped) { - return; - } - appendStringInfo(out, - "%*s%s: \"%s\"\n", level, "", f[i].name, escaped); - pfree(escaped); - } - break; - case PROTOBUF_C_TYPE_BYTES: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; j < quantifier_offset; j++) { - unsigned char *escaped; - - escaped = esc_str( - STRUCT_MEMBER(ProtobufCBinaryData *, m, f[i].offset)[j].data, - STRUCT_MEMBER(ProtobufCBinaryData *, m, f[i].offset)[j].len); - if (!escaped) { - return; - } - appendStringInfo(out, - "%*s%s: \"%s\"\n", level, "", f[i].name, escaped); - pfree(escaped); - } - } else { - unsigned char *escaped; - - escaped = esc_str( - STRUCT_MEMBER(ProtobufCBinaryData, m, f[i].offset).data, - STRUCT_MEMBER(ProtobufCBinaryData, m, f[i].offset).len); - if (!escaped) { - return; - } - appendStringInfo(out, - "%*s%s: \"%s\"\n", level, "", f[i].name, escaped); - pfree(escaped); - } - break; - - case PROTOBUF_C_TYPE_MESSAGE: - if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { - for (j = 0; - j < STRUCT_MEMBER(size_t, m, f[i].quantifier_offset); - j++) { - appendStringInfo(out, - "%*s%s {\n", level, "", f[i].name); - protobuf_c_text_to_string_internal(out, level + 2, - STRUCT_MEMBER(ProtobufCMessage **, m, f[i].offset)[j], - (ProtobufCMessageDescriptor *)f[i].descriptor); - appendStringInfo(out, - "%*s}\n", level, ""); - } - } else { - appendStringInfo(out, - "%*s%s {\n", level, "", f[i].name); - protobuf_c_text_to_string_internal(out, level + 2, - STRUCT_MEMBER(ProtobufCMessage *, m, f[i].offset), - (ProtobufCMessageDescriptor *)f[i].descriptor); - appendStringInfo(out, - "%*s}\n", level, ""); - } - break; - default: - return; - break; - } - } -} - - -static void protobuf_c_text_to_string(StringInfo out, ProtobufCMessage *m) { - protobuf_c_text_to_string_internal(out, 0, m, m->descriptor); -} diff --git a/src/protobuf-c-text.h b/src/protobuf-c-text.h deleted file mode 100644 index 42dca77..0000000 --- a/src/protobuf-c-text.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef PROTOBUF_C_TEXT_H -#define PROTOBUF_C_TEXT_H - -#include "postgres.h" -#include "lib/stringinfo.h" -#include "protobuf-c/protobuf-c.h" - - -/* BEGIN MACROS TAKEN FROM protobuf-c/protobuf-c.c */ -/** - * Internal `ProtobufCMessage` manipulation macro. - * - * Base macro for manipulating a `ProtobufCMessage`. Used by STRUCT_MEMBER() and - * STRUCT_MEMBER_PTR(). - */ -#define STRUCT_MEMBER_P(struct_p, struct_offset) \ - ((void *) ((uint8_t *) (struct_p) + (struct_offset))) - -/** - * Return field in a `ProtobufCMessage` based on offset. - * - * Take a pointer to a `ProtobufCMessage` and find the field at the offset. - * Cast it to the passed type. - */ -#define STRUCT_MEMBER(member_type, struct_p, struct_offset) \ - (*(member_type *) STRUCT_MEMBER_P((struct_p), (struct_offset))) - -/** - * Return field in a `ProtobufCMessage` based on offset. - * - * Take a pointer to a `ProtobufCMessage` and find the field at the offset. Cast - * it to a pointer to the passed type. - */ -#define STRUCT_MEMBER_PTR(member_type, struct_p, struct_offset) \ - ((member_type *) STRUCT_MEMBER_P((struct_p), (struct_offset))) -/* END MACROS FROM protobuf-c/protobuf-c.c */ - - -/** Internal function to back API function. - * - * Has a few extra params to better enable recursion. This function gets - * called for each nested message as the \c ProtobufCMessage struct is - * traversed. - * - * \param[in,out] out The string being used for the text format protobuf. - * \param[in] level Indent level - increments in 2's. - * \param[in] m The \c ProtobufCMessage being serialised. - * \param[in] d The descriptor for the \c ProtobufCMessage. - */ -extern void protobuf_c_text_to_string_internal(StringInfo out, - int level, - ProtobufCMessage *m, - const ProtobufCMessageDescriptor *d); - -extern void protobuf_c_text_to_string(StringInfo out, ProtobufCMessage *m); - -#endif /* PROTOBUF_C_TEXT_H */