DB-1367 Sending marker for unchanged TOAST columns
parent
44cf35d4b5
commit
d01282a180
|
@ -27,6 +27,7 @@ message DatumMessage {
|
|||
string datum_string = 8;
|
||||
bytes datum_bytes = 9;
|
||||
Point datum_point = 10;
|
||||
bool datum_missing = 11;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -265,7 +265,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
|||
TimeTzADT *timetz = NULL;
|
||||
Interval *interval = NULL;
|
||||
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
|
||||
|
||||
|
||||
int size = 0;
|
||||
switch (typid) {
|
||||
case BOOLOID:
|
||||
|
@ -293,7 +293,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
|||
datum_msg->datum_double = DatumGetFloat8(datum);
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
|
||||
break;
|
||||
case CASHOID:
|
||||
case CASHOID:
|
||||
datum_msg->datum_int64 = DatumGetCash(datum);
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||
break;
|
||||
|
@ -303,7 +303,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
|||
case BPCHAROID:
|
||||
case TEXTOID:
|
||||
case JSONOID:
|
||||
case JSONBOID:
|
||||
case JSONBOID:
|
||||
case XMLOID:
|
||||
case BITOID:
|
||||
case VARBITOID:
|
||||
|
@ -320,7 +320,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
|||
} else {
|
||||
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||
break;
|
||||
break;
|
||||
}
|
||||
case TIMESTAMPTZOID:
|
||||
ts = DatumGetTimestampTz(datum);
|
||||
|
@ -330,9 +330,9 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
|||
} else {
|
||||
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||
break;
|
||||
break;
|
||||
}
|
||||
case DATEOID:
|
||||
case DATEOID:
|
||||
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */
|
||||
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
|
||||
|
@ -340,18 +340,18 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
|||
case TIMEOID:
|
||||
datum_msg->datum_int64 = DatumGetTimeADT(datum);
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||
break;
|
||||
break;
|
||||
case TIMETZOID:
|
||||
timetz = DatumGetTimeTzADTP(datum);
|
||||
/* use GMT-equivalent time */
|
||||
/* use GMT-equivalent time */
|
||||
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
|
||||
break;
|
||||
break;
|
||||
case INTERVALOID:
|
||||
interval = DatumGetIntervalP(datum);
|
||||
/*
|
||||
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
|
||||
*/
|
||||
/*
|
||||
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
|
||||
*/
|
||||
duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY);
|
||||
datum_msg->datum_double = duration;
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
|
||||
|
@ -394,14 +394,14 @@ static int valid_attributes_count_from(TupleDesc tupdesc) {
|
|||
int count = 0;
|
||||
for (natt = 0; natt < tupdesc->natts; natt++) {
|
||||
Form_pg_attribute attr = TupleDescAttr(tupdesc, natt);
|
||||
|
||||
|
||||
/* skip dropped columns and system columns */
|
||||
if (attr->attisdropped || attr->attnum < 0) {
|
||||
if (attr->attisdropped || attr->attnum < 0) {
|
||||
continue;
|
||||
}
|
||||
count++;
|
||||
}
|
||||
count++;
|
||||
}
|
||||
return count;
|
||||
return count;
|
||||
}
|
||||
|
||||
/* convert a PG tuple to an array of DatumMessage(s) */
|
||||
|
@ -422,19 +422,19 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
|||
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
|
||||
|
||||
attr = TupleDescAttr(tupdesc, natt);
|
||||
|
||||
|
||||
/* skip dropped columns and system columns */
|
||||
if (attr->attisdropped || attr->attnum < 0) {
|
||||
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
attrName = quote_identifier(NameStr(attr->attname));
|
||||
elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName);
|
||||
attrName = quote_identifier(NameStr(attr->attname));
|
||||
elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName);
|
||||
|
||||
/* set the column name */
|
||||
datum_msg.column_name = (char *)attrName;
|
||||
|
||||
|
||||
/* set datum from tuple */
|
||||
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
|
||||
|
||||
|
@ -446,7 +446,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
|||
getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
|
||||
if (!isnull) {
|
||||
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
|
||||
// TODO: Is there a way we can handle this?
|
||||
datum_msg.datum_missing = true;
|
||||
datum_msg.datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING;
|
||||
elog(DEBUG1, "Not handling external on disk varlena at the moment.");
|
||||
} else if (!typisvarlena) {
|
||||
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
|
||||
|
@ -455,7 +456,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
|||
set_datum_value(&datum_msg, attr->atttypid, typoutput, val);
|
||||
}
|
||||
} else {
|
||||
elog(DEBUG1, "column %s is null, ignoring value", attrName);
|
||||
elog(DEBUG1, "column %s is null, ignoring value", attrName);
|
||||
}
|
||||
|
||||
tmsg[valid_attr_cnt] = palloc(sizeof(datum_msg));
|
||||
|
@ -534,10 +535,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
rmsg.has_transaction_id = true;
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||
rmsg.has_commit_time = true;
|
||||
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
|
||||
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
|
||||
NameStr(class_form->relname)));
|
||||
|
||||
|
||||
|
||||
|
||||
/* decode different operation types */
|
||||
switch (change->action) {
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
|
@ -611,7 +612,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
}
|
||||
break;
|
||||
default:
|
||||
elog(WARNING, "unknown change action");
|
||||
elog(WARNING, "unknown change action");
|
||||
Assert(0);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -230,7 +230,7 @@ const ProtobufCMessageDescriptor decoderbufs__point__descriptor =
|
|||
(ProtobufCMessageInit) decoderbufs__point__init,
|
||||
NULL,NULL,NULL /* reserved[123] */
|
||||
};
|
||||
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] =
|
||||
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[11] =
|
||||
{
|
||||
{
|
||||
"column_name",
|
||||
|
@ -352,6 +352,18 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
|
|||
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
|
||||
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||
},
|
||||
{
|
||||
"datum_missing",
|
||||
11,
|
||||
PROTOBUF_C_LABEL_OPTIONAL,
|
||||
PROTOBUF_C_TYPE_BOOL,
|
||||
offsetof(Decoderbufs__DatumMessage, datum_case),
|
||||
offsetof(Decoderbufs__DatumMessage, datum_missing),
|
||||
NULL,
|
||||
NULL,
|
||||
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
|
||||
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||
},
|
||||
};
|
||||
static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
|
||||
0, /* field[0] = column_name */
|
||||
|
@ -362,13 +374,14 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
|
|||
4, /* field[4] = datum_float */
|
||||
2, /* field[2] = datum_int32 */
|
||||
3, /* field[3] = datum_int64 */
|
||||
10, /* field[10] = datum_missing */
|
||||
9, /* field[9] = datum_point */
|
||||
7, /* field[7] = datum_string */
|
||||
};
|
||||
static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] =
|
||||
{
|
||||
{ 1, 0 },
|
||||
{ 0, 10 }
|
||||
{ 0, 11 }
|
||||
};
|
||||
const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
|
||||
{
|
||||
|
@ -378,7 +391,7 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
|
|||
"Decoderbufs__DatumMessage",
|
||||
"decoderbufs",
|
||||
sizeof(Decoderbufs__DatumMessage),
|
||||
10,
|
||||
11,
|
||||
decoderbufs__datum_message__field_descriptors,
|
||||
decoderbufs__datum_message__field_indices_by_name,
|
||||
1, decoderbufs__datum_message__number_ranges,
|
||||
|
|
|
@ -53,6 +53,7 @@ typedef enum {
|
|||
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
|
||||
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
|
||||
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
|
||||
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING = 11,
|
||||
} Decoderbufs__DatumMessage__DatumCase;
|
||||
|
||||
struct _Decoderbufs__DatumMessage
|
||||
|
@ -71,6 +72,7 @@ struct _Decoderbufs__DatumMessage
|
|||
char *datum_string;
|
||||
ProtobufCBinaryData datum_bytes;
|
||||
Decoderbufs__Point *datum_point;
|
||||
protobuf_c_boolean datum_missing;
|
||||
};
|
||||
};
|
||||
#define DECODERBUFS__DATUM_MESSAGE__INIT \
|
||||
|
|
Loading…
Reference in New Issue