Compare commits

..

No commits in common. "main" and "v0.9.0.Final" have entirely different histories.

7 changed files with 195 additions and 312 deletions

View File

@ -4,8 +4,13 @@ EXTENSION = decoderbufs
PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0') PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0')
PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0') PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0')
ifneq ($(USE_POSTGIS),false)
C_PARAMS = -DUSE_POSTGIS
POSTGIS_C_LDFLAGS = -L/usr/local/lib -llwgeom
endif
PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include $(C_PARAMS) PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include $(C_PARAMS)
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) $(POSTGIS_C_LDFLAGS)
OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o

116
README.md
View File

@ -10,17 +10,15 @@
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
## Thanks to ## Thanks to
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based - The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support - [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
## Dependencies ## Dependencies
This code depends on the following libraries and requires them for compilation: This code depends on the following libraries and requires them for compilation:
- [PostgreSQL](http://www.postgresql.org) 9.6+ * [PostgreSQL](http://www.postgresql.org) 9.6+
- [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization * [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
- [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support * [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
## Building ## Building
@ -28,107 +26,71 @@ This code depends on the following libraries and requires them for compilation:
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages. (for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
### Installing Dependencies ### Installing Dependencies
#### Debian #### Debian
```bash # Core build utilities
# Core build utilities apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
# PostGIS dependency # PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev apt-get install -f -y libproj-dev liblwgeom-dev
# Protobuf-c dependency (requires a non-stable Debian repo) # Protobuf-c dependency (requires a non-stable Debian repo)
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1 apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
```
When updating the ProtoBuf definition, also install the ProtoBuf C compiler: The above are taken from the Debezium [docker images](https://github.com/debezium/docker-images).
```bash
apt-get install -y protobuf-c-compiler=1.2.*
```
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
#### Other Linux distributions #### Other Linux distributions
You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro. You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1` Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1`
### Getting the source code
If you have all of the above prerequisites installed, clone this git repo to build from source:
```bash
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
```
### Optional: Re-generating ProtoBuf code
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
```bash
cd proto
protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..
```
Commit the generated files to git then.
### Building and installing decoderbufs ### Building and installing decoderbufs
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version. If you have all of the above prerequisites installed, clone this git repo to build from source. If you have multiple
postgres versions installed, you can select which version to install by altering your `$PATH` to point to the right version.
Then `make` and `make install` for each version. Here is an example: Then `make` and `make install` for each version. Here is an example:
```bash git clone https://github.com/debezium/postgres-decoderbufs.git
# Install for Postgres 9.6 if I have multiple local versions cd postgres-decoderbufs
export PATH=/usr/lib/postgresql/9.6/bin:$PATH # Install for Postgres 9.6 if I have multiple local versions
make export PATH=/usr/lib/postgresql/9.6/bin:$PATH
make install make
``` make install
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf: Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
```bash # MODULES
# MODULES shared_preload_libraries = 'decoderbufs'
shared_preload_libraries = 'decoderbufs'
# REPLICATION # REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart) wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart) max_wal_senders = 8 # max number of walsender processes (change requires restart)
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart) max_replication_slots = 4 # max number of replication slots (change requires restart)
```
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so: In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
```make local replication <youruser> trust
local replication <youruser> trust host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> 127.0.0.1/32 trust host replication <youruser> ::1/128 trust
host replication <youruser> ::1/128 trust
```
And restart PostgreSQL. And restart PostgreSQL.
## Usage ## Usage
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
```sql -- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE) -- peek at WAL changes using decoderbufs debug mode for SQL console
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- peek at WAL changes using decoderbufs debug mode for SQL console -- check the WAL position of logical replicators
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1'); select * from pg_replication_slots where slot_type = 'logical';
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
```
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case. If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.

View File

@ -5,12 +5,9 @@ option java_outer_classname = "PgProto";
option optimize_for = SPEED; option optimize_for = SPEED;
enum Op { enum Op {
UNKNOWN = -1;
INSERT = 0; INSERT = 0;
UPDATE = 1; UPDATE = 1;
DELETE = 2; DELETE = 2;
BEGIN = 3;
COMMIT = 4;
} }
message Point { message Point {
@ -30,7 +27,6 @@ message DatumMessage {
string datum_string = 8; string datum_string = 8;
bytes datum_bytes = 9; bytes datum_bytes = 9;
Point datum_point = 10; Point datum_point = 10;
bool datum_missing = 11;
} }
} }

View File

@ -1,48 +0,0 @@
Name: postgres-decoderbufs
Version: 0.10.0
Release: 1%{?dist}
Summary: PostgreSQL Protocol Buffers logical decoder plugin
License: MIT
URL: https://github.com/debezium/postgres-decoderbufs
%global full_version %{version}.Final
Source0: https://github.com/debezium/%{name}/archive/v%{full_version}.tar.gz
BuildRequires: gcc
BuildRequires: postgresql-devel >= 9.6, postgresql-server-devel >= 9.6
BuildRequires: postgis-devel >= 2
BuildRequires: protobuf-c-devel
Requires: protobuf-c
%{?postgresql_module_requires}
Recommends: postgis
%description
A PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers messages.
%prep
%setup -qn postgres-decoderbufs-%{full_version}
%build
%make_build
%install
%make_install
%files
%doc README.md
%license LICENSE
%{_libdir}/pgsql/decoderbufs.so
%{_datadir}/pgsql/extension/decoderbufs.control
%changelog
* Wed Oct 9 2019 - Jiri Pechanec <jpechane@redhat.com> 0.10.0-1
* Tue May 14 2019 - Jiri Pechanec <jpechane@redhat.com> 0.9.5-1
- Initial RPM packaging

View File

@ -60,10 +60,10 @@
#error Expecting timestamps to be represented as integers, not as floating-point. #error Expecting timestamps to be represented as integers, not as floating-point.
#endif #endif
#if PG_VERSION_NUM >= 170000 #ifdef USE_POSTGIS
#define TUPLE_ACCESS(x) x /* POSTGIS version define so it doesn't redef macros */
#else #define POSTGIS_PGSQL_VERSION 94
#define TUPLE_ACCESS(x) &x->tuple #include "liblwgeom.h"
#endif #endif
PG_MODULE_MAGIC; PG_MODULE_MAGIC;
@ -80,6 +80,10 @@ typedef struct {
bool debug_mode; bool debug_mode;
} DecoderData; } DecoderData;
/* GLOBALs for PostGIS dynamic OIDs */
Oid geometry_oid = InvalidOid;
Oid geography_oid = InvalidOid;
/* these must be available to pg_dlsym() */ /* these must be available to pg_dlsym() */
extern void _PG_init(void); extern void _PG_init(void);
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
@ -170,6 +174,32 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
MemoryContextDelete(data->context); MemoryContextDelete(data->context);
} }
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
#ifdef USE_POSTGIS
// set PostGIS geometry type id (these are dynamic)
// TODO: Figure out how to make sure we get the typid's from postgis extension namespace
if (geometry_oid == InvalidOid) {
geometry_oid = TypenameGetTypid("geometry");
if (geometry_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geometry_oid);
}
}
if (geography_oid == InvalidOid) {
geography_oid = TypenameGetTypid("geography");
if (geography_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geography_oid);
}
}
#endif
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
}
/* print tuple datums (only used for debug-mode) */ /* print tuple datums (only used for debug-mode) */
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup, static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
size_t n) { size_t n) {
@ -250,6 +280,42 @@ static void print_row_msg(StringInfo out, Decoderbufs__RowMessage *rmsg) {
} }
static bool geography_point_as_decoderbufs_point(Datum datum,
Decoderbufs__Point *p) {
#ifdef USE_POSTGIS
GSERIALIZED *geom;
LWGEOM *lwgeom;
LWPOINT *point = NULL;
POINT2D p2d;
geom = (GSERIALIZED *)PG_DETOAST_DATUM(datum);
if (gserialized_get_type(geom) != POINTTYPE) {
return false;
}
lwgeom = lwgeom_from_gserialized(geom);
point = lwgeom_as_lwpoint(lwgeom);
if (lwgeom_is_empty(lwgeom)) {
return false;
}
getPoint2d_p(point->point, 0, &p2d);
if (p != NULL) {
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
dp.x = p2d.x;
dp.y = p2d.y;
memcpy(p, &dp, sizeof(dp));
elog(DEBUG1, "Translating geography to point: (x,y) = (%f,%f)", p->x, p->y);
}
return true;
#else
elog(DEBUG1, "PostGIS support is off, recompile decoderbufs with USE_POSTGIS option!");
return false;
#endif
}
/* set a datum value based on its OID specified by typid */ /* set a datum value based on its OID specified by typid */
static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
Oid typoutput, Datum datum) { Oid typoutput, Datum datum) {
@ -257,7 +323,9 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
const char *output = NULL; const char *output = NULL;
Point *p = NULL; Point *p = NULL;
Timestamp ts; Timestamp ts;
double duration;
TimeTzADT *timetz = NULL; TimeTzADT *timetz = NULL;
Interval *interval = NULL;
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT; Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
int size = 0; int size = 0;
@ -275,11 +343,8 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
break; break;
case INT8OID: case INT8OID:
datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case OIDOID: case OIDOID:
datum_msg->datum_int64 = (Oid) DatumGetUInt64(datum); datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break; break;
case FLOAT4OID: case FLOAT4OID:
@ -305,7 +370,6 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case BITOID: case BITOID:
case VARBITOID: case VARBITOID:
case UUIDOID: case UUIDOID:
case INTERVALOID:
output = OidOutputFunctionCall(typoutput, datum); output = OidOutputFunctionCall(typoutput, datum);
datum_msg->datum_string = pnstrdup(output, strlen(output)); datum_msg->datum_string = pnstrdup(output, strlen(output));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING;
@ -313,21 +377,23 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case TIMESTAMPOID: case TIMESTAMPOID:
ts = DatumGetTimestamp(datum); ts = DatumGetTimestamp(datum);
if (TIMESTAMP_NOT_FINITE(ts)) { if (TIMESTAMP_NOT_FINITE(ts)) {
datum_msg->datum_int64 = ts; ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else { } else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
} }
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case TIMESTAMPTZOID: case TIMESTAMPTZOID:
ts = DatumGetTimestampTz(datum); ts = DatumGetTimestampTz(datum);
if (TIMESTAMP_NOT_FINITE(ts)) { if (TIMESTAMP_NOT_FINITE(ts)) {
datum_msg->datum_int64 = ts; ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
} else { errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
} datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; break;
break; }
case DATEOID: case DATEOID:
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */ /* simply get the number of days as the stored 32 bit value and convert to EPOCH */
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum)); datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
@ -343,6 +409,15 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0)); datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break; break;
case INTERVALOID:
interval = DatumGetIntervalP(datum);
/*
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
*/
duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY);
datum_msg->datum_double = duration;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case BYTEAOID: case BYTEAOID:
valptr = DatumGetByteaPCopy(datum); valptr = DatumGetByteaPCopy(datum);
size = VARSIZE(valptr) - VARHDRSZ; size = VARSIZE(valptr) - VARHDRSZ;
@ -360,7 +435,13 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
break; break;
default: default:
{ // PostGIS uses dynamic OIDs so we need to check the type again here
if (typid == geometry_oid || typid == geography_oid) {
elog(DEBUG1, "Converting geography point to datum_point");
datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point));
geography_point_as_decoderbufs_point(datum, datum_msg->datum_point);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
} else {
int len; int len;
elog(DEBUG1, "Encountered unknown typid: %d, using bytes", typid); elog(DEBUG1, "Encountered unknown typid: %d, using bytes", typid);
output = OidOutputFunctionCall(typoutput, datum); output = OidOutputFunctionCall(typoutput, datum);
@ -433,8 +514,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
if (!isnull) { if (!isnull) {
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) { if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
datum_msg.datum_missing = true; // TODO: Is there a way we can handle this?
datum_msg.datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING;
elog(DEBUG1, "Not handling external on disk varlena at the moment."); elog(DEBUG1, "Not handling external on disk varlena at the moment.");
} else if (!typisvarlena) { } else if (!typisvarlena) {
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval); set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
@ -488,90 +568,6 @@ static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
} }
} }
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering begin callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__BEGIN;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering commit callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__COMMIT;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* callback for individual changed tuples */ /* callback for individual changed tuples */
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) { Relation relation, ReorderBufferChange *change) {
@ -604,11 +600,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* set common fields */ /* set common fields */
rmsg.transaction_id = txn->xid; rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true; rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true; rmsg.has_commit_time = true;
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname))); NameStr(class_form->relname)));
@ -628,13 +620,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple; rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo = rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo); palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation, add_metadata_to_msg(rmsg.new_typeinfo, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
} }
break; break;
case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_UPDATE:
@ -649,7 +641,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc); &change->data.tp.oldtuple->tuple, tupdesc);
} }
if (change->data.tp.newtuple != NULL) { if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information"); elog(DEBUG1, "decoding new tuple information");
@ -659,13 +651,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple; rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo = rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo); palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation, add_metadata_to_msg(rmsg.new_typeinfo, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
} }
} }
break; break;
@ -681,7 +673,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc); &change->data.tp.oldtuple->tuple, tupdesc);
} else { } else {
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid "); elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
} }

