diff --git a/proto/pg_logicaldec.proto b/proto/pg_logicaldec.proto index dc17724..8586487 100644 --- a/proto/pg_logicaldec.proto +++ b/proto/pg_logicaldec.proto @@ -1,7 +1,7 @@ package decoderbufs; option java_package="decoderbufs.proto"; -option java_outer_classname = "PgldProto"; +option java_outer_classname = "PgldProtos"; option optimize_for = SPEED; enum Op { @@ -18,20 +18,23 @@ message Point { message DatumMessage { optional string column_name = 1; optional int64 column_type = 2; - optional int32 datum_int32 = 3; - optional int64 datum_int64 = 4; - optional float datum_float = 5; - optional double datum_double = 6; - optional bool datum_bool = 7; - optional string datum_string = 8; - optional bytes datum_bytes = 9; - optional Point datum_point = 10; + oneof datum { + int32 datum_int32 = 3; + int64 datum_int64 = 4; + float datum_float = 5; + double datum_double = 6; + bool datum_bool = 7; + string datum_string = 8; + bytes datum_bytes = 9; + Point datum_point = 10; + } } message RowMessage { - optional sint64 commit_time = 1; - optional string table = 2; - optional Op op = 3; - repeated DatumMessage new_tuple = 4; - repeated DatumMessage old_tuple = 5; + optional uint32 transaction_id = 1; + optional uint64 commit_time = 2; + optional string table = 3; + optional Op op = 4; + repeated DatumMessage new_tuple = 5; + repeated DatumMessage old_tuple = 6; } diff --git a/src/decoderbufs.c b/src/decoderbufs.c index 77e48bd..1c5e5e4 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -190,18 +190,33 @@ static void row_message_destroy(Decoderbufs__RowMessage *msg) { return; } - pfree(msg->table); + if (msg->table) { + pfree(msg->table); + } + if (msg->n_new_tuple > 0) { for (int i = 0; i < msg->n_new_tuple; i++) { if (msg->new_tuple[i]) { - if (msg->new_tuple[i]->datum_string) { - pfree(msg->new_tuple[i]->datum_string); - } else if (msg->new_tuple[i]->has_datum_bytes) { - pfree(msg->new_tuple[i]->datum_bytes.data); - msg->new_tuple[i]->datum_bytes.data = NULL; - msg->new_tuple[i]->datum_bytes.len = 0; - } else if (msg->new_tuple[i]->datum_point) { - pfree(msg->new_tuple[i]->datum_point); + switch (msg->new_tuple[i]->datum_case) { + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING: + if (msg->new_tuple[i]->datum_string) { + pfree(msg->new_tuple[i]->datum_string); + } + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES: + if (msg->new_tuple[i]->datum_bytes.data) { + pfree(msg->new_tuple[i]->datum_bytes.data); + msg->new_tuple[i]->datum_bytes.data = NULL; + msg->new_tuple[i]->datum_bytes.len = 0; + } + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT: + if (msg->new_tuple[i]->datum_point) { + pfree(msg->new_tuple[i]->datum_point); + } + break; + default: + break; } pfree(msg->new_tuple[i]); } @@ -211,14 +226,26 @@ static void row_message_destroy(Decoderbufs__RowMessage *msg) { if (msg->n_old_tuple > 0) { for (int i = 0; i < msg->n_old_tuple; i++) { if (msg->old_tuple[i]) { - if (msg->old_tuple[i]->datum_string) { - pfree(msg->old_tuple[i]->datum_string); - } else if (msg->old_tuple[i]->has_datum_bytes) { - pfree(msg->old_tuple[i]->datum_bytes.data); - msg->old_tuple[i]->datum_bytes.data = NULL; - msg->old_tuple[i]->datum_bytes.len = 0; - } else if (msg->old_tuple[i]->datum_point) { - pfree(msg->old_tuple[i]->datum_point); + switch (msg->old_tuple[i]->datum_case) { + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING: + if (msg->old_tuple[i]->datum_string) { + pfree(msg->old_tuple[i]->datum_string); + } + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES: + if (msg->old_tuple[i]->datum_bytes.data) { + pfree(msg->old_tuple[i]->datum_bytes.data); + msg->old_tuple[i]->datum_bytes.data = NULL; + msg->old_tuple[i]->datum_bytes.len = 0; + } + break; + case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT: + if (msg->old_tuple[i]->datum_point) { + pfree(msg->old_tuple[i]->datum_point); + } + break; + default: + break; } pfree(msg->old_tuple[i]); } @@ -350,34 +377,27 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, switch (typid) { case BOOLOID: datum_msg->datum_bool = DatumGetBool(datum); - datum_msg->has_datum_bool = true; break; case INT2OID: datum_msg->datum_int32 = DatumGetInt16(datum); - datum_msg->has_datum_int32 = true; break; case INT4OID: datum_msg->datum_int32 = DatumGetInt32(datum); - datum_msg->has_datum_int32 = true; break; case INT8OID: case OIDOID: datum_msg->datum_int64 = DatumGetInt64(datum); - datum_msg->has_datum_int64 = true; break; case FLOAT4OID: datum_msg->datum_float = DatumGetFloat4(datum); - datum_msg->has_datum_float = true; break; case FLOAT8OID: datum_msg->datum_double = DatumGetFloat8(datum); - datum_msg->has_datum_double = true; break; case NUMERICOID: num = DatumGetNumeric(datum); if (!numeric_is_nan(num)) { datum_msg->datum_double = numeric_to_double_no_overflow(num); - datum_msg->has_datum_double = true; } break; case CHAROID: @@ -404,7 +424,6 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, datum_msg->datum_bytes.data = palloc(size); memcpy(datum_msg->datum_bytes.data, (uint8_t *)VARDATA(valptr), size); datum_msg->datum_bytes.len = size; - datum_msg->has_datum_bytes = true; break; case POINTOID: p = DatumGetPointP(datum); @@ -428,7 +447,6 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, datum_msg->datum_bytes.data = palloc(size); memcpy(datum_msg->datum_bytes.data, (uint8_t *)output, size); datum_msg->datum_bytes.len = len; - datum_msg->has_datum_bytes = true; } break; } @@ -509,6 +527,8 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, !OidIsValid(relation->rd_replidindex))); /* set common fields */ + rmsg.transaction_id = txn->xid; + rmsg.has_transaction_id = true; rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); rmsg.has_commit_time = true; rmsg.table = pstrdup(NameStr(class_form->relname)); diff --git a/src/proto/pg_logicaldec.pb-c.c b/src/proto/pg_logicaldec.pb-c.c index 46a7af1..99c4779 100644 --- a/src/proto/pg_logicaldec.pb-c.c +++ b/src/proto/pg_logicaldec.pb-c.c @@ -218,11 +218,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 3, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_INT32, - offsetof(Decoderbufs__DatumMessage, has_datum_int32), + offsetof(Decoderbufs__DatumMessage, datum_case), offsetof(Decoderbufs__DatumMessage, datum_int32), NULL, NULL, - 0, /* flags */ + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, { @@ -230,11 +230,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 4, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_INT64, - offsetof(Decoderbufs__DatumMessage, has_datum_int64), + offsetof(Decoderbufs__DatumMessage, datum_case), offsetof(Decoderbufs__DatumMessage, datum_int64), NULL, NULL, - 0, /* flags */ + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, { @@ -242,11 +242,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 5, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_FLOAT, - offsetof(Decoderbufs__DatumMessage, has_datum_float), + offsetof(Decoderbufs__DatumMessage, datum_case), offsetof(Decoderbufs__DatumMessage, datum_float), NULL, NULL, - 0, /* flags */ + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, { @@ -254,11 +254,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 6, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_DOUBLE, - offsetof(Decoderbufs__DatumMessage, has_datum_double), + offsetof(Decoderbufs__DatumMessage, datum_case), offsetof(Decoderbufs__DatumMessage, datum_double), NULL, NULL, - 0, /* flags */ + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, { @@ -266,11 +266,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 7, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_BOOL, - offsetof(Decoderbufs__DatumMessage, has_datum_bool), + offsetof(Decoderbufs__DatumMessage, datum_case), offsetof(Decoderbufs__DatumMessage, datum_bool), NULL, NULL, - 0, /* flags */ + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, { @@ -278,11 +278,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 8, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_STRING, - 0, /* quantifier_offset */ + offsetof(Decoderbufs__DatumMessage, datum_case), offsetof(Decoderbufs__DatumMessage, datum_string), NULL, NULL, - 0, /* flags */ + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, { @@ -290,11 +290,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 9, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_BYTES, - offsetof(Decoderbufs__DatumMessage, has_datum_bytes), + offsetof(Decoderbufs__DatumMessage, datum_case), offsetof(Decoderbufs__DatumMessage, datum_bytes), NULL, NULL, - 0, /* flags */ + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, { @@ -302,11 +302,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 10, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_MESSAGE, - 0, /* quantifier_offset */ + offsetof(Decoderbufs__DatumMessage, datum_case), offsetof(Decoderbufs__DatumMessage, datum_point), &decoderbufs__point__descriptor, NULL, - 0, /* flags */ + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, }; @@ -342,13 +342,25 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = (ProtobufCMessageInit) decoderbufs__datum_message__init, NULL,NULL,NULL /* reserved[123] */ }; -static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[5] = +static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[6] = { { - "commit_time", + "transaction_id", 1, PROTOBUF_C_LABEL_OPTIONAL, - PROTOBUF_C_TYPE_SINT64, + PROTOBUF_C_TYPE_UINT32, + offsetof(Decoderbufs__RowMessage, has_transaction_id), + offsetof(Decoderbufs__RowMessage, transaction_id), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "commit_time", + 2, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_UINT64, offsetof(Decoderbufs__RowMessage, has_commit_time), offsetof(Decoderbufs__RowMessage, commit_time), NULL, @@ -358,7 +370,7 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor }, { "table", - 2, + 3, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_STRING, 0, /* quantifier_offset */ @@ -370,7 +382,7 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor }, { "op", - 3, + 4, PROTOBUF_C_LABEL_OPTIONAL, PROTOBUF_C_TYPE_ENUM, offsetof(Decoderbufs__RowMessage, has_op), @@ -382,7 +394,7 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor }, { "new_tuple", - 4, + 5, PROTOBUF_C_LABEL_REPEATED, PROTOBUF_C_TYPE_MESSAGE, offsetof(Decoderbufs__RowMessage, n_new_tuple), @@ -394,7 +406,7 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor }, { "old_tuple", - 5, + 6, PROTOBUF_C_LABEL_REPEATED, PROTOBUF_C_TYPE_MESSAGE, offsetof(Decoderbufs__RowMessage, n_old_tuple), @@ -406,16 +418,17 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor }, }; static const unsigned decoderbufs__row_message__field_indices_by_name[] = { - 0, /* field[0] = commit_time */ - 3, /* field[3] = new_tuple */ - 4, /* field[4] = old_tuple */ - 2, /* field[2] = op */ - 1, /* field[1] = table */ + 1, /* field[1] = commit_time */ + 4, /* field[4] = new_tuple */ + 5, /* field[5] = old_tuple */ + 3, /* field[3] = op */ + 2, /* field[2] = table */ + 0, /* field[0] = transaction_id */ }; static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] = { { 1, 0 }, - { 0, 5 } + { 0, 6 } }; const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor = { @@ -425,14 +438,14 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor = "Decoderbufs__RowMessage", "decoderbufs", sizeof(Decoderbufs__RowMessage), - 5, + 6, decoderbufs__row_message__field_descriptors, decoderbufs__row_message__field_indices_by_name, 1, decoderbufs__row_message__number_ranges, (ProtobufCMessageInit) decoderbufs__row_message__init, NULL,NULL,NULL /* reserved[123] */ }; -const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] = +static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] = { { "INSERT", "DECODERBUFS__OP__INSERT", 0 }, { "UPDATE", "DECODERBUFS__OP__UPDATE", 1 }, @@ -441,7 +454,7 @@ const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] = static const ProtobufCIntRange decoderbufs__op__value_ranges[] = { {0, 0},{0, 3} }; -const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] = +static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] = { { "DELETE", 2 }, { "INSERT", 0 }, diff --git a/src/proto/pg_logicaldec.pb-c.h b/src/proto/pg_logicaldec.pb-c.h index b7fec87..17206dd 100644 --- a/src/proto/pg_logicaldec.pb-c.h +++ b/src/proto/pg_logicaldec.pb-c.h @@ -1,8 +1,8 @@ /* Generated by the protocol buffer compiler. DO NOT EDIT! */ /* Generated from: pg_logicaldec.proto */ -#ifndef PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED -#define PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED +#ifndef PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED +#define PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED #include @@ -10,7 +10,7 @@ PROTOBUF_C__BEGIN_DECLS #if PROTOBUF_C_VERSION_NUMBER < 1000000 # error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers. -#elif 1001000 < PROTOBUF_C_MIN_COMPILER_VERSION +#elif 1001001 < PROTOBUF_C_MIN_COMPILER_VERSION # error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c. #endif @@ -42,37 +42,48 @@ struct _Decoderbufs__Point , 0, 0 } +typedef enum { + DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET = 0, + DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32 = 3, + DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64 = 4, + DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT = 5, + DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE = 6, + DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BOOL = 7, + DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8, + DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9, + DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10, +} Decoderbufs__DatumMessage__DatumCase; + struct _Decoderbufs__DatumMessage { ProtobufCMessage base; char *column_name; protobuf_c_boolean has_column_type; int64_t column_type; - protobuf_c_boolean has_datum_int32; - int32_t datum_int32; - protobuf_c_boolean has_datum_int64; - int64_t datum_int64; - protobuf_c_boolean has_datum_float; - float datum_float; - protobuf_c_boolean has_datum_double; - double datum_double; - protobuf_c_boolean has_datum_bool; - protobuf_c_boolean datum_bool; - char *datum_string; - protobuf_c_boolean has_datum_bytes; - ProtobufCBinaryData datum_bytes; - Decoderbufs__Point *datum_point; + Decoderbufs__DatumMessage__DatumCase datum_case; + union { + int32_t datum_int32; + int64_t datum_int64; + float datum_float; + double datum_double; + protobuf_c_boolean datum_bool; + char *datum_string; + ProtobufCBinaryData datum_bytes; + Decoderbufs__Point *datum_point; + }; }; #define DECODERBUFS__DATUM_MESSAGE__INIT \ { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \ - , NULL, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, NULL, 0,{0,NULL}, NULL } + , NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {} } struct _Decoderbufs__RowMessage { ProtobufCMessage base; + protobuf_c_boolean has_transaction_id; + uint32_t transaction_id; protobuf_c_boolean has_commit_time; - int64_t commit_time; + uint64_t commit_time; char *table; protobuf_c_boolean has_op; Decoderbufs__Op op; @@ -83,7 +94,7 @@ struct _Decoderbufs__RowMessage }; #define DECODERBUFS__ROW_MESSAGE__INIT \ { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \ - , 0,0, NULL, 0,0, 0,NULL, 0,NULL } + , 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL } /* Decoderbufs__Point methods */ @@ -168,4 +179,4 @@ extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor; PROTOBUF_C__END_DECLS -#endif /* PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED */ +#endif /* PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED */