diff --git a/Makefile b/Makefile index daefde4..04ff70c 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) -L/usr/local/lib -llwgeom MODULE_big = $(patsubst src/%.c,%,$(wildcard src/*.c)) -OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o +OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o src/protobuf-c-text.o PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/src/decoderbufs.c b/src/decoderbufs.c index ba34503..b2251b2 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -52,6 +52,8 @@ #include "utils/typcache.h" #include "utils/uuid.h" #include "proto/pg_logicaldec.pb-c.h" +#include "protobuf-c/protobuf-c.h" +#include "protobuf-c-text.h" /* POSTGIS version define so it doesn't redef macros */ #define POSTGIS_PGSQL_VERSION 94 @@ -254,6 +256,7 @@ static void row_message_destroy(Decoderbufs__RowMessage *msg) { } } + /* only used for debug-mode (currently not all OIDs are currently supported) */ static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup, size_t n) { @@ -602,22 +605,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (data->debug_mode) { OutputPluginPrepareWrite(ctx, true); - if (rmsg.has_commit_time) - appendStringInfo(ctx->out, "commit_time[%" PRId64 "]", rmsg.commit_time); - if (rmsg.table) - appendStringInfo(ctx->out, ", table[%s]", rmsg.table); - if (rmsg.has_op) - appendStringInfo(ctx->out, ", op[%d]", rmsg.op); - if (rmsg.old_tuple) { - appendStringInfo(ctx->out, "\nOLD TUPLE: \n"); - print_tuple_msg(ctx->out, rmsg.old_tuple, rmsg.n_old_tuple); - appendStringInfo(ctx->out, "\n"); - } - if (rmsg.new_tuple) { - appendStringInfo(ctx->out, "\nNEW TUPLE: \n"); - print_tuple_msg(ctx->out, rmsg.new_tuple, rmsg.n_new_tuple); - appendStringInfo(ctx->out, "\n"); - } + protobuf_c_text_to_string_internal(ctx->out, 0, (ProtobufCMessage*)&rmsg, &decoderbufs__row_message__descriptor); OutputPluginWrite(ctx, true); } else { OutputPluginPrepareWrite(ctx, true); diff --git a/src/protobuf-c-text.c b/src/protobuf-c-text.c new file mode 100644 index 0000000..008a8f2 --- /dev/null +++ b/src/protobuf-c-text.c @@ -0,0 +1,348 @@ +#include "protobuf-c-text.h" + +/** Escape string. + * + * Add escape characters to strings for problematic characters. + * + * \param[in] src The unescaped string to process. + * \param[in] len Length of \c src. Note that \c src might have ASCII + * \c NULs so strlen() isn't good enough here. + * \return The fully escaped string, or \c NULL if there has been an + * allocation error. + */ +static char* esc_str(char *src, int len) { + int i, escapes = 0, dst_len = 0; + unsigned char *dst; + + for (i = 0; i < len; i++) { + if (!isprint(src[i])) { + escapes++; + } + } + dst = palloc((escapes * 4) + ((len - escapes) * 2) + 1); + if (!dst) { + return NULL; + } + + for (i = 0; i < len; i++) { + switch (src[i]) { + /* Special cases. */ + case '\'': + dst[dst_len++] = '\\'; + dst[dst_len++] = '\''; + break; + case '\"': + dst[dst_len++] = '\\'; + dst[dst_len++] = '\"'; + break; + case '\\': + dst[dst_len++] = '\\'; + dst[dst_len++] = '\\'; + break; + case '\n': + dst[dst_len++] = '\\'; + dst[dst_len++] = 'n'; + break; + case '\r': + dst[dst_len++] = '\\'; + dst[dst_len++] = 'r'; + break; + case '\t': + dst[dst_len++] = '\\'; + dst[dst_len++] = 't'; + break; + + /* Escape with octal if !isprint. */ + default: + if (!isprint(src[i])) { + dst_len += snprintf("\\%03o", dst + dst_len, src[i]); + } else { + dst[dst_len++] = src[i]; + } + break; + } + } + dst[dst_len] = '\0'; + + return dst; +} + + +/** Internal function to back API function. + * + * Has a few extra params to better enable recursion. This function gets + * called for each nested message as the \c ProtobufCMessage struct is + * traversed. + * + * \param[in,out] out The string being built up for the text format protobuf. + * \param[in] level Indent level - increments in 2's. + * \param[in] m The \c ProtobufCMessage being serialised. + * \param[in] d The descriptor for the \c ProtobufCMessage. + */ +static void protobuf_c_text_to_string_internal(StringInfo out, + int level, + ProtobufCMessage *m, + const ProtobufCMessageDescriptor *d) { + int i; + size_t j, quantifier_offset; + double float_var; + const ProtobufCFieldDescriptor *f; + ProtobufCEnumDescriptor *enumd; + const ProtobufCEnumValue *enumv; + + f = d->fields; + for (i = 0; i < d->n_fields; i++) { + /* Decide if something needs to be done for this field. */ + switch (f[i].label) { + case PROTOBUF_C_LABEL_OPTIONAL: + if (f[i].type == PROTOBUF_C_TYPE_STRING) { + if (!STRUCT_MEMBER(char *, m, f[i].offset) || (STRUCT_MEMBER(char *, m, f[i].offset) == (char *)f[i].default_value)) { + continue; + } + } else if (f[i].type == PROTOBUF_C_TYPE_MESSAGE) { + if (!STRUCT_MEMBER(char *, m, f[i].offset)) { + continue; + } + } else { + if (!STRUCT_MEMBER(protobuf_c_boolean, m, f[i].quantifier_offset)) { + continue; + } + } + break; + case PROTOBUF_C_LABEL_REPEATED: + if (!STRUCT_MEMBER(size_t, m, f[i].quantifier_offset)) { + continue; + } + break; + } + + quantifier_offset = STRUCT_MEMBER(size_t, m, f[i].quantifier_offset); + /* Field exists and has data, dump it. */ + switch (f[i].type) { + case PROTOBUF_C_TYPE_INT32: + case PROTOBUF_C_TYPE_UINT32: + case PROTOBUF_C_TYPE_FIXED32: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + appendStringInfo(out, + "%*s%s: %u\n", + level, "", f[i].name, + STRUCT_MEMBER(uint32_t *, m, f[i].offset)[j]); + } + } else { + appendStringInfo(out, + "%*s%s: %u\n", + level, "", f[i].name, + STRUCT_MEMBER(uint32_t, m, f[i].offset)); + } + break; + case PROTOBUF_C_TYPE_SINT32: + case PROTOBUF_C_TYPE_SFIXED32: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + appendStringInfo(out, + "%*s%s: %d\n", + level, "", f[i].name, + STRUCT_MEMBER(int32_t *, m, f[i].offset)[j]); + } + } else { + appendStringInfo(out, + "%*s%s: %d\n", + level, "", f[i].name, + STRUCT_MEMBER(int32_t, m, f[i].offset)); + } + break; + case PROTOBUF_C_TYPE_INT64: + case PROTOBUF_C_TYPE_UINT64: + case PROTOBUF_C_TYPE_FIXED64: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + appendStringInfo(out, + "%*s%s: %lu\n", + level, "", f[i].name, + STRUCT_MEMBER(uint64_t *, m, f[i].offset)[j]); + } + } else { + appendStringInfo(out, + "%*s%s: %lu\n", + level, "", f[i].name, + STRUCT_MEMBER(uint64_t, m, f[i].offset)); + } + break; + case PROTOBUF_C_TYPE_SINT64: + case PROTOBUF_C_TYPE_SFIXED64: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + appendStringInfo(out, + "%*s%s: %ld\n", + level, "", f[i].name, + STRUCT_MEMBER(int64_t *, m, f[i].offset)[j]); + } + } else { + appendStringInfo(out, + "%*s%s: %ld\n", + level, "", f[i].name, + STRUCT_MEMBER(int64_t, m, f[i].offset)); + } + break; + case PROTOBUF_C_TYPE_FLOAT: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + float_var = STRUCT_MEMBER(float *, m, f[i].offset)[j]; + appendStringInfo(out, + "%*s%s: %g\n", + level, "", f[i].name, + float_var); + } + } else { + float_var = STRUCT_MEMBER(float, m, f[i].offset); + appendStringInfo(out, + "%*s%s: %g\n", + level, "", f[i].name, + float_var); + } + break; + case PROTOBUF_C_TYPE_DOUBLE: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + appendStringInfo(out, + "%*s%s: %g\n", + level, "", f[i].name, + STRUCT_MEMBER(double *, m, f[i].offset)[j]); + } + } else { + appendStringInfo(out, + "%*s%s: %g\n", + level, "", f[i].name, + STRUCT_MEMBER(double, m, f[i].offset)); + } + break; + case PROTOBUF_C_TYPE_BOOL: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + appendStringInfo(out, + "%*s%s: %s\n", + level, "", f[i].name, + STRUCT_MEMBER(protobuf_c_boolean *, m, f[i].offset)[j]? + "true": "false"); + } + } else { + appendStringInfo(out, + "%*s%s: %s\n", + level, "", f[i].name, + STRUCT_MEMBER(protobuf_c_boolean, m, f[i].offset)? + "true": "false"); + } + break; + case PROTOBUF_C_TYPE_ENUM: + enumd = (ProtobufCEnumDescriptor *)f[i].descriptor; + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + enumv = protobuf_c_enum_descriptor_get_value( + enumd, STRUCT_MEMBER(int *, m, f[i].offset)[j]); + appendStringInfo(out, + "%*s%s: %s\n", + level, "", f[i].name, + enumv? enumv->name: "unknown"); + } + } else { + enumv = protobuf_c_enum_descriptor_get_value( + enumd, STRUCT_MEMBER(int, m, f[i].offset)); + appendStringInfo(out, + "%*s%s: %s\n", + level, "", f[i].name, + enumv? enumv->name: "unknown"); + } + break; + case PROTOBUF_C_TYPE_STRING: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + unsigned char *escaped; + + escaped = esc_str( + STRUCT_MEMBER(unsigned char **, m, f[i].offset)[j], + strlen(STRUCT_MEMBER(unsigned char **, m, f[i].offset)[j])); + if (!escaped) { + return; + } + appendStringInfo(out, + "%*s%s: \"%s\"\n", level, "", f[i].name, escaped); + pfree(escaped); + } + } else { + unsigned char *escaped; + + escaped = esc_str(STRUCT_MEMBER(unsigned char *, m, f[i].offset), + strlen(STRUCT_MEMBER(unsigned char *, m, f[i].offset))); + if (!escaped) { + return; + } + appendStringInfo(out, + "%*s%s: \"%s\"\n", level, "", f[i].name, escaped); + pfree(escaped); + } + break; + case PROTOBUF_C_TYPE_BYTES: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; j < quantifier_offset; j++) { + unsigned char *escaped; + + escaped = esc_str( + STRUCT_MEMBER(ProtobufCBinaryData *, m, f[i].offset)[j].data, + STRUCT_MEMBER(ProtobufCBinaryData *, m, f[i].offset)[j].len); + if (!escaped) { + return; + } + appendStringInfo(out, + "%*s%s: \"%s\"\n", level, "", f[i].name, escaped); + pfree(escaped); + } + } else { + unsigned char *escaped; + + escaped = esc_str( + STRUCT_MEMBER(ProtobufCBinaryData, m, f[i].offset).data, + STRUCT_MEMBER(ProtobufCBinaryData, m, f[i].offset).len); + if (!escaped) { + return; + } + appendStringInfo(out, + "%*s%s: \"%s\"\n", level, "", f[i].name, escaped); + pfree(escaped); + } + break; + + case PROTOBUF_C_TYPE_MESSAGE: + if (f[i].label == PROTOBUF_C_LABEL_REPEATED) { + for (j = 0; + j < STRUCT_MEMBER(size_t, m, f[i].quantifier_offset); + j++) { + appendStringInfo(out, + "%*s%s {\n", level, "", f[i].name); + protobuf_c_text_to_string_internal(out, level + 2, + STRUCT_MEMBER(ProtobufCMessage **, m, f[i].offset)[j], + (ProtobufCMessageDescriptor *)f[i].descriptor); + appendStringInfo(out, + "%*s}\n", level, ""); + } + } else { + appendStringInfo(out, + "%*s%s {\n", level, "", f[i].name); + protobuf_c_text_to_string_internal(out, level + 2, + STRUCT_MEMBER(ProtobufCMessage *, m, f[i].offset), + (ProtobufCMessageDescriptor *)f[i].descriptor); + appendStringInfo(out, + "%*s}\n", level, ""); + } + break; + default: + return; + break; + } + } +} + + +static void protobuf_c_text_to_string(StringInfo out, ProtobufCMessage *m) { + protobuf_c_text_to_string_internal(out, 0, m, m->descriptor); +} diff --git a/src/protobuf-c-text.h b/src/protobuf-c-text.h new file mode 100644 index 0000000..42dca77 --- /dev/null +++ b/src/protobuf-c-text.h @@ -0,0 +1,57 @@ +#ifndef PROTOBUF_C_TEXT_H +#define PROTOBUF_C_TEXT_H + +#include "postgres.h" +#include "lib/stringinfo.h" +#include "protobuf-c/protobuf-c.h" + + +/* BEGIN MACROS TAKEN FROM protobuf-c/protobuf-c.c */ +/** + * Internal `ProtobufCMessage` manipulation macro. + * + * Base macro for manipulating a `ProtobufCMessage`. Used by STRUCT_MEMBER() and + * STRUCT_MEMBER_PTR(). + */ +#define STRUCT_MEMBER_P(struct_p, struct_offset) \ + ((void *) ((uint8_t *) (struct_p) + (struct_offset))) + +/** + * Return field in a `ProtobufCMessage` based on offset. + * + * Take a pointer to a `ProtobufCMessage` and find the field at the offset. + * Cast it to the passed type. + */ +#define STRUCT_MEMBER(member_type, struct_p, struct_offset) \ + (*(member_type *) STRUCT_MEMBER_P((struct_p), (struct_offset))) + +/** + * Return field in a `ProtobufCMessage` based on offset. + * + * Take a pointer to a `ProtobufCMessage` and find the field at the offset. Cast + * it to a pointer to the passed type. + */ +#define STRUCT_MEMBER_PTR(member_type, struct_p, struct_offset) \ + ((member_type *) STRUCT_MEMBER_P((struct_p), (struct_offset))) +/* END MACROS FROM protobuf-c/protobuf-c.c */ + + +/** Internal function to back API function. + * + * Has a few extra params to better enable recursion. This function gets + * called for each nested message as the \c ProtobufCMessage struct is + * traversed. + * + * \param[in,out] out The string being used for the text format protobuf. + * \param[in] level Indent level - increments in 2's. + * \param[in] m The \c ProtobufCMessage being serialised. + * \param[in] d The descriptor for the \c ProtobufCMessage. + */ +extern void protobuf_c_text_to_string_internal(StringInfo out, + int level, + ProtobufCMessage *m, + const ProtobufCMessageDescriptor *d); + +extern void protobuf_c_text_to_string(StringInfo out, ProtobufCMessage *m); + +#endif /* PROTOBUF_C_TEXT_H */