From f9d69b3f1f4f1681b43b15ac98e84530cf803e78 Mon Sep 17 00:00:00 2001 From: Xavier Stevens Date: Mon, 22 Sep 2014 16:26:50 -0700 Subject: [PATCH] Using clang-format to style nazi myself --- src/decoderbufs.c | 227 ++++++++++++++++++++++++---------------------- 1 file changed, 117 insertions(+), 110 deletions(-) diff --git a/src/decoderbufs.c b/src/decoderbufs.c index 36a7450..e17473f 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -25,7 +25,6 @@ */ #include -#include #include "postgres.h" #include "funcapi.h" @@ -50,11 +49,11 @@ PG_MODULE_MAGIC; * which in this case is microseconds since epoch */ #ifdef HAVE_INT64_TIMESTAMP -#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \ - t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY * USECS_PER_SEC); +#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \ + t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY *USECS_PER_SEC); #else -#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \ - (t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * 1000.0; +#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \ + (t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * 1000.0; #endif typedef struct { @@ -135,8 +134,7 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) { /* BEGIN callback */ static void pg_decode_begin_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn) { -} + ReorderBufferTXN *txn) {} /* COMMIT callback */ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, @@ -146,75 +144,76 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, /* convenience method to free up sub-messages */ static void free_row_msg_subs(Decoderbufs__RowMessage *msg) { if (!msg) { - return; + return; } pfree(msg->table); if (msg->n_new_tuple > 0) { - for (int i=0; i < msg->n_new_tuple; i++) { - if (msg->new_tuple[i]) { - if (msg->new_tuple[i]->datum_string) { - pfree(msg->new_tuple[i]->datum_string); - } else if (msg->new_tuple[i]->has_datum_bytes) { - pfree(msg->new_tuple[i]->datum_bytes.data); - msg->new_tuple[i]->datum_bytes.data = NULL; - msg->new_tuple[i]->datum_bytes.len = 0; - } - pfree(msg->new_tuple[i]); - } - } - pfree(msg->new_tuple); + for (int i = 0; i < msg->n_new_tuple; i++) { + if (msg->new_tuple[i]) { + if (msg->new_tuple[i]->datum_string) { + pfree(msg->new_tuple[i]->datum_string); + } else if (msg->new_tuple[i]->has_datum_bytes) { + pfree(msg->new_tuple[i]->datum_bytes.data); + msg->new_tuple[i]->datum_bytes.data = NULL; + msg->new_tuple[i]->datum_bytes.len = 0; + } + pfree(msg->new_tuple[i]); + } + } + pfree(msg->new_tuple); } if (msg->n_old_tuple > 0) { - for (int i=0; i < msg->n_old_tuple; i++) { - if (msg->old_tuple[i]) { - if (msg->old_tuple[i]->datum_string) { - pfree(msg->old_tuple[i]->datum_string); - } else if (msg->old_tuple[i]->has_datum_bytes) { - pfree(msg->old_tuple[i]->datum_bytes.data); - msg->old_tuple[i]->datum_bytes.data = NULL; - msg->old_tuple[i]->datum_bytes.len = 0; - } - pfree(msg->old_tuple[i]); - } - } - pfree(msg->old_tuple); + for (int i = 0; i < msg->n_old_tuple; i++) { + if (msg->old_tuple[i]) { + if (msg->old_tuple[i]->datum_string) { + pfree(msg->old_tuple[i]->datum_string); + } else if (msg->old_tuple[i]->has_datum_bytes) { + pfree(msg->old_tuple[i]->datum_bytes.data); + msg->old_tuple[i]->datum_bytes.data = NULL; + msg->old_tuple[i]->datum_bytes.len = 0; + } + pfree(msg->old_tuple[i]); + } + } + pfree(msg->old_tuple); } } -static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup, size_t n) { - if (tup) { - for (int i=0; i < n; i++) { - Decoderbufs__DatumMessage *dmsg = tup[i]; - if (dmsg->column_name) - appendStringInfo(out, "column_name[%s]", dmsg->column_name); - if (dmsg->has_column_type) { - appendStringInfo(out, ", column_type[%" PRId64 "]", dmsg->column_type); - switch (dmsg->column_type) { - case INT2OID: - case INT4OID: - appendStringInfo(out, ", datum[%d]", dmsg->datum_int32); - break; - case INT8OID: - appendStringInfo(out, ", datum[%" PRId64 "]", dmsg->datum_int64); - break; - case FLOAT4OID: - appendStringInfo(out, ", datum[%f]", dmsg->datum_float); - break; - case FLOAT8OID: - case NUMERICOID: - appendStringInfo(out, ", datum[%f]", dmsg->datum_double); - break; - case TEXTOID: - appendStringInfo(out, ", datum[%s]", dmsg->datum_string); - break; - default: - break; - } - appendStringInfo(out, "\n"); - } - } - } +static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup, + size_t n) { + if (tup) { + for (int i = 0; i < n; i++) { + Decoderbufs__DatumMessage *dmsg = tup[i]; + if (dmsg->column_name) + appendStringInfo(out, "column_name[%s]", dmsg->column_name); + if (dmsg->has_column_type) { + appendStringInfo(out, ", column_type[%" PRId64 "]", dmsg->column_type); + switch (dmsg->column_type) { + case INT2OID: + case INT4OID: + appendStringInfo(out, ", datum[%d]", dmsg->datum_int32); + break; + case INT8OID: + appendStringInfo(out, ", datum[%" PRId64 "]", dmsg->datum_int64); + break; + case FLOAT4OID: + appendStringInfo(out, ", datum[%f]", dmsg->datum_float); + break; + case FLOAT8OID: + case NUMERICOID: + appendStringInfo(out, ", datum[%f]", dmsg->datum_double); + break; + case TEXTOID: + appendStringInfo(out, ", datum[%s]", dmsg->datum_string); + break; + default: + break; + } + appendStringInfo(out, "\n"); + } + } + } } /* this doesn't seem to be available in the public api (unfortunate) */ @@ -308,8 +307,9 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, } } -static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Relation relation, - HeapTuple tuple, TupleDesc tupdesc) { +static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, + Relation relation, HeapTuple tuple, + TupleDesc tupdesc) { int natt; /* build column names and values */ @@ -345,7 +345,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Relation relati if (!isnull) { if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) { // TODO: Is there a way we can handle this? - elog(WARNING, "Not handling external on disk varlena at the moment."); + elog(WARNING, "Not handling external on disk varlena at the moment."); } else if (!typisvarlena) { set_datum_value(&datum_msg, attr->atttypid, typoutput, origval); } else { @@ -357,7 +357,6 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Relation relati tmsg[natt] = palloc(sizeof(datum_msg)); memcpy(tmsg[natt], &datum_msg, sizeof(datum_msg)); } - } /* @@ -397,10 +396,12 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, rmsg.op = DECODERBUFS__OP__INSERT; rmsg.has_op = true; if (change->data.tp.newtuple != NULL) { - TupleDesc tupdesc = RelationGetDescr(relation); - rmsg.n_new_tuple = tupdesc->natts; - rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); - tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); + TupleDesc tupdesc = RelationGetDescr(relation); + rmsg.n_new_tuple = tupdesc->natts; + rmsg.new_tuple = + palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + tuple_to_tuple_msg(rmsg.new_tuple, relation, + &change->data.tp.newtuple->tuple, tupdesc); } break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -410,14 +411,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (change->data.tp.oldtuple != NULL) { TupleDesc tupdesc = RelationGetDescr(relation); rmsg.n_old_tuple = tupdesc->natts; - rmsg.old_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); - tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc); + rmsg.old_tuple = + palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + tuple_to_tuple_msg(rmsg.old_tuple, relation, + &change->data.tp.oldtuple->tuple, tupdesc); } if (change->data.tp.newtuple != NULL) { TupleDesc tupdesc = RelationGetDescr(relation); rmsg.n_new_tuple = tupdesc->natts; - rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); - tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); + rmsg.new_tuple = + palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + tuple_to_tuple_msg(rmsg.new_tuple, relation, + &change->data.tp.newtuple->tuple, tupdesc); } } break; @@ -426,10 +431,12 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, rmsg.has_op = true; /* if there was no PK, we only know that a delete happened */ if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) { - TupleDesc tupdesc = RelationGetDescr(relation); - rmsg.n_old_tuple = tupdesc->natts; - rmsg.old_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); - tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc); + TupleDesc tupdesc = RelationGetDescr(relation); + rmsg.n_old_tuple = tupdesc->natts; + rmsg.old_tuple = + palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + tuple_to_tuple_msg(rmsg.old_tuple, relation, + &change->data.tp.oldtuple->tuple, tupdesc); } break; default: @@ -438,35 +445,35 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } if (data->debug_mode) { - OutputPluginPrepareWrite(ctx, true); - if (rmsg.has_commit_time) - appendStringInfo(ctx->out, "commit_time[%" PRId64 "]", rmsg.commit_time); - if (rmsg.table) - appendStringInfo(ctx->out, ", table[%s]", rmsg.table); - if (rmsg.has_op) - appendStringInfo(ctx->out, ", op[%d]", rmsg.op); - if (rmsg.old_tuple) { - appendStringInfo(ctx->out, "\nOLD TUPLE: \n"); - print_tuple_msg(ctx->out, rmsg.old_tuple, rmsg.n_old_tuple); - appendStringInfo(ctx->out, "\n"); - } - if (rmsg.new_tuple) { - appendStringInfo(ctx->out, "\nNEW TUPLE: \n"); - print_tuple_msg(ctx->out, rmsg.new_tuple, rmsg.n_new_tuple); - appendStringInfo(ctx->out, "\n"); - } - OutputPluginWrite(ctx, true); + OutputPluginPrepareWrite(ctx, true); + if (rmsg.has_commit_time) + appendStringInfo(ctx->out, "commit_time[%" PRId64 "]", rmsg.commit_time); + if (rmsg.table) + appendStringInfo(ctx->out, ", table[%s]", rmsg.table); + if (rmsg.has_op) + appendStringInfo(ctx->out, ", op[%d]", rmsg.op); + if (rmsg.old_tuple) { + appendStringInfo(ctx->out, "\nOLD TUPLE: \n"); + print_tuple_msg(ctx->out, rmsg.old_tuple, rmsg.n_old_tuple); + appendStringInfo(ctx->out, "\n"); + } + if (rmsg.new_tuple) { + appendStringInfo(ctx->out, "\nNEW TUPLE: \n"); + print_tuple_msg(ctx->out, rmsg.new_tuple, rmsg.n_new_tuple); + appendStringInfo(ctx->out, "\n"); + } + OutputPluginWrite(ctx, true); } else { - OutputPluginPrepareWrite(ctx, true); - size_t psize = decoderbufs__row_message__get_packed_size(&rmsg); - void *packed = palloc(psize); - size_t ssize = decoderbufs__row_message__pack(&rmsg, packed); - // appendBinaryStringInfo(ctx->out, (void *)psize, sizeof(psize)); - appendBinaryStringInfo(ctx->out, packed, ssize); - OutputPluginWrite(ctx, true); + OutputPluginPrepareWrite(ctx, true); + size_t psize = decoderbufs__row_message__get_packed_size(&rmsg); + void *packed = palloc(psize); + size_t ssize = decoderbufs__row_message__pack(&rmsg, packed); + // appendBinaryStringInfo(ctx->out, (void *)psize, sizeof(psize)); + appendBinaryStringInfo(ctx->out, packed, ssize); + OutputPluginWrite(ctx, true); - /* free packed buffer */ - pfree(packed); + /* free packed buffer */ + pfree(packed); } /* cleanup msg */