DBZ-485 Replication message provides full type info

pull/10/head
Jiri Pechanec 2017-11-27 14:18:23 +01:00
parent 6c1723a179
commit 88dfd7e594
4 changed files with 187 additions and 6 deletions

View File

@ -30,6 +30,10 @@ message DatumMessage {
} }
} }
message TypeInfo {
required string modifier = 1;
}
message RowMessage { message RowMessage {
optional uint32 transaction_id = 1; optional uint32 transaction_id = 1;
optional uint64 commit_time = 2; optional uint64 commit_time = 2;
@ -37,4 +41,5 @@ message RowMessage {
optional Op op = 4; optional Op op = 4;
repeated DatumMessage new_tuple = 5; repeated DatumMessage new_tuple = 5;
repeated DatumMessage old_tuple = 6; repeated DatumMessage old_tuple = 6;
repeated TypeInfo new_typeinfo = 7;
} }

View File

@ -628,6 +628,38 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
} }
} }
/* provide a metadata for new tuple */
static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
Relation relation, HeapTuple tuple,
TupleDesc tupdesc) {
int natt;
int valid_attr_cnt = 0;
elog(DEBUG1, "Adding metadata for %d columns", tupdesc->natts);
/* build column names and values */
for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr;
char *typ_mod;
Decoderbufs__TypeInfo typeinfo = DECODERBUFS__TYPE_INFO__INIT;
attr = tupdesc->attrs[natt];
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
continue;
}
typ_mod = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod));
elog(DEBUG1, "Adding typemodifier '%s' for column %d", typ_mod, natt);
typeinfo.modifier = typ_mod;
tmsg[valid_attr_cnt] = palloc(sizeof(typeinfo));
memcpy(tmsg[valid_attr_cnt], &typeinfo, sizeof(typeinfo));
valid_attr_cnt++;
}
}
/* callback for individual changed tuples */ /* callback for individual changed tuples */
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) { Relation relation, ReorderBufferChange *change) {
@ -676,11 +708,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (change->data.tp.newtuple != NULL) { if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information"); elog(DEBUG1, "decoding new tuple information");
tupdesc = RelationGetDescr(relation); tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc);
} }
break; break;
case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_UPDATE:
@ -700,11 +739,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (change->data.tp.newtuple != NULL) { if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information"); elog(DEBUG1, "decoding new tuple information");
tupdesc = RelationGetDescr(relation); tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc);
} }
} }
break; break;

View File

