Merge pull request #2 from jarreds/master

Use `oneof` for different datum field types
pull/1/head
Xavier 2015-04-20 20:36:11 -07:00
commit ba4a480643
4 changed files with 140 additions and 93 deletions

View File

@ -1,7 +1,7 @@
package decoderbufs;
option java_package="decoderbufs.proto";
option java_outer_classname = "PgldProto";
option java_outer_classname = "PgldProtos";
option optimize_for = SPEED;
enum Op {
@ -18,20 +18,23 @@ message Point {
message DatumMessage {
optional string column_name = 1;
optional int64 column_type = 2;
optional int32 datum_int32 = 3;
optional int64 datum_int64 = 4;
optional float datum_float = 5;
optional double datum_double = 6;
optional bool datum_bool = 7;
optional string datum_string = 8;
optional bytes datum_bytes = 9;
optional Point datum_point = 10;
oneof datum {
int32 datum_int32 = 3;
int64 datum_int64 = 4;
float datum_float = 5;
double datum_double = 6;
bool datum_bool = 7;
string datum_string = 8;
bytes datum_bytes = 9;
Point datum_point = 10;
}
}
message RowMessage {
optional sint64 commit_time = 1;
optional string table = 2;
optional Op op = 3;
repeated DatumMessage new_tuple = 4;
repeated DatumMessage old_tuple = 5;
optional uint32 transaction_id = 1;
optional uint64 commit_time = 2;
optional string table = 3;
optional Op op = 4;
repeated DatumMessage new_tuple = 5;
repeated DatumMessage old_tuple = 6;
}

View File

@ -190,19 +190,34 @@ static void row_message_destroy(Decoderbufs__RowMessage *msg) {
return;
}
if (msg->table) {
pfree(msg->table);
}
if (msg->n_new_tuple > 0) {
for (int i = 0; i < msg->n_new_tuple; i++) {
if (msg->new_tuple[i]) {
switch (msg->new_tuple[i]->datum_case) {
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
if (msg->new_tuple[i]->datum_string) {
pfree(msg->new_tuple[i]->datum_string);
} else if (msg->new_tuple[i]->has_datum_bytes) {
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
if (msg->new_tuple[i]->datum_bytes.data) {
pfree(msg->new_tuple[i]->datum_bytes.data);
msg->new_tuple[i]->datum_bytes.data = NULL;
msg->new_tuple[i]->datum_bytes.len = 0;
} else if (msg->new_tuple[i]->datum_point) {
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
if (msg->new_tuple[i]->datum_point) {
pfree(msg->new_tuple[i]->datum_point);
}
break;
default:
break;
}
pfree(msg->new_tuple[i]);
}
}
@ -211,15 +226,27 @@ static void row_message_destroy(Decoderbufs__RowMessage *msg) {
if (msg->n_old_tuple > 0) {
for (int i = 0; i < msg->n_old_tuple; i++) {
if (msg->old_tuple[i]) {
switch (msg->old_tuple[i]->datum_case) {
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
if (msg->old_tuple[i]->datum_string) {
pfree(msg->old_tuple[i]->datum_string);
} else if (msg->old_tuple[i]->has_datum_bytes) {
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
if (msg->old_tuple[i]->datum_bytes.data) {
pfree(msg->old_tuple[i]->datum_bytes.data);
msg->old_tuple[i]->datum_bytes.data = NULL;
msg->old_tuple[i]->datum_bytes.len = 0;
} else if (msg->old_tuple[i]->datum_point) {
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
if (msg->old_tuple[i]->datum_point) {
pfree(msg->old_tuple[i]->datum_point);
}
break;
default:
break;
}
pfree(msg->old_tuple[i]);
}
}
@ -350,34 +377,27 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
switch (typid) {
case BOOLOID:
datum_msg->datum_bool = DatumGetBool(datum);
datum_msg->has_datum_bool = true;
break;
case INT2OID:
datum_msg->datum_int32 = DatumGetInt16(datum);
datum_msg->has_datum_int32 = true;
break;
case INT4OID:
datum_msg->datum_int32 = DatumGetInt32(datum);
datum_msg->has_datum_int32 = true;
break;
case INT8OID:
case OIDOID:
datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->has_datum_int64 = true;
break;
case FLOAT4OID:
datum_msg->datum_float = DatumGetFloat4(datum);
datum_msg->has_datum_float = true;
break;
case FLOAT8OID:
datum_msg->datum_double = DatumGetFloat8(datum);
datum_msg->has_datum_double = true;
break;
case NUMERICOID:
num = DatumGetNumeric(datum);
if (!numeric_is_nan(num)) {
datum_msg->datum_double = numeric_to_double_no_overflow(num);
datum_msg->has_datum_double = true;
}
break;
case CHAROID:
@ -404,7 +424,6 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_bytes.data = palloc(size);
memcpy(datum_msg->datum_bytes.data, (uint8_t *)VARDATA(valptr), size);
datum_msg->datum_bytes.len = size;
datum_msg->has_datum_bytes = true;
break;
case POINTOID:
p = DatumGetPointP(datum);
@ -428,7 +447,6 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_bytes.data = palloc(size);
memcpy(datum_msg->datum_bytes.data, (uint8_t *)output, size);
datum_msg->datum_bytes.len = len;
datum_msg->has_datum_bytes = true;
}
break;
}
@ -509,6 +527,8 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
!OidIsValid(relation->rd_replidindex)));
/* set common fields */
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
rmsg.has_commit_time = true;
rmsg.table = pstrdup(NameStr(class_form->relname));

View File

@ -218,11 +218,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
3,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_INT32,
offsetof(Decoderbufs__DatumMessage, has_datum_int32),
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_int32),
NULL,
NULL,
0, /* flags */
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
@ -230,11 +230,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
4,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_INT64,
offsetof(Decoderbufs__DatumMessage, has_datum_int64),
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_int64),
NULL,
NULL,
0, /* flags */
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
@ -242,11 +242,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
5,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_FLOAT,
offsetof(Decoderbufs__DatumMessage, has_datum_float),
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_float),
NULL,
NULL,
0, /* flags */
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
@ -254,11 +254,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
6,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_DOUBLE,
offsetof(Decoderbufs__DatumMessage, has_datum_double),
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_double),
NULL,
NULL,
0, /* flags */
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
@ -266,11 +266,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
7,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_BOOL,
offsetof(Decoderbufs__DatumMessage, has_datum_bool),
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_bool),
NULL,
NULL,
0, /* flags */
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
@ -278,11 +278,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
8,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_STRING,
0, /* quantifier_offset */
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_string),
NULL,
NULL,
0, /* flags */
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
@ -290,11 +290,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
9,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_BYTES,
offsetof(Decoderbufs__DatumMessage, has_datum_bytes),
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_bytes),
NULL,
NULL,
0, /* flags */
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
@ -302,11 +302,11 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
10,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_MESSAGE,
0, /* quantifier_offset */
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_point),
&decoderbufs__point__descriptor,
NULL,
0, /* flags */
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
@ -342,13 +342,25 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
(ProtobufCMessageInit) decoderbufs__datum_message__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[5] =
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[6] =
{
{
"commit_time",
"transaction_id",
1,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_SINT64,
PROTOBUF_C_TYPE_UINT32,
offsetof(Decoderbufs__RowMessage, has_transaction_id),
offsetof(Decoderbufs__RowMessage, transaction_id),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"commit_time",
2,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_UINT64,
offsetof(Decoderbufs__RowMessage, has_commit_time),
offsetof(Decoderbufs__RowMessage, commit_time),
NULL,
@ -358,7 +370,7 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
},
{
"table",
2,
3,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_STRING,
0, /* quantifier_offset */
@ -370,7 +382,7 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
},
{
"op",
3,
4,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_ENUM,
offsetof(Decoderbufs__RowMessage, has_op),
@ -382,7 +394,7 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
},
{
"new_tuple",
4,
5,
PROTOBUF_C_LABEL_REPEATED,
PROTOBUF_C_TYPE_MESSAGE,
offsetof(Decoderbufs__RowMessage, n_new_tuple),
@ -394,7 +406,7 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
},
{
"old_tuple",
5,
6,
PROTOBUF_C_LABEL_REPEATED,
PROTOBUF_C_TYPE_MESSAGE,
offsetof(Decoderbufs__RowMessage, n_old_tuple),
@ -406,16 +418,17 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
},
};
static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
0, /* field[0] = commit_time */
3, /* field[3] = new_tuple */
4, /* field[4] = old_tuple */
2, /* field[2] = op */
1, /* field[1] = table */
1, /* field[1] = commit_time */
4, /* field[4] = new_tuple */
5, /* field[5] = old_tuple */
3, /* field[3] = op */
2, /* field[2] = table */
0, /* field[0] = transaction_id */
};
static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 5 }
{ 0, 6 }
};
const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
{
@ -425,14 +438,14 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
"Decoderbufs__RowMessage",
"decoderbufs",
sizeof(Decoderbufs__RowMessage),
5,
6,
decoderbufs__row_message__field_descriptors,
decoderbufs__row_message__field_indices_by_name,
1, decoderbufs__row_message__number_ranges,
(ProtobufCMessageInit) decoderbufs__row_message__init,
NULL,NULL,NULL /* reserved[123] */
};
const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
{
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
@ -441,7 +454,7 @@ const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
{0, 0},{0, 3}
};
const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
{
{ "DELETE", 2 },
{ "INSERT", 0 },

View File

@ -1,8 +1,8 @@
/* Generated by the protocol buffer compiler. DO NOT EDIT! */
/* Generated from: pg_logicaldec.proto */
#ifndef PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED
#define PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED
#ifndef PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED
#define PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED
#include <protobuf-c/protobuf-c.h>
@ -10,7 +10,7 @@ PROTOBUF_C__BEGIN_DECLS
#if PROTOBUF_C_VERSION_NUMBER < 1000000
# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
#elif 1001000 < PROTOBUF_C_MIN_COMPILER_VERSION
#elif 1001001 < PROTOBUF_C_MIN_COMPILER_VERSION
# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
#endif
@ -42,37 +42,48 @@ struct _Decoderbufs__Point
, 0, 0 }
typedef enum {
DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET = 0,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32 = 3,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64 = 4,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT = 5,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE = 6,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BOOL = 7,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
} Decoderbufs__DatumMessage__DatumCase;
struct _Decoderbufs__DatumMessage
{
ProtobufCMessage base;
char *column_name;
protobuf_c_boolean has_column_type;
int64_t column_type;
protobuf_c_boolean has_datum_int32;
Decoderbufs__DatumMessage__DatumCase datum_case;
union {
int32_t datum_int32;
protobuf_c_boolean has_datum_int64;
int64_t datum_int64;
protobuf_c_boolean has_datum_float;
float datum_float;
protobuf_c_boolean has_datum_double;
double datum_double;
protobuf_c_boolean has_datum_bool;
protobuf_c_boolean datum_bool;
char *datum_string;
protobuf_c_boolean has_datum_bytes;
ProtobufCBinaryData datum_bytes;
Decoderbufs__Point *datum_point;
};
};
#define DECODERBUFS__DATUM_MESSAGE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \
, NULL, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, NULL, 0,{0,NULL}, NULL }
, NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {} }
struct _Decoderbufs__RowMessage
{
ProtobufCMessage base;
protobuf_c_boolean has_transaction_id;
uint32_t transaction_id;
protobuf_c_boolean has_commit_time;
int64_t commit_time;
uint64_t commit_time;
char *table;
protobuf_c_boolean has_op;
Decoderbufs__Op op;
@ -83,7 +94,7 @@ struct _Decoderbufs__RowMessage
};
#define DECODERBUFS__ROW_MESSAGE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \
, 0,0, NULL, 0,0, 0,NULL, 0,NULL }
, 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL }
/* Decoderbufs__Point methods */
@ -168,4 +179,4 @@ extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor;
PROTOBUF_C__END_DECLS
#endif /* PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED */
#endif /* PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED */