From 88dfd7e594d600f703b1d0764523a2b0370ee3fd Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Mon, 27 Nov 2017 14:18:23 +0100 Subject: [PATCH] DBZ-485 Replication message provides full type info --- proto/pg_logicaldec.proto | 5 ++ src/decoderbufs.c | 46 +++++++++++++++ src/proto/pg_logicaldec.pb-c.c | 100 ++++++++++++++++++++++++++++++++- src/proto/pg_logicaldec.pb-c.h | 42 +++++++++++++- 4 files changed, 187 insertions(+), 6 deletions(-) diff --git a/proto/pg_logicaldec.proto b/proto/pg_logicaldec.proto index e9b8a75..5a4504c 100644 --- a/proto/pg_logicaldec.proto +++ b/proto/pg_logicaldec.proto @@ -30,6 +30,10 @@ message DatumMessage { } } +message TypeInfo { + required string modifier = 1; +} + message RowMessage { optional uint32 transaction_id = 1; optional uint64 commit_time = 2; @@ -37,4 +41,5 @@ message RowMessage { optional Op op = 4; repeated DatumMessage new_tuple = 5; repeated DatumMessage old_tuple = 6; + repeated TypeInfo new_typeinfo = 7; } diff --git a/src/decoderbufs.c b/src/decoderbufs.c index f31ef3b..9c29864 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -628,6 +628,38 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, } } +/* provide a metadata for new tuple */ +static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg, + Relation relation, HeapTuple tuple, + TupleDesc tupdesc) { + int natt; + int valid_attr_cnt = 0; + elog(DEBUG1, "Adding metadata for %d columns", tupdesc->natts); + /* build column names and values */ + for (natt = 0; natt < tupdesc->natts; natt++) { + Form_pg_attribute attr; + char *typ_mod; + Decoderbufs__TypeInfo typeinfo = DECODERBUFS__TYPE_INFO__INIT; + + attr = tupdesc->attrs[natt]; + + /* skip dropped columns and system columns */ + if (attr->attisdropped || attr->attnum < 0) { + elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column"); + continue; + } + + typ_mod = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod)); + elog(DEBUG1, "Adding typemodifier '%s' for column %d", typ_mod, natt); + + typeinfo.modifier = typ_mod; + tmsg[valid_attr_cnt] = palloc(sizeof(typeinfo)); + memcpy(tmsg[valid_attr_cnt], &typeinfo, sizeof(typeinfo)); + + valid_attr_cnt++; + } +} + /* callback for individual changed tuples */ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { @@ -676,11 +708,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (change->data.tp.newtuple != NULL) { elog(DEBUG1, "decoding new tuple information"); tupdesc = RelationGetDescr(relation); + rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); + + rmsg.n_new_typeinfo = rmsg.n_new_tuple; + rmsg.new_typeinfo = + palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo); + add_metadata_to_msg(rmsg.new_typeinfo, relation, + &change->data.tp.newtuple->tuple, tupdesc); } break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -700,11 +739,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (change->data.tp.newtuple != NULL) { elog(DEBUG1, "decoding new tuple information"); tupdesc = RelationGetDescr(relation); + rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.new_tuple = palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); + + rmsg.n_new_typeinfo = rmsg.n_new_tuple; + rmsg.new_typeinfo = + palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo); + add_metadata_to_msg(rmsg.new_typeinfo, relation, + &change->data.tp.newtuple->tuple, tupdesc); } } break; diff --git a/src/proto/pg_logicaldec.pb-c.c b/src/proto/pg_logicaldec.pb-c.c index 99c4779..98226e2 100644 --- a/src/proto/pg_logicaldec.pb-c.c +++ b/src/proto/pg_logicaldec.pb-c.c @@ -93,6 +93,49 @@ void decoderbufs__datum_message__free_unpacked assert(message->base.descriptor == &decoderbufs__datum_message__descriptor); protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); } +void decoderbufs__type_info__init + (Decoderbufs__TypeInfo *message) +{ + static Decoderbufs__TypeInfo init_value = DECODERBUFS__TYPE_INFO__INIT; + *message = init_value; +} +size_t decoderbufs__type_info__get_packed_size + (const Decoderbufs__TypeInfo *message) +{ + assert(message->base.descriptor == &decoderbufs__type_info__descriptor); + return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message)); +} +size_t decoderbufs__type_info__pack + (const Decoderbufs__TypeInfo *message, + uint8_t *out) +{ + assert(message->base.descriptor == &decoderbufs__type_info__descriptor); + return protobuf_c_message_pack ((const ProtobufCMessage*)message, out); +} +size_t decoderbufs__type_info__pack_to_buffer + (const Decoderbufs__TypeInfo *message, + ProtobufCBuffer *buffer) +{ + assert(message->base.descriptor == &decoderbufs__type_info__descriptor); + return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer); +} +Decoderbufs__TypeInfo * + decoderbufs__type_info__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data) +{ + return (Decoderbufs__TypeInfo *) + protobuf_c_message_unpack (&decoderbufs__type_info__descriptor, + allocator, len, data); +} +void decoderbufs__type_info__free_unpacked + (Decoderbufs__TypeInfo *message, + ProtobufCAllocator *allocator) +{ + assert(message->base.descriptor == &decoderbufs__type_info__descriptor); + protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); +} void decoderbufs__row_message__init (Decoderbufs__RowMessage *message) { @@ -342,7 +385,45 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = (ProtobufCMessageInit) decoderbufs__datum_message__init, NULL,NULL,NULL /* reserved[123] */ }; -static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[6] = +static const ProtobufCFieldDescriptor decoderbufs__type_info__field_descriptors[1] = +{ + { + "modifier", + 1, + PROTOBUF_C_LABEL_REQUIRED, + PROTOBUF_C_TYPE_STRING, + 0, /* quantifier_offset */ + offsetof(Decoderbufs__TypeInfo, modifier), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, +}; +static const unsigned decoderbufs__type_info__field_indices_by_name[] = { + 0, /* field[0] = modifier */ +}; +static const ProtobufCIntRange decoderbufs__type_info__number_ranges[1 + 1] = +{ + { 1, 0 }, + { 0, 1 } +}; +const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor = +{ + PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC, + "decoderbufs.TypeInfo", + "TypeInfo", + "Decoderbufs__TypeInfo", + "decoderbufs", + sizeof(Decoderbufs__TypeInfo), + 1, + decoderbufs__type_info__field_descriptors, + decoderbufs__type_info__field_indices_by_name, + 1, decoderbufs__type_info__number_ranges, + (ProtobufCMessageInit) decoderbufs__type_info__init, + NULL,NULL,NULL /* reserved[123] */ +}; +static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[7] = { { "transaction_id", @@ -416,10 +497,23 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor 0, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, + { + "new_typeinfo", + 7, + PROTOBUF_C_LABEL_REPEATED, + PROTOBUF_C_TYPE_MESSAGE, + offsetof(Decoderbufs__RowMessage, n_new_typeinfo), + offsetof(Decoderbufs__RowMessage, new_typeinfo), + &decoderbufs__type_info__descriptor, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, }; static const unsigned decoderbufs__row_message__field_indices_by_name[] = { 1, /* field[1] = commit_time */ 4, /* field[4] = new_tuple */ + 6, /* field[6] = new_typeinfo */ 5, /* field[5] = old_tuple */ 3, /* field[3] = op */ 2, /* field[2] = table */ @@ -428,7 +522,7 @@ static const unsigned decoderbufs__row_message__field_indices_by_name[] = { static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] = { { 1, 0 }, - { 0, 6 } + { 0, 7 } }; const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor = { @@ -438,7 +532,7 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor = "Decoderbufs__RowMessage", "decoderbufs", sizeof(Decoderbufs__RowMessage), - 6, + 7, decoderbufs__row_message__field_descriptors, decoderbufs__row_message__field_indices_by_name, 1, decoderbufs__row_message__number_ranges, diff --git a/src/proto/pg_logicaldec.pb-c.h b/src/proto/pg_logicaldec.pb-c.h index 17206dd..4e17513 100644 --- a/src/proto/pg_logicaldec.pb-c.h +++ b/src/proto/pg_logicaldec.pb-c.h @@ -10,13 +10,14 @@ PROTOBUF_C__BEGIN_DECLS #if PROTOBUF_C_VERSION_NUMBER < 1000000 # error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers. -#elif 1001001 < PROTOBUF_C_MIN_COMPILER_VERSION +#elif 1002001 < PROTOBUF_C_MIN_COMPILER_VERSION # error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c. #endif typedef struct _Decoderbufs__Point Decoderbufs__Point; typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage; +typedef struct _Decoderbufs__TypeInfo Decoderbufs__TypeInfo; typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage; @@ -74,7 +75,17 @@ struct _Decoderbufs__DatumMessage }; #define DECODERBUFS__DATUM_MESSAGE__INIT \ { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \ - , NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {} } + , NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {0} } + + +struct _Decoderbufs__TypeInfo +{ + ProtobufCMessage base; + char *modifier; +}; +#define DECODERBUFS__TYPE_INFO__INIT \ + { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__type_info__descriptor) \ + , NULL } struct _Decoderbufs__RowMessage @@ -91,10 +102,12 @@ struct _Decoderbufs__RowMessage Decoderbufs__DatumMessage **new_tuple; size_t n_old_tuple; Decoderbufs__DatumMessage **old_tuple; + size_t n_new_typeinfo; + Decoderbufs__TypeInfo **new_typeinfo; }; #define DECODERBUFS__ROW_MESSAGE__INIT \ { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \ - , 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL } + , 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL, 0,NULL } /* Decoderbufs__Point methods */ @@ -135,6 +148,25 @@ Decoderbufs__DatumMessage * void decoderbufs__datum_message__free_unpacked (Decoderbufs__DatumMessage *message, ProtobufCAllocator *allocator); +/* Decoderbufs__TypeInfo methods */ +void decoderbufs__type_info__init + (Decoderbufs__TypeInfo *message); +size_t decoderbufs__type_info__get_packed_size + (const Decoderbufs__TypeInfo *message); +size_t decoderbufs__type_info__pack + (const Decoderbufs__TypeInfo *message, + uint8_t *out); +size_t decoderbufs__type_info__pack_to_buffer + (const Decoderbufs__TypeInfo *message, + ProtobufCBuffer *buffer); +Decoderbufs__TypeInfo * + decoderbufs__type_info__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data); +void decoderbufs__type_info__free_unpacked + (Decoderbufs__TypeInfo *message, + ProtobufCAllocator *allocator); /* Decoderbufs__RowMessage methods */ void decoderbufs__row_message__init (Decoderbufs__RowMessage *message); @@ -162,6 +194,9 @@ typedef void (*Decoderbufs__Point_Closure) typedef void (*Decoderbufs__DatumMessage_Closure) (const Decoderbufs__DatumMessage *message, void *closure_data); +typedef void (*Decoderbufs__TypeInfo_Closure) + (const Decoderbufs__TypeInfo *message, + void *closure_data); typedef void (*Decoderbufs__RowMessage_Closure) (const Decoderbufs__RowMessage *message, void *closure_data); @@ -174,6 +209,7 @@ typedef void (*Decoderbufs__RowMessage_Closure) extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor; +extern const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor; PROTOBUF_C__END_DECLS