Cleaned up debug print functions

pull/1/head
Xavier Stevens 2015-04-28 12:46:18 -07:00
parent 913b176e5d
commit 610ea41409
6 changed files with 82 additions and 470 deletions

View File

@ -1,4 +1,5 @@
MODULES = decoderbufs
MODULE_big = decoderbufs
EXTENSION = decoderbufs
PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0')
PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0')
@ -6,8 +7,7 @@ PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0')
PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) -L/usr/local/lib -llwgeom
MODULE_big = $(patsubst src/%.c,%,$(wildcard src/*.c))
OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o src/protobuf-c-text.o
OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)

View File

@ -5,7 +5,7 @@ A PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers
# decoderbufs
Version: 0.0.1
Version: 0.1.0
**decoderbufs** is a PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers.

3
decoderbufs.control Normal file
View File

@ -0,0 +1,3 @@
comment = 'Logical decoding plugin that delivers WAL stream changes using a Protocol Buffer format'
default_version = '0.1.0'
relocatable = true

View File

@ -52,12 +52,10 @@
#include "utils/typcache.h"
#include "utils/uuid.h"
#include "proto/pg_logicaldec.pb-c.h"
#include "protobuf-c/protobuf-c.h"
#include "protobuf-c-text.h"
/* POSTGIS version define so it doesn't redef macros */
#define POSTGIS_PGSQL_VERSION 94
#include "libpgcommon/lwgeom_pg.h"
#include "liblwgeom.h"
PG_MODULE_MAGIC;
@ -256,65 +254,86 @@ static void row_message_destroy(Decoderbufs__RowMessage *msg) {
}
}
/* only used for debug-mode (currently not all OIDs are currently supported) */
static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup,
size_t n) {
/* print tuple datums (only used for debug-mode) */
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
size_t n) {
if (tup) {
for (int i = 0; i < n; i++) {
Decoderbufs__DatumMessage *dmsg = tup[i];
if (dmsg->column_name)
appendStringInfo(out, "column_name[%s]", dmsg->column_name);
if (dmsg->has_column_type) {
if (dmsg->has_column_type)
appendStringInfo(out, ", column_type[%" PRId64 "]", dmsg->column_type);
switch (dmsg->column_type) {
case INT2OID:
case INT4OID:
appendStringInfo(out, ", datum[%d]", dmsg->datum_int32);
break;
case INT8OID:
appendStringInfo(out, ", datum[%" PRId64 "]", dmsg->datum_int64);
break;
case FLOAT4OID:
appendStringInfo(out, ", datum[%f]", dmsg->datum_float);
break;
case FLOAT8OID:
case NUMERICOID:
appendStringInfo(out, ", datum[%f]", dmsg->datum_double);
break;
case CHAROID:
case VARCHAROID:
case BPCHAROID:
case TEXTOID:
case JSONOID:
case XMLOID:
case UUIDOID:
case TIMESTAMPOID:
case TIMESTAMPTZOID:
appendStringInfo(out, ", datum[%s]", dmsg->datum_string);
break;
case POINTOID:
appendStringInfo(out, ", datum[POINT(%f, %f)]",
dmsg->datum_point->x, dmsg->datum_point->y);
break;
default:
if (dmsg->column_type == geometry_oid &&
dmsg->datum_point != NULL) {
appendStringInfo(out, ", datum[GEOMETRY(POINT(%f,%f))]",
dmsg->datum_point->x, dmsg->datum_point->y);
} else if (dmsg->column_type == geography_oid &&
dmsg->datum_point != NULL) {
appendStringInfo(out, ", datum[GEOGRAPHY(POINT(%f,%f))]",
dmsg->datum_point->x, dmsg->datum_point->y);
}
break;
}
appendStringInfo(out, "\n");
switch (dmsg->datum_case) {
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32:
appendStringInfo(out, ", datum[%d]", dmsg->datum_int32);
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64:
appendStringInfo(out, ", datum[%" PRId64 "]", dmsg->datum_int64);
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT:
appendStringInfo(out, ", datum[%f]", dmsg->datum_float);
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE:
appendStringInfo(out, ", datum[%f]", dmsg->datum_double);
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BOOL:
appendStringInfo(out, ", datum[%d]", dmsg->datum_bool);
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
appendStringInfo(out, ", datum[%s]", dmsg->datum_string);
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
appendStringInfo(out, ", datum[POINT(%f, %f)]",
dmsg->datum_point->x, dmsg->datum_point->y);
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET:
// intentional fall-through
default:
appendStringInfo(out, ", datum[!NOT SET!]");
break;
}
appendStringInfo(out, "\n");
}
}
}
/* print a row message (only used for debug-mode) */
static void print_row_msg(StringInfo out, Decoderbufs__RowMessage *rmsg) {
if (!rmsg)
return;
if (rmsg->has_transaction_id)
appendStringInfo(out, "txid[%d]", rmsg->transaction_id);
if (rmsg->has_commit_time)
appendStringInfo(out, ", commit_time[%" PRId64 "]", rmsg->commit_time);
if (rmsg->table)
appendStringInfo(out, ", table[%s]", rmsg->table);
if (rmsg->has_op)
appendStringInfo(out, ", op[%d]", rmsg->op);
if (rmsg->old_tuple) {
appendStringInfo(out, "\nOLD TUPLE: \n");
print_tuple_datums(out, rmsg->old_tuple, rmsg->n_old_tuple);
appendStringInfo(out, "\n");
}
if (rmsg->new_tuple) {
appendStringInfo(out, "\nNEW TUPLE: \n");
print_tuple_datums(out, rmsg->new_tuple, rmsg->n_new_tuple);
appendStringInfo(out, "\n");
}
}
/* this doesn't seem to be available in the public api (unfortunate) */
static double numeric_to_double_no_overflow(Numeric num) {
char *tmp;
@ -603,25 +622,20 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break;
}
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
OutputPluginPrepareWrite(ctx, true);
protobuf_c_text_to_string_internal(ctx->out, 0, (ProtobufCMessage*)&rmsg, &decoderbufs__row_message__descriptor);
OutputPluginWrite(ctx, true);
//protobuf_c_text_to_string(ctx->out, (ProtobufCMessage*)&rmsg);
print_row_msg(ctx->out, &rmsg);
} else {
OutputPluginPrepareWrite(ctx, true);
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
uint64_t flen = htobe64(ssize);
/* frame encoding size */
appendBinaryStringInfo(ctx->out, (char *)&flen, sizeof(flen));
/* frame encoding payload */
appendBinaryStringInfo(ctx->out, packed, ssize);
OutputPluginWrite(ctx, true);
/* free packed buffer */
pfree(packed);
}
OutputPluginWrite(ctx, true);
/* cleanup msg */
row_message_destroy(&rmsg);