@ -93,6 +93,49 @@ void decoderbufs__datum_message__free_unpacked
assert(message->base.descriptor == &decoderbufs__datum_message__descriptor); assert(message->base.descriptor == &decoderbufs__datum_message__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
} }
void decoderbufs__type_info__init
(Decoderbufs__TypeInfo *message)
{
static Decoderbufs__TypeInfo init_value = DECODERBUFS__TYPE_INFO__INIT;
*message = init_value;
}
size_t decoderbufs__type_info__get_packed_size
(const Decoderbufs__TypeInfo *message)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
}
size_t decoderbufs__type_info__pack
(const Decoderbufs__TypeInfo *message,
uint8_t *out)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
}
size_t decoderbufs__type_info__pack_to_buffer
(const Decoderbufs__TypeInfo *message,
ProtobufCBuffer *buffer)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
}
Decoderbufs__TypeInfo *
decoderbufs__type_info__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data)
{
return (Decoderbufs__TypeInfo *)
protobuf_c_message_unpack (&decoderbufs__type_info__descriptor,
allocator, len, data);
}
void decoderbufs__type_info__free_unpacked
(Decoderbufs__TypeInfo *message,
ProtobufCAllocator *allocator)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void decoderbufs__row_message__init void decoderbufs__row_message__init
(Decoderbufs__RowMessage *message) (Decoderbufs__RowMessage *message)
{ {
@ -342,7 +385,45 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
(ProtobufCMessageInit) decoderbufs__datum_message__init, (ProtobufCMessageInit) decoderbufs__datum_message__init,
NULL,NULL,NULL /* reserved[123] */ NULL,NULL,NULL /* reserved[123] */
}; };
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[6] = static const ProtobufCFieldDescriptor decoderbufs__type_info__field_descriptors[1] =
{
{
"modifier",
1,
PROTOBUF_C_LABEL_REQUIRED,
PROTOBUF_C_TYPE_STRING,
0, /* quantifier_offset */
offsetof(Decoderbufs__TypeInfo, modifier),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned decoderbufs__type_info__field_indices_by_name[] = {
0, /* field[0] = modifier */
};
static const ProtobufCIntRange decoderbufs__type_info__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 1 }
};
const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor =
{
PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
"decoderbufs.TypeInfo",
"TypeInfo",
"Decoderbufs__TypeInfo",
"decoderbufs",
sizeof(Decoderbufs__TypeInfo),
1,
decoderbufs__type_info__field_descriptors,
decoderbufs__type_info__field_indices_by_name,
1, decoderbufs__type_info__number_ranges,
(ProtobufCMessageInit) decoderbufs__type_info__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[7] =
{ {
{ {
"transaction_id", "transaction_id",
@ -416,10 +497,23 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
0, /* flags */ 0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */ 0,NULL,NULL /* reserved1,reserved2, etc */
}, },
{
"new_typeinfo",
7,
PROTOBUF_C_LABEL_REPEATED,
PROTOBUF_C_TYPE_MESSAGE,
offsetof(Decoderbufs__RowMessage, n_new_typeinfo),
offsetof(Decoderbufs__RowMessage, new_typeinfo),
&decoderbufs__type_info__descriptor,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
}; };
static const unsigned decoderbufs__row_message__field_indices_by_name[] = { static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
1, /* field[1] = commit_time */ 1, /* field[1] = commit_time */
4, /* field[4] = new_tuple */ 4, /* field[4] = new_tuple */
6, /* field[6] = new_typeinfo */
5, /* field[5] = old_tuple */ 5, /* field[5] = old_tuple */
3, /* field[3] = op */ 3, /* field[3] = op */
2, /* field[2] = table */ 2, /* field[2] = table */
@ -428,7 +522,7 @@ static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] = static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] =
{ {
{ 1, 0 }, { 1, 0 },
{ 0, 6 } { 0, 7 }
}; };
const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor = const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
{ {
@ -438,7 +532,7 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
"Decoderbufs__RowMessage", "Decoderbufs__RowMessage",
"decoderbufs", "decoderbufs",
sizeof(Decoderbufs__RowMessage), sizeof(Decoderbufs__RowMessage),
6, 7,
decoderbufs__row_message__field_descriptors, decoderbufs__row_message__field_descriptors,
decoderbufs__row_message__field_indices_by_name, decoderbufs__row_message__field_indices_by_name,
1, decoderbufs__row_message__number_ranges, 1, decoderbufs__row_message__number_ranges,

View File

@ -10,13 +10,14 @@ PROTOBUF_C__BEGIN_DECLS
#if PROTOBUF_C_VERSION_NUMBER < 1000000 #if PROTOBUF_C_VERSION_NUMBER < 1000000
# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers. # error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
#elif 1001001 < PROTOBUF_C_MIN_COMPILER_VERSION #elif 1002001 < PROTOBUF_C_MIN_COMPILER_VERSION
# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c. # error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
#endif #endif
typedef struct _Decoderbufs__Point Decoderbufs__Point; typedef struct _Decoderbufs__Point Decoderbufs__Point;
typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage; typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage;
typedef struct _Decoderbufs__TypeInfo Decoderbufs__TypeInfo;
typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage; typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
@ -74,7 +75,17 @@ struct _Decoderbufs__DatumMessage
}; };
#define DECODERBUFS__DATUM_MESSAGE__INIT \ #define DECODERBUFS__DATUM_MESSAGE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \ { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \
, NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {} } , NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {0} }
struct _Decoderbufs__TypeInfo
{
ProtobufCMessage base;
char *modifier;
};
#define DECODERBUFS__TYPE_INFO__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__type_info__descriptor) \
, NULL }
struct _Decoderbufs__RowMessage struct _Decoderbufs__RowMessage
@ -91,10 +102,12 @@ struct _Decoderbufs__RowMessage
Decoderbufs__DatumMessage **new_tuple; Decoderbufs__DatumMessage **new_tuple;
size_t n_old_tuple; size_t n_old_tuple;
Decoderbufs__DatumMessage **old_tuple; Decoderbufs__DatumMessage **old_tuple;
size_t n_new_typeinfo;
Decoderbufs__TypeInfo **new_typeinfo;
}; };
#define DECODERBUFS__ROW_MESSAGE__INIT \ #define DECODERBUFS__ROW_MESSAGE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \ { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \
, 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL } , 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL, 0,NULL }
/* Decoderbufs__Point methods */ /* Decoderbufs__Point methods */
@ -135,6 +148,25 @@ Decoderbufs__DatumMessage *
void decoderbufs__datum_message__free_unpacked void decoderbufs__datum_message__free_unpacked
(Decoderbufs__DatumMessage *message, (Decoderbufs__DatumMessage *message,
ProtobufCAllocator *allocator); ProtobufCAllocator *allocator);
/* Decoderbufs__TypeInfo methods */
void decoderbufs__type_info__init
(Decoderbufs__TypeInfo *message);
size_t decoderbufs__type_info__get_packed_size
(const Decoderbufs__TypeInfo *message);
size_t decoderbufs__type_info__pack
(const Decoderbufs__TypeInfo *message,
uint8_t *out);
size_t decoderbufs__type_info__pack_to_buffer
(const Decoderbufs__TypeInfo *message,
ProtobufCBuffer *buffer);
Decoderbufs__TypeInfo *
decoderbufs__type_info__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data);
void decoderbufs__type_info__free_unpacked
(Decoderbufs__TypeInfo *message,
ProtobufCAllocator *allocator);
/* Decoderbufs__RowMessage methods */ /* Decoderbufs__RowMessage methods */
void decoderbufs__row_message__init void decoderbufs__row_message__init
(Decoderbufs__RowMessage *message); (Decoderbufs__RowMessage *message);
@ -162,6 +194,9 @@ typedef void (*Decoderbufs__Point_Closure)
typedef void (*Decoderbufs__DatumMessage_Closure) typedef void (*Decoderbufs__DatumMessage_Closure)
(const Decoderbufs__DatumMessage *message, (const Decoderbufs__DatumMessage *message,
void *closure_data); void *closure_data);
typedef void (*Decoderbufs__TypeInfo_Closure)
(const Decoderbufs__TypeInfo *message,
void *closure_data);
typedef void (*Decoderbufs__RowMessage_Closure) typedef void (*Decoderbufs__RowMessage_Closure)
(const Decoderbufs__RowMessage *message, (const Decoderbufs__RowMessage *message,
void *closure_data); void *closure_data);
@ -174,6 +209,7 @@ typedef void (*Decoderbufs__RowMessage_Closure)
extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor; extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor;
PROTOBUF_C__END_DECLS PROTOBUF_C__END_DECLS