Using clang-format to style nazi myself
parent
6a212e9e4c
commit
f9d69b3f1f
|
@ -25,7 +25,6 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <inttypes.h>
|
#include <inttypes.h>
|
||||||
#include <stdarg.h>
|
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
#include "funcapi.h"
|
#include "funcapi.h"
|
||||||
|
@ -50,11 +49,11 @@ PG_MODULE_MAGIC;
|
||||||
* which in this case is microseconds since epoch
|
* which in this case is microseconds since epoch
|
||||||
*/
|
*/
|
||||||
#ifdef HAVE_INT64_TIMESTAMP
|
#ifdef HAVE_INT64_TIMESTAMP
|
||||||
#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \
|
#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \
|
||||||
t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY * USECS_PER_SEC);
|
t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY *USECS_PER_SEC);
|
||||||
#else
|
#else
|
||||||
#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \
|
#define TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(t) \
|
||||||
(t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * 1000.0;
|
(t + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * 1000.0;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -135,8 +134,7 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
|
||||||
|
|
||||||
/* BEGIN callback */
|
/* BEGIN callback */
|
||||||
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
||||||
ReorderBufferTXN *txn) {
|
ReorderBufferTXN *txn) {}
|
||||||
}
|
|
||||||
|
|
||||||
/* COMMIT callback */
|
/* COMMIT callback */
|
||||||
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
||||||
|
@ -146,75 +144,76 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
||||||
/* convenience method to free up sub-messages */
|
/* convenience method to free up sub-messages */
|
||||||
static void free_row_msg_subs(Decoderbufs__RowMessage *msg) {
|
static void free_row_msg_subs(Decoderbufs__RowMessage *msg) {
|
||||||
if (!msg) {
|
if (!msg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pfree(msg->table);
|
pfree(msg->table);
|
||||||
if (msg->n_new_tuple > 0) {
|
if (msg->n_new_tuple > 0) {
|
||||||
for (int i=0; i < msg->n_new_tuple; i++) {
|
for (int i = 0; i < msg->n_new_tuple; i++) {
|
||||||
if (msg->new_tuple[i]) {
|
if (msg->new_tuple[i]) {
|
||||||
if (msg->new_tuple[i]->datum_string) {
|
if (msg->new_tuple[i]->datum_string) {
|
||||||
pfree(msg->new_tuple[i]->datum_string);
|
pfree(msg->new_tuple[i]->datum_string);
|
||||||
} else if (msg->new_tuple[i]->has_datum_bytes) {
|
} else if (msg->new_tuple[i]->has_datum_bytes) {
|
||||||
pfree(msg->new_tuple[i]->datum_bytes.data);
|
pfree(msg->new_tuple[i]->datum_bytes.data);
|
||||||
msg->new_tuple[i]->datum_bytes.data = NULL;
|
msg->new_tuple[i]->datum_bytes.data = NULL;
|
||||||
msg->new_tuple[i]->datum_bytes.len = 0;
|
msg->new_tuple[i]->datum_bytes.len = 0;
|
||||||
}
|
}
|
||||||
pfree(msg->new_tuple[i]);
|
pfree(msg->new_tuple[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pfree(msg->new_tuple);
|
pfree(msg->new_tuple);
|
||||||
}
|
}
|
||||||
if (msg->n_old_tuple > 0) {
|
if (msg->n_old_tuple > 0) {
|
||||||
for (int i=0; i < msg->n_old_tuple; i++) {
|
for (int i = 0; i < msg->n_old_tuple; i++) {
|
||||||
if (msg->old_tuple[i]) {
|
if (msg->old_tuple[i]) {
|
||||||
if (msg->old_tuple[i]->datum_string) {
|
if (msg->old_tuple[i]->datum_string) {
|
||||||
pfree(msg->old_tuple[i]->datum_string);
|
pfree(msg->old_tuple[i]->datum_string);
|
||||||
} else if (msg->old_tuple[i]->has_datum_bytes) {
|
} else if (msg->old_tuple[i]->has_datum_bytes) {
|
||||||
pfree(msg->old_tuple[i]->datum_bytes.data);
|
pfree(msg->old_tuple[i]->datum_bytes.data);
|
||||||
msg->old_tuple[i]->datum_bytes.data = NULL;
|
msg->old_tuple[i]->datum_bytes.data = NULL;
|
||||||
msg->old_tuple[i]->datum_bytes.len = 0;
|
msg->old_tuple[i]->datum_bytes.len = 0;
|
||||||
}
|
}
|
||||||
pfree(msg->old_tuple[i]);
|
pfree(msg->old_tuple[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pfree(msg->old_tuple);
|
pfree(msg->old_tuple);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup, size_t n) {
|
static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup,
|
||||||
if (tup) {
|
size_t n) {
|
||||||
for (int i=0; i < n; i++) {
|
if (tup) {
|
||||||
Decoderbufs__DatumMessage *dmsg = tup[i];
|
for (int i = 0; i < n; i++) {
|
||||||
if (dmsg->column_name)
|
Decoderbufs__DatumMessage *dmsg = tup[i];
|
||||||
appendStringInfo(out, "column_name[%s]", dmsg->column_name);
|
if (dmsg->column_name)
|
||||||
if (dmsg->has_column_type) {
|
appendStringInfo(out, "column_name[%s]", dmsg->column_name);
|
||||||
appendStringInfo(out, ", column_type[%" PRId64 "]", dmsg->column_type);
|
if (dmsg->has_column_type) {
|
||||||
switch (dmsg->column_type) {
|
appendStringInfo(out, ", column_type[%" PRId64 "]", dmsg->column_type);
|
||||||
case INT2OID:
|
switch (dmsg->column_type) {
|
||||||
case INT4OID:
|
case INT2OID:
|
||||||
appendStringInfo(out, ", datum[%d]", dmsg->datum_int32);
|
case INT4OID:
|
||||||
break;
|
appendStringInfo(out, ", datum[%d]", dmsg->datum_int32);
|
||||||
case INT8OID:
|
break;
|
||||||
appendStringInfo(out, ", datum[%" PRId64 "]", dmsg->datum_int64);
|
case INT8OID:
|
||||||
break;
|
appendStringInfo(out, ", datum[%" PRId64 "]", dmsg->datum_int64);
|
||||||
case FLOAT4OID:
|
break;
|
||||||
appendStringInfo(out, ", datum[%f]", dmsg->datum_float);
|
case FLOAT4OID:
|
||||||
break;
|
appendStringInfo(out, ", datum[%f]", dmsg->datum_float);
|
||||||
case FLOAT8OID:
|
break;
|
||||||
case NUMERICOID:
|
case FLOAT8OID:
|
||||||
appendStringInfo(out, ", datum[%f]", dmsg->datum_double);
|
case NUMERICOID:
|
||||||
break;
|
appendStringInfo(out, ", datum[%f]", dmsg->datum_double);
|
||||||
case TEXTOID:
|
break;
|
||||||
appendStringInfo(out, ", datum[%s]", dmsg->datum_string);
|
case TEXTOID:
|
||||||
break;
|
appendStringInfo(out, ", datum[%s]", dmsg->datum_string);
|
||||||
default:
|
break;
|
||||||
break;
|
default:
|
||||||
}
|
break;
|
||||||
appendStringInfo(out, "\n");
|
}
|
||||||
}
|
appendStringInfo(out, "\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* this doesn't seem to be available in the public api (unfortunate) */
|
/* this doesn't seem to be available in the public api (unfortunate) */
|
||||||
|
@ -308,8 +307,9 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Relation relation,
|
static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
||||||
HeapTuple tuple, TupleDesc tupdesc) {
|
Relation relation, HeapTuple tuple,
|
||||||
|
TupleDesc tupdesc) {
|
||||||
int natt;
|
int natt;
|
||||||
|
|
||||||
/* build column names and values */
|
/* build column names and values */
|
||||||
|
@ -345,7 +345,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Relation relati
|
||||||
if (!isnull) {
|
if (!isnull) {
|
||||||
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
|
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
|
||||||
// TODO: Is there a way we can handle this?
|
// TODO: Is there a way we can handle this?
|
||||||
elog(WARNING, "Not handling external on disk varlena at the moment.");
|
elog(WARNING, "Not handling external on disk varlena at the moment.");
|
||||||
} else if (!typisvarlena) {
|
} else if (!typisvarlena) {
|
||||||
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
|
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
|
||||||
} else {
|
} else {
|
||||||
|
@ -357,7 +357,6 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Relation relati
|
||||||
tmsg[natt] = palloc(sizeof(datum_msg));
|
tmsg[natt] = palloc(sizeof(datum_msg));
|
||||||
memcpy(tmsg[natt], &datum_msg, sizeof(datum_msg));
|
memcpy(tmsg[natt], &datum_msg, sizeof(datum_msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -397,10 +396,12 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
rmsg.op = DECODERBUFS__OP__INSERT;
|
rmsg.op = DECODERBUFS__OP__INSERT;
|
||||||
rmsg.has_op = true;
|
rmsg.has_op = true;
|
||||||
if (change->data.tp.newtuple != NULL) {
|
if (change->data.tp.newtuple != NULL) {
|
||||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||||
rmsg.n_new_tuple = tupdesc->natts;
|
rmsg.n_new_tuple = tupdesc->natts;
|
||||||
rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts);
|
rmsg.new_tuple =
|
||||||
tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc);
|
palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts);
|
||||||
|
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||||
|
&change->data.tp.newtuple->tuple, tupdesc);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||||
|
@ -410,14 +411,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
if (change->data.tp.oldtuple != NULL) {
|
if (change->data.tp.oldtuple != NULL) {
|
||||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||||
rmsg.n_old_tuple = tupdesc->natts;
|
rmsg.n_old_tuple = tupdesc->natts;
|
||||||
rmsg.old_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts);
|
rmsg.old_tuple =
|
||||||
tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc);
|
palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts);
|
||||||
|
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||||
|
&change->data.tp.oldtuple->tuple, tupdesc);
|
||||||
}
|
}
|
||||||
if (change->data.tp.newtuple != NULL) {
|
if (change->data.tp.newtuple != NULL) {
|
||||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||||
rmsg.n_new_tuple = tupdesc->natts;
|
rmsg.n_new_tuple = tupdesc->natts;
|
||||||
rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts);
|
rmsg.new_tuple =
|
||||||
tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc);
|
palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts);
|
||||||
|
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||||
|
&change->data.tp.newtuple->tuple, tupdesc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -426,10 +431,12 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
rmsg.has_op = true;
|
rmsg.has_op = true;
|
||||||
/* if there was no PK, we only know that a delete happened */
|
/* if there was no PK, we only know that a delete happened */
|
||||||
if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) {
|
if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) {
|
||||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||||
rmsg.n_old_tuple = tupdesc->natts;
|
rmsg.n_old_tuple = tupdesc->natts;
|
||||||
rmsg.old_tuple = palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts);
|
rmsg.old_tuple =
|
||||||
tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc);
|
palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts);
|
||||||
|
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||||
|
&change->data.tp.oldtuple->tuple, tupdesc);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -438,35 +445,35 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data->debug_mode) {
|
if (data->debug_mode) {
|
||||||
OutputPluginPrepareWrite(ctx, true);
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
if (rmsg.has_commit_time)
|
if (rmsg.has_commit_time)
|
||||||
appendStringInfo(ctx->out, "commit_time[%" PRId64 "]", rmsg.commit_time);
|
appendStringInfo(ctx->out, "commit_time[%" PRId64 "]", rmsg.commit_time);
|
||||||
if (rmsg.table)
|
if (rmsg.table)
|
||||||
appendStringInfo(ctx->out, ", table[%s]", rmsg.table);
|
appendStringInfo(ctx->out, ", table[%s]", rmsg.table);
|
||||||
if (rmsg.has_op)
|
if (rmsg.has_op)
|
||||||
appendStringInfo(ctx->out, ", op[%d]", rmsg.op);
|
appendStringInfo(ctx->out, ", op[%d]", rmsg.op);
|
||||||
if (rmsg.old_tuple) {
|
if (rmsg.old_tuple) {
|
||||||
appendStringInfo(ctx->out, "\nOLD TUPLE: \n");
|
appendStringInfo(ctx->out, "\nOLD TUPLE: \n");
|
||||||
print_tuple_msg(ctx->out, rmsg.old_tuple, rmsg.n_old_tuple);
|
print_tuple_msg(ctx->out, rmsg.old_tuple, rmsg.n_old_tuple);
|
||||||
appendStringInfo(ctx->out, "\n");
|
appendStringInfo(ctx->out, "\n");
|
||||||
}
|
}
|
||||||
if (rmsg.new_tuple) {
|
if (rmsg.new_tuple) {
|
||||||
appendStringInfo(ctx->out, "\nNEW TUPLE: \n");
|
appendStringInfo(ctx->out, "\nNEW TUPLE: \n");
|
||||||
print_tuple_msg(ctx->out, rmsg.new_tuple, rmsg.n_new_tuple);
|
print_tuple_msg(ctx->out, rmsg.new_tuple, rmsg.n_new_tuple);
|
||||||
appendStringInfo(ctx->out, "\n");
|
appendStringInfo(ctx->out, "\n");
|
||||||
}
|
}
|
||||||
OutputPluginWrite(ctx, true);
|
OutputPluginWrite(ctx, true);
|
||||||
} else {
|
} else {
|
||||||
OutputPluginPrepareWrite(ctx, true);
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
|
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
|
||||||
void *packed = palloc(psize);
|
void *packed = palloc(psize);
|
||||||
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
||||||
// appendBinaryStringInfo(ctx->out, (void *)psize, sizeof(psize));
|
// appendBinaryStringInfo(ctx->out, (void *)psize, sizeof(psize));
|
||||||
appendBinaryStringInfo(ctx->out, packed, ssize);
|
appendBinaryStringInfo(ctx->out, packed, ssize);
|
||||||
OutputPluginWrite(ctx, true);
|
OutputPluginWrite(ctx, true);
|
||||||
|
|
||||||
/* free packed buffer */
|
/* free packed buffer */
|
||||||
pfree(packed);
|
pfree(packed);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* cleanup msg */
|
/* cleanup msg */
|
||||||
|
|
Loading…
Reference in New Issue