View File

@ -1,348 +0,0 @@
#include "protobuf-c-text.h"
/** Escape string.
*
* Add escape characters to strings for problematic characters.
*
* \param[in] src The unescaped string to process.
* \param[in] len Length of \c src. Note that \c src might have ASCII
* \c NULs so strlen() isn't good enough here.
* \return The fully escaped string, or \c NULL if there has been an
* allocation error.
*/
static char* esc_str(char *src, int len) {
int i, escapes = 0, dst_len = 0;
unsigned char *dst;
for (i = 0; i < len; i++) {
if (!isprint(src[i])) {
escapes++;
}
}
dst = palloc((escapes * 4) + ((len - escapes) * 2) + 1);
if (!dst) {
return NULL;
}
for (i = 0; i < len; i++) {
switch (src[i]) {
/* Special cases. */
case '\'':
dst[dst_len++] = '\\';
dst[dst_len++] = '\'';
break;
case '\"':
dst[dst_len++] = '\\';
dst[dst_len++] = '\"';
break;
case '\\':
dst[dst_len++] = '\\';
dst[dst_len++] = '\\';
break;
case '\n':
dst[dst_len++] = '\\';
dst[dst_len++] = 'n';
break;
case '\r':
dst[dst_len++] = '\\';
dst[dst_len++] = 'r';
break;
case '\t':
dst[dst_len++] = '\\';
dst[dst_len++] = 't';
break;
/* Escape with octal if !isprint. */
default:
if (!isprint(src[i])) {
dst_len += snprintf("\\%03o", dst + dst_len, src[i]);
} else {
dst[dst_len++] = src[i];
}
break;
}
}
dst[dst_len] = '\0';
return dst;
}
/** Internal function to back API function.
*
* Has a few extra params to better enable recursion. This function gets
* called for each nested message as the \c ProtobufCMessage struct is
* traversed.
*
* \param[in,out] out The string being built up for the text format protobuf.
* \param[in] level Indent level - increments in 2's.
* \param[in] m The \c ProtobufCMessage being serialised.
* \param[in] d The descriptor for the \c ProtobufCMessage.
*/
static void protobuf_c_text_to_string_internal(StringInfo out,
int level,
ProtobufCMessage *m,
const ProtobufCMessageDescriptor *d) {
int i;
size_t j, quantifier_offset;
double float_var;
const ProtobufCFieldDescriptor *f;
ProtobufCEnumDescriptor *enumd;
const ProtobufCEnumValue *enumv;
f = d->fields;
for (i = 0; i < d->n_fields; i++) {
/* Decide if something needs to be done for this field. */
switch (f[i].label) {
case PROTOBUF_C_LABEL_OPTIONAL:
if (f[i].type == PROTOBUF_C_TYPE_STRING) {
if (!STRUCT_MEMBER(char *, m, f[i].offset) || (STRUCT_MEMBER(char *, m, f[i].offset) == (char *)f[i].default_value)) {
continue;
}
} else if (f[i].type == PROTOBUF_C_TYPE_MESSAGE) {
if (!STRUCT_MEMBER(char *, m, f[i].offset)) {
continue;
}
} else {
if (!STRUCT_MEMBER(protobuf_c_boolean, m, f[i].quantifier_offset)) {
continue;
}
}
break;
case PROTOBUF_C_LABEL_REPEATED:
if (!STRUCT_MEMBER(size_t, m, f[i].quantifier_offset)) {
continue;
}
break;
}
quantifier_offset = STRUCT_MEMBER(size_t, m, f[i].quantifier_offset);
/* Field exists and has data, dump it. */
switch (f[i].type) {
case PROTOBUF_C_TYPE_INT32:
case PROTOBUF_C_TYPE_UINT32:
case PROTOBUF_C_TYPE_FIXED32:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
appendStringInfo(out,
"%*s%s: %u\n",
level, "", f[i].name,
STRUCT_MEMBER(uint32_t *, m, f[i].offset)[j]);
}
} else {
appendStringInfo(out,
"%*s%s: %u\n",
level, "", f[i].name,
STRUCT_MEMBER(uint32_t, m, f[i].offset));
}
break;
case PROTOBUF_C_TYPE_SINT32:
case PROTOBUF_C_TYPE_SFIXED32:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
appendStringInfo(out,
"%*s%s: %d\n",
level, "", f[i].name,
STRUCT_MEMBER(int32_t *, m, f[i].offset)[j]);
}
} else {
appendStringInfo(out,
"%*s%s: %d\n",
level, "", f[i].name,
STRUCT_MEMBER(int32_t, m, f[i].offset));
}
break;
case PROTOBUF_C_TYPE_INT64:
case PROTOBUF_C_TYPE_UINT64:
case PROTOBUF_C_TYPE_FIXED64:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
appendStringInfo(out,
"%*s%s: %lu\n",
level, "", f[i].name,
STRUCT_MEMBER(uint64_t *, m, f[i].offset)[j]);
}
} else {
appendStringInfo(out,
"%*s%s: %lu\n",
level, "", f[i].name,
STRUCT_MEMBER(uint64_t, m, f[i].offset));
}
break;
case PROTOBUF_C_TYPE_SINT64:
case PROTOBUF_C_TYPE_SFIXED64:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
appendStringInfo(out,
"%*s%s: %ld\n",
level, "", f[i].name,
STRUCT_MEMBER(int64_t *, m, f[i].offset)[j]);
}
} else {
appendStringInfo(out,
"%*s%s: %ld\n",
level, "", f[i].name,
STRUCT_MEMBER(int64_t, m, f[i].offset));
}
break;
case PROTOBUF_C_TYPE_FLOAT:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
float_var = STRUCT_MEMBER(float *, m, f[i].offset)[j];
appendStringInfo(out,
"%*s%s: %g\n",
level, "", f[i].name,
float_var);
}
} else {
float_var = STRUCT_MEMBER(float, m, f[i].offset);
appendStringInfo(out,
"%*s%s: %g\n",
level, "", f[i].name,
float_var);
}
break;
case PROTOBUF_C_TYPE_DOUBLE:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
appendStringInfo(out,
"%*s%s: %g\n",
level, "", f[i].name,
STRUCT_MEMBER(double *, m, f[i].offset)[j]);
}
} else {
appendStringInfo(out,
"%*s%s: %g\n",
level, "", f[i].name,
STRUCT_MEMBER(double, m, f[i].offset));
}
break;
case PROTOBUF_C_TYPE_BOOL:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
appendStringInfo(out,
"%*s%s: %s\n",
level, "", f[i].name,
STRUCT_MEMBER(protobuf_c_boolean *, m, f[i].offset)[j]?
"true": "false");
}
} else {
appendStringInfo(out,
"%*s%s: %s\n",
level, "", f[i].name,
STRUCT_MEMBER(protobuf_c_boolean, m, f[i].offset)?
"true": "false");
}
break;
case PROTOBUF_C_TYPE_ENUM:
enumd = (ProtobufCEnumDescriptor *)f[i].descriptor;
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
enumv = protobuf_c_enum_descriptor_get_value(
enumd, STRUCT_MEMBER(int *, m, f[i].offset)[j]);
appendStringInfo(out,
"%*s%s: %s\n",
level, "", f[i].name,
enumv? enumv->name: "unknown");
}
} else {
enumv = protobuf_c_enum_descriptor_get_value(
enumd, STRUCT_MEMBER(int, m, f[i].offset));
appendStringInfo(out,
"%*s%s: %s\n",
level, "", f[i].name,
enumv? enumv->name: "unknown");
}
break;
case PROTOBUF_C_TYPE_STRING:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
unsigned char *escaped;
escaped = esc_str(
STRUCT_MEMBER(unsigned char **, m, f[i].offset)[j],
strlen(STRUCT_MEMBER(unsigned char **, m, f[i].offset)[j]));
if (!escaped) {
return;
}
appendStringInfo(out,
"%*s%s: \"%s\"\n", level, "", f[i].name, escaped);
pfree(escaped);
}
} else {
unsigned char *escaped;
escaped = esc_str(STRUCT_MEMBER(unsigned char *, m, f[i].offset),
strlen(STRUCT_MEMBER(unsigned char *, m, f[i].offset)));
if (!escaped) {
return;
}
appendStringInfo(out,
"%*s%s: \"%s\"\n", level, "", f[i].name, escaped);
pfree(escaped);
}
break;
case PROTOBUF_C_TYPE_BYTES:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0; j < quantifier_offset; j++) {
unsigned char *escaped;
escaped = esc_str(
STRUCT_MEMBER(ProtobufCBinaryData *, m, f[i].offset)[j].data,
STRUCT_MEMBER(ProtobufCBinaryData *, m, f[i].offset)[j].len);
if (!escaped) {
return;
}
appendStringInfo(out,
"%*s%s: \"%s\"\n", level, "", f[i].name, escaped);
pfree(escaped);
}
} else {
unsigned char *escaped;
escaped = esc_str(
STRUCT_MEMBER(ProtobufCBinaryData, m, f[i].offset).data,
STRUCT_MEMBER(ProtobufCBinaryData, m, f[i].offset).len);
if (!escaped) {
return;
}
appendStringInfo(out,
"%*s%s: \"%s\"\n", level, "", f[i].name, escaped);
pfree(escaped);
}
break;
case PROTOBUF_C_TYPE_MESSAGE:
if (f[i].label == PROTOBUF_C_LABEL_REPEATED) {
for (j = 0;
j < STRUCT_MEMBER(size_t, m, f[i].quantifier_offset);
j++) {
appendStringInfo(out,
"%*s%s {\n", level, "", f[i].name);
protobuf_c_text_to_string_internal(out, level + 2,
STRUCT_MEMBER(ProtobufCMessage **, m, f[i].offset)[j],
(ProtobufCMessageDescriptor *)f[i].descriptor);
appendStringInfo(out,
"%*s}\n", level, "");
}
} else {
appendStringInfo(out,
"%*s%s {\n", level, "", f[i].name);
protobuf_c_text_to_string_internal(out, level + 2,
STRUCT_MEMBER(ProtobufCMessage *, m, f[i].offset),
(ProtobufCMessageDescriptor *)f[i].descriptor);
appendStringInfo(out,
"%*s}\n", level, "");
}
break;
default:
return;
break;
}
}
}
static void protobuf_c_text_to_string(StringInfo out, ProtobufCMessage *m) {
protobuf_c_text_to_string_internal(out, 0, m, m->descriptor);
}

