Compare commits

..

No commits in common. "main" and "v1.8.0.Alpha2" have entirely different histories.

2 changed files with 57 additions and 95 deletions

100
README.md
View File

@ -10,17 +10,15 @@
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
## Thanks to
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
## Dependencies
This code depends on the following libraries and requires them for compilation:
- [PostgreSQL](http://www.postgresql.org) 9.6+
- [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
- [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
* [PostgreSQL](http://www.postgresql.org) 9.6+
* [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
* [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
## Building
@ -28,26 +26,21 @@ This code depends on the following libraries and requires them for compilation:
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
### Installing Dependencies
#### Debian
```bash
# Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
# Core build utilities
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
# PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev
# PostGIS dependency
apt-get install -f -y libproj-dev liblwgeom-dev
# Protobuf-c dependency (requires a non-stable Debian repo)
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
```
# Protobuf-c dependency (requires a non-stable Debian repo)
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
When updating the ProtoBuf definition, also install the ProtoBuf C compiler:
```bash
apt-get install -y protobuf-c-compiler=1.2.*
```
apt-get install -y protobuf-c-compiler=1.2.*
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
@ -60,20 +53,16 @@ Note that the last step from the above sequence is only required for Debian to b
If you have all of the above prerequisites installed, clone this git repo to build from source:
```bash
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
```
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
### Optional: Re-generating ProtoBuf code
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
```bash
cd proto
protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..
```
cd proto
protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..
Commit the generated files to git then.
@ -82,53 +71,44 @@ Commit the generated files to git then.
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
Then `make` and `make install` for each version. Here is an example:
```bash
# Install for Postgres 9.6 if I have multiple local versions
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
make
make install
```
# Install for Postgres 9.6 if I have multiple local versions
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
make
make install
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
```bash
# MODULES
shared_preload_libraries = 'decoderbufs'
# MODULES
shared_preload_libraries = 'decoderbufs'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart)
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
```
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart)
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
```make
local replication <youruser> trust
host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
```
local replication <youruser> trust
host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
And restart PostgreSQL.
## Usage
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
```sql
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
-- peek at WAL changes using decoderbufs debug mode for SQL console
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- peek at WAL changes using decoderbufs debug mode for SQL console
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
```
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.

View File

@ -60,12 +60,6 @@
#error Expecting timestamps to be represented as integers, not as floating-point.
#endif
#if PG_VERSION_NUM >= 170000
#define TUPLE_ACCESS(x) x
#else
#define TUPLE_ACCESS(x) &x->tuple
#endif
PG_MODULE_MAGIC;
/* define a time macro to convert TimestampTz into something more sane,
@ -506,11 +500,7 @@ static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
@ -548,11 +538,7 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
rmsg.has_op = true;
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
/* write msg */
@ -604,11 +590,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* set common fields */
rmsg.transaction_id = txn->xid;
rmsg.has_transaction_id = true;
#if PG_VERSION_NUM >= 150000
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
#else
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
#endif
rmsg.has_commit_time = true;
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname)));
@ -628,13 +610,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
&change->data.tp.newtuple->tuple, tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
&change->data.tp.newtuple->tuple, tupdesc);
}
break;
case REORDER_BUFFER_CHANGE_UPDATE:
@ -649,7 +631,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation,
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
&change->data.tp.oldtuple->tuple, tupdesc);
}
if (change->data.tp.newtuple != NULL) {
elog(DEBUG1, "decoding new tuple information");
@ -659,13 +641,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
tuple_to_tuple_msg(rmsg.new_tuple, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
&change->data.tp.newtuple->tuple, tupdesc);
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
rmsg.new_typeinfo =
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
add_metadata_to_msg(rmsg.new_typeinfo, relation,
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
&change->data.tp.newtuple->tuple, tupdesc);
}
}
break;
@ -681,7 +663,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
tuple_to_tuple_msg(rmsg.old_tuple, relation,
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
&change->data.tp.oldtuple->tuple, tupdesc);
} else {
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
}