DB-1367 Sending marker for unchanged TOAST columns

pull/21/head v0.10.0.CR2
Gunnar Morling 2019-07-01 12:54:52 +02:00
parent 44cf35d4b5
commit 3c910fff12
4 changed files with 48 additions and 31 deletions

View File

@ -27,6 +27,7 @@ message DatumMessage {
string datum_string = 8;
bytes datum_bytes = 9;
Point datum_point = 10;
bool datum_missing = 11;
}
}

View File

@ -265,7 +265,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
TimeTzADT *timetz = NULL;
Interval *interval = NULL;
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
int size = 0;
switch (typid) {
case BOOLOID:
@ -293,7 +293,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_double = DatumGetFloat8(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case CASHOID:
case CASHOID:
datum_msg->datum_int64 = DatumGetCash(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
@ -303,7 +303,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case BPCHAROID:
case TEXTOID:
case JSONOID:
case JSONBOID:
case JSONBOID:
case XMLOID:
case BITOID:
case VARBITOID:
@ -320,7 +320,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
break;
}
case TIMESTAMPTZOID:
ts = DatumGetTimestampTz(datum);
@ -330,9 +330,9 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
break;
}
case DATEOID:
case DATEOID:
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
@ -340,18 +340,18 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case TIMEOID:
datum_msg->datum_int64 = DatumGetTimeADT(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
break;
case TIMETZOID:
timetz = DatumGetTimeTzADTP(datum);
/* use GMT-equivalent time */
/* use GMT-equivalent time */
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
break;
case INTERVALOID:
interval = DatumGetIntervalP(datum);
/*
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
*/
/*
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
*/
duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY);
datum_msg->datum_double = duration;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
@ -394,14 +394,14 @@ static int valid_attributes_count_from(TupleDesc tupdesc) {
int count = 0;
for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr = TupleDescAttr(tupdesc, natt);
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
if (attr->attisdropped || attr->attnum < 0) {
continue;
}
count++;
}
count++;
}
return count;
return count;
}
/* convert a PG tuple to an array of DatumMessage(s) */
@ -422,19 +422,19 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
attr = TupleDescAttr(tupdesc, natt);
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
continue;
}
}
attrName = quote_identifier(NameStr(attr->attname));
elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName);
attrName = quote_identifier(NameStr(attr->attname));
elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName);
/* set the column name */
datum_msg.column_name = (char *)attrName;
/* set datum from tuple */
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
@ -446,7 +446,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
if (!isnull) {
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
// TODO: Is there a way we can handle this?
datum_msg.datum_missing = true;
datum_msg.datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING;
elog(DEBUG1, "Not handling external on disk varlena at the moment.");
} else if (!typisvarlena) {
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
@ -455,7 +456,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
set_datum_value(&datum_msg, attr->atttypid, typoutput, val);
}
} else {
elog(DEBUG1, "column %s is null, ignoring value", attrName);
elog(DEBUG1, "column %s is null, ignoring value", attrName);
}
tmsg[valid_attr_cnt] = palloc(sizeof(datum_msg));
@ -534,10 +535,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.has_transaction_id = true;
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
rmsg.has_commit_time = true;
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname)));
/* decode different operation types */
switch (change->action) {
case REORDER_BUFFER_CHANGE_INSERT:
@ -611,7 +612,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
break;
default:
elog(WARNING, "unknown change action");
elog(WARNING, "unknown change action");
Assert(0);
break;
}

View File

@ -230,7 +230,7 @@ const ProtobufCMessageDescriptor decoderbufs__point__descriptor =
(ProtobufCMessageInit) decoderbufs__point__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] =
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[11] =
{
{
"column_name",
@ -352,6 +352,18 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"datum_missing",
11,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_BOOL,
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_missing),
NULL,
NULL,
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
0, /* field[0] = column_name */
@ -362,13 +374,14 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
4, /* field[4] = datum_float */
2, /* field[2] = datum_int32 */
3, /* field[3] = datum_int64 */
10, /* field[10] = datum_missing */
9, /* field[9] = datum_point */
7, /* field[7] = datum_string */
};
static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 10 }
{ 0, 11 }
};
const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
{
@ -378,7 +391,7 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
"Decoderbufs__DatumMessage",
"decoderbufs",
sizeof(Decoderbufs__DatumMessage),
10,
11,
decoderbufs__datum_message__field_descriptors,
decoderbufs__datum_message__field_indices_by_name,
1, decoderbufs__datum_message__number_ranges,

View File

@ -53,6 +53,7 @@ typedef enum {
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING = 11,
} Decoderbufs__DatumMessage__DatumCase;
struct _Decoderbufs__DatumMessage
@ -71,6 +72,7 @@ struct _Decoderbufs__DatumMessage
char *datum_string;
ProtobufCBinaryData datum_bytes;
Decoderbufs__Point *datum_point;
protobuf_c_boolean datum_missing;
};
};
#define DECODERBUFS__DATUM_MESSAGE__INIT \