View File

@ -1,57 +0,0 @@
#ifndef PROTOBUF_C_TEXT_H
#define PROTOBUF_C_TEXT_H
#include "postgres.h"
#include "lib/stringinfo.h"
#include "protobuf-c/protobuf-c.h"
/* BEGIN MACROS TAKEN FROM protobuf-c/protobuf-c.c */
/**
* Internal `ProtobufCMessage` manipulation macro.
*
* Base macro for manipulating a `ProtobufCMessage`. Used by STRUCT_MEMBER() and
* STRUCT_MEMBER_PTR().
*/
#define STRUCT_MEMBER_P(struct_p, struct_offset) \
((void *) ((uint8_t *) (struct_p) + (struct_offset)))
/**
* Return field in a `ProtobufCMessage` based on offset.
*
* Take a pointer to a `ProtobufCMessage` and find the field at the offset.
* Cast it to the passed type.
*/
#define STRUCT_MEMBER(member_type, struct_p, struct_offset) \
(*(member_type *) STRUCT_MEMBER_P((struct_p), (struct_offset)))
/**
* Return field in a `ProtobufCMessage` based on offset.
*
* Take a pointer to a `ProtobufCMessage` and find the field at the offset. Cast
* it to a pointer to the passed type.
*/
#define STRUCT_MEMBER_PTR(member_type, struct_p, struct_offset) \
((member_type *) STRUCT_MEMBER_P((struct_p), (struct_offset)))
/* END MACROS FROM protobuf-c/protobuf-c.c */
/** Internal function to back API function.
*
* Has a few extra params to better enable recursion. This function gets
* called for each nested message as the \c ProtobufCMessage struct is
* traversed.
*
* \param[in,out] out The string being used for the text format protobuf.
* \param[in] level Indent level - increments in 2's.
* \param[in] m The \c ProtobufCMessage being serialised.
* \param[in] d The descriptor for the \c ProtobufCMessage.
*/
extern void protobuf_c_text_to_string_internal(StringInfo out,
int level,
ProtobufCMessage *m,
const ProtobufCMessageDescriptor *d);
extern void protobuf_c_text_to_string(StringInfo out, ProtobufCMessage *m);
#endif /* PROTOBUF_C_TEXT_H */