Compare commits

..

No commits in common. "main" and "v0.5.2" have entirely different histories.
main ... v0.5.2

7 changed files with 302 additions and 542 deletions

View File

@ -4,11 +4,11 @@ EXTENSION = decoderbufs
PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0')
PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0')
PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include $(C_PARAMS)
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS)
PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) -L/usr/local/lib -llwgeom
OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o
PG_CONFIG ?= pg_config
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

122
README.md
View File

@ -10,125 +10,77 @@
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
## Thanks to
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
## Dependencies
This code depends on the following libraries and requires them for compilation:
- [PostgreSQL](http://www.postgresql.org) 9.6+
- [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
- [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
* [PostgreSQL](http://www.postgresql.org) 9.6+
* [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.1+ - used for data serialization
* [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
## Building
`postgres-decoderbufs` has to be built from source after installing required dependencies. The required dependencies are first PostgreSQL
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
To build you will need to install PostgreSQL (for pg_config), PostgreSQL server development packages, protobuf-c for the
Protocol Buffer support and some PostGIS development packages
### Installing Dependencies
### Debian
#### Debian
# Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
```bash
# Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
# PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev
# PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev
# Protobuf-c dependency (requires a non-stable Debian repo)
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
# Protobuf-c dependency (requires a non-stable Debian repo)
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
```
The above are taken from the Debezium [docker images](https://github.com/debezium/docker-images).
When updating the ProtoBuf definition, also install the ProtoBuf C compiler:
```bash
apt-get install -y protobuf-c-compiler=1.2.*
```
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
#### Other Linux distributions
### Other Linux distributions
You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1`
### Getting the source code
If you have all of the prerequisites installed you should be able to just:
If you have all of the above prerequisites installed, clone this git repo to build from source:
```bash
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
```
### Optional: Re-generating ProtoBuf code
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
```bash
cd proto
protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..
```
Commit the generated files to git then.
### Building and installing decoderbufs
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
Then `make` and `make install` for each version. Here is an example:
```bash
# Install for Postgres 9.6 if I have multiple local versions
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
make
make install
```
make && make install
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
```bash
# MODULES
shared_preload_libraries = 'decoderbufs'
# MODULES
shared_preload_libraries = 'decoderbufs'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart)
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
```
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart)
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
```make
local replication <youruser> trust
host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
```
local replication <youruser> trust
host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
And restart PostgreSQL.
## Usage
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
```sql
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
-- peek at WAL changes using decoderbufs debug mode for SQL console
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- peek at WAL changes using decoderbufs debug mode for SQL console
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
```
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.

View File

@ -5,12 +5,9 @@ option java_outer_classname = "PgProto";
option optimize_for = SPEED;
enum Op {
UNKNOWN = -1;
INSERT = 0;
UPDATE = 1;
DELETE = 2;
BEGIN = 3;
COMMIT = 4;
}
message Point {
@ -30,15 +27,9 @@ message DatumMessage {
string datum_string = 8;
bytes datum_bytes = 9;
Point datum_point = 10;
bool datum_missing = 11;
}
}
message TypeInfo {
required string modifier = 1;
required bool value_optional = 2;
}
message RowMessage {
optional uint32 transaction_id = 1;
optional uint64 commit_time = 2;
@ -46,5 +37,4 @@ message RowMessage {
optional Op op = 4;
repeated DatumMessage new_tuple = 5;
repeated DatumMessage old_tuple = 6;
repeated TypeInfo new_typeinfo = 7;
}

View File

@ -1,48 +0,0 @@
Name: postgres-decoderbufs
Version: 0.10.0
Release: 1%{?dist}
Summary: PostgreSQL Protocol Buffers logical decoder plugin
License: MIT
URL: https://github.com/debezium/postgres-decoderbufs
%global full_version %{version}.Final
Source0: https://github.com/debezium/%{name}/archive/v%{full_version}.tar.gz
BuildRequires: gcc
BuildRequires: postgresql-devel >= 9.6, postgresql-server-devel >= 9.6
BuildRequires: postgis-devel >= 2
BuildRequires: protobuf-c-devel
Requires: protobuf-c
%{?postgresql_module_requires}
Recommends: postgis
%description
A PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers messages.
%prep
%setup -qn postgres-decoderbufs-%{full_version}
%build
%make_build
%install
%make_install
%files
%doc README.md
%license LICENSE
%{_libdir}/pgsql/decoderbufs.so
%{_datadir}/pgsql/extension/decoderbufs.control
%changelog
* Wed Oct 9 2019 - Jiri Pechanec <jpechane@redhat.com> 0.10.0-1
* Tue May 14 2019 - Jiri Pechanec <jpechane@redhat.com> 0.9.5-1
- Initial RPM packaging

View File

@ -60,11 +60,9 @@
#error Expecting timestamps to be represented as integers, not as floating-point.
#endif
#if PG_VERSION_NUM >= 170000
#define TUPLE_ACCESS(x) x
#else
#define TUPLE_ACCESS(x) &x->tuple
#endif
/* POSTGIS version define so it doesn't redef macros */
#define POSTGIS_PGSQL_VERSION 94
#include "liblwgeom.h"
PG_MODULE_MAGIC;
@ -80,6 +78,10 @@ typedef struct {
bool debug_mode;
} DecoderData;
/* GLOBALs for PostGIS dynamic OIDs */
Oid geometry_oid = InvalidOid;
Oid geography_oid = InvalidOid;
/* these must be available to pg_dlsym() */
extern void _PG_init(void);
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
@ -115,14 +117,9 @@ static void pg_decode_startup(LogicalDecodingContext *ctx,
elog(DEBUG1, "Entering startup callback");
data = palloc(sizeof(DecoderData));
#if PG_VERSION_NUM >= 90600
data->context = AllocSetContextCreate(
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_SIZES);
#else
data->context = AllocSetContextCreate(
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
#endif
data->debug_mode = false;
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
@ -170,6 +167,100 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
MemoryContextDelete(data->context);
}
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
// set PostGIS geometry type id (these are dynamic)
// TODO: Figure out how to make sure we get the typid's from postgis extension namespace
if (geometry_oid == InvalidOid) {
geometry_oid = TypenameGetTypid("geometry");
if (geometry_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geometry_oid);
}
}
if (geography_oid == InvalidOid) {
geography_oid = TypenameGetTypid("geography");
if (geography_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geography_oid);
}
}
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
}
/* convenience method to free up sub-messages */
static void row_message_destroy(Decoderbufs__RowMessage *msg) {
if (!msg) {
return;
}
if (msg->table) {
pfree(msg->table);
}
if (msg->n_new_tuple > 0) {
for (int i = 0; i < msg->n_new_tuple; i++) {
if (msg->new_tuple[i]) {
switch (msg->new_tuple[i]->datum_case) {
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
if (msg->new_tuple[i]->datum_string) {
pfree(msg->new_tuple[i]->datum_string);
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
if (msg->new_tuple[i]->datum_bytes.data) {
pfree(msg->new_tuple[i]->datum_bytes.data);
msg->new_tuple[i]->datum_bytes.data = NULL;
msg->new_tuple[i]->datum_bytes.len = 0;
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
if (msg->new_tuple[i]->datum_point) {
pfree(msg->new_tuple[i]->datum_point);
}
break;
default:
break;
}
pfree(msg->new_tuple[i]);
}
}
pfree(msg->new_tuple);
}
if (msg->n_old_tuple > 0) {
for (int i = 0; i < msg->n_old_tuple; i++) {
if (msg->old_tuple[i]) {
switch (msg->old_tuple[i]->datum_case) {
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
if (msg->old_tuple[i]->datum_string) {
pfree(msg->old_tuple[i]->datum_string);
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
if (msg->old_tuple[i]->datum_bytes.data) {
pfree(msg->old_tuple[i]->datum_bytes.data);
msg->old_tuple[i]->datum_bytes.data = NULL;
msg->old_tuple[i]->datum_bytes.len = 0;
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
if (msg->old_tuple[i]->datum_point) {
pfree(msg->old_tuple[i]->datum_point);
}
break;
default:
break;
}
pfree(msg->old_tuple[i]);
}
}
pfree(msg->old_tuple);
}
}
/* print tuple datums (only used for debug-mode) */
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
size_t n) {
@ -250,14 +341,71 @@ static void print_row_msg(StringInfo out, Decoderbufs__RowMessage *rmsg) {
}
/* this doesn't seem to be available in the public api (unfortunate) */
static double numeric_to_double_no_overflow(Numeric num) {
char *tmp;
double val;
char *endptr;
tmp = DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(num)));
/* unlike float8in, we ignore ERANGE from strtod */
val = strtod(tmp, &endptr);
if (*endptr != '\0') {
/* shouldn't happen ... */
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type double precision: \"%s\"",
tmp)));
}
pfree(tmp);
return val;
}
static bool geography_point_as_decoderbufs_point(Datum datum,
Decoderbufs__Point *p) {
GSERIALIZED *geom;
LWGEOM *lwgeom;
LWPOINT *point = NULL;
POINT2D p2d;
geom = (GSERIALIZED *)PG_DETOAST_DATUM(datum);
if (gserialized_get_type(geom) != POINTTYPE) {
return false;
}
lwgeom = lwgeom_from_gserialized(geom);
point = lwgeom_as_lwpoint(lwgeom);
if (lwgeom_is_empty(lwgeom)) {
return false;
}
getPoint2d_p(point->point, 0, &p2d);
if (p != NULL) {
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
dp.x = p2d.x;
dp.y = p2d.y;
memcpy(p, &dp, sizeof(dp));
elog(DEBUG1, "Translating geography to point: (x,y) = (%f,%f)", p->x, p->y);
}
return true;
}
/* set a datum value based on its OID specified by typid */
static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
Oid typoutput, Datum datum) {
Numeric num;
bytea *valptr = NULL;
const char *output = NULL;
Point *p = NULL;
Timestamp ts;
double duration;
TimeTzADT *timetz = NULL;
Interval *interval = NULL;
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
int size = 0;
@ -275,11 +423,8 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
break;
case INT8OID:
datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case OIDOID:
datum_msg->datum_int64 = (Oid) DatumGetUInt64(datum);
datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case FLOAT4OID:
@ -290,11 +435,17 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_double = DatumGetFloat8(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case NUMERICOID:
num = DatumGetNumeric(datum);
if (!numeric_is_nan(num)) {
datum_msg->datum_double = numeric_to_double_no_overflow(num);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
}
break;
case CASHOID:
datum_msg->datum_int64 = DatumGetCash(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case NUMERICOID:
case CHAROID:
case VARCHAROID:
case BPCHAROID:
@ -305,7 +456,6 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case BITOID:
case VARBITOID:
case UUIDOID:
case INTERVALOID:
output = OidOutputFunctionCall(typoutput, datum);
datum_msg->datum_string = pnstrdup(output, strlen(output));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING;
@ -313,21 +463,23 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case TIMESTAMPOID:
ts = DatumGetTimestamp(datum);
if (TIMESTAMP_NOT_FINITE(ts)) {
datum_msg->datum_int64 = ts;
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
}
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case TIMESTAMPTZOID:
ts = DatumGetTimestampTz(datum);
if (TIMESTAMP_NOT_FINITE(ts)) {
datum_msg->datum_int64 = ts;
} else {
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
}
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
}
case DATEOID:
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
@ -343,6 +495,15 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case INTERVALOID:
interval = DatumGetIntervalP(datum);
/*
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
*/
duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY);
datum_msg->datum_double = duration;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case BYTEAOID:
valptr = DatumGetByteaPCopy(datum);
size = VARSIZE(valptr) - VARHDRSZ;
@ -360,7 +521,13 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
break;
default:
{
// PostGIS uses dynamic OIDs so we need to check the type again here
if (typid == geometry_oid || typid == geography_oid) {
elog(DEBUG1, "Converting geography point to datum_point");
datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point));
geography_point_as_decoderbufs_point(datum, datum_msg->datum_point);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
} else {
int len;
elog(DEBUG1, "Encountered unknown typid: %d, using bytes", typid);
output = OidOutputFunctionCall(typoutput, datum);
@ -380,7 +547,7 @@ static int valid_attributes_count_from(TupleDesc tupdesc) {
int natt;
int count = 0;
for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr = TupleDescAttr(tupdesc, natt);
Form_pg_attribute attr = tupdesc->attrs[natt];
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
@ -408,7 +575,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
bool typisvarlena;
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
attr = TupleDescAttr(tupdesc, natt);
attr = tupdesc->attrs[natt];
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
@ -433,8 +600,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
if (!isnull) {
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
datum_msg.datum_missing = true;
datum_msg.datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING;
// TODO: Is there a way we can handle this?
elog(DEBUG1, "Not handling external on disk varlena at the moment.");
} else if (!typisvarlena) {
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
@ -453,125 +619,6 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
}
}
/* provide a metadata for new tuple */
static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
Relation relation, HeapTuple tuple,
TupleDesc tupdesc) {
int natt;
int valid_attr_cnt = 0;
elog(DEBUG1, "Adding metadata for %d columns", tupdesc->natts);
/* build column names and values */
for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr;
char *typ_mod;
Decoderbufs__TypeInfo typeinfo = DECODERBUFS__TYPE_INFO__INIT;
bool not_null;
attr = TupleDescAttr(tupdesc, natt);
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
continue;
}
not_null = attr->attnotnull;
typ_mod = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod));
elog(DEBUG1, "Adding typemodifier '%s' for column %d, optional %s", typ_mod, natt, !not_null ? "true" : "false");
typeinfo.modifier = typ_mod;
typeinfo.value_optional = !not_null;
tmsg[valid_attr_cnt] = palloc(sizeof(typeinfo));
memcpy(tmsg[valid_attr_cnt], &typeinfo, sizeof(typeinfo));
valid_attr_cnt++;
}
}
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering begin callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__BEGIN;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering commit callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__COMMIT;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* callback for individual changed tuples */
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) {
@ -585,16 +632,17 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleDesc tupdesc;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering decode_change callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
elog(DEBUG1, "Entering decode_change callback");
replident = relation->rd_rel->relreplident;
class_form = RelationGetForm(relation);
data = ctx->output_plugin_private;
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
RelationGetIndexList(relation);
is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
(replident == REPLICA_IDENTITY_DEFAULT &&
@ -604,11 +652,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* set common fields */
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname)));
@ -623,18 +667,11 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information");
tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
&change->data.tp.newtuple->tuple, tupdesc);
}
break;
case REORDER_BUFFER_CHANGE_UPDATE:
@ -649,23 +686,16 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation,
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
&change->data.tp.oldtuple->tuple, tupdesc);
}
if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information");
tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
&change->data.tp.newtuple->tuple, tupdesc);
}
}
break;
@ -681,7 +711,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation,
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
&change->data.tp.oldtuple->tuple, tupdesc);
} else {
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
}
@ -702,10 +732,14 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
/* free packed buffer */
pfree(packed);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
/* cleanup msg */
row_message_destroy(&rmsg);
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}

View File

@ -93,49 +93,6 @@ void decoderbufs__datum_message__free_unpacked
assert(message->base.descriptor == &decoderbufs__datum_message__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void decoderbufs__type_info__init
(Decoderbufs__TypeInfo *message)
{
static Decoderbufs__TypeInfo init_value = DECODERBUFS__TYPE_INFO__INIT;
*message = init_value;
}
size_t decoderbufs__type_info__get_packed_size
(const Decoderbufs__TypeInfo *message)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
}
size_t decoderbufs__type_info__pack
(const Decoderbufs__TypeInfo *message,
uint8_t *out)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
}
size_t decoderbufs__type_info__pack_to_buffer
(const Decoderbufs__TypeInfo *message,
ProtobufCBuffer *buffer)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
}
Decoderbufs__TypeInfo *
decoderbufs__type_info__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data)
{
return (Decoderbufs__TypeInfo *)
protobuf_c_message_unpack (&decoderbufs__type_info__descriptor,
allocator, len, data);
}
void decoderbufs__type_info__free_unpacked
(Decoderbufs__TypeInfo *message,
ProtobufCAllocator *allocator)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void decoderbufs__row_message__init
(Decoderbufs__RowMessage *message)
{
@ -230,7 +187,7 @@ const ProtobufCMessageDescriptor decoderbufs__point__descriptor =
(ProtobufCMessageInit) decoderbufs__point__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[11] =
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] =
{
{
"column_name",
@ -352,18 +309,6 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"datum_missing",
11,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_BOOL,
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_missing),
NULL,
NULL,
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
0, /* field[0] = column_name */
@ -374,14 +319,13 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
4, /* field[4] = datum_float */
2, /* field[2] = datum_int32 */
3, /* field[3] = datum_int64 */
10, /* field[10] = datum_missing */
9, /* field[9] = datum_point */
7, /* field[7] = datum_string */
};
static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 11 }
{ 0, 10 }
};
const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
{
@ -391,65 +335,14 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
"Decoderbufs__DatumMessage",
"decoderbufs",
sizeof(Decoderbufs__DatumMessage),
11,
10,
decoderbufs__datum_message__field_descriptors,
decoderbufs__datum_message__field_indices_by_name,
1, decoderbufs__datum_message__number_ranges,
(ProtobufCMessageInit) decoderbufs__datum_message__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__type_info__field_descriptors[2] =
{
{
"modifier",
1,
PROTOBUF_C_LABEL_REQUIRED,
PROTOBUF_C_TYPE_STRING,
0, /* quantifier_offset */
offsetof(Decoderbufs__TypeInfo, modifier),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"value_optional",
2,
PROTOBUF_C_LABEL_REQUIRED,
PROTOBUF_C_TYPE_BOOL,
0, /* quantifier_offset */
offsetof(Decoderbufs__TypeInfo, value_optional),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned decoderbufs__type_info__field_indices_by_name[] = {
0, /* field[0] = modifier */
1, /* field[1] = value_optional */
};
static const ProtobufCIntRange decoderbufs__type_info__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 2 }
};
const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor =
{
PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
"decoderbufs.TypeInfo",
"TypeInfo",
"Decoderbufs__TypeInfo",
"decoderbufs",
sizeof(Decoderbufs__TypeInfo),
2,
decoderbufs__type_info__field_descriptors,
decoderbufs__type_info__field_indices_by_name,
1, decoderbufs__type_info__number_ranges,
(ProtobufCMessageInit) decoderbufs__type_info__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[7] =
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[6] =
{
{
"transaction_id",
@ -523,23 +416,10 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"new_typeinfo",
7,
PROTOBUF_C_LABEL_REPEATED,
PROTOBUF_C_TYPE_MESSAGE,
offsetof(Decoderbufs__RowMessage, n_new_typeinfo),
offsetof(Decoderbufs__RowMessage, new_typeinfo),
&decoderbufs__type_info__descriptor,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
1, /* field[1] = commit_time */
4, /* field[4] = new_tuple */
6, /* field[6] = new_typeinfo */
5, /* field[5] = old_tuple */
3, /* field[3] = op */
2, /* field[2] = table */
@ -548,7 +428,7 @@ static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 7 }
{ 0, 6 }
};
const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
{
@ -558,33 +438,27 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
"Decoderbufs__RowMessage",
"decoderbufs",
sizeof(Decoderbufs__RowMessage),
7,
6,
decoderbufs__row_message__field_descriptors,
decoderbufs__row_message__field_indices_by_name,
1, decoderbufs__row_message__number_ranges,
(ProtobufCMessageInit) decoderbufs__row_message__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[6] =
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
{
{ "UNKNOWN", "DECODERBUFS__OP__UNKNOWN", -1 },
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 },
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
};
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
{-1, 0},{0, 6}
{0, 0},{0, 3}
};
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[6] =
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
{
{ "BEGIN", 4 },
{ "COMMIT", 5 },
{ "DELETE", 3 },
{ "INSERT", 1 },
{ "UNKNOWN", 0 },
{ "UPDATE", 2 },
{ "DELETE", 2 },
{ "INSERT", 0 },
{ "UPDATE", 1 },
};
const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
{
@ -593,9 +467,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
"Op",
"Decoderbufs__Op",
"decoderbufs",
6,
3,
decoderbufs__op__enum_values_by_number,
6,
3,
decoderbufs__op__enum_values_by_name,
1,
decoderbufs__op__value_ranges,

View File

@ -10,26 +10,22 @@ PROTOBUF_C__BEGIN_DECLS
#if PROTOBUF_C_VERSION_NUMBER < 1000000
# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
#elif 1002001 < PROTOBUF_C_MIN_COMPILER_VERSION
#elif 1001001 < PROTOBUF_C_MIN_COMPILER_VERSION
# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
#endif
typedef struct _Decoderbufs__Point Decoderbufs__Point;
typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage;
typedef struct _Decoderbufs__TypeInfo Decoderbufs__TypeInfo;
typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
/* --- enums --- */
typedef enum _Decoderbufs__Op {
DECODERBUFS__OP__UNKNOWN = -1,
DECODERBUFS__OP__INSERT = 0,
DECODERBUFS__OP__UPDATE = 1,
DECODERBUFS__OP__DELETE = 2,
DECODERBUFS__OP__BEGIN = 3,
DECODERBUFS__OP__COMMIT = 4
DECODERBUFS__OP__DELETE = 2
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
} Decoderbufs__Op;
@ -56,7 +52,6 @@ typedef enum {
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING = 11,
} Decoderbufs__DatumMessage__DatumCase;
struct _Decoderbufs__DatumMessage
@ -75,23 +70,11 @@ struct _Decoderbufs__DatumMessage
char *datum_string;
ProtobufCBinaryData datum_bytes;
Decoderbufs__Point *datum_point;
protobuf_c_boolean datum_missing;
};
};
#define DECODERBUFS__DATUM_MESSAGE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \
, NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {0} }
struct _Decoderbufs__TypeInfo
{
ProtobufCMessage base;
char *modifier;
protobuf_c_boolean value_optional;
};
#define DECODERBUFS__TYPE_INFO__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__type_info__descriptor) \
, NULL, 0 }
, NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {} }
struct _Decoderbufs__RowMessage
@ -108,12 +91,10 @@ struct _Decoderbufs__RowMessage
Decoderbufs__DatumMessage **new_tuple;
size_t n_old_tuple;
Decoderbufs__DatumMessage **old_tuple;
size_t n_new_typeinfo;
Decoderbufs__TypeInfo **new_typeinfo;
};
#define DECODERBUFS__ROW_MESSAGE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \
, 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL, 0,NULL }
, 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL }
/* Decoderbufs__Point methods */
@ -154,25 +135,6 @@ Decoderbufs__DatumMessage *
void decoderbufs__datum_message__free_unpacked
(Decoderbufs__DatumMessage *message,
ProtobufCAllocator *allocator);
/* Decoderbufs__TypeInfo methods */
void decoderbufs__type_info__init
(Decoderbufs__TypeInfo *message);
size_t decoderbufs__type_info__get_packed_size
(const Decoderbufs__TypeInfo *message);
size_t decoderbufs__type_info__pack
(const Decoderbufs__TypeInfo *message,
uint8_t *out);
size_t decoderbufs__type_info__pack_to_buffer
(const Decoderbufs__TypeInfo *message,
ProtobufCBuffer *buffer);
Decoderbufs__TypeInfo *
decoderbufs__type_info__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data);
void decoderbufs__type_info__free_unpacked
(Decoderbufs__TypeInfo *message,
ProtobufCAllocator *allocator);
/* Decoderbufs__RowMessage methods */
void decoderbufs__row_message__init
(Decoderbufs__RowMessage *message);
@ -200,9 +162,6 @@ typedef void (*Decoderbufs__Point_Closure)
typedef void (*Decoderbufs__DatumMessage_Closure)
(const Decoderbufs__DatumMessage *message,
void *closure_data);
typedef void (*Decoderbufs__TypeInfo_Closure)
(const Decoderbufs__TypeInfo *message,
void *closure_data);
typedef void (*Decoderbufs__RowMessage_Closure)
(const Decoderbufs__RowMessage *message,
void *closure_data);
@ -215,7 +174,6 @@ typedef void (*Decoderbufs__RowMessage_Closure)
extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor;
PROTOBUF_C__END_DECLS