Merge pull request #4 from hchiorean/DBZ-3
DBZ-3 Changes the output of the plugin to include the FQN of the table and fixes various plugin issuespull/5/head v0.3.0
commit
04f1dc3c9e
|
@ -111,7 +111,7 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) {
|
|||
/* initialize this plugin */
|
||||
static void pg_decode_startup(LogicalDecodingContext *ctx,
|
||||
OutputPluginOptions *opt, bool is_init) {
|
||||
elog(INFO, "Entering startup callback");
|
||||
elog(DEBUG1, "Entering startup callback");
|
||||
|
||||
ListCell *option;
|
||||
DecoderData *data;
|
||||
|
@ -157,6 +157,7 @@ static void pg_decode_startup(LogicalDecodingContext *ctx,
|
|||
|
||||
/* cleanup this plugin's resources */
|
||||
static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
|
||||
elog(DEBUG1, "Entering decode_shutdown callback");
|
||||
DecoderData *data = ctx->output_plugin_private;
|
||||
|
||||
/* cleanup our own resources via memory context reset */
|
||||
|
@ -537,32 +538,52 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
|||
}
|
||||
}
|
||||
|
||||
/* convert a PG tuple to an array of DatumMessage(s) */
|
||||
static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
||||
Relation relation, HeapTuple tuple,
|
||||
TupleDesc tupdesc) {
|
||||
/* return the number of valid attributes from the tuple */
|
||||
static int valid_attributes_count_from(TupleDesc tupdesc) {
|
||||
int natt;
|
||||
|
||||
/* build column names and values */
|
||||
int count = 0;
|
||||
for (natt = 0; natt < tupdesc->natts; natt++) {
|
||||
Form_pg_attribute attr;
|
||||
Datum origval;
|
||||
bool isnull;
|
||||
|
||||
attr = tupdesc->attrs[natt];
|
||||
Form_pg_attribute attr = tupdesc->attrs[natt];
|
||||
|
||||
/* skip dropped columns and system columns */
|
||||
if (attr->attisdropped || attr->attnum < 0) {
|
||||
continue;
|
||||
}
|
||||
count++;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
/* convert a PG tuple to an array of DatumMessage(s) */
|
||||
static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
||||
Relation relation, HeapTuple tuple,
|
||||
TupleDesc tupdesc) {
|
||||
int natt;
|
||||
elog(DEBUG1, "processing tuple with %d columns", tupdesc->natts);
|
||||
/* build column names and values */
|
||||
for (natt = 0; natt < tupdesc->natts; natt++) {
|
||||
Form_pg_attribute attr;
|
||||
Datum origval;
|
||||
bool isnull;
|
||||
const char *attrName;
|
||||
|
||||
attr = tupdesc->attrs[natt];
|
||||
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
|
||||
|
||||
/* skip dropped columns and system columns */
|
||||
if (attr->attisdropped || attr->attnum < 0) {
|
||||
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
|
||||
continue;
|
||||
}
|
||||
|
||||
attrName = quote_identifier(NameStr(attr->attname));
|
||||
elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName);
|
||||
|
||||
/* set the column name */
|
||||
datum_msg.column_name = quote_identifier(NameStr(attr->attname));
|
||||
datum_msg.column_name = attrName;
|
||||
|
||||
/* set datum from tuple */
|
||||
origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull);
|
||||
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
|
||||
|
||||
/* get output function */
|
||||
datum_msg.column_type = attr->atttypid;
|
||||
|
@ -582,6 +603,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
|||
Datum val = PointerGetDatum(PG_DETOAST_DATUM(origval));
|
||||
set_datum_value(&datum_msg, attr->atttypid, typoutput, val);
|
||||
}
|
||||
} else {
|
||||
elog(DEBUG1, "column %s is null, ignoring value", attrName);
|
||||
}
|
||||
|
||||
tmsg[natt] = palloc(sizeof(datum_msg));
|
||||
|
@ -592,12 +615,14 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
|||
/* callback for individual changed tuples */
|
||||
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change) {
|
||||
elog(DEBUG1, "Entering decode_change callback");
|
||||
DecoderData *data;
|
||||
MemoryContext old;
|
||||
|
||||
Form_pg_class class_form;
|
||||
char replident = relation->rd_rel->relreplident;
|
||||
bool is_rel_non_selective;
|
||||
const char *selectiveInfo = is_rel_non_selective ? "non selective" : "selective";
|
||||
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
||||
class_form = RelationGetForm(relation);
|
||||
|
||||
|
@ -616,18 +641,22 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
rmsg.has_transaction_id = true;
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||
rmsg.has_commit_time = true;
|
||||
rmsg.table = pstrdup(NameStr(class_form->relname));
|
||||
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
|
||||
NameStr(class_form->relname)));
|
||||
|
||||
|
||||
/* decode different operation types */
|
||||
switch (change->action) {
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
elog(DEBUG1, "decoding INSERT for table %s; relation is %s", rmsg.table, selectiveInfo);
|
||||
rmsg.op = DECODERBUFS__OP__INSERT;
|
||||
rmsg.has_op = true;
|
||||
if (change->data.tp.newtuple != NULL) {
|
||||
elog(DEBUG1, "decoding new tuple information");
|
||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||
rmsg.n_new_tuple = tupdesc->natts;
|
||||
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
|
||||
rmsg.new_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
}
|
||||
|
@ -635,20 +664,23 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
rmsg.op = DECODERBUFS__OP__UPDATE;
|
||||
rmsg.has_op = true;
|
||||
elog(DEBUG1, "decoding UPDATE for table %s; relation is %s", rmsg.table, selectiveInfo);
|
||||
if (!is_rel_non_selective) {
|
||||
if (change->data.tp.oldtuple != NULL) {
|
||||
elog(DEBUG1, "decoding old tuple information");
|
||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||
rmsg.n_old_tuple = tupdesc->natts;
|
||||
rmsg.n_old_tuple = valid_attributes_count_from(tupdesc);
|
||||
rmsg.old_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
||||
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||
&change->data.tp.oldtuple->tuple, tupdesc);
|
||||
}
|
||||
if (change->data.tp.newtuple != NULL) {
|
||||
elog(DEBUG1, "decoding new tuple information");
|
||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||
rmsg.n_new_tuple = tupdesc->natts;
|
||||
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
|
||||
rmsg.new_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
}
|
||||
|
@ -657,17 +689,22 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
rmsg.op = DECODERBUFS__OP__DELETE;
|
||||
rmsg.has_op = true;
|
||||
elog(DEBUG1, "decoding DELETE for table %s; relation is %s", rmsg.table, selectiveInfo);
|
||||
/* if there was no PK, we only know that a delete happened */
|
||||
if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) {
|
||||
elog(DEBUG1, "decoding old tuple information");
|
||||
TupleDesc tupdesc = RelationGetDescr(relation);
|
||||
rmsg.n_old_tuple = tupdesc->natts;
|
||||
rmsg.n_old_tuple = valid_attributes_count_from(tupdesc);
|
||||
rmsg.old_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
||||
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||
&change->data.tp.oldtuple->tuple, tupdesc);
|
||||
} else {
|
||||
elog(WARNING, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
elog(WARNING, "unknown change action");
|
||||
Assert(0);
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue