Merge 88dfd7e594
into 6c1723a179
commit
95e4a09a77
|
@ -30,6 +30,10 @@ message DatumMessage {
|
|||
}
|
||||
}
|
||||
|
||||
message TypeInfo {
|
||||
required string modifier = 1;
|
||||
}
|
||||
|
||||
message RowMessage {
|
||||
optional uint32 transaction_id = 1;
|
||||
optional uint64 commit_time = 2;
|
||||
|
@ -37,4 +41,5 @@ message RowMessage {
|
|||
optional Op op = 4;
|
||||
repeated DatumMessage new_tuple = 5;
|
||||
repeated DatumMessage old_tuple = 6;
|
||||
repeated TypeInfo new_typeinfo = 7;
|
||||
}
|
||||
|
|
|
@ -628,6 +628,38 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
|||
}
|
||||
}
|
||||
|
||||
/* provide a metadata for new tuple */
|
||||
static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
|
||||
Relation relation, HeapTuple tuple,
|
||||
TupleDesc tupdesc) {
|
||||
int natt;
|
||||
int valid_attr_cnt = 0;
|
||||
elog(DEBUG1, "Adding metadata for %d columns", tupdesc->natts);
|
||||
/* build column names and values */
|
||||
for (natt = 0; natt < tupdesc->natts; natt++) {
|
||||
Form_pg_attribute attr;
|
||||
char *typ_mod;
|
||||
Decoderbufs__TypeInfo typeinfo = DECODERBUFS__TYPE_INFO__INIT;
|
||||
|
||||
attr = tupdesc->attrs[natt];
|
||||
|
||||
/* skip dropped columns and system columns */
|
||||
if (attr->attisdropped || attr->attnum < 0) {
|
||||
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
|
||||
continue;
|
||||
}
|
||||
|
||||
typ_mod = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod));
|
||||
elog(DEBUG1, "Adding typemodifier '%s' for column %d", typ_mod, natt);
|
||||
|
||||
typeinfo.modifier = typ_mod;
|
||||
tmsg[valid_attr_cnt] = palloc(sizeof(typeinfo));
|
||||
memcpy(tmsg[valid_attr_cnt], &typeinfo, sizeof(typeinfo));
|
||||
|
||||
valid_attr_cnt++;
|
||||
}
|
||||
}
|
||||
|
||||
/* callback for individual changed tuples */
|
||||
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change) {
|
||||
|
@ -676,11 +708,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
if (change->data.tp.newtuple != NULL) {
|
||||
elog(DEBUG1, "decoding new tuple information");
|
||||
tupdesc = RelationGetDescr(relation);
|
||||
|
||||
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
|
||||
rmsg.new_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
|
||||
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
||||
rmsg.new_typeinfo =
|
||||
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
||||
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
}
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
|
@ -700,11 +739,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
if (change->data.tp.newtuple != NULL) {
|
||||
elog(DEBUG1, "decoding new tuple information");
|
||||
tupdesc = RelationGetDescr(relation);
|
||||
|
||||
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
|
||||
rmsg.new_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
|
||||
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
||||
rmsg.new_typeinfo =
|
||||
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
||||
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -93,6 +93,49 @@ void decoderbufs__datum_message__free_unpacked
|
|||
assert(message->base.descriptor == &decoderbufs__datum_message__descriptor);
|
||||
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
|
||||
}
|
||||
void decoderbufs__type_info__init
|
||||
(Decoderbufs__TypeInfo *message)
|
||||
{
|
||||
static Decoderbufs__TypeInfo init_value = DECODERBUFS__TYPE_INFO__INIT;
|
||||
*message = init_value;
|
||||
}
|
||||
size_t decoderbufs__type_info__get_packed_size
|
||||
(const Decoderbufs__TypeInfo *message)
|
||||
{
|
||||
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
|
||||
return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
|
||||
}
|
||||
size_t decoderbufs__type_info__pack
|
||||
(const Decoderbufs__TypeInfo *message,
|
||||
uint8_t *out)
|
||||
{
|
||||
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
|
||||
return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
|
||||
}
|
||||
size_t decoderbufs__type_info__pack_to_buffer
|
||||
(const Decoderbufs__TypeInfo *message,
|
||||
ProtobufCBuffer *buffer)
|
||||
{
|
||||
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
|
||||
return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
|
||||
}
|
||||
Decoderbufs__TypeInfo *
|
||||
decoderbufs__type_info__unpack
|
||||
(ProtobufCAllocator *allocator,
|
||||
size_t len,
|
||||
const uint8_t *data)
|
||||
{
|
||||
return (Decoderbufs__TypeInfo *)
|
||||
protobuf_c_message_unpack (&decoderbufs__type_info__descriptor,
|
||||
allocator, len, data);
|
||||
}
|
||||
void decoderbufs__type_info__free_unpacked
|
||||
(Decoderbufs__TypeInfo *message,
|
||||
ProtobufCAllocator *allocator)
|
||||
{
|
||||
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
|
||||
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
|
||||
}
|
||||
void decoderbufs__row_message__init
|
||||
(Decoderbufs__RowMessage *message)
|
||||
{
|
||||
|
@ -342,7 +385,45 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
|
|||
(ProtobufCMessageInit) decoderbufs__datum_message__init,
|
||||
NULL,NULL,NULL /* reserved[123] */
|
||||
};
|
||||
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[6] =
|
||||
static const ProtobufCFieldDescriptor decoderbufs__type_info__field_descriptors[1] =
|
||||
{
|
||||
{
|
||||
"modifier",
|
||||
1,
|
||||
PROTOBUF_C_LABEL_REQUIRED,
|
||||
PROTOBUF_C_TYPE_STRING,
|
||||
0, /* quantifier_offset */
|
||||
offsetof(Decoderbufs__TypeInfo, modifier),
|
||||
NULL,
|
||||
NULL,
|
||||
0, /* flags */
|
||||
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||
},
|
||||
};
|
||||
static const unsigned decoderbufs__type_info__field_indices_by_name[] = {
|
||||
0, /* field[0] = modifier */
|
||||
};
|
||||
static const ProtobufCIntRange decoderbufs__type_info__number_ranges[1 + 1] =
|
||||
{
|
||||
{ 1, 0 },
|
||||
{ 0, 1 }
|
||||
};
|
||||
const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor =
|
||||
{
|
||||
PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
|
||||
"decoderbufs.TypeInfo",
|
||||
"TypeInfo",
|
||||
"Decoderbufs__TypeInfo",
|
||||
"decoderbufs",
|
||||
sizeof(Decoderbufs__TypeInfo),
|
||||
1,
|
||||
decoderbufs__type_info__field_descriptors,
|
||||
decoderbufs__type_info__field_indices_by_name,
|
||||
1, decoderbufs__type_info__number_ranges,
|
||||
(ProtobufCMessageInit) decoderbufs__type_info__init,
|
||||
NULL,NULL,NULL /* reserved[123] */
|
||||
};
|
||||
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[7] =
|
||||
{
|
||||
{
|
||||
"transaction_id",
|
||||
|
@ -416,10 +497,23 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
|
|||
0, /* flags */
|
||||
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||
},
|
||||
{
|
||||
"new_typeinfo",
|
||||
7,
|
||||
PROTOBUF_C_LABEL_REPEATED,
|
||||
PROTOBUF_C_TYPE_MESSAGE,
|
||||
offsetof(Decoderbufs__RowMessage, n_new_typeinfo),
|
||||
offsetof(Decoderbufs__RowMessage, new_typeinfo),
|
||||
&decoderbufs__type_info__descriptor,
|
||||
NULL,
|
||||
0, /* flags */
|
||||
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||
},
|
||||
};
|
||||
static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
|
||||
1, /* field[1] = commit_time */
|
||||
4, /* field[4] = new_tuple */
|
||||
6, /* field[6] = new_typeinfo */
|
||||
5, /* field[5] = old_tuple */
|
||||
3, /* field[3] = op */
|
||||
2, /* field[2] = table */
|
||||
|
@ -428,7 +522,7 @@ static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
|
|||
static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] =
|
||||
{
|
||||
{ 1, 0 },
|
||||
{ 0, 6 }
|
||||
{ 0, 7 }
|
||||
};
|
||||
const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
|
||||
{
|
||||
|
@ -438,7 +532,7 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
|
|||
"Decoderbufs__RowMessage",
|
||||
"decoderbufs",
|
||||
sizeof(Decoderbufs__RowMessage),
|
||||
6,
|
||||
7,
|
||||
decoderbufs__row_message__field_descriptors,
|
||||
decoderbufs__row_message__field_indices_by_name,
|
||||
1, decoderbufs__row_message__number_ranges,
|
||||
|
|
|
@ -10,13 +10,14 @@ PROTOBUF_C__BEGIN_DECLS
|
|||
|
||||
#if PROTOBUF_C_VERSION_NUMBER < 1000000
|
||||
# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
|
||||
#elif 1001001 < PROTOBUF_C_MIN_COMPILER_VERSION
|
||||
#elif 1002001 < PROTOBUF_C_MIN_COMPILER_VERSION
|
||||
# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
|
||||
#endif
|
||||
|
||||
|
||||
typedef struct _Decoderbufs__Point Decoderbufs__Point;
|
||||
typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage;
|
||||
typedef struct _Decoderbufs__TypeInfo Decoderbufs__TypeInfo;
|
||||
typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
|
||||
|
||||
|
||||
|
@ -74,7 +75,17 @@ struct _Decoderbufs__DatumMessage
|
|||
};
|
||||
#define DECODERBUFS__DATUM_MESSAGE__INIT \
|
||||
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \
|
||||
, NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {} }
|
||||
, NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {0} }
|
||||
|
||||
|
||||
struct _Decoderbufs__TypeInfo
|
||||
{
|
||||
ProtobufCMessage base;
|
||||
char *modifier;
|
||||
};
|
||||
#define DECODERBUFS__TYPE_INFO__INIT \
|
||||
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__type_info__descriptor) \
|
||||
, NULL }
|
||||
|
||||
|
||||
struct _Decoderbufs__RowMessage
|
||||
|
@ -91,10 +102,12 @@ struct _Decoderbufs__RowMessage
|
|||
Decoderbufs__DatumMessage **new_tuple;
|
||||
size_t n_old_tuple;
|
||||
Decoderbufs__DatumMessage **old_tuple;
|
||||
size_t n_new_typeinfo;
|
||||
Decoderbufs__TypeInfo **new_typeinfo;
|
||||
};
|
||||
#define DECODERBUFS__ROW_MESSAGE__INIT \
|
||||
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \
|
||||
, 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL }
|
||||
, 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL, 0,NULL }
|
||||
|
||||
|
||||
/* Decoderbufs__Point methods */
|
||||
|
@ -135,6 +148,25 @@ Decoderbufs__DatumMessage *
|
|||
void decoderbufs__datum_message__free_unpacked
|
||||
(Decoderbufs__DatumMessage *message,
|
||||
ProtobufCAllocator *allocator);
|
||||
/* Decoderbufs__TypeInfo methods */
|
||||
void decoderbufs__type_info__init
|
||||
(Decoderbufs__TypeInfo *message);
|
||||
size_t decoderbufs__type_info__get_packed_size
|
||||
(const Decoderbufs__TypeInfo *message);
|
||||
size_t decoderbufs__type_info__pack
|
||||
(const Decoderbufs__TypeInfo *message,
|
||||
uint8_t *out);
|
||||
size_t decoderbufs__type_info__pack_to_buffer
|
||||
(const Decoderbufs__TypeInfo *message,
|
||||
ProtobufCBuffer *buffer);
|
||||
Decoderbufs__TypeInfo *
|
||||
decoderbufs__type_info__unpack
|
||||
(ProtobufCAllocator *allocator,
|
||||
size_t len,
|
||||
const uint8_t *data);
|
||||
void decoderbufs__type_info__free_unpacked
|
||||
(Decoderbufs__TypeInfo *message,
|
||||
ProtobufCAllocator *allocator);
|
||||
/* Decoderbufs__RowMessage methods */
|
||||
void decoderbufs__row_message__init
|
||||
(Decoderbufs__RowMessage *message);
|
||||
|
@ -162,6 +194,9 @@ typedef void (*Decoderbufs__Point_Closure)
|
|||
typedef void (*Decoderbufs__DatumMessage_Closure)
|
||||
(const Decoderbufs__DatumMessage *message,
|
||||
void *closure_data);
|
||||
typedef void (*Decoderbufs__TypeInfo_Closure)
|
||||
(const Decoderbufs__TypeInfo *message,
|
||||
void *closure_data);
|
||||
typedef void (*Decoderbufs__RowMessage_Closure)
|
||||
(const Decoderbufs__RowMessage *message,
|
||||
void *closure_data);
|
||||
|
@ -174,6 +209,7 @@ typedef void (*Decoderbufs__RowMessage_Closure)
|
|||
extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor;
|
||||
extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor;
|
||||
extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor;
|
||||
extern const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor;
|
||||
extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor;
|
||||
|
||||
PROTOBUF_C__END_DECLS
|
||||
|
|
Loading…
Reference in New Issue