DBZ-3 Changes the output of the plugin to include the FQN of the table and fixes plugin issues regarding dropped columns

pull/4/head
Horia Chiorean 2016-11-08 17:08:59 +02:00
parent 1baad6ecc6
commit 69ff80aee0
1 changed files with 55 additions and 18 deletions

View File

@ -111,7 +111,7 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) {
/* initialize this plugin */
static void pg_decode_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt, bool is_init) {
elog(INFO, "Entering startup callback");
elog(DEBUG1, "Entering startup callback");
ListCell *option;
DecoderData *data;
@ -157,6 +157,7 @@ static void pg_decode_startup(LogicalDecodingContext *ctx,
/* cleanup this plugin's resources */
static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
elog(DEBUG1, "Entering decode_shutdown callback");
DecoderData *data = ctx->output_plugin_private;
/* cleanup our own resources via memory context reset */
@ -537,32 +538,52 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
}
}
/* return the number of valid attributes from the tuple */
static int valid_attributes_count_from(TupleDesc tupdesc) {
int natt;
int count = 0;
for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr = tupdesc->attrs[natt];
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
continue;
}
count++;
}
return count;
}
/* convert a PG tuple to an array of DatumMessage(s) */
static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
Relation relation, HeapTuple tuple,
TupleDesc tupdesc) {
int natt;
elog(DEBUG1, "processing tuple with %d columns", tupdesc->natts);
/* build column names and values */
for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr;
Datum origval;
bool isnull;
const char *attrName;
attr = tupdesc->attrs[natt];
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
continue;
}
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
attrName = quote_identifier(NameStr(attr->attname));
elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName);
/* set the column name */
datum_msg.column_name = quote_identifier(NameStr(attr->attname));
datum_msg.column_name = attrName;
/* set datum from tuple */
origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull);
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
/* get output function */
datum_msg.column_type = attr->atttypid;
@ -582,6 +603,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
Datum val = PointerGetDatum(PG_DETOAST_DATUM(origval));
set_datum_value(&datum_msg, attr->atttypid, typoutput, val);
}
} else {
elog(DEBUG1, "column %s is null, ignoring value", attrName);
}
tmsg[natt] = palloc(sizeof(datum_msg));
@ -592,12 +615,14 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
/* callback for individual changed tuples */
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) {
elog(DEBUG1, "Entering decode_change callback");
DecoderData *data;
MemoryContext old;
Form_pg_class class_form;
char replident = relation->rd_rel->relreplident;
bool is_rel_non_selective;
const char *selectiveInfo = is_rel_non_selective ? "non selective" : "selective";
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
class_form = RelationGetForm(relation);
@ -616,18 +641,22 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.has_transaction_id = true;
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
rmsg.has_commit_time = true;
rmsg.table = pstrdup(NameStr(class_form->relname));
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname)));
/* decode different operation types */
switch (change->action) {
case REORDER_BUFFER_CHANGE_INSERT:
elog(DEBUG1, "decoding INSERT for table %s; relation is %s", rmsg.table, selectiveInfo);
rmsg.op = DECODERBUFS__OP__INSERT;
rmsg.has_op = true;
if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information");
TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = tupdesc->natts;
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc);
}
@ -635,20 +664,23 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_UPDATE:
rmsg.op = DECODERBUFS__OP__UPDATE;
rmsg.has_op = true;
elog(DEBUG1, "decoding UPDATE for table %s; relation is %s", rmsg.table, selectiveInfo);
if (!is_rel_non_selective) {
if (change->data.tp.oldtuple != NULL) {
elog(DEBUG1, "decoding old tuple information");
TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_old_tuple = tupdesc->natts;
rmsg.n_old_tuple = valid_attributes_count_from(tupdesc);
rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc);
}
if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information");
TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = tupdesc->natts;
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc);
}
@ -657,17 +689,22 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_DELETE:
rmsg.op = DECODERBUFS__OP__DELETE;
rmsg.has_op = true;
elog(DEBUG1, "decoding DELETE for table %s; relation is %s", rmsg.table, selectiveInfo);
/* if there was no PK, we only know that a delete happened */
if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) {
elog(DEBUG1, "decoding old tuple information");
TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_old_tuple = tupdesc->natts;
rmsg.n_old_tuple = valid_attributes_count_from(tupdesc);
rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc);
} else {
elog(WARNING, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
}
break;
default:
elog(WARNING, "unknown change action");
Assert(0);
break;
}