Compare commits

..

No commits in common. "main" and "v0.3.0" have entirely different histories.
main ... v0.3.0

7 changed files with 320 additions and 573 deletions

View File

@ -4,11 +4,11 @@ EXTENSION = decoderbufs
PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0') PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0')
PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0') PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0')
PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include $(C_PARAMS) PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) -L/usr/local/lib -llwgeom
OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o
PG_CONFIG ?= pg_config PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs) PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS) include $(PGXS)

146
README.md
View File

@ -10,130 +10,82 @@
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
## Thanks to ## Thanks to
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support - [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
## Dependencies ## Dependencies
This code depends on the following libraries and requires them for compilation: This code depends on the following libraries and requires them for compilation:
- [PostgreSQL](http://www.postgresql.org) 9.6+ * [PostgreSQL](http://www.postgresql.org) 9.6+
- [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization * [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.1+ - used for data serialization
- [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support * [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
## Building ## Building
`postgres-decoderbufs` has to be built from source after installing required dependencies. The required dependencies are first PostgreSQL To build you will need to install PostgreSQL (for pg_config), PostgreSQL server development packages, protobuf-c for the
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages. Protocol Buffer support and some PostGIS development packages
### Installing Dependencies ### Debian
#### Debian # Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
# PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev
```bash # Protobuf-c dependency (requires a non-stable Debian repo)
# Core build utilities add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6 apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
# PostGIS dependency The above are taken from the Debezium [docker images](https://github.com/debezium/docker-images).
apt-get install -f -y libproj-dev liblwgeom-dev
# Protobuf-c dependency (requires a non-stable Debian repo) ### Other Linux distributions
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
```
When updating the ProtoBuf definition, also install the ProtoBuf C compiler: You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
```bash
apt-get install -y protobuf-c-compiler=1.2.*
```
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
#### Other Linux distributions
You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1` Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1`
### Getting the source code If you have all of the prerequisites installed you should be able to just:
If you have all of the above prerequisites installed, clone this git repo to build from source: make && make install
```bash
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
```
### Optional: Re-generating ProtoBuf code
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
```bash
cd proto
protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..
```
Commit the generated files to git then.
### Building and installing decoderbufs
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
Then `make` and `make install` for each version. Here is an example:
```bash
# Install for Postgres 9.6 if I have multiple local versions
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
make
make install
```
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf: Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
```bash # MODULES
# MODULES shared_preload_libraries = 'decoderbufs'
shared_preload_libraries = 'decoderbufs'
# REPLICATION
# REPLICATION wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart) max_wal_senders = 8 # max number of walsender processes (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart) wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables max_replication_slots = 4 # max number of replication slots (change requires restart)
max_replication_slots = 4 # max number of replication slots (change requires restart)
```
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so: In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
```make local replication <youruser> trust
local replication <youruser> trust host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> 127.0.0.1/32 trust host replication <youruser> ::1/128 trust
host replication <youruser> ::1/128 trust
```
And restart PostgreSQL. And restart PostgreSQL.
## Usage ## Usage
-- can use SQL for demo purposes
```sql select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs'); -- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE) -- peek at WAL changes using decoderbufs debug mode for SQL console
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- peek at WAL changes using decoderbufs debug mode for SQL console -- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1'); select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1'); -- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
```
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case. If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.
The binary format will be consumed by the Debezium Postgres Connector. The binary format will be consumed by the Debezium Postgres Connector.
## Type Mappings ## Type Mappings
The following table shows how current PostgreSQL type OIDs are mapped to which decoderbuf fields: The following table shows how current PostgreSQL type OIDs are mapped to which decoderbuf fields:
@ -164,5 +116,5 @@ The following table shows how current PostgreSQL type OIDs are mapped to which d
## Support ## Support
File bug reports and feature requests using [Debezium's JIRA](https://issues.jboss.org/browse/DBZ) and the File bug reports and feature requests using [Debezium's JIRA](https://issues.jboss.org/browse/DBZ) and the
[postgresql-connector](https://issues.jboss.org/browse/DBZ/component/12323543) component [postgresql-connector](https://issues.jboss.org/browse/DBZ/component/12323543) component

View File

@ -5,12 +5,9 @@ option java_outer_classname = "PgProto";
option optimize_for = SPEED; option optimize_for = SPEED;
enum Op { enum Op {
UNKNOWN = -1;
INSERT = 0; INSERT = 0;
UPDATE = 1; UPDATE = 1;
DELETE = 2; DELETE = 2;
BEGIN = 3;
COMMIT = 4;
} }
message Point { message Point {
@ -30,15 +27,9 @@ message DatumMessage {
string datum_string = 8; string datum_string = 8;
bytes datum_bytes = 9; bytes datum_bytes = 9;
Point datum_point = 10; Point datum_point = 10;
bool datum_missing = 11;
} }
} }
message TypeInfo {
required string modifier = 1;
required bool value_optional = 2;
}
message RowMessage { message RowMessage {
optional uint32 transaction_id = 1; optional uint32 transaction_id = 1;
optional uint64 commit_time = 2; optional uint64 commit_time = 2;
@ -46,5 +37,4 @@ message RowMessage {
optional Op op = 4; optional Op op = 4;
repeated DatumMessage new_tuple = 5; repeated DatumMessage new_tuple = 5;
repeated DatumMessage old_tuple = 6; repeated DatumMessage old_tuple = 6;
repeated TypeInfo new_typeinfo = 7;
} }

View File

@ -1,48 +0,0 @@
Name: postgres-decoderbufs
Version: 0.10.0
Release: 1%{?dist}
Summary: PostgreSQL Protocol Buffers logical decoder plugin
License: MIT
URL: https://github.com/debezium/postgres-decoderbufs
%global full_version %{version}.Final
Source0: https://github.com/debezium/%{name}/archive/v%{full_version}.tar.gz
BuildRequires: gcc
BuildRequires: postgresql-devel >= 9.6, postgresql-server-devel >= 9.6
BuildRequires: postgis-devel >= 2
BuildRequires: protobuf-c-devel
Requires: protobuf-c
%{?postgresql_module_requires}
Recommends: postgis
%description
A PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers messages.
%prep
%setup -qn postgres-decoderbufs-%{full_version}
%build
%make_build
%install
%make_install
%files
%doc README.md
%license LICENSE
%{_libdir}/pgsql/decoderbufs.so
%{_datadir}/pgsql/extension/decoderbufs.control
%changelog
* Wed Oct 9 2019 - Jiri Pechanec <jpechane@redhat.com> 0.10.0-1
* Tue May 14 2019 - Jiri Pechanec <jpechane@redhat.com> 0.9.5-1
- Initial RPM packaging

View File

@ -60,11 +60,9 @@
#error Expecting timestamps to be represented as integers, not as floating-point. #error Expecting timestamps to be represented as integers, not as floating-point.
#endif #endif
#if PG_VERSION_NUM >= 170000 /* POSTGIS version define so it doesn't redef macros */
#define TUPLE_ACCESS(x) x #define POSTGIS_PGSQL_VERSION 94
#else #include "liblwgeom.h"
#define TUPLE_ACCESS(x) &x->tuple
#endif
PG_MODULE_MAGIC; PG_MODULE_MAGIC;
@ -80,6 +78,10 @@ typedef struct {
bool debug_mode; bool debug_mode;
} DecoderData; } DecoderData;
/* GLOBALs for PostGIS dynamic OIDs */
Oid geometry_oid = InvalidOid;
Oid geography_oid = InvalidOid;
/* these must be available to pg_dlsym() */ /* these must be available to pg_dlsym() */
extern void _PG_init(void); extern void _PG_init(void);
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
@ -109,20 +111,15 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) {
/* initialize this plugin */ /* initialize this plugin */
static void pg_decode_startup(LogicalDecodingContext *ctx, static void pg_decode_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt, bool is_init) { OutputPluginOptions *opt, bool is_init) {
elog(DEBUG1, "Entering startup callback");
ListCell *option; ListCell *option;
DecoderData *data; DecoderData *data;
elog(DEBUG1, "Entering startup callback");
data = palloc(sizeof(DecoderData)); data = palloc(sizeof(DecoderData));
#if PG_VERSION_NUM >= 90600
data->context = AllocSetContextCreate(
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_SIZES);
#else
data->context = AllocSetContextCreate( data->context = AllocSetContextCreate(
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE, ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
#endif
data->debug_mode = false; data->debug_mode = false;
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
@ -160,16 +157,107 @@ static void pg_decode_startup(LogicalDecodingContext *ctx,
/* cleanup this plugin's resources */ /* cleanup this plugin's resources */
static void pg_decode_shutdown(LogicalDecodingContext *ctx) { static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
DecoderData *data;
elog(DEBUG1, "Entering decode_shutdown callback"); elog(DEBUG1, "Entering decode_shutdown callback");
DecoderData *data = ctx->output_plugin_private;
data = ctx->output_plugin_private;
/* cleanup our own resources via memory context reset */ /* cleanup our own resources via memory context reset */
MemoryContextDelete(data->context); MemoryContextDelete(data->context);
} }
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
// set PostGIS geometry type id (these are dynamic)
// TODO: Figure out how to make sure we get the typid's from postgis extension namespace
if (geometry_oid == InvalidOid) {
geometry_oid = TypenameGetTypid("geometry");
if (geometry_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geometry_oid);
}
}
if (geography_oid == InvalidOid) {
geography_oid = TypenameGetTypid("geography");
if (geography_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geography_oid);
}
}
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
}
/* convenience method to free up sub-messages */
static void row_message_destroy(Decoderbufs__RowMessage *msg) {
if (!msg) {
return;
}
if (msg->table) {
pfree(msg->table);
}
if (msg->n_new_tuple > 0) {
for (int i = 0; i < msg->n_new_tuple; i++) {
if (msg->new_tuple[i]) {
switch (msg->new_tuple[i]->datum_case) {
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
if (msg->new_tuple[i]->datum_string) {
pfree(msg->new_tuple[i]->datum_string);
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
if (msg->new_tuple[i]->datum_bytes.data) {
pfree(msg->new_tuple[i]->datum_bytes.data);
msg->new_tuple[i]->datum_bytes.data = NULL;
msg->new_tuple[i]->datum_bytes.len = 0;
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
if (msg->new_tuple[i]->datum_point) {
pfree(msg->new_tuple[i]->datum_point);
}
break;
default:
break;
}
pfree(msg->new_tuple[i]);
}
}
pfree(msg->new_tuple);
}
if (msg->n_old_tuple > 0) {
for (int i = 0; i < msg->n_old_tuple; i++) {
if (msg->old_tuple[i]) {
switch (msg->old_tuple[i]->datum_case) {
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
if (msg->old_tuple[i]->datum_string) {
pfree(msg->old_tuple[i]->datum_string);
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
if (msg->old_tuple[i]->datum_bytes.data) {
pfree(msg->old_tuple[i]->datum_bytes.data);
msg->old_tuple[i]->datum_bytes.data = NULL;
msg->old_tuple[i]->datum_bytes.len = 0;
}
break;
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
if (msg->old_tuple[i]->datum_point) {
pfree(msg->old_tuple[i]->datum_point);
}
break;
default:
break;
}
pfree(msg->old_tuple[i]);
}
}
pfree(msg->old_tuple);
}
}
/* print tuple datums (only used for debug-mode) */ /* print tuple datums (only used for debug-mode) */
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup, static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
size_t n) { size_t n) {
@ -250,16 +338,72 @@ static void print_row_msg(StringInfo out, Decoderbufs__RowMessage *rmsg) {
} }
/* this doesn't seem to be available in the public api (unfortunate) */
static double numeric_to_double_no_overflow(Numeric num) {
char *tmp;
double val;
char *endptr;
tmp = DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(num)));
/* unlike float8in, we ignore ERANGE from strtod */
val = strtod(tmp, &endptr);
if (*endptr != '\0') {
/* shouldn't happen ... */
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type double precision: \"%s\"",
tmp)));
}
pfree(tmp);
return val;
}
static bool geography_point_as_decoderbufs_point(Datum datum,
Decoderbufs__Point *p) {
GSERIALIZED *geom;
LWGEOM *lwgeom;
LWPOINT *point = NULL;
POINT2D p2d;
geom = (GSERIALIZED *)PG_DETOAST_DATUM(datum);
if (gserialized_get_type(geom) != POINTTYPE) {
return false;
}
lwgeom = lwgeom_from_gserialized(geom);
point = lwgeom_as_lwpoint(lwgeom);
if (lwgeom_is_empty(lwgeom)) {
return false;
}
getPoint2d_p(point->point, 0, &p2d);
if (p != NULL) {
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
dp.x = p2d.x;
dp.y = p2d.y;
memcpy(p, &dp, sizeof(dp));
elog(DEBUG1, "Translating geography to point: (x,y) = (%f,%f)", p->x, p->y);
}
return true;
}
/* set a datum value based on its OID specified by typid */ /* set a datum value based on its OID specified by typid */
static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
Oid typoutput, Datum datum) { Oid typoutput, Datum datum) {
Numeric num;
bytea *valptr = NULL; bytea *valptr = NULL;
const char *output = NULL; const char *output = NULL;
Point *p = NULL; Point *p = NULL;
Timestamp ts; Timestamp ts = NULL;
double duration;
TimeTzADT *timetz = NULL; TimeTzADT *timetz = NULL;
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT; Interval *interval = NULL;
int size = 0; int size = 0;
switch (typid) { switch (typid) {
case BOOLOID: case BOOLOID:
@ -275,11 +419,8 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
break; break;
case INT8OID: case INT8OID:
datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case OIDOID: case OIDOID:
datum_msg->datum_int64 = (Oid) DatumGetUInt64(datum); datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break; break;
case FLOAT4OID: case FLOAT4OID:
@ -290,22 +431,27 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_double = DatumGetFloat8(datum); datum_msg->datum_double = DatumGetFloat8(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break; break;
case CASHOID: case NUMERICOID:
num = DatumGetNumeric(datum);
if (!numeric_is_nan(num)) {
datum_msg->datum_double = numeric_to_double_no_overflow(num);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
}
break;
case CASHOID:
datum_msg->datum_int64 = DatumGetCash(datum); datum_msg->datum_int64 = DatumGetCash(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break; break;
case NUMERICOID:
case CHAROID: case CHAROID:
case VARCHAROID: case VARCHAROID:
case BPCHAROID: case BPCHAROID:
case TEXTOID: case TEXTOID:
case JSONOID: case JSONOID:
case JSONBOID: case JSONBOID:
case XMLOID: case XMLOID:
case BITOID: case BITOID:
case VARBITOID: case VARBITOID:
case UUIDOID: case UUIDOID:
case INTERVALOID:
output = OidOutputFunctionCall(typoutput, datum); output = OidOutputFunctionCall(typoutput, datum);
datum_msg->datum_string = pnstrdup(output, strlen(output)); datum_msg->datum_string = pnstrdup(output, strlen(output));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING;
@ -313,22 +459,24 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case TIMESTAMPOID: case TIMESTAMPOID:
ts = DatumGetTimestamp(datum); ts = DatumGetTimestamp(datum);
if (TIMESTAMP_NOT_FINITE(ts)) { if (TIMESTAMP_NOT_FINITE(ts)) {
datum_msg->datum_int64 = ts; ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else { } else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
} }
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case TIMESTAMPTZOID: case TIMESTAMPTZOID:
ts = DatumGetTimestampTz(datum); ts = DatumGetTimestampTz(datum);
if (TIMESTAMP_NOT_FINITE(ts)) { if (TIMESTAMP_NOT_FINITE(ts)) {
datum_msg->datum_int64 = ts; ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
} else { errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
} datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; break;
break; }
case DATEOID: case DATEOID:
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */ /* simply get the number of days as the stored 32 bit value and convert to EPOCH */
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum)); datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
@ -336,12 +484,21 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case TIMEOID: case TIMEOID:
datum_msg->datum_int64 = DatumGetTimeADT(datum); datum_msg->datum_int64 = DatumGetTimeADT(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break; break;
case TIMETZOID: case TIMETZOID:
timetz = DatumGetTimeTzADTP(datum); timetz = DatumGetTimeTzADTP(datum);
/* use GMT-equivalent time */ /* use GMT-equivalent time */
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0)); datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case INTERVALOID:
interval = DatumGetIntervalP(datum);
/*
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
*/
duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY);
datum_msg->datum_double = duration;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break; break;
case BYTEAOID: case BYTEAOID:
valptr = DatumGetByteaPCopy(datum); valptr = DatumGetByteaPCopy(datum);
@ -353,6 +510,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
break; break;
case POINTOID: case POINTOID:
p = DatumGetPointP(datum); p = DatumGetPointP(datum);
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
dp.x = p->x; dp.x = p->x;
dp.y = p->y; dp.y = p->y;
datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point)); datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point));
@ -360,11 +518,16 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
break; break;
default: default:
{ // PostGIS uses dynamic OIDs so we need to check the type again here
int len; if (typid == geometry_oid || typid == geography_oid) {
elog(DEBUG1, "Encountered unknown typid: %d, using bytes", typid); elog(DEBUG1, "Converting geography point to datum_point");
datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point));
geography_point_as_decoderbufs_point(datum, datum_msg->datum_point);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
} else {
elog(WARNING, "Encountered unknown typid: %d, using bytes", typid);
output = OidOutputFunctionCall(typoutput, datum); output = OidOutputFunctionCall(typoutput, datum);
len = strlen(output); int len = strlen(output);
size = sizeof(char) * len; size = sizeof(char) * len;
datum_msg->datum_bytes.data = palloc(size); datum_msg->datum_bytes.data = palloc(size);
memcpy(datum_msg->datum_bytes.data, (uint8_t *)output, size); memcpy(datum_msg->datum_bytes.data, (uint8_t *)output, size);
@ -380,15 +543,15 @@ static int valid_attributes_count_from(TupleDesc tupdesc) {
int natt; int natt;
int count = 0; int count = 0;
for (natt = 0; natt < tupdesc->natts; natt++) { for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr = TupleDescAttr(tupdesc, natt); Form_pg_attribute attr = tupdesc->attrs[natt];
/* skip dropped columns and system columns */ /* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) { if (attr->attisdropped || attr->attnum < 0) {
continue; continue;
} }
count++; count++;
} }
return count; return count;
} }
/* convert a PG tuple to an array of DatumMessage(s) */ /* convert a PG tuple to an array of DatumMessage(s) */
@ -396,7 +559,6 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
Relation relation, HeapTuple tuple, Relation relation, HeapTuple tuple,
TupleDesc tupdesc) { TupleDesc tupdesc) {
int natt; int natt;
int valid_attr_cnt = 0;
elog(DEBUG1, "processing tuple with %d columns", tupdesc->natts); elog(DEBUG1, "processing tuple with %d columns", tupdesc->natts);
/* build column names and values */ /* build column names and values */
for (natt = 0; natt < tupdesc->natts; natt++) { for (natt = 0; natt < tupdesc->natts; natt++) {
@ -404,24 +566,22 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
Datum origval; Datum origval;
bool isnull; bool isnull;
const char *attrName; const char *attrName;
Oid typoutput;
bool typisvarlena; attr = tupdesc->attrs[natt];
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
attr = TupleDescAttr(tupdesc, natt);
/* skip dropped columns and system columns */ /* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) { if (attr->attisdropped || attr->attnum < 0) {
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column"); elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
continue; continue;
} }
attrName = quote_identifier(NameStr(attr->attname)); attrName = quote_identifier(NameStr(attr->attname));
elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName); elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName);
/* set the column name */ /* set the column name */
datum_msg.column_name = (char *)attrName; datum_msg.column_name = attrName;
/* set datum from tuple */ /* set datum from tuple */
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull); origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
@ -429,13 +589,14 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
datum_msg.column_type = attr->atttypid; datum_msg.column_type = attr->atttypid;
datum_msg.has_column_type = true; datum_msg.has_column_type = true;
Oid typoutput;
bool typisvarlena;
/* query output function */ /* query output function */
getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
if (!isnull) { if (!isnull) {
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) { if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
datum_msg.datum_missing = true; // TODO: Is there a way we can handle this?
datum_msg.datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING; elog(WARNING, "Not handling external on disk varlena at the moment.");
elog(DEBUG1, "Not handling external on disk varlena at the moment.");
} else if (!typisvarlena) { } else if (!typisvarlena) {
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval); set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
} else { } else {
@ -443,177 +604,47 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
set_datum_value(&datum_msg, attr->atttypid, typoutput, val); set_datum_value(&datum_msg, attr->atttypid, typoutput, val);
} }
} else { } else {
elog(DEBUG1, "column %s is null, ignoring value", attrName); elog(DEBUG1, "column %s is null, ignoring value", attrName);
} }
tmsg[valid_attr_cnt] = palloc(sizeof(datum_msg)); tmsg[natt] = palloc(sizeof(datum_msg));
memcpy(tmsg[valid_attr_cnt], &datum_msg, sizeof(datum_msg)); memcpy(tmsg[natt], &datum_msg, sizeof(datum_msg));
valid_attr_cnt++;
} }
} }
/* provide a metadata for new tuple */
static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
Relation relation, HeapTuple tuple,
TupleDesc tupdesc) {
int natt;
int valid_attr_cnt = 0;
elog(DEBUG1, "Adding metadata for %d columns", tupdesc->natts);
/* build column names and values */
for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr;
char *typ_mod;
Decoderbufs__TypeInfo typeinfo = DECODERBUFS__TYPE_INFO__INIT;
bool not_null;
attr = TupleDescAttr(tupdesc, natt);
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
continue;
}
not_null = attr->attnotnull;
typ_mod = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod));
elog(DEBUG1, "Adding typemodifier '%s' for column %d, optional %s", typ_mod, natt, !not_null ? "true" : "false");
typeinfo.modifier = typ_mod;
typeinfo.value_optional = !not_null;
tmsg[valid_attr_cnt] = palloc(sizeof(typeinfo));
memcpy(tmsg[valid_attr_cnt], &typeinfo, sizeof(typeinfo));
valid_attr_cnt++;
}
}
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering begin callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__BEGIN;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering commit callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__COMMIT;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* callback for individual changed tuples */ /* callback for individual changed tuples */
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) { Relation relation, ReorderBufferChange *change) {
elog(DEBUG1, "Entering decode_change callback");
DecoderData *data; DecoderData *data;
MemoryContext old; MemoryContext old;
Form_pg_class class_form; Form_pg_class class_form;
char replident; char replident = relation->rd_rel->relreplident;
bool is_rel_non_selective; bool is_rel_non_selective;
const char *selectiveInfo; const char *selectiveInfo = is_rel_non_selective ? "non selective" : "selective";
TupleDesc tupdesc;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT; Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
class_form = RelationGetForm(relation);
elog(DEBUG1, "Entering decode_change callback"); data = ctx->output_plugin_private;
/* Avoid leaking memory by using and resetting our own context */ /* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context); old = MemoryContextSwitchTo(data->context);
replident = relation->rd_rel->relreplident;
class_form = RelationGetForm(relation);
RelationGetIndexList(relation); RelationGetIndexList(relation);
is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING || is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
(replident == REPLICA_IDENTITY_DEFAULT && (replident == REPLICA_IDENTITY_DEFAULT &&
!OidIsValid(relation->rd_replidindex))); !OidIsValid(relation->rd_replidindex)));
selectiveInfo = is_rel_non_selective ? "non selective" : "selective";
/* set common fields */ /* set common fields */
rmsg.transaction_id = txn->xid; rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true; rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true; rmsg.has_commit_time = true;
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname))); NameStr(class_form->relname)));
/* decode different operation types */ /* decode different operation types */
switch (change->action) { switch (change->action) {
case REORDER_BUFFER_CHANGE_INSERT: case REORDER_BUFFER_CHANGE_INSERT:
@ -622,19 +653,12 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.has_op = true; rmsg.has_op = true;
if (change->data.tp.newtuple != NULL) { if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information"); elog(DEBUG1, "decoding new tuple information");
tupdesc = RelationGetDescr(relation); TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
} }
break; break;
case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_UPDATE:
@ -644,28 +668,21 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (!is_rel_non_selective) { if (!is_rel_non_selective) {
if (change->data.tp.oldtuple != NULL) { if (change->data.tp.oldtuple != NULL) {
elog(DEBUG1, "decoding old tuple information"); elog(DEBUG1, "decoding old tuple information");
tupdesc = RelationGetDescr(relation); TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_old_tuple = valid_attributes_count_from(tupdesc); rmsg.n_old_tuple = valid_attributes_count_from(tupdesc);
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc); &change->data.tp.oldtuple->tuple, tupdesc);
} }
if (change->data.tp.newtuple != NULL) { if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information"); elog(DEBUG1, "decoding new tuple information");
tupdesc = RelationGetDescr(relation); TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc); rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
} }
} }
break; break;
@ -676,18 +693,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* if there was no PK, we only know that a delete happened */ /* if there was no PK, we only know that a delete happened */
if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) { if (!is_rel_non_selective && change->data.tp.oldtuple != NULL) {
elog(DEBUG1, "decoding old tuple information"); elog(DEBUG1, "decoding old tuple information");
tupdesc = RelationGetDescr(relation); TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_old_tuple = valid_attributes_count_from(tupdesc); rmsg.n_old_tuple = valid_attributes_count_from(tupdesc);
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc); &change->data.tp.oldtuple->tuple, tupdesc);
} else { } else {
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid "); elog(WARNING, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
} }
break; break;
default: default:
elog(WARNING, "unknown change action"); elog(WARNING, "unknown change action");
Assert(0); Assert(0);
break; break;
} }
@ -702,10 +719,14 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
void *packed = palloc(psize); void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed); size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize); appendBinaryStringInfo(ctx->out, packed, ssize);
/* free packed buffer */
pfree(packed);
} }
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */ /* cleanup msg */
row_message_destroy(&rmsg);
MemoryContextSwitchTo(old); MemoryContextSwitchTo(old);
MemoryContextReset(data->context); MemoryContextReset(data->context);
} }

