Compare commits

..

6 Commits

Author SHA1 Message Date
Bradford D. Boyle 84f30f3f44 DBZ-8403 Fix PostgreSQL 17 compilation
When compiling against PG17 on Debian testing with gcc 14, building
fails because of incompatible-pointer-type error. This commit updates
the macros added in e1f8714 to handle the pointer types.
2024-11-13 10:07:02 +01:00
Jiri Pechanec e1f87147b7 DBZ-8275 Support PostgreSQL 17 API 2024-10-16 10:41:33 +02:00
Mohammed Imran e709f326cb fixed md violations and added syntax hightlighted codeblocks 2023-05-19 09:08:15 -04:00
Polina Bungina cd4171a030 DBZ-5370 Add PostgreSQL 15 compatibility code 2022-10-18 08:55:01 +02:00
Jiri Pechanec c9b00aa8c0 DBZ-3937 Read OID as unsigned integer 2021-08-30 14:42:49 +02:00
Jiri Pechanec 7a8c8b8f46 DBZ-2565 Process infinite timestamps 2020-10-01 09:53:09 +02:00
2 changed files with 107 additions and 68 deletions

122
README.md
View File

@ -10,15 +10,17 @@
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
## Thanks to ## Thanks to
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support - [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
## Dependencies ## Dependencies
This code depends on the following libraries and requires them for compilation: This code depends on the following libraries and requires them for compilation:
* [PostgreSQL](http://www.postgresql.org) 9.6+ - [PostgreSQL](http://www.postgresql.org) 9.6+
* [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization - [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
* [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support - [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
## Building ## Building
@ -26,43 +28,52 @@ This code depends on the following libraries and requires them for compilation:
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages. (for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
### Installing Dependencies ### Installing Dependencies
#### Debian #### Debian
# Core build utilities ```bash
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6 # Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
# PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev
# Protobuf-c dependency (requires a non-stable Debian repo) # PostGIS dependency
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update apt-get install -f -y libproj-dev liblwgeom-dev
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
# Protobuf-c dependency (requires a non-stable Debian repo)
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
```
When updating the ProtoBuf definition, also install the ProtoBuf C compiler: When updating the ProtoBuf definition, also install the ProtoBuf C compiler:
apt-get install -y protobuf-c-compiler=1.2.* ```bash
apt-get install -y protobuf-c-compiler=1.2.*
```
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images). The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
#### Other Linux distributions #### Other Linux distributions
You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro. You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1` Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1`
### Getting the source code ### Getting the source code
If you have all of the above prerequisites installed, clone this git repo to build from source: If you have all of the above prerequisites installed, clone this git repo to build from source:
git clone https://github.com/debezium/postgres-decoderbufs.git ```bash
cd postgres-decoderbufs git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
```
### Optional: Re-generating ProtoBuf code ### Optional: Re-generating ProtoBuf code
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto): This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
cd proto ```bash
protoc-c --c_out=../src/proto pg_logicaldec.proto cd proto
cd .. protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..
```
Commit the generated files to git then. Commit the generated files to git then.
@ -71,49 +82,58 @@ Commit the generated files to git then.
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version. If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
Then `make` and `make install` for each version. Here is an example: Then `make` and `make install` for each version. Here is an example:
# Install for Postgres 9.6 if I have multiple local versions ```bash
export PATH=/usr/lib/postgresql/9.6/bin:$PATH # Install for Postgres 9.6 if I have multiple local versions
make export PATH=/usr/lib/postgresql/9.6/bin:$PATH
make install make
make install
```
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf: Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
# MODULES ```bash
shared_preload_libraries = 'decoderbufs' # MODULES
shared_preload_libraries = 'decoderbufs'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart) # REPLICATION
max_wal_senders = 8 # max number of walsender processes (change requires restart) wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables max_wal_senders = 8 # max number of walsender processes (change requires restart)
#wal_sender_timeout = 60s # in milliseconds; 0 disables wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart) #wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
```
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so: In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
local replication <youruser> trust ```make
host replication <youruser> 127.0.0.1/32 trust local replication <youruser> trust
host replication <youruser> ::1/128 trust host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
```
And restart PostgreSQL. And restart PostgreSQL.
## Usage ## Usage
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs'); ```sql
-- can use SQL for demo purposes
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE) select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- peek at WAL changes using decoderbufs debug mode for SQL console -- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position -- peek at WAL changes using decoderbufs debug mode for SQL console
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1'); select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
-- check the WAL position of logical replicators select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
select * from pg_replication_slots where slot_type = 'logical';
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
```
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case. If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.
The binary format will be consumed by the Debezium Postgres Connector. The binary format will be consumed by the Debezium Postgres Connector.
## Type Mappings ## Type Mappings
The following table shows how current PostgreSQL type OIDs are mapped to which decoderbuf fields: The following table shows how current PostgreSQL type OIDs are mapped to which decoderbuf fields:
@ -144,5 +164,5 @@ The following table shows how current PostgreSQL type OIDs are mapped to which d
## Support ## Support
File bug reports and feature requests using [Debezium's JIRA](https://issues.jboss.org/browse/DBZ) and the File bug reports and feature requests using [Debezium's JIRA](https://issues.jboss.org/browse/DBZ) and the
[postgresql-connector](https://issues.jboss.org/browse/DBZ/component/12323543) component [postgresql-connector](https://issues.jboss.org/browse/DBZ/component/12323543) component

View File

@ -60,6 +60,12 @@
#error Expecting timestamps to be represented as integers, not as floating-point. #error Expecting timestamps to be represented as integers, not as floating-point.
#endif #endif
#if PG_VERSION_NUM >= 170000
#define TUPLE_ACCESS(x) x
#else
#define TUPLE_ACCESS(x) &x->tuple
#endif
PG_MODULE_MAGIC; PG_MODULE_MAGIC;
/* define a time macro to convert TimestampTz into something more sane, /* define a time macro to convert TimestampTz into something more sane,
@ -269,10 +275,13 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
break; break;
case INT8OID: case INT8OID:
case OIDOID:
datum_msg->datum_int64 = DatumGetInt64(datum); datum_msg->datum_int64 = DatumGetInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break; break;
case OIDOID:
datum_msg->datum_int64 = (Oid) DatumGetUInt64(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case FLOAT4OID: case FLOAT4OID:
datum_msg->datum_float = DatumGetFloat4(datum); datum_msg->datum_float = DatumGetFloat4(datum);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT;
@ -304,23 +313,21 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
case TIMESTAMPOID: case TIMESTAMPOID:
ts = DatumGetTimestamp(datum); ts = DatumGetTimestamp(datum);
if (TIMESTAMP_NOT_FINITE(ts)) { if (TIMESTAMP_NOT_FINITE(ts)) {
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE), datum_msg->datum_int64 = ts;
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
} else { } else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
} }
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
break;
case TIMESTAMPTZOID: case TIMESTAMPTZOID:
ts = DatumGetTimestampTz(datum); ts = DatumGetTimestampTz(datum);
if (TIMESTAMP_NOT_FINITE(ts)) { if (TIMESTAMP_NOT_FINITE(ts)) {
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE), datum_msg->datum_int64 = ts;
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)"))); } else {
} else {
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts); datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64; }
break; datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
} break;
case DATEOID: case DATEOID:
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */ /* simply get the number of days as the stored 32 bit value and convert to EPOCH */
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum)); datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
@ -499,7 +506,11 @@ static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
rmsg.has_op = true; rmsg.has_op = true;
rmsg.transaction_id = txn->xid; rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true; rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true; rmsg.has_commit_time = true;
/* write msg */ /* write msg */
@ -537,7 +548,11 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
rmsg.has_op = true; rmsg.has_op = true;
rmsg.transaction_id = txn->xid; rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true; rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true; rmsg.has_commit_time = true;
/* write msg */ /* write msg */
@ -589,7 +604,11 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* set common fields */ /* set common fields */
rmsg.transaction_id = txn->xid; rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true; rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time); rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true; rmsg.has_commit_time = true;
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname))); NameStr(class_form->relname)));
@ -609,13 +628,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple; rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo = rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo); palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation, add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
} }
break; break;
case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_UPDATE:
@ -630,7 +649,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
} }
if (change->data.tp.newtuple != NULL) { if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information"); elog(DEBUG1, "decoding new tuple information");
@ -640,13 +659,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple; rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo = rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo); palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation, add_metadata_to_msg(rmsg.new_typeinfo, relation,
&change->data.tp.newtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
} }
} }
break; break;
@ -662,7 +681,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple); palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc); TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
} else { } else {
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid "); elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
} }