From 3c910fff12ad68db0f2872d48ff06a04b4356465 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Mon, 1 Jul 2019 12:54:52 +0200 Subject: [PATCH] DB-1367 Sending marker for unchanged TOAST columns --- proto/pg_logicaldec.proto | 1 + src/decoderbufs.c | 57 +++++++++++++++++----------------- src/proto/pg_logicaldec.pb-c.c | 19 ++++++++++-- src/proto/pg_logicaldec.pb-c.h | 2 ++ 4 files changed, 48 insertions(+), 31 deletions(-) diff --git a/proto/pg_logicaldec.proto b/proto/pg_logicaldec.proto index aad1363..2f4a861 100644 --- a/proto/pg_logicaldec.proto +++ b/proto/pg_logicaldec.proto @@ -27,6 +27,7 @@ message DatumMessage { string datum_string = 8; bytes datum_bytes = 9; Point datum_point = 10; + bool datum_missing = 11; } } diff --git a/src/decoderbufs.c b/src/decoderbufs.c index 812defc..a63f83a 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -265,7 +265,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, TimeTzADT *timetz = NULL; Interval *interval = NULL; Decoderbufs__Point dp = DECODERBUFS__POINT__INIT; - + int size = 0; switch (typid) { case BOOLOID: @@ -293,7 +293,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, datum_msg->datum_double = DatumGetFloat8(datum); datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; break; - case CASHOID: + case CASHOID: datum_msg->datum_int64 = DatumGetCash(datum); datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; break; @@ -303,7 +303,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, case BPCHAROID: case TEXTOID: case JSONOID: - case JSONBOID: + case JSONBOID: case XMLOID: case BITOID: case VARBITOID: @@ -320,7 +320,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, } else { datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; - break; + break; } case TIMESTAMPTZOID: ts = DatumGetTimestampTz(datum); @@ -330,9 +330,9 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, } else { datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; - break; + break; } - case DATEOID: + case DATEOID: /* simply get the number of days as the stored 32 bit value and convert to EPOCH */ datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum)); datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32; @@ -340,18 +340,18 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, case TIMEOID: datum_msg->datum_int64 = DatumGetTimeADT(datum); datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; - break; + break; case TIMETZOID: timetz = DatumGetTimeTzADTP(datum); - /* use GMT-equivalent time */ + /* use GMT-equivalent time */ datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0)); datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; - break; + break; case INTERVALOID: interval = DatumGetIntervalP(datum); - /* - Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days. - */ + /* + Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days. + */ duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY); datum_msg->datum_double = duration; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; @@ -394,14 +394,14 @@ static int valid_attributes_count_from(TupleDesc tupdesc) { int count = 0; for (natt = 0; natt < tupdesc->natts; natt++) { Form_pg_attribute attr = TupleDescAttr(tupdesc, natt); - + /* skip dropped columns and system columns */ - if (attr->attisdropped || attr->attnum < 0) { + if (attr->attisdropped || attr->attnum < 0) { continue; - } - count++; + } + count++; } - return count; + return count; } /* convert a PG tuple to an array of DatumMessage(s) */ @@ -422,19 +422,19 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; attr = TupleDescAttr(tupdesc, natt); - + /* skip dropped columns and system columns */ if (attr->attisdropped || attr->attnum < 0) { elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column"); continue; - } + } - attrName = quote_identifier(NameStr(attr->attname)); - elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName); + attrName = quote_identifier(NameStr(attr->attname)); + elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName); /* set the column name */ datum_msg.column_name = (char *)attrName; - + /* set datum from tuple */ origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull); @@ -446,7 +446,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); if (!isnull) { if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) { - // TODO: Is there a way we can handle this? + datum_msg.datum_missing = true; + datum_msg.datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING; elog(DEBUG1, "Not handling external on disk varlena at the moment."); } else if (!typisvarlena) { set_datum_value(&datum_msg, attr->atttypid, typoutput, origval); @@ -455,7 +456,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, set_datum_value(&datum_msg, attr->atttypid, typoutput, val); } } else { - elog(DEBUG1, "column %s is null, ignoring value", attrName); + elog(DEBUG1, "column %s is null, ignoring value", attrName); } tmsg[valid_attr_cnt] = palloc(sizeof(datum_msg)); @@ -534,10 +535,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, rmsg.has_transaction_id = true; rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); rmsg.has_commit_time = true; - rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), NameStr(class_form->relname))); - - + + /* decode different operation types */ switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -611,7 +612,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } break; default: - elog(WARNING, "unknown change action"); + elog(WARNING, "unknown change action"); Assert(0); break; } diff --git a/src/proto/pg_logicaldec.pb-c.c b/src/proto/pg_logicaldec.pb-c.c index ea776f7..7f85065 100644 --- a/src/proto/pg_logicaldec.pb-c.c +++ b/src/proto/pg_logicaldec.pb-c.c @@ -230,7 +230,7 @@ const ProtobufCMessageDescriptor decoderbufs__point__descriptor = (ProtobufCMessageInit) decoderbufs__point__init, NULL,NULL,NULL /* reserved[123] */ }; -static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] = +static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[11] = { { "column_name", @@ -352,6 +352,18 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, + { + "datum_missing", + 11, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_BOOL, + offsetof(Decoderbufs__DatumMessage, datum_case), + offsetof(Decoderbufs__DatumMessage, datum_missing), + NULL, + NULL, + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, }; static const unsigned decoderbufs__datum_message__field_indices_by_name[] = { 0, /* field[0] = column_name */ @@ -362,13 +374,14 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = { 4, /* field[4] = datum_float */ 2, /* field[2] = datum_int32 */ 3, /* field[3] = datum_int64 */ + 10, /* field[10] = datum_missing */ 9, /* field[9] = datum_point */ 7, /* field[7] = datum_string */ }; static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] = { { 1, 0 }, - { 0, 10 } + { 0, 11 } }; const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = { @@ -378,7 +391,7 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = "Decoderbufs__DatumMessage", "decoderbufs", sizeof(Decoderbufs__DatumMessage), - 10, + 11, decoderbufs__datum_message__field_descriptors, decoderbufs__datum_message__field_indices_by_name, 1, decoderbufs__datum_message__number_ranges, diff --git a/src/proto/pg_logicaldec.pb-c.h b/src/proto/pg_logicaldec.pb-c.h index fe7978f..7082bdd 100644 --- a/src/proto/pg_logicaldec.pb-c.h +++ b/src/proto/pg_logicaldec.pb-c.h @@ -53,6 +53,7 @@ typedef enum { DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10, + DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING = 11, } Decoderbufs__DatumMessage__DatumCase; struct _Decoderbufs__DatumMessage @@ -71,6 +72,7 @@ struct _Decoderbufs__DatumMessage char *datum_string; ProtobufCBinaryData datum_bytes; Decoderbufs__Point *datum_point; + protobuf_c_boolean datum_missing; }; }; #define DECODERBUFS__DATUM_MESSAGE__INIT \