View File

@ -93,49 +93,6 @@ void decoderbufs__datum_message__free_unpacked
assert(message->base.descriptor == &decoderbufs__datum_message__descriptor); assert(message->base.descriptor == &decoderbufs__datum_message__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
} }
void decoderbufs__type_info__init
(Decoderbufs__TypeInfo *message)
{
static Decoderbufs__TypeInfo init_value = DECODERBUFS__TYPE_INFO__INIT;
*message = init_value;
}
size_t decoderbufs__type_info__get_packed_size
(const Decoderbufs__TypeInfo *message)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
}
size_t decoderbufs__type_info__pack
(const Decoderbufs__TypeInfo *message,
uint8_t *out)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
}
size_t decoderbufs__type_info__pack_to_buffer
(const Decoderbufs__TypeInfo *message,
ProtobufCBuffer *buffer)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
}
Decoderbufs__TypeInfo *
decoderbufs__type_info__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data)
{
return (Decoderbufs__TypeInfo *)
protobuf_c_message_unpack (&decoderbufs__type_info__descriptor,
allocator, len, data);
}
void decoderbufs__type_info__free_unpacked
(Decoderbufs__TypeInfo *message,
ProtobufCAllocator *allocator)
{
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void decoderbufs__row_message__init void decoderbufs__row_message__init
(Decoderbufs__RowMessage *message) (Decoderbufs__RowMessage *message)
{ {
@ -230,7 +187,7 @@ const ProtobufCMessageDescriptor decoderbufs__point__descriptor =
(ProtobufCMessageInit) decoderbufs__point__init, (ProtobufCMessageInit) decoderbufs__point__init,
NULL,NULL,NULL /* reserved[123] */ NULL,NULL,NULL /* reserved[123] */
}; };
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[11] = static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] =
{ {
{ {
"column_name", "column_name",
@ -352,18 +309,6 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */ 0,NULL,NULL /* reserved1,reserved2, etc */
}, },
{
"datum_missing",
11,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_BOOL,
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_missing),
NULL,
NULL,
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
}; };
static const unsigned decoderbufs__datum_message__field_indices_by_name[] = { static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
0, /* field[0] = column_name */ 0, /* field[0] = column_name */
@ -374,14 +319,13 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
4, /* field[4] = datum_float */ 4, /* field[4] = datum_float */
2, /* field[2] = datum_int32 */ 2, /* field[2] = datum_int32 */
3, /* field[3] = datum_int64 */ 3, /* field[3] = datum_int64 */
10, /* field[10] = datum_missing */
9, /* field[9] = datum_point */ 9, /* field[9] = datum_point */
7, /* field[7] = datum_string */ 7, /* field[7] = datum_string */
}; };
static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] = static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] =
{ {
{ 1, 0 }, { 1, 0 },
{ 0, 11 } { 0, 10 }
}; };
const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
{ {
@ -391,65 +335,14 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
"Decoderbufs__DatumMessage", "Decoderbufs__DatumMessage",
"decoderbufs", "decoderbufs",
sizeof(Decoderbufs__DatumMessage), sizeof(Decoderbufs__DatumMessage),
11, 10,
decoderbufs__datum_message__field_descriptors, decoderbufs__datum_message__field_descriptors,
decoderbufs__datum_message__field_indices_by_name, decoderbufs__datum_message__field_indices_by_name,
1, decoderbufs__datum_message__number_ranges, 1, decoderbufs__datum_message__number_ranges,
(ProtobufCMessageInit) decoderbufs__datum_message__init, (ProtobufCMessageInit) decoderbufs__datum_message__init,
NULL,NULL,NULL /* reserved[123] */ NULL,NULL,NULL /* reserved[123] */
}; };
static const ProtobufCFieldDescriptor decoderbufs__type_info__field_descriptors[2] = static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[6] =
{
{
"modifier",
1,
PROTOBUF_C_LABEL_REQUIRED,
PROTOBUF_C_TYPE_STRING,
0, /* quantifier_offset */
offsetof(Decoderbufs__TypeInfo, modifier),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"value_optional",
2,
PROTOBUF_C_LABEL_REQUIRED,
PROTOBUF_C_TYPE_BOOL,
0, /* quantifier_offset */
offsetof(Decoderbufs__TypeInfo, value_optional),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned decoderbufs__type_info__field_indices_by_name[] = {
0, /* field[0] = modifier */
1, /* field[1] = value_optional */
};
static const ProtobufCIntRange decoderbufs__type_info__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 2 }
};
const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor =
{
PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
"decoderbufs.TypeInfo",
"TypeInfo",
"Decoderbufs__TypeInfo",
"decoderbufs",
sizeof(Decoderbufs__TypeInfo),
2,
decoderbufs__type_info__field_descriptors,
decoderbufs__type_info__field_indices_by_name,
1, decoderbufs__type_info__number_ranges,
(ProtobufCMessageInit) decoderbufs__type_info__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[7] =
{ {
{ {
"transaction_id", "transaction_id",
@ -523,23 +416,10 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
0, /* flags */ 0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */ 0,NULL,NULL /* reserved1,reserved2, etc */
}, },
{
"new_typeinfo",
7,
PROTOBUF_C_LABEL_REPEATED,
PROTOBUF_C_TYPE_MESSAGE,
offsetof(Decoderbufs__RowMessage, n_new_typeinfo),
offsetof(Decoderbufs__RowMessage, new_typeinfo),
&decoderbufs__type_info__descriptor,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
}; };
static const unsigned decoderbufs__row_message__field_indices_by_name[] = { static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
1, /* field[1] = commit_time */ 1, /* field[1] = commit_time */
4, /* field[4] = new_tuple */ 4, /* field[4] = new_tuple */
6, /* field[6] = new_typeinfo */
5, /* field[5] = old_tuple */ 5, /* field[5] = old_tuple */
3, /* field[3] = op */ 3, /* field[3] = op */
2, /* field[2] = table */ 2, /* field[2] = table */
@ -548,7 +428,7 @@ static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] = static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] =
{ {
{ 1, 0 }, { 1, 0 },
{ 0, 7 } { 0, 6 }
}; };
const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor = const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
{ {
@ -558,33 +438,27 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
"Decoderbufs__RowMessage", "Decoderbufs__RowMessage",
"decoderbufs", "decoderbufs",
sizeof(Decoderbufs__RowMessage), sizeof(Decoderbufs__RowMessage),
7, 6,
decoderbufs__row_message__field_descriptors, decoderbufs__row_message__field_descriptors,
decoderbufs__row_message__field_indices_by_name, decoderbufs__row_message__field_indices_by_name,
1, decoderbufs__row_message__number_ranges, 1, decoderbufs__row_message__number_ranges,
(ProtobufCMessageInit) decoderbufs__row_message__init, (ProtobufCMessageInit) decoderbufs__row_message__init,
NULL,NULL,NULL /* reserved[123] */ NULL,NULL,NULL /* reserved[123] */
}; };
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[6] = static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
{ {
{ "UNKNOWN", "DECODERBUFS__OP__UNKNOWN", -1 },
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 }, { "INSERT", "DECODERBUFS__OP__INSERT", 0 },
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 }, { "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 }, { "DELETE", "DECODERBUFS__OP__DELETE", 2 },
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
}; };
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = { static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
{-1, 0},{0, 6} {0, 0},{0, 3}
}; };
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[6] = static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
{ {
{ "BEGIN", 4 }, { "DELETE", 2 },
{ "COMMIT", 5 }, { "INSERT", 0 },
{ "DELETE", 3 }, { "UPDATE", 1 },
{ "INSERT", 1 },
{ "UNKNOWN", 0 },
{ "UPDATE", 2 },
}; };
const ProtobufCEnumDescriptor decoderbufs__op__descriptor = const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
{ {
@ -593,9 +467,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
"Op", "Op",
"Decoderbufs__Op", "Decoderbufs__Op",
"decoderbufs", "decoderbufs",
6, 3,
decoderbufs__op__enum_values_by_number, decoderbufs__op__enum_values_by_number,
6, 3,
decoderbufs__op__enum_values_by_name, decoderbufs__op__enum_values_by_name,
1, 1,
decoderbufs__op__value_ranges, decoderbufs__op__value_ranges,

View File

@ -10,26 +10,22 @@ PROTOBUF_C__BEGIN_DECLS
#if PROTOBUF_C_VERSION_NUMBER < 1000000 #if PROTOBUF_C_VERSION_NUMBER < 1000000
# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers. # error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
#elif 1002001 < PROTOBUF_C_MIN_COMPILER_VERSION #elif 1001001 < PROTOBUF_C_MIN_COMPILER_VERSION
# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c. # error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
#endif #endif
typedef struct _Decoderbufs__Point Decoderbufs__Point; typedef struct _Decoderbufs__Point Decoderbufs__Point;
typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage; typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage;
typedef struct _Decoderbufs__TypeInfo Decoderbufs__TypeInfo;
typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage; typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
/* --- enums --- */ /* --- enums --- */
typedef enum _Decoderbufs__Op { typedef enum _Decoderbufs__Op {
DECODERBUFS__OP__UNKNOWN = -1,
DECODERBUFS__OP__INSERT = 0, DECODERBUFS__OP__INSERT = 0,
DECODERBUFS__OP__UPDATE = 1, DECODERBUFS__OP__UPDATE = 1,
DECODERBUFS__OP__DELETE = 2, DECODERBUFS__OP__DELETE = 2
DECODERBUFS__OP__BEGIN = 3,
DECODERBUFS__OP__COMMIT = 4
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP) PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
} Decoderbufs__Op; } Decoderbufs__Op;
@ -56,7 +52,6 @@ typedef enum {
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING = 11,
} Decoderbufs__DatumMessage__DatumCase; } Decoderbufs__DatumMessage__DatumCase;
struct _Decoderbufs__DatumMessage struct _Decoderbufs__DatumMessage
@ -75,23 +70,11 @@ struct _Decoderbufs__DatumMessage
char *datum_string; char *datum_string;
ProtobufCBinaryData datum_bytes; ProtobufCBinaryData datum_bytes;
Decoderbufs__Point *datum_point; Decoderbufs__Point *datum_point;
protobuf_c_boolean datum_missing;
}; };
}; };
#define DECODERBUFS__DATUM_MESSAGE__INIT \ #define DECODERBUFS__DATUM_MESSAGE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \ { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \
, NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {0} } , NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {} }
struct _Decoderbufs__TypeInfo
{
ProtobufCMessage base;
char *modifier;
protobuf_c_boolean value_optional;
};
#define DECODERBUFS__TYPE_INFO__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__type_info__descriptor) \
, NULL, 0 }
struct _Decoderbufs__RowMessage struct _Decoderbufs__RowMessage
@ -108,12 +91,10 @@ struct _Decoderbufs__RowMessage
Decoderbufs__DatumMessage **new_tuple; Decoderbufs__DatumMessage **new_tuple;
size_t n_old_tuple; size_t n_old_tuple;
Decoderbufs__DatumMessage **old_tuple; Decoderbufs__DatumMessage **old_tuple;
size_t n_new_typeinfo;
Decoderbufs__TypeInfo **new_typeinfo;
}; };
#define DECODERBUFS__ROW_MESSAGE__INIT \ #define DECODERBUFS__ROW_MESSAGE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \ { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \
, 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL, 0,NULL } , 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL }
/* Decoderbufs__Point methods */ /* Decoderbufs__Point methods */
@ -154,25 +135,6 @@ Decoderbufs__DatumMessage *
void decoderbufs__datum_message__free_unpacked void decoderbufs__datum_message__free_unpacked
(Decoderbufs__DatumMessage *message, (Decoderbufs__DatumMessage *message,
ProtobufCAllocator *allocator); ProtobufCAllocator *allocator);
/* Decoderbufs__TypeInfo methods */
void decoderbufs__type_info__init
(Decoderbufs__TypeInfo *message);
size_t decoderbufs__type_info__get_packed_size
(const Decoderbufs__TypeInfo *message);
size_t decoderbufs__type_info__pack
(const Decoderbufs__TypeInfo *message,
uint8_t *out);
size_t decoderbufs__type_info__pack_to_buffer
(const Decoderbufs__TypeInfo *message,
ProtobufCBuffer *buffer);
Decoderbufs__TypeInfo *
decoderbufs__type_info__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data);
void decoderbufs__type_info__free_unpacked
(Decoderbufs__TypeInfo *message,
ProtobufCAllocator *allocator);
/* Decoderbufs__RowMessage methods */ /* Decoderbufs__RowMessage methods */
void decoderbufs__row_message__init void decoderbufs__row_message__init
(Decoderbufs__RowMessage *message); (Decoderbufs__RowMessage *message);
@ -200,9 +162,6 @@ typedef void (*Decoderbufs__Point_Closure)
typedef void (*Decoderbufs__DatumMessage_Closure) typedef void (*Decoderbufs__DatumMessage_Closure)
(const Decoderbufs__DatumMessage *message, (const Decoderbufs__DatumMessage *message,
void *closure_data); void *closure_data);
typedef void (*Decoderbufs__TypeInfo_Closure)
(const Decoderbufs__TypeInfo *message,
void *closure_data);
typedef void (*Decoderbufs__RowMessage_Closure) typedef void (*Decoderbufs__RowMessage_Closure)
(const Decoderbufs__RowMessage *message, (const Decoderbufs__RowMessage *message,
void *closure_data); void *closure_data);
@ -215,7 +174,6 @@ typedef void (*Decoderbufs__RowMessage_Closure)
extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor; extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor;
extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor;
PROTOBUF_C__END_DECLS PROTOBUF_C__END_DECLS