Compare commits
No commits in common. "main" and "v1.6.0.Beta2" have entirely different histories.
main
...
v1.6.0.Bet
100
README.md
100
README.md
|
@ -10,17 +10,15 @@
|
|||
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
|
||||
|
||||
## Thanks to
|
||||
|
||||
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
|
||||
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
|
||||
|
||||
## Dependencies
|
||||
|
||||
This code depends on the following libraries and requires them for compilation:
|
||||
|
||||
- [PostgreSQL](http://www.postgresql.org) 9.6+
|
||||
- [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
|
||||
- [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
|
||||
* [PostgreSQL](http://www.postgresql.org) 9.6+
|
||||
* [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
|
||||
* [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
|
||||
|
||||
## Building
|
||||
|
||||
|
@ -28,26 +26,21 @@ This code depends on the following libraries and requires them for compilation:
|
|||
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
|
||||
|
||||
### Installing Dependencies
|
||||
|
||||
#### Debian
|
||||
|
||||
```bash
|
||||
# Core build utilities
|
||||
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
|
||||
# Core build utilities
|
||||
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
|
||||
|
||||
# PostGIS dependency
|
||||
apt-get install -f -y libproj-dev liblwgeom-dev
|
||||
# PostGIS dependency
|
||||
apt-get install -f -y libproj-dev liblwgeom-dev
|
||||
|
||||
# Protobuf-c dependency (requires a non-stable Debian repo)
|
||||
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
|
||||
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
|
||||
```
|
||||
# Protobuf-c dependency (requires a non-stable Debian repo)
|
||||
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
|
||||
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
|
||||
|
||||
When updating the ProtoBuf definition, also install the ProtoBuf C compiler:
|
||||
|
||||
```bash
|
||||
apt-get install -y protobuf-c-compiler=1.2.*
|
||||
```
|
||||
apt-get install -y protobuf-c-compiler=1.2.*
|
||||
|
||||
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
|
||||
|
||||
|
@ -60,20 +53,16 @@ Note that the last step from the above sequence is only required for Debian to b
|
|||
|
||||
If you have all of the above prerequisites installed, clone this git repo to build from source:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/debezium/postgres-decoderbufs.git
|
||||
cd postgres-decoderbufs
|
||||
```
|
||||
git clone https://github.com/debezium/postgres-decoderbufs.git
|
||||
cd postgres-decoderbufs
|
||||
|
||||
### Optional: Re-generating ProtoBuf code
|
||||
|
||||
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
|
||||
|
||||
```bash
|
||||
cd proto
|
||||
protoc-c --c_out=../src/proto pg_logicaldec.proto
|
||||
cd ..
|
||||
```
|
||||
cd proto
|
||||
protoc-c --c_out=../src/proto pg_logicaldec.proto
|
||||
cd ..
|
||||
|
||||
Commit the generated files to git then.
|
||||
|
||||
|
@ -82,53 +71,44 @@ Commit the generated files to git then.
|
|||
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
|
||||
Then `make` and `make install` for each version. Here is an example:
|
||||
|
||||
```bash
|
||||
# Install for Postgres 9.6 if I have multiple local versions
|
||||
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
|
||||
make
|
||||
make install
|
||||
```
|
||||
# Install for Postgres 9.6 if I have multiple local versions
|
||||
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
|
||||
make
|
||||
make install
|
||||
|
||||
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
|
||||
|
||||
```bash
|
||||
# MODULES
|
||||
shared_preload_libraries = 'decoderbufs'
|
||||
# MODULES
|
||||
shared_preload_libraries = 'decoderbufs'
|
||||
|
||||
# REPLICATION
|
||||
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
|
||||
max_wal_senders = 8 # max number of walsender processes (change requires restart)
|
||||
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
|
||||
#wal_sender_timeout = 60s # in milliseconds; 0 disables
|
||||
max_replication_slots = 4 # max number of replication slots (change requires restart)
|
||||
```
|
||||
# REPLICATION
|
||||
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
|
||||
max_wal_senders = 8 # max number of walsender processes (change requires restart)
|
||||
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
|
||||
#wal_sender_timeout = 60s # in milliseconds; 0 disables
|
||||
max_replication_slots = 4 # max number of replication slots (change requires restart)
|
||||
|
||||
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
|
||||
|
||||
```make
|
||||
local replication <youruser> trust
|
||||
host replication <youruser> 127.0.0.1/32 trust
|
||||
host replication <youruser> ::1/128 trust
|
||||
```
|
||||
local replication <youruser> trust
|
||||
host replication <youruser> 127.0.0.1/32 trust
|
||||
host replication <youruser> ::1/128 trust
|
||||
|
||||
And restart PostgreSQL.
|
||||
|
||||
## Usage
|
||||
-- can use SQL for demo purposes
|
||||
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
|
||||
|
||||
```sql
|
||||
-- can use SQL for demo purposes
|
||||
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
|
||||
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
|
||||
|
||||
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
|
||||
-- peek at WAL changes using decoderbufs debug mode for SQL console
|
||||
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
||||
-- get WAL changes using decoderbufs to update the WAL position
|
||||
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
||||
|
||||
-- peek at WAL changes using decoderbufs debug mode for SQL console
|
||||
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
||||
-- get WAL changes using decoderbufs to update the WAL position
|
||||
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
||||
|
||||
-- check the WAL position of logical replicators
|
||||
select * from pg_replication_slots where slot_type = 'logical';
|
||||
```
|
||||
-- check the WAL position of logical replicators
|
||||
select * from pg_replication_slots where slot_type = 'logical';
|
||||
|
||||
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.
|
||||
|
||||
|
|
|
@ -60,12 +60,6 @@
|
|||
#error Expecting timestamps to be represented as integers, not as floating-point.
|
||||
#endif
|
||||
|
||||
#if PG_VERSION_NUM >= 170000
|
||||
#define TUPLE_ACCESS(x) x
|
||||
#else
|
||||
#define TUPLE_ACCESS(x) &x->tuple
|
||||
#endif
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
/* define a time macro to convert TimestampTz into something more sane,
|
||||
|
@ -275,11 +269,8 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
|||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
|
||||
break;
|
||||
case INT8OID:
|
||||
datum_msg->datum_int64 = DatumGetInt64(datum);
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||
break;
|
||||
case OIDOID:
|
||||
datum_msg->datum_int64 = (Oid) DatumGetUInt64(datum);
|
||||
datum_msg->datum_int64 = DatumGetInt64(datum);
|
||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||
break;
|
||||
case FLOAT4OID:
|
||||
|
@ -506,11 +497,7 @@ static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
|||
rmsg.has_op = true;
|
||||
rmsg.transaction_id = txn->xid;
|
||||
rmsg.has_transaction_id = true;
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
|
||||
#else
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||
#endif
|
||||
rmsg.has_commit_time = true;
|
||||
|
||||
/* write msg */
|
||||
|
@ -548,11 +535,7 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
|||
rmsg.has_op = true;
|
||||
rmsg.transaction_id = txn->xid;
|
||||
rmsg.has_transaction_id = true;
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
|
||||
#else
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||
#endif
|
||||
rmsg.has_commit_time = true;
|
||||
|
||||
/* write msg */
|
||||
|
@ -604,11 +587,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
/* set common fields */
|
||||
rmsg.transaction_id = txn->xid;
|
||||
rmsg.has_transaction_id = true;
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
|
||||
#else
|
||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||
#endif
|
||||
rmsg.has_commit_time = true;
|
||||
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
|
||||
NameStr(class_form->relname)));
|
||||
|
@ -628,13 +607,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
rmsg.new_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
|
||||
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
||||
rmsg.new_typeinfo =
|
||||
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
||||
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
||||
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
}
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
|
@ -649,7 +628,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
rmsg.old_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
||||
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
|
||||
&change->data.tp.oldtuple->tuple, tupdesc);
|
||||
}
|
||||
if (change->data.tp.newtuple != NULL) {
|
||||
elog(DEBUG1, "decoding new tuple information");
|
||||
|
@ -659,13 +638,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
rmsg.new_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
|
||||
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
||||
rmsg.new_typeinfo =
|
||||
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
||||
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
||||
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||
&change->data.tp.newtuple->tuple, tupdesc);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -681,7 +660,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
rmsg.old_tuple =
|
||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
||||
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
|
||||
&change->data.tp.oldtuple->tuple, tupdesc);
|
||||
} else {
|
||||
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue