Compare commits

...

15 Commits

Author SHA1 Message Date
Bradford D. Boyle 84f30f3f44 DBZ-8403 Fix PostgreSQL 17 compilation
When compiling against PG17 on Debian testing with gcc 14, building
fails because of incompatible-pointer-type error. This commit updates
the macros added in e1f8714 to handle the pointer types.
2024-11-13 10:07:02 +01:00
Jiri Pechanec e1f87147b7 DBZ-8275 Support PostgreSQL 17 API 2024-10-16 10:41:33 +02:00
Mohammed Imran e709f326cb fixed md violations and added syntax hightlighted codeblocks 2023-05-19 09:08:15 -04:00
Polina Bungina cd4171a030 DBZ-5370 Add PostgreSQL 15 compatibility code 2022-10-18 08:55:01 +02:00
Jiri Pechanec c9b00aa8c0 DBZ-3937 Read OID as unsigned integer 2021-08-30 14:42:49 +02:00
Jiri Pechanec 7a8c8b8f46 DBZ-2565 Process infinite timestamps 2020-10-01 09:53:09 +02:00
Jiri Pechanec e29a2580a5 DBZ-1052 DBZ-1746 Add unknown message type 2020-01-30 12:27:04 +01:00
Jiri Pechanec 870ecfa976 DBZ-1052 Emit tx BEGIN/END messages 2020-01-30 12:27:04 +01:00
Jiri Pechanec 01126bfa89 DBZ-1549 Remove PostGIS dependency 2019-11-15 07:57:33 +01:00
Jiri Pechanec 7f1c6fefc3 DBZ-1540 Update RPM spec to 0.10.0 2019-10-09 12:22:34 +02:00
Jiri Pechanec 54e2c45f11 DBZ-1498 Provide precise interval value 2019-10-09 12:13:28 +02:00
Gunnar Morling 3c910fff12 DB-1367 Sending marker for unchanged TOAST columns 2019-09-25 15:06:32 +02:00
Gunnar Morling 44cf35d4b5
Adding instructions for re-generating ProtoBuf code 2019-07-01 12:26:09 +02:00
Jiri Pechanec 0b536f372e DBZ-1272 Relax Postgres version requirements 2019-05-21 17:15:57 +02:00
Jiri Pechanec c1a5f51179 DBZ-1272 Spec file for RPM package 2019-05-21 17:15:57 +02:00
7 changed files with 313 additions and 128 deletions

View File

@ -4,13 +4,8 @@ EXTENSION = decoderbufs
PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0') PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0')
PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0') PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0')
ifneq ($(USE_POSTGIS),false)
C_PARAMS = -DUSE_POSTGIS
POSTGIS_C_LDFLAGS = -L/usr/local/lib -llwgeom
endif
PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include $(C_PARAMS) PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include $(C_PARAMS)
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) $(POSTGIS_C_LDFLAGS) SHLIB_LINK += $(PROTOBUF_C_LDFLAGS)
OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o

116
README.md
View File

@ -10,15 +10,17 @@
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
## Thanks to ## Thanks to
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based - The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support - [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
## Dependencies ## Dependencies
This code depends on the following libraries and requires them for compilation: This code depends on the following libraries and requires them for compilation:
* [PostgreSQL](http://www.postgresql.org) 9.6+ - [PostgreSQL](http://www.postgresql.org) 9.6+
* [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization - [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
* [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support - [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
## Building ## Building
@ -26,71 +28,107 @@ This code depends on the following libraries and requires them for compilation:
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages. (for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
### Installing Dependencies ### Installing Dependencies
#### Debian #### Debian
# Core build utilities ```bash
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6 # Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
# PostGIS dependency # PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev apt-get install -f -y libproj-dev liblwgeom-dev
# Protobuf-c dependency (requires a non-stable Debian repo) # Protobuf-c dependency (requires a non-stable Debian repo)
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1 apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
```
The above are taken from the Debezium [docker images](https://github.com/debezium/docker-images). When updating the ProtoBuf definition, also install the ProtoBuf C compiler:
```bash
apt-get install -y protobuf-c-compiler=1.2.*
```
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
#### Other Linux distributions #### Other Linux distributions
You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro. You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1` Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1`
### Getting the source code
If you have all of the above prerequisites installed, clone this git repo to build from source:
```bash
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
```
### Optional: Re-generating ProtoBuf code
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
```bash
cd proto
protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..
```
Commit the generated files to git then.
### Building and installing decoderbufs ### Building and installing decoderbufs
If you have all of the above prerequisites installed, clone this git repo to build from source. If you have multiple If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
postgres versions installed, you can select which version to install by altering your `$PATH` to point to the right version.
Then `make` and `make install` for each version. Here is an example: Then `make` and `make install` for each version. Here is an example:
git clone https://github.com/debezium/postgres-decoderbufs.git ```bash
cd postgres-decoderbufs # Install for Postgres 9.6 if I have multiple local versions
# Install for Postgres 9.6 if I have multiple local versions export PATH=/usr/lib/postgresql/9.6/bin:$PATH
export PATH=/usr/lib/postgresql/9.6/bin:$PATH make
make make install
make install ```
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf: Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
# MODULES ```bash
shared_preload_libraries = 'decoderbufs' # MODULES
shared_preload_libraries = 'decoderbufs'
# REPLICATION # REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart) wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart) max_wal_senders = 8 # max number of walsender processes (change requires restart)
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart) max_replication_slots = 4 # max number of replication slots (change requires restart)
```
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so: In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
local replication <youruser> trust ```make
host replication <youruser> 127.0.0.1/32 trust local replication <youruser> trust
host replication <youruser> ::1/128 trust host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
```
And restart PostgreSQL. And restart PostgreSQL.
## Usage ## Usage
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE) ```sql
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- peek at WAL changes using decoderbufs debug mode for SQL console -- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- check the WAL position of logical replicators -- peek at WAL changes using decoderbufs debug mode for SQL console
select * from pg_replication_slots where slot_type = 'logical'; select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
```
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case. If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.

View File

@ -5,9 +5,12 @@ option java_outer_classname = "PgProto";
option optimize_for = SPEED; option optimize_for = SPEED;
enum Op { enum Op {
UNKNOWN = -1;
INSERT = 0; INSERT = 0;
UPDATE = 1; UPDATE = 1;
DELETE = 2; DELETE = 2;
BEGIN = 3;
COMMIT = 4;
} }
message Point { message Point {
@ -27,6 +30,7 @@ message DatumMessage {
string datum_string = 8; string datum_string = 8;
bytes datum_bytes = 9; bytes datum_bytes = 9;
Point datum_point = 10; Point datum_point = 10;
bool datum_missing = 11;
} }
} }

View File

@ -0,0 +1,48 @@
Name: postgres-decoderbufs
Version: 0.10.0
Release: 1%{?dist}
Summary: PostgreSQL Protocol Buffers logical decoder plugin
License: MIT
URL: https://github.com/debezium/postgres-decoderbufs
%global full_version %{version}.Final
Source0: https://github.com/debezium/%{name}/archive/v%{full_version}.tar.gz
BuildRequires: gcc
BuildRequires: postgresql-devel >= 9.6, postgresql-server-devel >= 9.6
BuildRequires: postgis-devel >= 2
BuildRequires: protobuf-c-devel
Requires: protobuf-c
%{?postgresql_module_requires}
Recommends: postgis
%description
A PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers messages.
%prep
%setup -qn postgres-decoderbufs-%{full_version}
%build
%make_build
%install
%make_install
%files
%doc README.md
%license LICENSE
%{_libdir}/pgsql/decoderbufs.so
%{_datadir}/pgsql/extension/decoderbufs.control
%changelog
* Wed Oct 9 2019 - Jiri Pechanec <jpechane@redhat.com> 0.10.0-1
* Tue May 14 2019 - Jiri Pechanec <jpechane@redhat.com> 0.9.5-1
- Initial RPM packaging

View File

@ -60,6 +60,12 @@
#error Expecting timestamps to be represented as integers, not as floating-point. #error Expecting timestamps to be represented as integers, not as floating-point.
#endif #endif
#if PG_VERSION_NUM >= 170000
#define TUPLE_ACCESS(x) x
#else
#define TUPLE_ACCESS(x) &x->tuple
#endif
PG_MODULE_MAGIC; PG_MODULE_MAGIC;
/* define a time macro to convert TimestampTz into something more sane, /* define a time macro to convert TimestampTz into something more sane,
@ -164,16 +170,6 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
MemoryContextDelete(data->context); MemoryContextDelete(data->context);
} }
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
}
/* print tuple datums (only used for debug-mode) */ /* print tuple datums (only used for debug-mode) */
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup, static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
size_t n) { size_t n) {
@ -261,9 +257,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
const char *output = NULL; const char *output = NULL;
Point *p = NULL; Point *p = NULL;
Timestamp ts; Timestamp ts;
double duration;
TimeTzADT *timetz = NULL; TimeTzADT *timetz = NULL;
Interval *interval = NULL;
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT; Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
int size = 0; int size = 0;
@ -281,10 +275,13 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
break; break;
case INT8OID: case INT8OID:
case OIDOID:
datum_msg->datum_int64 = DatumGetInt64(datum); datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break; break;
case OIDOID:
datum_msg->datum_int64 = (Oid) DatumGetUInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case FLOAT4OID: case FLOAT4OID:
datum_msg->datum_float = DatumGetFloat4(datum); datum_msg->datum_float = DatumGetFloat4(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT;
@ -308,6 +305,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case BITOID: case BITOID:
case VARBITOID: case VARBITOID:
case UUIDOID: case UUIDOID:
case INTERVALOID:
output = OidOutputFunctionCall(typoutput, datum); output = OidOutputFunctionCall(typoutput, datum);
datum_msg->datum_string = pnstrdup(output, strlen(output)); datum_msg->datum_string = pnstrdup(output, strlen(output));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING;
@ -315,23 +313,21 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case TIMESTAMPOID: case TIMESTAMPOID:
ts = DatumGetTimestamp(datum); ts = DatumGetTimestamp(datum);
if (TIMESTAMP_NOT_FINITE(ts)) { if (TIMESTAMP_NOT_FINITE(ts)) {
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE), datum_msg->datum_int64 = ts;
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else { } else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
}
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break; break;
}
case TIMESTAMPTZOID: case TIMESTAMPTZOID:
ts = DatumGetTimestampTz(datum); ts = DatumGetTimestampTz(datum);
if (TIMESTAMP_NOT_FINITE(ts)) { if (TIMESTAMP_NOT_FINITE(ts)) {
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE), datum_msg->datum_int64 = ts;
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else { } else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
}
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break; break;
}
case DATEOID: case DATEOID:
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */ /* simply get the number of days as the stored 32 bit value and convert to EPOCH */
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum)); datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
@ -347,15 +343,6 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0)); datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break; break;
case INTERVALOID:
interval = DatumGetIntervalP(datum);
/*
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
*/
duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY);
datum_msg->datum_double = duration;
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
break;
case BYTEAOID: case BYTEAOID:
valptr = DatumGetByteaPCopy(datum); valptr = DatumGetByteaPCopy(datum);
size = VARSIZE(valptr) - VARHDRSZ; size = VARSIZE(valptr) - VARHDRSZ;
@ -446,7 +433,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena); getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
if (!isnull) { if (!isnull) {
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) { if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
// TODO: Is there a way we can handle this? datum_msg.datum_missing = true;
datum_msg.datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING;
elog(DEBUG1, "Not handling external on disk varlena at the moment."); elog(DEBUG1, "Not handling external on disk varlena at the moment.");
} else if (!typisvarlena) { } else if (!typisvarlena) {
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval); set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
@ -500,6 +488,90 @@ static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
} }
} }
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering begin callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__BEGIN;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
DecoderData *data;
MemoryContext old;
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
elog(DEBUG1, "Entering commit callback");
/* Avoid leaking memory by using and resetting our own context */
data = ctx->output_plugin_private;
old = MemoryContextSwitchTo(data->context);
rmsg.op = DECODERBUFS__OP__COMMIT;
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
OutputPluginPrepareWrite(ctx, true);
if (data->debug_mode) {
print_row_msg(ctx->out, &rmsg);
} else {
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
void *packed = palloc(psize);
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
appendBinaryStringInfo(ctx->out, packed, ssize);
}
OutputPluginWrite(ctx, true);
/* Cleanup, freeing memory */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/* callback for individual changed tuples */ /* callback for individual changed tuples */
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) { Relation relation, ReorderBufferChange *change) {
@ -532,7 +604,11 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* set common fields */ /* set common fields */
rmsg.transaction_id = txn->xid; rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true; rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true; rmsg.has_commit_time = true;
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname))); NameStr(class_form->relname)));
@ -552,13 +628,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple; rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo = rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo); palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation, add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
} }
break; break;
case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_UPDATE:
@ -573,7 +649,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
} }
if (change->data.tp.newtuple != NULL) { if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information"); elog(DEBUG1, "decoding new tuple information");
@ -583,13 +659,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple; rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo = rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo); palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation, add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
} }
} }
break; break;
@ -605,7 +681,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
} else { } else {
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid "); elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
} }

View File

@ -230,7 +230,7 @@ const ProtobufCMessageDescriptor decoderbufs__point__descriptor =
(ProtobufCMessageInit) decoderbufs__point__init, (ProtobufCMessageInit) decoderbufs__point__init,
NULL,NULL,NULL /* reserved[123] */ NULL,NULL,NULL /* reserved[123] */
}; };
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] = static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[11] =
{ {
{ {
"column_name", "column_name",
@ -352,6 +352,18 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */ 0,NULL,NULL /* reserved1,reserved2, etc */
}, },
{
"datum_missing",
11,
PROTOBUF_C_LABEL_OPTIONAL,
PROTOBUF_C_TYPE_BOOL,
offsetof(Decoderbufs__DatumMessage, datum_case),
offsetof(Decoderbufs__DatumMessage, datum_missing),
NULL,
NULL,
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
}; };
static const unsigned decoderbufs__datum_message__field_indices_by_name[] = { static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
0, /* field[0] = column_name */ 0, /* field[0] = column_name */
@ -362,13 +374,14 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
4, /* field[4] = datum_float */ 4, /* field[4] = datum_float */
2, /* field[2] = datum_int32 */ 2, /* field[2] = datum_int32 */
3, /* field[3] = datum_int64 */ 3, /* field[3] = datum_int64 */
10, /* field[10] = datum_missing */
9, /* field[9] = datum_point */ 9, /* field[9] = datum_point */
7, /* field[7] = datum_string */ 7, /* field[7] = datum_string */
}; };
static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] = static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] =
{ {
{ 1, 0 }, { 1, 0 },
{ 0, 10 } { 0, 11 }
}; };
const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
{ {
@ -378,7 +391,7 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
"Decoderbufs__DatumMessage", "Decoderbufs__DatumMessage",
"decoderbufs", "decoderbufs",
sizeof(Decoderbufs__DatumMessage), sizeof(Decoderbufs__DatumMessage),
10, 11,
decoderbufs__datum_message__field_descriptors, decoderbufs__datum_message__field_descriptors,
decoderbufs__datum_message__field_indices_by_name, decoderbufs__datum_message__field_indices_by_name,
1, decoderbufs__datum_message__number_ranges, 1, decoderbufs__datum_message__number_ranges,
@ -552,20 +565,26 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
(ProtobufCMessageInit) decoderbufs__row_message__init, (ProtobufCMessageInit) decoderbufs__row_message__init,
NULL,NULL,NULL /* reserved[123] */ NULL,NULL,NULL /* reserved[123] */
}; };
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] = static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[6] =
{ {
{ "UNKNOWN", "DECODERBUFS__OP__UNKNOWN", -1 },
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 }, { "INSERT", "DECODERBUFS__OP__INSERT", 0 },
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 }, { "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 }, { "DELETE", "DECODERBUFS__OP__DELETE", 2 },
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
}; };
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = { static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
{0, 0},{0, 3} {-1, 0},{0, 6}
}; };
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] = static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[6] =
{ {
{ "DELETE", 2 }, { "BEGIN", 4 },
{ "INSERT", 0 }, { "COMMIT", 5 },
{ "UPDATE", 1 }, { "DELETE", 3 },
{ "INSERT", 1 },
{ "UNKNOWN", 0 },
{ "UPDATE", 2 },
}; };
const ProtobufCEnumDescriptor decoderbufs__op__descriptor = const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
{ {
@ -574,9 +593,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
"Op", "Op",
"Decoderbufs__Op", "Decoderbufs__Op",
"decoderbufs", "decoderbufs",
3, 6,
decoderbufs__op__enum_values_by_number, decoderbufs__op__enum_values_by_number,
3, 6,
decoderbufs__op__enum_values_by_name, decoderbufs__op__enum_values_by_name,
1, 1,
decoderbufs__op__value_ranges, decoderbufs__op__value_ranges,

View File

@ -24,9 +24,12 @@ typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
/* --- enums --- */ /* --- enums --- */
typedef enum _Decoderbufs__Op { typedef enum _Decoderbufs__Op {
DECODERBUFS__OP__UNKNOWN = -1,
DECODERBUFS__OP__INSERT = 0, DECODERBUFS__OP__INSERT = 0,
DECODERBUFS__OP__UPDATE = 1, DECODERBUFS__OP__UPDATE = 1,
DECODERBUFS__OP__DELETE = 2 DECODERBUFS__OP__DELETE = 2,
DECODERBUFS__OP__BEGIN = 3,
DECODERBUFS__OP__COMMIT = 4
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP) PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
} Decoderbufs__Op; } Decoderbufs__Op;
@ -53,6 +56,7 @@ typedef enum {
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10, DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING = 11,
} Decoderbufs__DatumMessage__DatumCase; } Decoderbufs__DatumMessage__DatumCase;
struct _Decoderbufs__DatumMessage struct _Decoderbufs__DatumMessage
@ -71,6 +75,7 @@ struct _Decoderbufs__DatumMessage
char *datum_string; char *datum_string;
ProtobufCBinaryData datum_bytes; ProtobufCBinaryData datum_bytes;
Decoderbufs__Point *datum_point; Decoderbufs__Point *datum_point;
protobuf_c_boolean datum_missing;
}; };
}; };
#define DECODERBUFS__DATUM_MESSAGE__INIT \ #define DECODERBUFS__DATUM_MESSAGE__INIT \