View File

@ -230,7 +230,7 @@ const ProtobufCMessageDescriptor decoderbufs__point__descriptor =
(ProtobufCMessageInit) decoderbufs__point__init, (ProtobufCMessageInit) decoderbufs__point__init,
NULL,NULL,NULL /* reserved[123] */ NULL,NULL,NULL /* reserved[123] */
}; };
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[11] = static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] =
{ {
{ {
"column_name", "column_name",
@ -352,18 +352,6 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */ 0,NULL,NULL /* reserved1,reserved2, etc */
}, },
{
"datum_missing",
11,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_BOOL,
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_missing),
NULL,
NULL,
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
}; };
static const unsigned decoderbufs__datum_message__field_indices_by_name[] = { static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
0, /* field[0] = column_name */ 0, /* field[0] = column_name */
@ -374,14 +362,13 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
4, /* field[4] = datum_float */ 4, /* field[4] = datum_float */
2, /* field[2] = datum_int32 */ 2, /* field[2] = datum_int32 */
3, /* field[3] = datum_int64 */ 3, /* field[3] = datum_int64 */
10, /* field[10] = datum_missing */
9, /* field[9] = datum_point */ 9, /* field[9] = datum_point */
7, /* field[7] = datum_string */ 7, /* field[7] = datum_string */
}; };
static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] = static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] =
{ {
{ 1, 0 }, { 1, 0 },
{ 0, 11 } { 0, 10 }
}; };
const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
{ {
@ -391,7 +378,7 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
"Decoderbufs__DatumMessage", "Decoderbufs__DatumMessage",
"decoderbufs", "decoderbufs",
sizeof(Decoderbufs__DatumMessage), sizeof(Decoderbufs__DatumMessage),
11, 10,
decoderbufs__datum_message__field_descriptors, decoderbufs__datum_message__field_descriptors,
decoderbufs__datum_message__field_indices_by_name, decoderbufs__datum_message__field_indices_by_name,
1, decoderbufs__datum_message__number_ranges, 1, decoderbufs__datum_message__number_ranges,
@ -565,26 +552,20 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
(ProtobufCMessageInit) decoderbufs__row_message__init, (ProtobufCMessageInit) decoderbufs__row_message__init,
NULL,NULL,NULL /* reserved[123] */ NULL,NULL,NULL /* reserved[123] */
}; };
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[6] = static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
{ {
{ "UNKNOWN", "DECODERBUFS__OP__UNKNOWN", -1 },
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 }, { "INSERT", "DECODERBUFS__OP__INSERT", 0 },
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 }, { "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 }, { "DELETE", "DECODERBUFS__OP__DELETE", 2 },
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
}; };
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = { static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
{-1, 0},{0, 6} {0, 0},{0, 3}
}; };
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[6] = static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
{ {
{ "BEGIN", 4 }, { "DELETE", 2 },
{ "COMMIT", 5 }, { "INSERT", 0 },
{ "DELETE", 3 }, { "UPDATE", 1 },
{ "INSERT", 1 },
{ "UNKNOWN", 0 },
{ "UPDATE", 2 },
}; };
const ProtobufCEnumDescriptor decoderbufs__op__descriptor = const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
{ {
@ -593,9 +574,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
"Op", "Op",
"Decoderbufs__Op", "Decoderbufs__Op",
"decoderbufs", "decoderbufs",
6, 3,
decoderbufs__op__enum_values_by_number, decoderbufs__op__enum_values_by_number,
6, 3,
decoderbufs__op__enum_values_by_name, decoderbufs__op__enum_values_by_name,
1, 1,
decoderbufs__op__value_ranges, decoderbufs__op__value_ranges,

View File

@ -24,12 +24,9 @@ typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
/* --- enums --- */ /* --- enums --- */
typedef enum _Decoderbufs__Op { typedef enum _Decoderbufs__Op {
DECODERBUFS__OP__UNKNOWN = -1,
DECODERBUFS__OP__INSERT = 0, DECODERBUFS__OP__INSERT = 0,
DECODERBUFS__OP__UPDATE = 1, DECODERBUFS__OP__UPDATE = 1,
DECODERBUFS__OP__DELETE = 2, DECODERBUFS__OP__DELETE = 2
DECODERBUFS__OP__BEGIN = 3,
DECODERBUFS__OP__COMMIT = 4
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP) PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
} Decoderbufs__Op; } Decoderbufs__Op;
@ -56,7 +53,6 @@ typedef enum {
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING = 11,
} Decoderbufs__DatumMessage__DatumCase; } Decoderbufs__DatumMessage__DatumCase;
struct _Decoderbufs__DatumMessage struct _Decoderbufs__DatumMessage
@ -75,7 +71,6 @@ struct _Decoderbufs__DatumMessage
char *datum_string; char *datum_string;
ProtobufCBinaryData datum_bytes; ProtobufCBinaryData datum_bytes;
Decoderbufs__Point *datum_point; Decoderbufs__Point *datum_point;
protobuf_c_boolean datum_missing;
}; };
}; };
#define DECODERBUFS__DATUM_MESSAGE__INIT \ #define DECODERBUFS__DATUM_MESSAGE__INIT \