Compare commits

...

19 Commits

Author SHA1 Message Date
Bradford D. Boyle 84f30f3f44 DBZ-8403 Fix PostgreSQL 17 compilation
When compiling against PG17 on Debian testing with gcc 14, building
fails because of incompatible-pointer-type error. This commit updates
the macros added in e1f8714 to handle the pointer types.
2024-11-13 10:07:02 +01:00
Jiri Pechanec e1f87147b7 DBZ-8275 Support PostgreSQL 17 API 2024-10-16 10:41:33 +02:00
Mohammed Imran e709f326cb fixed md violations and added syntax hightlighted codeblocks 2023-05-19 09:08:15 -04:00
Polina Bungina cd4171a030 DBZ-5370 Add PostgreSQL 15 compatibility code 2022-10-18 08:55:01 +02:00
Jiri Pechanec c9b00aa8c0 DBZ-3937 Read OID as unsigned integer 2021-08-30 14:42:49 +02:00
Jiri Pechanec 7a8c8b8f46 DBZ-2565 Process infinite timestamps 2020-10-01 09:53:09 +02:00
Jiri Pechanec e29a2580a5 DBZ-1052 DBZ-1746 Add unknown message type 2020-01-30 12:27:04 +01:00
Jiri Pechanec 870ecfa976 DBZ-1052 Emit tx BEGIN/END messages 2020-01-30 12:27:04 +01:00
Jiri Pechanec 01126bfa89 DBZ-1549 Remove PostGIS dependency 2019-11-15 07:57:33 +01:00
Jiri Pechanec 7f1c6fefc3 DBZ-1540 Update RPM spec to 0.10.0 2019-10-09 12:22:34 +02:00
Jiri Pechanec 54e2c45f11 DBZ-1498 Provide precise interval value 2019-10-09 12:13:28 +02:00
Gunnar Morling 3c910fff12 DB-1367 Sending marker for unchanged TOAST columns 2019-09-25 15:06:32 +02:00
Gunnar Morling 44cf35d4b5
Adding instructions for re-generating ProtoBuf code 2019-07-01 12:26:09 +02:00
Jiri Pechanec 0b536f372e DBZ-1272 Relax Postgres version requirements 2019-05-21 17:15:57 +02:00
Jiri Pechanec c1a5f51179 DBZ-1272 Spec file for RPM package 2019-05-21 17:15:57 +02:00
Jiri Pechanec c719d3fa6a DBZ-1144 Do not translate geometry point (PostGIS) to PG point
This code was effectivelly dead. It was used only when PostGIS geometry type was created inside the public schema and was responsible for translating of PostGIS point to PG point. When we introduced the full support for PostGIS types in Debezium the code was no longer used as PostGIS types are typically created in separate schema.
So now we support

* POINT type (postgres point)
* postgis GEOMETRY in arbitrary schema (including geometry point)
2019-04-02 15:56:16 +02:00
Jeremy Finzel 2a60a18ab1 Clarify build instructions 2019-01-17 20:24:21 +01:00
Alexander Kukushkin c5068d5b42 DBZ-955 Remove unused function and variable 2018-10-31 15:55:59 +01:00
Alexander Kukushkin 105344ac58 DBZ-955 Fix compilation agains postgres 11 2018-10-31 15:55:43 +01:00
7 changed files with 328 additions and 218 deletions

View File

@ -4,13 +4,8 @@ EXTENSION = decoderbufs
PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0')
PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0')
ifneq ($(USE_POSTGIS),false)
C_PARAMS = -DUSE_POSTGIS
POSTGIS_C_LDFLAGS = -L/usr/local/lib -llwgeom
endif
PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include $(C_PARAMS)
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) $(POSTGIS_C_LDFLAGS)
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS)
OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o

144
README.md
View File

@ -10,82 +10,130 @@
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
## Thanks to
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
## Dependencies
This code depends on the following libraries and requires them for compilation:
* [PostgreSQL](http://www.postgresql.org) 9.6+
* [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
* [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
- [PostgreSQL](http://www.postgresql.org) 9.6+
- [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
- [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
## Building
To build you will need to install PostgreSQL (for pg_config), PostgreSQL server development packages, protobuf-c for the
Protocol Buffer support and some PostGIS development packages
`postgres-decoderbufs` has to be built from source after installing required dependencies. The required dependencies are first PostgreSQL
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
### Debian
### Installing Dependencies
# Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
# PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev
#### Debian
# Protobuf-c dependency (requires a non-stable Debian repo)
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
```bash
# Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
The above are taken from the Debezium [docker images](https://github.com/debezium/docker-images).
# PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev
### Other Linux distributions
# Protobuf-c dependency (requires a non-stable Debian repo)
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
```
You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
When updating the ProtoBuf definition, also install the ProtoBuf C compiler:
```bash
apt-get install -y protobuf-c-compiler=1.2.*
```
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
#### Other Linux distributions
You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1`
If you have all of the prerequisites installed you should be able to just:
### Getting the source code
make && make install
If you have all of the above prerequisites installed, clone this git repo to build from source:
```bash
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
```
### Optional: Re-generating ProtoBuf code
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
```bash
cd proto
protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..
```
Commit the generated files to git then.
### Building and installing decoderbufs
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
Then `make` and `make install` for each version. Here is an example:
```bash
# Install for Postgres 9.6 if I have multiple local versions
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
make
make install
```
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
# MODULES
shared_preload_libraries = 'decoderbufs'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart)
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
```bash
# MODULES
shared_preload_libraries = 'decoderbufs'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart)
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
```
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
local replication <youruser> trust
host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
```make
local replication <youruser> trust
host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
```
And restart PostgreSQL.
## Usage
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
-- peek at WAL changes using decoderbufs debug mode for SQL console
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
```sql
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
-- peek at WAL changes using decoderbufs debug mode for SQL console
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
```
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.
The binary format will be consumed by the Debezium Postgres Connector.
## Type Mappings
The following table shows how current PostgreSQL type OIDs are mapped to which decoderbuf fields:
@ -116,5 +164,5 @@ The following table shows how current PostgreSQL type OIDs are mapped to which d
## Support
File bug reports and feature requests using [Debezium's JIRA](https://issues.jboss.org/browse/DBZ) and the
File bug reports and feature requests using [Debezium's JIRA](https://issues.jboss.org/browse/DBZ) and the
[postgresql-connector](https://issues.jboss.org/browse/DBZ/component/12323543) component

View File

@ -5,9 +5,12 @@ option java_outer_classname = "PgProto";
option optimize_for = SPEED;
enum Op {
UNKNOWN = -1;
INSERT = 0;
UPDATE = 1;
DELETE = 2;
BEGIN = 3;
COMMIT = 4;
}
message Point {
@ -27,6 +30,7 @@ message DatumMessage {
string datum_string = 8;
bytes datum_bytes = 9;
Point datum_point = 10;
bool datum_missing = 11;
}
}

View File

@ -0,0 +1,48 @@
Name: postgres-decoderbufs
Version: 0.10.0
Release: 1%{?dist}
Summary: PostgreSQL Protocol Buffers logical decoder plugin
License: MIT
URL: https://github.com/debezium/postgres-decoderbufs
%global full_version %{version}.Final
Source0: https://github.com/debezium/%{name}/archive/v%{full_version}.tar.gz
BuildRequires: gcc
BuildRequires: postgresql-devel >= 9.6, postgresql-server-devel >= 9.6
BuildRequires: postgis-devel >= 2
BuildRequires: protobuf-c-devel
Requires: protobuf-c
%{?postgresql_module_requires}
Recommends: postgis
%description
A PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers messages.
%prep
%setup -qn postgres-decoderbufs-%{full_version}
%build
%make_build
%install
%make_install
%files
%doc README.md
%license LICENSE
%{_libdir}/pgsql/decoderbufs.so
%{_datadir}/pgsql/extension/decoderbufs.control
%changelog
* Wed Oct 9 2019 - Jiri Pechanec <jpechane@redhat.com> 0.10.0-1
* Tue May 14 2019 - Jiri Pechanec <jpechane@redhat.com> 0.9.5-1
- Initial RPM packaging

View File

@ -60,10 +60,10 @@
#error Expecting timestamps to be represented as integers, not as floating-point.
#endif
#ifdef USE_POSTGIS
/* POSTGIS version define so it doesn't redef macros */
#define POSTGIS_PGSQL_VERSION 94
#include "liblwgeom.h"
#if PG_VERSION_NUM >= 170000
#define TUPLE_ACCESS(x) x
#else
#define TUPLE_ACCESS(x) &x->tuple
#endif
PG_MODULE_MAGIC;
@ -80,10 +80,6 @@ typedef struct {
bool debug_mode;
} DecoderData;
/* GLOBALs for PostGIS dynamic OIDs */
Oid geometry_oid = InvalidOid;
Oid geography_oid = InvalidOid;
/* these must be available to pg_dlsym() */
extern void _PG_init(void);
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
@ -119,9 +115,14 @@ static void pg_decode_startup(LogicalDecodingContext *ctx,
elog(DEBUG1, "Entering startup callback");
data = palloc(sizeof(DecoderData));
#if PG_VERSION_NUM >= 90600
data->context = AllocSetContextCreate(
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_SIZES);
#else
data->context = AllocSetContextCreate(
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
#endif
data->debug_mode = false;
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
@ -169,32 +170,6 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
MemoryContextDelete(data->context);
}
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
#ifdef USE_POSTGIS
// set PostGIS geometry type id (these are dynamic)
// TODO: Figure out how to make sure we get the typid's from postgis extension namespace
if (geometry_oid == InvalidOid) {
geometry_oid = TypenameGetTypid("geometry");
if (geometry_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geometry_oid);
}
}
if (geography_oid == InvalidOid) {
geography_oid = TypenameGetTypid("geography");
if (geography_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geography_oid);
}
}
#endif
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
}
/* print tuple datums (only used for debug-mode) */
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
size_t n) {
@ -275,76 +250,16 @@ static void print_row_msg(StringInfo out, Decoderbufs__RowMessage *rmsg) {
}
/* this doesn't seem to be available in the public api (unfortunate) */
static double numeric_to_double_no_overflow(Numeric num) {
char *tmp;
double val;
char *endptr;
tmp = DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(num)));
/* unlike float8in, we ignore ERANGE from strtod */
val = strtod(tmp, &endptr);
if (*endptr != '\0') {
/* shouldn't happen ... */
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type double precision: \"%s\"",
tmp)));
}
return val;
}
static bool geography_point_as_decoderbufs_point(Datum datum,
Decoderbufs__Point *p) {
#ifdef USE_POSTGIS
GSERIALIZED *geom;
LWGEOM *lwgeom;
LWPOINT *point = NULL;
POINT2D p2d;
geom = (GSERIALIZED *)PG_DETOAST_DATUM(datum);
if (gserialized_get_type(geom) != POINTTYPE) {
return false;
}
lwgeom = lwgeom_from_gserialized(geom);
point = lwgeom_as_lwpoint(lwgeom);
if (lwgeom_is_empty(lwgeom)) {
return false;
}
getPoint2d_p(point->point, 0, &p2d);
if (p != NULL) {
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
dp.x = p2d.x;
dp.y = p2d.y;
memcpy(p, &dp, sizeof(dp));
elog(DEBUG1, "Translating geography to point: (x,y) = (%f,%f)", p->x, p->y);
}
return true;
#else
elog(DEBUG1, "PostGIS support is off, recompile decoderbufs with USE_POSTGIS option!");
return false;
#endif
}
/* set a datum value based on its OID specified by typid */
static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
Oid typoutput, Datum datum) {
Numeric num;
bytea *valptr = NULL;
const char *output = NULL;
Point *p = NULL;
Timestamp ts;
double duration;
TimeTzADT *timetz = NULL;
Interval *interval = NULL;
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
int size = 0;
switch (typid) {
case BOOLOID:
@ -360,10 +275,13 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
break;
case INT8OID:
case OIDOID:
datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case OIDOID:
datum_msg->datum_int64 = (Oid) DatumGetUInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case FLOAT4OID:
datum_msg->datum_float = DatumGetFloat4(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT;
@ -372,7 +290,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_double = DatumGetFloat8(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case CASHOID:
case CASHOID:
datum_msg->datum_int64 = DatumGetCash(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
@ -382,11 +300,12 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case BPCHAROID:
case TEXTOID:
case JSONOID:
case JSONBOID:
case JSONBOID:
case XMLOID:
case BITOID:
case VARBITOID:
case UUIDOID:
case INTERVALOID:
output = OidOutputFunctionCall(typoutput, datum);
datum_msg->datum_string = pnstrdup(output, strlen(output));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING;
@ -394,24 +313,22 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case TIMESTAMPOID:
ts = DatumGetTimestamp(datum);
if (TIMESTAMP_NOT_FINITE(ts)) {
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
datum_msg->datum_int64 = ts;
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
}
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case TIMESTAMPTZOID:
ts = DatumGetTimestampTz(datum);
if (TIMESTAMP_NOT_FINITE(ts)) {
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else {
datum_msg->datum_int64 = ts;
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
}
case DATEOID:
}
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case DATEOID:
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
@ -419,21 +336,12 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case TIMEOID:
datum_msg->datum_int64 = DatumGetTimeADT(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
break;
case TIMETZOID:
timetz = DatumGetTimeTzADTP(datum);
/* use GMT-equivalent time */
/* use GMT-equivalent time */
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case INTERVALOID:
interval = DatumGetIntervalP(datum);
/*
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
*/
duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY);
datum_msg->datum_double = duration;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case BYTEAOID:
valptr = DatumGetByteaPCopy(datum);
@ -452,13 +360,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
break;
default:
// PostGIS uses dynamic OIDs so we need to check the type again here
if (typid == geometry_oid || typid == geography_oid) {
elog(DEBUG1, "Converting geography point to datum_point");
datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point));
geography_point_as_decoderbufs_point(datum, datum_msg->datum_point);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
} else {
{
int len;
elog(DEBUG1, "Encountered unknown typid: %d, using bytes", typid);
output = OidOutputFunctionCall(typoutput, datum);
@ -478,15 +380,15 @@ static int valid_attributes_count_from(TupleDesc tupdesc) {
int natt;
int count = 0;
for (natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr = tupdesc->attrs[natt];
Form_pg_attribute attr = TupleDescAttr(tupdesc, natt);
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
if (attr->attisdropped || attr->attnum < 0) {
continue;
}
count++;
}
count++;
}
return count;
return count;
}
/* convert a PG tuple to an array of DatumMessage(s) */
@ -506,20 +408,20 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
bool typisvarlena;
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
attr = tupdesc->attrs[natt];
attr = TupleDescAttr(tupdesc, natt);
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
continue;
}
}
attrName = quote_identifier(NameStr(attr->attname));
elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName);
attrName = quote_identifier(NameStr(attr->attname));
elog(DEBUG1, "processing column %d with name %s", natt + 1, attrName);
/* set the column name */
datum_msg.column_name = (char *)attrName;
/* set datum from tuple */
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
@ -531,7 +433,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
if (!isnull) {
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
// TODO: Is there a way we can handle this?
datum_msg.datum_missing = true;
datum_msg.datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING;
elog(DEBUG1, "Not handling external on disk varlena at the moment.");
} else if (!typisvarlena) {
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
@ -540,7 +443,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
set_datum_value(&datum_msg, attr->atttypid, typoutput, val);
}
} else {
elog(DEBUG1, "column %s is null, ignoring value", attrName);
elog(DEBUG1, "column %s is null, ignoring value", attrName);
}
tmsg[valid_attr_cnt] = palloc(sizeof(datum_msg));
@ -564,7 +467,7 @@ static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
Decoderbufs__TypeInfo typeinfo = DECODERBUFS__TYPE_INFO__INIT;
bool not_null;
attr = tupdesc->attrs[natt];
attr = TupleDescAttr(tupdesc, natt);
/* skip dropped columns and system columns */
if (attr->attisdropped || attr->attnum < 0) {
@ -585,6 +488,90 @@ static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
}
}
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering begin callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__BEGIN;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering commit callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__COMMIT;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* callback for individual changed tuples */
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) {
@ -617,12 +604,16 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* set common fields */
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname)));
/* decode different operation types */
switch (change->action) {
case REORDER_BUFFER_CHANGE_INSERT:
@ -637,13 +628,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc);
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc);
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
}
break;
case REORDER_BUFFER_CHANGE_UPDATE:
@ -658,7 +649,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc);
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
}
if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information");
@ -668,13 +659,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc);
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc);
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
}
}
break;
@ -690,13 +681,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc);
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
} else {
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
}
break;
default:
elog(WARNING, "unknown change action");
elog(WARNING, "unknown change action");
Assert(0);
break;
}

View File

@ -230,7 +230,7 @@ const ProtobufCMessageDescriptor decoderbufs__point__descriptor =
(ProtobufCMessageInit) decoderbufs__point__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] =
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[11] =
{
{
"column_name",
@ -352,6 +352,18 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"datum_missing",
11,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_BOOL,
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_missing),
NULL,
NULL,
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
0, /* field[0] = column_name */
@ -362,13 +374,14 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
4, /* field[4] = datum_float */
2, /* field[2] = datum_int32 */
3, /* field[3] = datum_int64 */
10, /* field[10] = datum_missing */
9, /* field[9] = datum_point */
7, /* field[7] = datum_string */
};
static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 10 }
{ 0, 11 }
};
const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
{
@ -378,7 +391,7 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
"Decoderbufs__DatumMessage",
"decoderbufs",
sizeof(Decoderbufs__DatumMessage),
10,
11,
decoderbufs__datum_message__field_descriptors,
decoderbufs__datum_message__field_indices_by_name,
1, decoderbufs__datum_message__number_ranges,
@ -552,20 +565,26 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
(ProtobufCMessageInit) decoderbufs__row_message__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[6] =
{
{ "UNKNOWN", "DECODERBUFS__OP__UNKNOWN", -1 },
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 },
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
};
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
{0, 0},{0, 3}
{-1, 0},{0, 6}
};
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[6] =
{
{ "DELETE", 2 },
{ "INSERT", 0 },
{ "UPDATE", 1 },
{ "BEGIN", 4 },
{ "COMMIT", 5 },
{ "DELETE", 3 },
{ "INSERT", 1 },
{ "UNKNOWN", 0 },
{ "UPDATE", 2 },
};
const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
{
@ -574,9 +593,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
"Op",
"Decoderbufs__Op",
"decoderbufs",
3,
6,
decoderbufs__op__enum_values_by_number,
3,
6,
decoderbufs__op__enum_values_by_name,
1,
decoderbufs__op__value_ranges,

View File

@ -24,9 +24,12 @@ typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
/* --- enums --- */
typedef enum _Decoderbufs__Op {
DECODERBUFS__OP__UNKNOWN = -1,
DECODERBUFS__OP__INSERT = 0,
DECODERBUFS__OP__UPDATE = 1,
DECODERBUFS__OP__DELETE = 2
DECODERBUFS__OP__DELETE = 2,
DECODERBUFS__OP__BEGIN = 3,
DECODERBUFS__OP__COMMIT = 4
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
} Decoderbufs__Op;
@ -53,6 +56,7 @@ typedef enum {
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING = 11,
} Decoderbufs__DatumMessage__DatumCase;
struct _Decoderbufs__DatumMessage
@ -71,6 +75,7 @@ struct _Decoderbufs__DatumMessage
char *datum_string;
ProtobufCBinaryData datum_bytes;
Decoderbufs__Point *datum_point;
protobuf_c_boolean datum_missing;
};
};
#define DECODERBUFS__DATUM_MESSAGE__INIT \